### Setting up connection to EMS

In [1]:
import pandas as pd
import re

from pandas.errors import EmptyDataError
from pycelonis import get_celonis, pql
from pycelonis.ems import ColumnTransport, ColumnType

In [2]:
celonis = get_celonis()

[2023-10-02 09:13:49,991] INFO: No `base_url` given. Using environment variable 'CELONIS_URL'
[2023-10-02 09:13:49,992] INFO: No `api_token` given. Using environment variable 'CELONIS_API_TOKEN'




[2023-10-02 09:13:50,140] INFO: Initial connect successful! PyCelonis Version: 2.4.1
[2023-10-02 09:13:50,173] INFO: `package-manager` permissions: ['$ACCESS_CHILD']
[2023-10-02 09:13:50,174] INFO: `workflows` permissions: []
[2023-10-02 09:13:50,174] INFO: `task-mining` permissions: []
[2023-10-02 09:13:50,175] INFO: `action-engine` permissions: []
[2023-10-02 09:13:50,175] INFO: `team` permissions: []
[2023-10-02 09:13:50,176] INFO: `process-repository` permissions: []
[2023-10-02 09:13:50,176] INFO: `process-analytics` permissions: []
[2023-10-02 09:13:50,177] INFO: `transformation-center` permissions: []
[2023-10-02 09:13:50,177] INFO: `storage-manager` permissions: []
[2023-10-02 09:13:50,178] INFO: `event-collection` permissions: ['USE_ALL_DATA_MODELS', '$ACCESS_CHILD', 'CREATE_DATA_POOL', 'EDIT_ALL_DATA_POOLS']
[2023-10-02 09:13:50,178] INFO: `user-provisioning` permissions: []
[2023-10-02 09:13:50,179] INFO: `ml-workbench` permissions: []


In [3]:
data_pool = celonis.data_integration.get_data_pools().find("P2P")
data_pool = celonis.data_integration.get_data_pool(data_pool.id)
data_model = data_pool.get_data_models().find('BANF (STXL table) for Python')

### Creation of initials dataframe

In [4]:
query = pql.PQL()
query += pql.PQLColumn(query = "BNF_STXL.MANDT", name = "MANDT")
query += pql.PQLColumn(query = "BNF_STXL.BANFN", name = "BANFN")
query += pql.PQLColumn(query = "BNF_STXL.BNFPO", name = "BNFPO")
query += pql.PQLColumn(query = "BNF_STXL.DATAORIGIN", name = "DATAORIGIN")
query += pql.PQLColumn(query = "BNF_STXL.TDNAME", name = "TDNAME")
query += pql.PQLColumn(query = "BNF_STXL.TEXT_", name = "TEXT_")

# FILTERS - already in SQL table:
# EBAN.BLCKD = '1' AND  BLCKT = 'Logbuch in Positionsnotiz'

df = data_model.export_data_frame(query)
    
if df.empty:
    print('DataFrame contains no data empty. Script failed!')
    raise EmptyDataError

[2023-10-02 09:13:51,456] INFO: Successfully created data export with id '99b7346b-3bd9-4a43-aead-dfe3b56bbe32'
[2023-10-02 09:13:51,457] INFO: Wait for execution of data export with id '99b7346b-3bd9-4a43-aead-dfe3b56bbe32'


The method `data_model.export_data_frame` has been deprecated and will be removed in future versions.
Please use SaolaPy from now on to export PQL queries:

	import pycelonis.pql as pql

	df = pql.DataFrame.from_pql(query, data_model=data_model)
	df.head()

For more information on SaolaPy, please visit https://celonis.github.io/pycelonis/2.4.1/tutorials/executed/05_saolapy/01_saolapy_quickstart/


0it [00:00, ?it/s]

[2023-10-02 09:13:51,510] INFO: Export result chunks for data export with id '99b7346b-3bd9-4a43-aead-dfe3b56bbe32'


### Filtering dataframe

In [5]:
def remove_logbuch_info(text: str) -> str:
    """
    removing logbuch redundant info
    :param _id: text content coming from STXL source table
    :return: preprocessed text
    """
    preprocessed_text = re.sub("#Logbuchende.*$", '' , text) # excluding everything after first Logbuchende, instead of row above
    
    logbuch_pattern = re.compile(r"#Logbuch.*?\d{2}.\d{2}.\d{4}")
    text = re.sub(logbuch_pattern, '', preprocessed_text)
    text += '#'
    return text

