# Etl Functions

In [None]:
# Read Parquet from Part

def _fn(
    filePathName, 
    fileSystemClient,
    dataTypes=None,
    showMessages=True):
    
    import os
    from io import BytesIO
    
    _path = os.path.split(filePathName)[0]
    _file = os.path.split(filePathName)[1]
    _file_client = fileSystemClient.get_directory_client(_path).get_file_client(_file)
    _stream = BytesIO()
    _file_data = _file_client.download_file().readinto(_stream)
    _df = pd.read_parquet(_stream)
    
    if dataTypes is not None:
        # Check if every col exists
        if showMessages:
            for col in dataTypes:
                if col not in _df:
                    pp.send_message(f"Column '{col}' is not present in '{os.path.split(_path)[-1]}'")
        
        # Must convert to float and then to pandas Int to support NaNs
        _int_cols = {k:v for k,v in dataTypes.items() if v == 'int'}
        # Category columns must be converted at the end (with all parts concatenated)
        _other_cols = {k:v for k,v in dataTypes.items() if v not in ['int', 'category']}
        _df = _df.astype(_other_cols)
        for col in _int_cols:
            _df[col] = _df[col].astype(float).astype('Int64')

    return _df

result = _fn

In [None]:
# Dataframe from Parquet Parts

def _fn(
    filesPath, 
    fileSystemClient,
    dataTypes=None,
    verbose=False,
    showMessages=True):
    
    # Path generator
    _path_list = fileSystemClient.get_paths(path=filesPath, recursive=True)

    _dfs = []
    for path in _path_list:
        # Full path string
        _path_name = path.name
        if '.parquet' in _path_name:
            try:
                _file_df = read_parquet_from_part(
                    filePathName=_path_name,
                    fileSystemClient=fileSystemClient,
                    dataTypes=dataTypes,
                    showMessages=showMessages)
                _dfs.append(_file_df)
                
                if verbose:
                    print(_path_name + '\n')
            except Exception as e:
                if showMessages:
                    pp.send_message(f"There was an error reading '{_path_name}' file. Error: {e}")
                continue
    
    _df = pd.DataFrame(columns=list(dataTypes.keys()) if dataTypes is not None else [])
    if len(_dfs) > 0:
        _df = pd.concat(_dfs, ignore_index=True)
    
    if dataTypes is not None:
        # Convert to categorical
        _cat_cols = {k:v for k,v in dataTypes.items() if v == 'category'}
        for col in _cat_cols:
            _df[col] = _df[col].astype('category')
    
    return _df

result = _fn

In [1]:
# Read CSV from Part
def _fn(
    filePathName, 
    fileSystemClient,
    sep=',',
    header='infer',
    dataTypes=None,
    skipBadLines=True, 
    names = None):
    
    import os
    from io import StringIO
    
    _path = os.path.split(filePathName)[0]
    _file = os.path.split(filePathName)[1]
    _file_client = fileSystemClient.get_directory_client(_path).get_file_client(_file)
    _file_data = StringIO(_file_client.download_file().readall().decode())
    _error_bad_lines = not skipBadLines
    _dtype = None
    
    if dataTypes is not None:
        # Category columns must be converted at the end (with all parts concatenated)
        _not_categ_cols_dtypes = {k:v for k,v in dataTypes.items() if v != 'category'}
        _dtype = {k: ('Int64' if v == 'int' else v) for k,v in _not_categ_cols_dtypes.items()}
    if names is None:
        _df = pd.read_csv(
            _file_data,
            sep=sep,
            header=header,
            dtype=_dtype,
            error_bad_lines=_error_bad_lines)
    else:
        _df = pd.read_csv(
            _file_data,
            sep=sep,
            header=header,
            dtype=_dtype,
            error_bad_lines=_error_bad_lines,
            names = names
            )
        
    
    return _df

result = _fn

In [2]:
# Dataframe from CSV Parts