def check_conditions(text: str) -> bool:
    """
    marking records which will be deleted further
    redundant are all empty texts + texts not generated by a bot (no Logbuch and #\d)
    :text: preprocessed text after the removal of logbuch info
    :return: boolean value of condition checks
    """
    switch = True
    if not text:
        switch = False
    elif 'Logbuch' not in text:
        switch = False
    elif '#1' not in text:
        switch = False
    return switch

df['condition_check'] = df['TEXT_'].apply(check_conditions)
df_filtered = df[df['condition_check'] == True]
df_filtered.drop('condition_check', axis = 1, inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered.drop('condition_check', axis = 1, inplace = True)


In [6]:
def create_regex_patterns() -> list:
    """
    creating regex patterns from the defined strings 
    purpose is to match later only the specific information to the related string (column)
    :return: list of regex patterns
    """
    patterns = []
    for string in strings_to_catch:
        patterns.append(re.compile(fr"(?<={string}).*?(?=#)", re.IGNORECASE))
    return patterns

def create_new_columns(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    assigning defined strings as new columns to the dataframe
    :dataframe: input df
    :return: dataframe with new columns (containing empty strings for now)
    """
    for string in strings_to_catch:
        if string.endswith(':'):
            string = string[:-1]
        dataframe[string] = ''
    return dataframe

def fill_column(text: str, pattern: re.Pattern) -> str:
    """
    matching the specific information to the related column
    :text: content of preprocessed text field
    :return: text(str)
    """
    result = ''
    if match := pattern.search(text):
        result = match.group()
    return result

def check_if_ok(input_text, column) -> str:
    """
    final creation of the checks based on the conditions from business side
    :input_text: string inside the specific column
    :column: specified column of the dataframe
    :return: final check (str)
    """
    text = 'Not OK'
    #print(input_text, column)
    if input_text == '':                   # If check is not mentioned --> NULL 
        text = None
    elif column in ['Leistungsnummer/0,01 €', 'Leistungsnummer/0 Menge', 'Vertragslaufzeit', 'Neue Leistungsnummer']:
        if 'OK' in input_text:
            text = 'OK'
    elif column in ['Prüfung Bauadresse', 'Korrekte MSGK']:
        if 'Ja' in input_text:
            text = 'OK'
    elif column == 'Leistungsnummer doppelt':
        if 'Nein' in input_text:
            text = 'OK'
    elif column == 'Prüfung AVV':
        if re.search(r"(Nein)|(Ja, ADV erforderlich)|(Ja, ADV vorhanden)", input_text):
            text = 'OK'
    elif column == 'Ausführungszeitraum':
        if 'OK' in input_text or re.search(r"Alt: \d{2}.\d{2}.\d{4}, Liefertermin \d{2}.\d{2}.\d{4}", input_text):
            text = 'OK'
    return text

In [7]:
strings_to_catch = [
    'Leistungsnummer/0,01 €:',
    'Leistungsnummer/0 Menge:',
    'Ausführungszeitraum:',
    'Prüfung AVV:',
    'Prüfung Bauadresse:',
    'Korrekte MSGK:',
    'Leistungsnummer doppelt:',
    'Vertragslaufzeit:', 
    'Neue Leistungsnummer:'              
]

In [8]:
patterns = []
for string in strings_to_catch:
    patterns.append(re.compile(fr"(?<={string}).*?(?=#)", re.IGNORECASE))
    
patterns

[re.compile(r'(?<=Leistungsnummer/0,01 €:).*?(?=#)', re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Leistungsnummer/0 Menge:).*?(?=#)',
            re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Ausführungszeitraum:).*?(?=#)', re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Prüfung AVV:).*?(?=#)', re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Prüfung Bauadresse:).*?(?=#)', re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Korrekte MSGK:).*?(?=#)', re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Leistungsnummer doppelt:).*?(?=#)',
            re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Vertragslaufzeit:).*?(?=#)', re.IGNORECASE|re.UNICODE),
 re.compile(r'(?<=Neue Leistungsnummer:).*?(?=#)', re.IGNORECASE|re.UNICODE)]

### Main script:

In [10]:
# columns without ':' in the end
columns = [column[:-1] for column in strings_to_catch]
patterns = create_regex_patterns()

df_copy = df_filtered.copy()
df_copy = create_new_columns(df_copy)

for column, pattern in zip(df_copy[columns], patterns):
    df_copy[column] = df_copy.apply(lambda x: fill_column(x['TEXT_'], pattern),axis=1) 

for column in columns:
    df_copy[column] = df_copy.apply(lambda x: check_if_ok(x[column], column),axis=1) 

### Pushing the data into Celonis EMS

In [12]:
column_config = [
    ColumnTransport(column_name="MANDT", column_type=ColumnType.STRING, field_length=3),
    ColumnTransport(column_name="BANFN", column_type=ColumnType.STRING, field_length=10),
    ColumnTransport(column_name="BNFPO", column_type=ColumnType.STRING, field_length=5),
    ColumnTransport(column_name="DATAORIGIN", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="TDNAME", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="TEXT_", column_type=ColumnType.STRING, field_length=5000),
    ColumnTransport(column_name="Leistungsnummer/0,01 €", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Leistungsnummer/0 Menge", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Ausführungszeitraum", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Prüfung AVV", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Prüfung Bauadresse", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Korrekte MSGK", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Leistungsnummer doppelt", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Vertragslaufzeit", column_type=ColumnType.STRING, field_length=20),
    ColumnTransport(column_name="Neue Leistungsnummer", column_type=ColumnType.STRING, field_length=20)
]

In [13]:
# push to EMS - (global workbench) and execute the datajob in global workbench to transform the table to production
data_pool.create_table(table_name="BNF_CHECKS_PYTHON",
                   df=df_copy,
                   drop_if_exists=True,
                   column_config = column_config)  

# force = True # resets all columns to str(80 chars)

datajob = data_pool.get_jobs().find("Workbench for Python Pushing")
datajob.execute()
print("Data successfully pushed into production.")

[2023-10-02 09:13:52,533] INFO: Successfully created data push job with id 'cdc90070-d2b6-452c-99a2-dd8f7de292de'
[2023-10-02 09:13:52,535] INFO: Add data frame as file chunks to data push job with id 'cdc90070-d2b6-452c-99a2-dd8f7de292de'


  0%|          | 0/1 [00:00<?, ?it/s]

[2023-10-02 09:13:52,649] INFO: Successfully upserted file chunk to data push job with id 'cdc90070-d2b6-452c-99a2-dd8f7de292de'
[2023-10-02 09:13:52,752] INFO: Successfully triggered execution for data push job with id 'cdc90070-d2b6-452c-99a2-dd8f7de292de'
[2023-10-02 09:13:52,753] INFO: Wait for execution of data push job with id 'cdc90070-d2b6-452c-99a2-dd8f7de292de'


0it [00:00, ?it/s]

[2023-10-02 09:13:54,215] INFO: Successfully created table 'BNF_CHECKS_PYTHON' in data pool
[2023-10-02 09:13:54,273] INFO: Successfully deleted data push job with id 'cdc90070-d2b6-452c-99a2-dd8f7de292de'
[2023-10-02 09:13:55,038] INFO: Successfully started execution for job with id '47e1db96-d7d2-4f5f-841f-43a798a88174'
[2023-10-02 09:13:55,038] INFO: Wait for execution of job with id '47e1db96-d7d2-4f5f-841f-43a798a88174'


0it [00:00, ?it/s]

Data successfully pushed into PHB.


In [14]:
data_model = data_pool.get_data_models().find('BANF(original_new) PHB')

data_model.reload(wait=True)
print("BANF datamodel succesfully reloaded")

[2023-10-02 09:13:59,945] INFO: Successfully triggered data model reload for data model with id '107f8869-45a0-43f1-b64b-a47e91c1cb67'
[2023-10-02 09:13:59,946] INFO: Wait for execution of data model reload for data model with id '107f8869-45a0-43f1-b64b-a47e91c1cb67'


0it [00:00, ?it/s]



BANF datamodel succesfully reloaded