def _fn(
    filesPath, 
    fileSystemClient,
    sep=',',
    header='infer',
    dataTypes=None,
    skipBadLines=False,
    verbose=False,
    showMessages=True,
    names = None
    ):
        
    import os
    
    # Path generator
    _path_list = fileSystemClient.get_paths(path=filesPath, recursive=True)

    _dfs = []
    for path in _path_list:
        # Full path string
        _path_name = path.name
        if 'part' in _path_name and '.csv' in _path_name:
            _file_df = read_csv_from_part(
                filePathName=_path_name,
                fileSystemClient=fileSystemClient,
                sep=sep,
                header=header,
                dataTypes=dataTypes,
                skipBadLines=skipBadLines,
                names = names
                )
            _dfs.append(_file_df)
            
            if verbose:
                print(_path_name + '\n')
    
    _df = pd.DataFrame(columns=list(dataTypes.keys()) if dataTypes is not None else [])
    if len(_dfs) > 0:
        _df = pd.concat(_dfs, ignore_index=True)
    
    _flag = False
    if dataTypes is not None:
        # Check if every col exists
        for col in dataTypes:
            if col not in _df:
                if showMessages:
                    pp.send_message(f"Column '{col}' is not present in '{os.path.split(filesPath)[-1]}'")
                _flag = True
        
        if _flag:
            raise Exception(f"Check columns from table '{os.path.split(filesPath)[-1]}'")
        
        # Convert to categorical
        _cat_cols = {k:v for k,v in dataTypes.items() if v == 'category'}
        for col in _cat_cols:
            _df[col] = _df[col].astype('category')
    
    return _df

result = _fn

In [None]:
# List Directory Files & Folders

def _fn(
    filesPath, 
    fileSystemClient, 
    maxResults=500):
    
    import os
    
    # Path generator
    path_list = fileSystemClient.get_paths(path=filesPath, recursive=True, max_results=maxResults)

    for path in path_list:
        print(path.name + '\n')

result = _fn

In [None]:
# Create Folder
def _fn(path):
    # Creates new folder if it does not exist
    
    import os
    
    if not os.path.exists(path):
        os.mkdir(path)
    
result = _fn

In [None]:
# Read Pickle or Datalake Table

def _fn(fileName, versionsPath, datalakeReadNode, readFromDatalakeByDefault=False):
    """datalakeReadNode must me a node object. i.e: etl_read_dm_material.node
    """
    
    import os

    _path = os.path.join(versionsPath, fileName)
    
    if etl_datalake_tables_read_source_sel == 'Datalake':
        _df = datalakeReadNode.result.copy()
        datalakeReadNode.invalidate()
    else:
        if not readFromDatalakeByDefault:
            if os.path.isfile(_path):
                _df = pd.read_pickle(_path, compression=etl_pkl_compression_mode)
            else:
                raise ValueError(f"File '{fileName}' does not exist")
        
        elif versionsPath == etl_no_versions_msg or not os.path.isfile(_path):
            _df = datalakeReadNode.result.copy()
            datalakeReadNode.invalidate()
            # Warn when reading from Datalake if the source should have been a saved version
            pp.send_message(message_text=f"Table '{fileName[:-4]}' was read directly from Datalake")
        
        else:
            _df = pd.read_pickle(_path, compression=etl_pkl_compression_mode)
    
    return _df

result = _fn

In [3]:
# Check if Recurrent Tables are All Updated

def _fn(recurrentTables, maxAttempts=3, showMessages=True):
    import time as _time_mod
    
    if showMessages:
        pp.progressbar(50, message_text='Checking recurrent tables last version')
    
    _n = 1
    while True:
        etl_read_controle_pyplan.node.invalidate()
        _df = etl_read_controle_pyplan.copy()
        
        _versions = set()
        for table in recurrentTables:
            _df_table = _df[_df['NOME_BASE'] == table]
            _last_update = _df_table['ATUALIZACAO'].max()
            _last_version = _df_table[_df_table['ATUALIZACAO'] == _last_update]['VERSAO'].iloc[0]
            _versions.add(_last_version)
        
        if len(_versions) != 1 and _n < maxAttempts:
            _n += 1
            _time_mod.sleep(5*60)
        else:
            break
    
    if showMessages:
        pp.progressbar(100, message_text='Checking recurrent tables last version')
    
    return len(_versions) == 1

result = _fn

In [None]:
# Save Version of Datalake Tables

def _fn(nodesList, targetPath, processName, maxAttempts=3, showMessages=True, checkRecurrentTables=False, recurrentTables=etl_recurrent_table_names, logsPath=etl_logs_file_path, nodesToSaveAfterList=None):
    
    import os
    import shutil
    import datetime, pytz
    import time as _time_mod
    
    _username = pyplan_user['userName']
    _failed_table = ''
    _logs = []
    try:
        _check = True
        if checkRecurrentTables:
            _check = check_if_recurrent_tables_are_all_updated(recurrentTables, maxAttempts=3, showMessages=showMessages)
            
        if _check:
            _number_of_attempts = 3
            
            _version = datetime.datetime.now(pytz.timezone(etl_pytz_timezone)).strftime('%Y-%m-%d-%H-%M-%S')
            _temp_version_path = os.path.join(etl_temp_folder_path, _version)
            _target_version_path = os.path.join(targetPath, _version)
            
            _start_time = _time_mod.time()
            
            # Create folders if they do not exist
            create_folder(etl_root_path)
            create_folder(etl_temp_folder_path)
            create_folder(_temp_version_path)
            create_folder(targetPath)
            
            _attempts = 1
            while True:
                _ok_status = True
                _progress = 1
                for node in nodesList:
                    _tablename = node.identifier[9:].upper()  # removes etl_read_
                    _filename = f'{_tablename}.pkl'
                    _filepath = os.path.join(_temp_version_path, _filename)
                    if showMessages:
                        pp.progressbar(_progress, message_text=f'Attempt {_attempts}: Saving {_tablename}')
                    try:
                        # Save
                        _result = node.result
                        _result.to_pickle(_filepath, compression=etl_pkl_compression_mode)
                        node.invalidate()  # release memory
                        _status = 'OK'
                        _message = ''
                    except Exception as e:
                        _status = 'ERROR'
                        _message = e
                        # Remove folder
                        shutil.rmtree(_temp_version_path, ignore_errors=True)
                        _ok_status = False
                        _failed_table = _tablename
                        break
                    _log = create_log_row(processName, _username, _version, _tablename, _status, _message)
                    _logs.append(_log)
                    _progress = int(_progress + 1/len(nodesList)*100)
                if _ok_status or _attempts == maxAttempts:
                    break
                else:
                    _attempts += 1
                    _time_mod.sleep(5*60)  # 5 minutes cooldown
                    _log = create_log_row(processName, _username, _version, _failed_table, '', 'RETRYING')
                    _logs.append(_log)
            
            # Transfer version to official versions folder
            if _ok_status:
                shutil.move(_temp_version_path, _target_version_path)
                
                # Save nodes that must me saved after the process
                if nodesToSaveAfterList is not None:
                    for node in nodesToSaveAfterList:
                        _tablename = node.identifier[9:].upper()  # removes etl_read_
                        _filename = f'{_tablename}.pkl'
                        _filepath = os.path.join(_target_version_path, _filename)
                        try:
                            # Save
                            _result = node.result
                            _result.to_pickle(_filepath, compression=etl_pkl_compression_mode)
                            node.invalidate()  # release memory
                            _status = 'OK'
                            _message = ''
                        except Exception as e:
                            _status = 'ERROR'
                            _message = e
                    _log = create_log_row(processName, _username, _version, _tablename, _status, _message)
                    _logs.append(_log)
                
                if showMessages:
                    pp.progressbar(100, message_text=f'Attempt {_attempts}: Successfully saved all tables')
            else:
                raise ValueError(f'Error: {_message}')
            etl_refresh_node.node.invalidate()
            
            # Calculate elapsed time of execution
            _elapsed_time = _time_mod.time() - _start_time
            _minutes = _elapsed_time/60
            _minutes_round = int(_minutes)
            _seconds = int((_minutes-_minutes_round) * 60)
            if showMessages:
                pp.send_message(message_text=f'{_minutes_round}:{_seconds:02d} minutes', message_title='Elapsed time')
        else:
            _recurrent_tables = recurrentTables
            _recurrent_tables_str = ', '.join(_recurrent_tables)
            _log = create_log_row(processName, _username, '', '', 'RECURRENT TABLES ERROR', f'Tables {_recurrent_tables_str} had different versions')
            _logs.append(_log)
    
    except Exception as e:
        _log = create_log_row(processName, _username, '', _failed_table, 'UNEXPECTED ERROR', f'Error: {e}')
        _logs.append(_log)
    
    finally:
        # Join logs and save them
        save_logs(_logs, logsPath=logsPath)

result = _fn

In [4]:
# Remove Saved Version

def _fn(versionsPath, versionName, processName):
    import os
    import shutil
    import datetime
    import pytz
    
    _version = versionName
    _version_path = os.path.join(versionsPath, _version)
    _username = pyplan_user['userName']
    
    if os.path.exists(_version_path):
        shutil.rmtree(_version_path, ignore_errors=True)
        
        # Update logs
        _log = create_log_row(
            processName=processName,
            userName=_username,
            version=versionName,
            tableName='[ALL]',
            status='OK',
            message='')
    
        # Join logs and save again
        if os.path.isfile(etl_logs_file_path):
            _df_all_logs = pd.read_pickle(etl_logs_file_path, compression=etl_pkl_compression_mode)
            _df_all_logs = pd.concat([_df_all_logs, _log], ignore_index=True)[_log.columns]
        else:
            _df_all_logs = _log.copy()
        _df_all_logs.to_pickle(etl_logs_file_path, compression=etl_pkl_compression_mode)
        
        pp.send_message(f"Succesfully removed '{_version}' version")
        etl_refresh_node.node.invalidate()
    else:
        pp.send_message(f"Version '{_version}' does not exist")

result = _fn

In [None]:
# Create Log Row

def _fn(processName, userName, version, tableName, status, message):
    import datetime
    import pytz
    
    _df = pd.DataFrame({
        'Process': processName,
        'Username': userName,
        'Version': version,
        'Table': tableName,
        'Timestamp': datetime.datetime.now(pytz.timezone(etl_pytz_timezone)).strftime('%Y/%m/%d %H:%M:%S'),
        'Status': status,
        'Message': message}, index=[0])
    
    return _df

result = _fn

In [5]:
# Save Logs

def _fn(logs, logsPath):
    import os
    
    if len(logs) > 0:
        _df_logs = pd.concat(logs, ignore_index=True)
        if os.path.isfile(logsPath):
            _df_all_logs = pd.read_pickle(logsPath, compression=etl_pkl_compression_mode)
            _df_all_logs = pd.concat([_df_all_logs, _df_logs], ignore_index=True)[_df_logs.columns]
        else:
            _df_all_logs = _df_logs.copy()
        _df_all_logs.to_pickle(logsPath, compression=etl_pkl_compression_mode)

result = _fn

In [None]:
# Upload File to Datalake

def _fn(fileSystemClient, filesPath, localFilePath, targetFilename):
    
    _directory_client = fileSystemClient.get_directory_client(filesPath)
    _file_client = _directory_client.create_file(targetFilename)
    
    with open(localFilePath, 'rb') as local_file:
        _file_contents = local_file.read()
        _file_client.append_data(data=_file_contents, offset=0, length=len(_file_contents))
        _file_client.flush_data(len(_file_contents))
        
    return 'OK'

result = _fn

In [None]:
# Remove Old Files From Exports Folder

def _fn(daysElapsed, showMessages=True):
    import os
    import time as _time_mod
    
    _current_time = _time_mod.time()
    _minimum_time = _current_time - (daysElapsed*24*60*60)
    _count = 0
    for _, _, filenames in os.walk(etl_exports_to_dl_folder_path):
        for filename in filenames:
            _filepath = os.path.join(etl_exports_to_dl_folder_path, filename)
            _file_mtime = os.path.getmtime(_filepath)
            if _file_mtime < _minimum_time:
                os.remove(_filepath)
                _count += 1
    
    if showMessages:
        _message = f'Removed {_count} files from Exports folder' if _count > 0 else 'No files were removed from Exports folder'
        pp.send_message(_message)

result = _fn

In [None]:
# Remove Old Records From Table

def _fn(mainTable, path, connection, cursor, elapsedDays=10, showMessages=True):
    
    import sqlite3
    from datetime import timedelta
    
    _threshold_date = opt_current_date - timedelta(days=elapsedDays)
    _threshold_date_str = _threshold_date.strftime('%Y-%m-%d')
    
    # Remove old rows from original table
    _delete_query = f"""
    DELETE FROM {mainTable}
    WHERE lastUpdate < '{_threshold_date_str}'
    """
    cursor.execute(_delete_query)
    
    if showMessages:
        pp.send_message(f'Succesfully deleted records from {mainTable} older than {_threshold_date_str}')
    
    connection.commit()

result = _fn

In [6]:
# Remove Old Records From Forms Tables

def _fn(tables, pathNode, elapsedDays=10, showMessages=True):
    
    import sqlite3
    from datetime import timedelta
    
    _conn = sqlite3.connect(pathNode.result)
    _cursor = _conn.cursor()
    
    for table in tables:
        remove_old_records_from_table( 
            mainTable=table,
            path=pathNode.result,
            connection=_conn,
            cursor=_cursor,
            elapsedDays=elapsedDays,
            showMessages=showMessages)
    
    # Rebuild database (shrinks file size)
    _vacuum_query = 'VACUUM'
    _cursor.execute(_vacuum_query)
    
    _conn.commit()
    _conn.close()
    
    pathNode.invalidate()

result = _fn

In [None]:
# Archive Old Records From Table

def _fn(mainTable, archiveTable, path, connection, cursor, elapsedDays=10, showMessages=True):
    
    import sqlite3
    from datetime import timedelta
    
    _threshold_date = opt_current_date - timedelta(days=elapsedDays)
    _threshold_date_str = _threshold_date.strftime('%Y-%m-%d')
    
    # Get columns for insert statement
    _cols_query = f"""
    SELECT *
    FROM {mainTable}
    LIMIT 1
    """
    _cols = pd.read_sql_query(_cols_query, connection).columns.tolist()
    if 'id' in _cols:
        _cols.remove('id')
    _cols_str = ','.join([f'[{col}]' for col in _cols])
    
    # Insert old rows into archive table
    _insert_query = f"""
    INSERT INTO {archiveTable} ({_cols_str})
    SELECT {_cols_str}
    FROM {mainTable}
    WHERE lastUpdate < '{_threshold_date_str}'
    """
    cursor.execute(_insert_query)
    
    # Remove old rows from original table
    _delete_query = f"""
    DELETE FROM {mainTable}
    WHERE lastUpdate < '{_threshold_date_str}'
    """
    cursor.execute(_delete_query)
    
    if showMessages:
        pp.send_message(f'Succesfully archived records from {mainTable} older than {_threshold_date_str}')
    
    connection.commit()

result = _fn

In [None]:
# Archive Old Records From Forms Tables

def _fn(tablePairs, pathNode, elapsedDays=10, showMessages=True):
    
    import sqlite3
    from datetime import timedelta
    
    _conn = sqlite3.connect(pathNode.result)
    _cursor = _conn.cursor()
    
    for pair in tablePairs:
        archive_old_records_from_table( 
            mainTable=pair[0],
            archiveTable=pair[1],
            path=pathNode.result,
            connection=_conn,
            cursor=_cursor,
            elapsedDays=elapsedDays,
            showMessages=showMessages)
    
    # Rebuild database (shrinks file size)
    _vacuum_query = 'VACUUM'
    _cursor.execute(_vacuum_query)
    
    _conn.commit()
    _conn.close()
    
    pathNode.invalidate()

result = _fn

In [None]:
# Create XML Message

def _fn(message):
    
    _xml = f"""<?xml version='1.0' encoding='utf-8'?>
    <QueueMessage>  
        <MessageText>{message}</MessageText>  
    </QueueMessage>"""
    
    return _xml

result = _fn

In [None]:
# POST Message to Datalake

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

def _fn(message, url):
    _retry_strategy = Retry(
        total=5,
        backoff_factor=0.1,
        status_forcelist=[429, 500, 502, 503, 504],
        method_whitelist=False
    )
    _adapter = HTTPAdapter(max_retries=_retry_strategy)
    _http = requests.Session()
    _http.mount('https://', _adapter)
    _http.mount('http://', _adapter)
    _data_xml = create_xml_message(message)
    _req = _http.post(url, data=_data_xml, timeout=5)
    pp.send_message(_req.status_code)
    return _req
result = _fn

In [None]:
# POST Message to Datalake Old

def _fn(message, url):
    import requests

    _data_xml = create_xml_message(message)
    _req = requests.post(url, data=_data_xml)
    
    return _req

result = _fn

In [None]:
# Format Time

_try_convert = try_convert

def _fn(timeSerie):
    timeSerie = timeSerie.astype(str).apply(lambda x: (x[:4] + '.' + (x[5:] + '0' if len(x)<7 else x[5:])) if ( isinstance(_try_convert(x[:4],x,int),int) & (_try_convert(x,x,float)-_try_convert(_try_convert(x,x,float),x,int) != 0 )) else int(x[:4]) if ( isinstance(_try_convert(x[:4],x,int),int) & (_try_convert(x,x,float) - _try_convert(_try_convert(x,x,float),x,int) == 0))
    else x).astype(str)
    
    return timeSerie
result = _fn

In [None]:
# Try Convert

def _try_convert(value, default, *types):
    for t in types:
        try:
            return t(value)
        except (ValueError, TypeError):
            continue
    return default
result = _try_convert