# HISevent processing and storage script

The goal of this script is to process incoming HISevent files from a HISevent folder, into clean data to be stored in an SQL Server database. SQLite was not chosen due to connection issues with Power BI.

**Workflow Overview:**
1. Initialise Python environment
2. Initialise directory
3. Initialise connection to SQL Server database (Database and tables created separately)
4. Initiate Processing Loop Sequence
4.1. Read in oldest raw HISevent log file (target file) from target folder (Raw File Source)
4.2. Clean and Process raw HISevent 
4.3. Append cleaned data into SQL Server database table
4.4. Delete target file from target folder
4.5. Save cleaned data as a new CSV in a separate folder (Optional Step)
5. End loop when there are no more files in folder

## Initialisation

### Initialise Python Environment

In [1]:
# Check python environment
import sys
print(sys.executable)

C:\ProgramData\Anaconda3\envs\Python_3-9-master\python.exe


In [2]:
#import libraries
import numpy as np
import pandas as pd
import regex
import datetime as dt
import time
import memory_profiler # for logging memory consumption at the function level
import psutil # or logging memory consumption at that point instant
import os # for manipulating file directories
import pyodbc # for sql operations
import sqlalchemy # for sql operations with pandas
import urllib # for defining sql connection parameters
import shutil # for transferring files between folders
#import swifter # for speeding up pandas apply functions on vectorised data

In [3]:
# Load memory profiler to log memory usage by function
# uses the "%memit" prefix before each function to start logging
%load_ext memory_profiler

In [10]:
import psutil
p = psutil.Process()
# Get peak memory usage at that instant of time
print(p.memory_info().peak_wset / 1024 ** 2)

102.55859375
102.55859375


### Configure display

In [None]:
# Enable display of all columns for dataframes with many variables
pd.set_option('display.max_columns', None)

### Initialise Directories

In [None]:
# Check current directory location
cwd = os.getcwd()
cwd

In [None]:
# Define root file directory folder where the files are being stored
#os.chdir(cwd + alarmLoc)
os.chdir(os.path.dirname(os.getcwd()) + '\\alarm-event-logs')

# Check current directory location
cwd = os.getcwd()

# Check directory location
cwd

In [None]:
# Location of source files
srcLoc = '\\HISevent_data\\Logs_test'

In [None]:
# Define root file directory folder where the files are being stored
os.chdir(cwd + srcLoc)
#os.chdir(cwd + eventLoc)
# Check directory location
os.getcwd()

### Initialise SQL Server Database
To Store cleaned log files

#### Connect to SQL Server Database & Define Core Functions
Actual database creation and table creation is done on Microsoft SQL Management Studio. Remember to Close Connection when done.

In [None]:
# Choose to activate Test Mode or Not
testMode = True

# Choose whether to reset database table (clear all data)
resetTableData = True

In [None]:
# Get pyodbc version
print(pyodbc.version)

In [None]:
# Find available server drivers
pyodbc.drivers()

In [None]:
# Open a database connection to target database
# All subsequent functions will depend on this connection
# Remember to close connection when done
conn = pyodbc.connect('Driver={SQL Server};'
                      'Server=SBSR-RD-0K00200;'
                      'Database=HISevent_DBtest;'
                      'Trusted_Connection=yes;')

# Define Server Parameters to Initiate Connection Engine via SQL Alchemy
# This has the same values as the connection request
# Only if one uses the windows authentification method
# Otherwise, one will need to define "UID" (user ID) + "PWD" (password)
serverParams = urllib.parse.quote_plus('Driver={SQL Server};'
                                       'Server=SBSR-RD-0K00200;'
                                       'Database=HISevent_DBtest;'
                                       'Trusted_Connection=yes;'
                                       #"UID=user;"
                                       #"PWD=password"
                                      )

# Create cursor to work in database
# SQL auto commits transactions
cursor = conn.cursor()

In [None]:
# Function to Inspect All Tables
def list_dbTables():
    # Check list of tables in db
    cursor.execute("SELECT table_name FROM INFORMATION_SCHEMA.TABLES ")
    print(cursor.fetchall())

# List tables in database
# Note: %memit" prefix is used to log peak memory uage
%memit list_dbTables()

In [None]:
# Function to list contents in target table
def list_tableContents(targetTable, rowLimit = 10):
    # Gets first n rows (rowLimit) from target table sorted by datetime (oldest entry first)
    # Not allowed to get all values as the table size can be huge
    cursor.execute(f"SELECT top {rowLimit} * FROM {targetTable} ORDER BY 'DATEANDTIME' ASC")
    results = cursor.fetchall()
    print(targetTable, "Contents")
    counter = 0
    print("Table Values")
    print("---START---")
    for row in results:
        counter=counter+1
        print(counter, row)
    print("---END---")

# Define the target table
# This table will be where the cleaned data would be saved to 
# and manipulated for future operations
targetTable = "TestLog_cleaned"

# Inspect target table
%memit list_tableContents(targetTable)

In [None]:
# Function to list contents in target table
def list_tableColDtype(targetTable):
    cursor.execute(f"SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.columns where TABLE_NAME = '{targetTable}'")
    results = cursor.fetchall()
    print(targetTable, "Contents")
    counter = 0
    print("Table Column Data Type")
    print("---START---")
    for row in results:
        counter=counter+1
        print(counter, row)
    print("---END---")

# Inspect target table
# This will get the firs
targetTable = "TestLog_cleaned"
%memit list_tableColDtype(targetTable)

In [None]:
# Function to Create a dataframe with dummy data for testing purposes
def createDummyDataDF():
    dummyData = {
                 'ALARMID': [1, 2, 3],
                 'ENVIRONMENT': ['Alpha', 'Bravo', 'Charlie'],
                 'VALUE': [1, 2, 3],
                 'ACKREQUIRED': [1, 2, 3],
                 'SEVERITY': [1, 2, 3],
                 'EQUIPMENTCLASS': [1, 2, 3],
                 'FUNCTIONALCAT': [1, 2, 3],
                 'GEOGRAPHICALCAT': [1, 2, 3],
                 'DATEANDTIME': [1644223829, 1644310229, 1644396629],
                 'EQUIPMENTNAME': ['Alpha', 'Bravo', 'Charlie'],
                 'ASSETNAME': ['Alpha', 'Bravo', 'Charlie'],
                 'MESSAGE': ['Alpha', 'Bravo', 'Charlie'],
                 'STATUS': ['Alpha', 'Bravo', 'Charlie'],
                 'GROUP1': [1, 2, 3],
                 'GROUP2': [1, 2, 3],
                 'FORMAT': [1, 2, 3],
                 'DSSEVENTTYPE': ['Alpha', 'Bravo', 'Charlie'],
                 'OPER': ['Alpha', 'Bravo', 'Charlie'],
                 'ASSET_DESC_CAT': ['Alpha', 'Bravo', 'Charlie'],
                 'EVENT_DESC_CAT': ['Alpha', 'Bravo', 'Charlie'],
                 'TrainID': [1, 2, 3],
                 'CarID': [1, 2, 3],
                 'ServiceID': [1, 2, 3],
                 'AssetClass': ['Alpha', 'Bravo', 'Charlie'],
                 'AssetSubClass': ['Alpha', 'Bravo', 'Charlie'] 
                }

    df = pd.DataFrame(dummyData)
    df['DATEANDTIME'] = pd.to_datetime(df['DATEANDTIME'], unit='s')
    df['DATE'] = df['DATEANDTIME'].dt.date
    df['DATE'] = pd.to_datetime(df['DATE'])
    df['TIME'] = df['DATEANDTIME'].dt.time
    # To get time in seconds resolution if it comes in higher resolutions
    # Not required
    #df['TIME_S'] = df['DATEANDTIME'].dt.floor("s").dt.time

    return df

# Generate test dataframe in testing mode
if (testMode == True):
    # Create a dummy dataframe for testing purposes
    %memit testDF = createDummyDataDF()
    print("Dataframe columns would have the same data type as SQL table values")
    testDF.info()
else:
    print("Test script skipped")

In [None]:
# Inspect test dataframe
if (testMode == True):
    # Inspect Data
    print(testDF.head())
else:
    print("Test script skipped")

In [None]:
# Function to append data to database as an entire dataframe
def appendData(tableName, inputDF, serverParams):    
    # Create connection engine 
    # Default connection function only works for SQLite
    engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect={}".format(serverParams))
    # Append data
    inputDF.to_sql(tableName, con=engine, if_exists="append", index = False)

In [None]:
# Test data append function
if (testMode == True):
    # Test if append dataframe function works
    %memit appendData("TestLog_cleaned", testDF, serverParams)
    %memit list_tableContents("TestLog_cleaned")
else:
    print("Test script skipped")

In [None]:
# Function to delete last n rows in table sorted by datetime (oldest first)
def delDataNRow(targetTable, nRow=3):
    cursor.execute(f"WITH CTE AS (SELECT TOP {nRow} * FROM {targetTable} ORDER BY DATEANDTIME DESC) DELETE FROM CTE")
    conn.commit()
    
# Function to delete all rows in table
def delDataAll(targetTable):
    cursor.execute(f"DELETE FROM {targetTable}")
    conn.commit()

In [None]:
# Test data delete latest nRows function
if (testMode == True):
    %memit delDataNRow(targetTable, nRow=2)
    %memit list_tableContents(targetTable)
else:
    print("Test script skipped")

In [None]:
# Test data delete all function
if (resetTableData == True):
    %memit delDataAll(targetTable)
    %memit list_tableContents(targetTable)
else:
    print("Test script skipped")

In [None]:
# Delete redundant variables used in test
if (resetTableData == True):
    del testDF
else:
    print("Test script skipped")


## Loop to Process Log Files

### Define Dependent Functions to Process Files

In [None]:
# Define function to transfer files which has been processed successfully to an archive folder
def fileTransfer(file_name, dst_folder, src_folder=""):
    # If current folder is the source directory for the files, leave src_folder empty
    # check if file exist in destination
    if os.path.exists(dst_folder + file_name):
        # Split name and file type extension
        data = os.path.splitext(file_name)
        only_name = data[0]
        extension = data[1]
        # Adding the new name
        new_base = only_name + '_new' + extension
        # construct full file path
        new_name = os.path.join(dst_folder, new_base)
        # move file
        shutil.move(src_folder + file_name, new_name)
    else:
        shutil.move(src_folder + file_name, dst_folder + file_name)
    print(file_name, "Transferred to destination folder", dst_folder)

In [None]:
def fileIngestor(inputFile):
    
    # Note read_csv cannot be used due to some level of file corruption in the logs
    # Hence, a more complicated data ingestion method is required
    
    # Read in file
    file = open(inputFile, 'r')
    # Convert file contents to a list
    fileContents = list(file)
    # Close file
    file.close()

    # Load File Data as a Dataframe
    df = pd.DataFrame(fileContents,columns=['rawData'])

    # Drop non-relevant rows and reset index
    df = df.drop([0,1], axis=0).reset_index().drop(["index"], axis=1)

    # Define Header Names
    headerList_core = [
                    "ALARMID",
                    "ENVIRONMENT",
                    "VALUE",
                    "ACKREQUIRED",
                    "SEVERITY",
                    "EQUIPMENTCLASS",
                    "FUNCTIONALCAT",
                    "GEOGRAPHICALCAT",
                    "DATEANDTIME",
                    "EQUIPMENTNAME",
                    "ASSETNAME",
                    "MESSAGE",
                    "STATUS",
                    "GROUP1",
                    "GROUP2",
                    "FORMAT",
                    "DSSEVENTTYPE",
                    "OPER",
                    "UnknownVariable"
                ]
    df = df["rawData"].str.split(pat=";", n=18, expand=True).rename(columns={0: headerList_core[0],
                                                                              1: headerList_core[1],
                                                                              2: headerList_core[2],
                                                                              3: headerList_core[3],
                                                                              4: headerList_core[4],
                                                                              5: headerList_core[5],
                                                                              6: headerList_core[6],
                                                                              7: headerList_core[7],
                                                                              8: headerList_core[8],
                                                                              9: headerList_core[9],
                                                                             10: headerList_core[10],
                                                                             11: headerList_core[11],
                                                                             12: headerList_core[12],
                                                                             13: headerList_core[13],
                                                                             14: headerList_core[14],
                                                                             15: headerList_core[15],
                                                                             16: headerList_core[16],
                                                                             17: headerList_core[17],
                                                                             18: headerList_core[18]})
    del df["UnknownVariable"]
    
    # Return ingested file as a dataframe
    return df

In [None]:
def removeDataAnomalies(dfInput):
    # Clean up exception cases
    dfInput.loc[df["EQUIPMENTNAME"].str.contains("\n", na = False, regex = False), "EQUIPMENTNAME"] = ""
    dfInput["EQUIPMENTNAME"] = dfInput["EQUIPMENTNAME"].str.replace("", "", regex = False)
    dfInput["ASSETNAME"] = dfInput["ASSETNAME"].str.replace("", "", regex = False)
    dfInput["MESSAGE"] = dfInput["MESSAGE"].str.replace("", "", regex = False)
    dfInput["EQUIPMENTNAME"] = dfInput["EQUIPMENTNAME"].str.replace("?", "", regex = False)
    dfInput["ASSETNAME"] = dfInput["ASSETNAME"].str.replace("?", "", regex = False)
    dfInput["MESSAGE"] = dfInput["MESSAGE"].str.replace("?", "", regex = False)

    dfInput.loc[df["DATEANDTIME"] == "SKY", ["DATEANDTIME", "OPER"]] = None, "SKY"
    dfInput.loc[df["ACKREQUIRED"] == 1, "ACKREQUIRED"] = True
    dfInput.loc[df["ACKREQUIRED"] == 0, "ACKREQUIRED"] = False
    dfInput.loc[(df["OPER"] == "\x18\x1a\x11") | 
           (dfInput["OPER"] == "(^+") | 
           (dfInput["OPER"] == "\x18*V") | 
           (dfInput["OPER"] == "\x18\x1aM") |
           (dfInput["OPER"] == "\x18Z\x0c") |
           (dfInput["OPER"] == "\x18JI") |
           (dfInput["OPER"] == "\x18\n") |
           (dfInput["OPER"] == "\x18j\x14") |
           (dfInput["OPER"] == "\x18:H"), "OPER"] = None

    # Delete corrupted rows
    dfInput = dfInput.drop(dfInput[dfInput["ENVIRONMENT"].str.contains("TRACTION", na = False, regex = False) | 
                    (dfInput["ENVIRONMENT"] == "Executed")].index)
    dfInput = dfInput.drop(dfInput[(dfInput["ENVIRONMENT"] == "\n")].index)

    # Clean up partially corrupted rows
    dfInput.loc[(dfInput["GROUP1"] == "NORMAL"),
          ["EQUIPMENTNAME",
           "ASSETNAME",
           "MESSAGE",
           "STATUS",
           "GROUP1",
           "GROUP2",
           "FORMAT",
           "DSSEVENTTYPE" 
          ]] = dfInput.iloc [::, 9:17].shift(periods=-1, axis="columns")
    dfInput.loc[(df["GROUP1"] == "REQUESTED") & 
           (dfInput["MESSAGE"].str.contains("TRACTION", na = False, regex = False)),
          "EQUIPMENTNAME"] = dfInput["EQUIPMENTNAME"] + dfInput["ASSETNAME"]
    dfInput.loc[(dfInput["GROUP1"] == "REQUESTED") & 
           (dfInput["MESSAGE"].str.contains("TRACTION", na = False, regex = False)),
          ["ASSETNAME", 
           "MESSAGE",
           "STATUS",
           "GROUP1",
           "GROUP2",
           "FORMAT",
           "DSSEVENTTYPE" 
          ]] = dfInput.iloc [::, 10:17].shift(periods=-1, axis="columns")

    # Remove trailing non-printable characters
    dfInput["ALARMID"] = dfInput["ALARMID"].str.strip()
    dfInput["EQUIPMENTNAME"] = dfInput["EQUIPMENTNAME"].str.strip()
    dfInput["MESSAGE"] = dfInput["MESSAGE"].str.strip()
    dfInput["ASSETNAME"] = dfInput["ASSETNAME"].str.strip()
    
    # Return dataframe with data anomalies removed
    return dfInput

In [None]:
def cleanLocNames(dfInput):
    
    locNamesList = {
                    'NED', #001
                    'FRP', #002
                    'SKG', #003
                    'HGN', #004
                    'KVN', #005
                    'SER', #006
                    'HBF', #007
                    'DBG', #008
                    'OTP', #009
                    'CNT', #010
                    'LTI', #011
                    'CQY', #012
                    'BGK', #013
                    'OCC', #014
                    'WLH', #015
                    'PTP', #016
                    'BNK', #017
                    'PGL', #018
                    'TUNNEL', #019
                    'Sector', #020
                    'Concourse', #021
                    'Mezzaninne', #022
                    'Mid-Landing Entrance', #023
                    'AL', #024
                    'Dirty Area', #025
                    'IAP', #026
                    '1st Storey', #027
                    '2nd Storey', #028
                    '3rd Storey', #029
                    'B1', #030
                    'B2', #031
                    'B3', #032
                    'Entrance', #033
                    'Mid Landing', #034
                    'Mid-Landing', #035
                    'Subway', #036
                    'Underpass Link', #037
                    "Underpass To EXT'G  STN", #038
                    "1st", #039 NEW UPDATE
                    "2nd", #040 NEW UPDATE
                    "SUBLOCATIONN", #041 NEW UPDATE
                    "SUBLOCATIONS", #042 NEW UPDATE
                    "North End", #043 NEW UPDATE
                    "South End", #044 NEW UPDATE
                    "South Adjacent", #045 NEW UPDATE
                    "North Adjacent", #046 NEW UPDATE
                    "Mezzanine", #047 NEW UPDATE
                    "Linkway", #048 NEW UPDATE
                    "Smoke Free Lobby", #049 NEW UPDATE
                    "Storey", #050 NEW UPDATE
                    "Underpass to EXT'G STN", #051 NEW UPDATE
                    "-SUBLOCATION", #052 NEW UPDATE
                    "SUBLOCATION-" #053 NEW UPDATE

                    }


    locNamesVal = [
                '', #001
                '', #002
                '', #003
                '', #004
                '', #005
                '', #006
                '', #007
                '', #008
                '', #009
                '', #010
                '', #011
                '', #012
                '', #013
                '', #014
                '', #015
                '', #016
                '', #017
                '', #018
                '', #019
                '', #020
                'SUBLOCATION', #021
                'SUBLOCATION', #022
                'SUBLOCATION', #023
                'SUBLOCATION', #024
                'SUBLOCATION', #025
                'SUBLOCATION', #026
                'SUBLOCATION', #027
                'SUBLOCATION', #028
                'SUBLOCATION', #029
                '', #030
                '', #031
                '', #032
                'SUBLOCATION', #033
                '', #034
                'SUBLOCATION', #035
                'SUBLOCATION', #036
                'SUBLOCATION', #037
                'SUBLOCATION', #038
                "", #039 NEW UPDATE
                "", #040 NEW UPDATE
                "SUBLOCATION", #041 NEW UPDATE
                "SUBLOCATION", #042 NEW UPDATE
                "", #043 NEW UPDATE
                "", #044 NEW UPDATE
                "", #045 NEW UPDATE
                "", #046 NEW UPDATE
                "SUBLOCATION", #047 NEW UPDATE
                "SUBLOCATION", #048 NEW UPDATE
                "SUBLOCATION", #049 NEW UPDATE
                "SUBLOCATION", #050 NEW UPDATE
                "SUBLOCATION", #051 NEW UPDATE
                "", #052 NEW UPDATE
                "" #053 NEW UPDATE
                ]

    # Create fields for "ASSET_DESC_CAT" and "EVENT_DESC_CAT" # NEW UPDATE FIX
    dfInput[["ASSET_DESC_CAT", "EVENT_DESC_CAT"]] = dfInput["MESSAGE"].str.split(pat = ": ", expand=True, n = 1)   
    dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].str.strip() # Remove leading and trailing whitespaces
    dfInput["EVENT_DESC_CAT"] = dfInput["EVENT_DESC_CAT"].str.strip() # Remove leading and trailing whitespaces
    try:
        dfInput.loc[df["EVENT_DESC_CAT"].isna(), "EVENT_DESC_CAT"] = dfInput["ASSET_DESC_CAT"]
        dfInput.loc[df["EVENT_DESC_CAT"] == dfInput["ASSET_DESC_CAT"], "ASSET_DESC_CAT"] = np.nan
    except:
        pass

    # Remove Location Names    
    try: # Error catch if the entire column is empty
        #df= df.replace({"ASSET_DESC_CAT": locNames}, regex = True) # not compatible with modin; slower than list method
        dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].replace(regex = locNamesList, value = locNamesVal)
    except:
        pass


    ###############################################################
    # Get Asset Description Category (Remove Numbers)
    # Remove Numbers
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace(r'\d+', '', regex = True)

    # Account for exceptions to interpolate ASSET_DESC_CAT based on ASSET_DESCRIPTION
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace(" kV", "22 kV", regex = False) # NEW UPDATE FIX
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace("at KV SW", "at 22 kV SW", regex = False) # NEW UPDATE FIX
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace("DC  V", "DC 1500 V", regex = False) # NEW UPDATE FIX

    ###############################################################
    # Get Asset Description Category (Remove Redundant White Spaces)
    # Remove redundant white spaces    
    dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].str.strip().str.replace(r'\s+', ' ', regex = True)  

    ###############################################################
    # Get Asset Description Category (Account for Misc Exceptions)

    # Account for exceptions # NEW UPDATE
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace('SUBLOCATION SUBLOCATION', 'SUBLOCATION', regex = False)
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace('( ', '(', regex = False)
    dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].str.replace(r'\A(: )','', regex = True)
    dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].str.replace('Cameras','Camera', case = False, regex = False)

    try:
        dfInput.loc[df["MESSAGE"].str.contains("CCTV Controller Power Supply", na = False, regex = False), "ASSET_DESC_CAT"] = "CCTV Controller Power Supply"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("CBN Access Multiplexer", na = False, regex = False), "ASSET_DESC_CAT"] = "CBN Access Multiplexer"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("CI Gas Panel", case = False, na = False, regex = False), "ASSET_DESC_CAT"] = "CI Gas Panel"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("RI Gas Panel", case = False, na = False, regex = False), "ASSET_DESC_CAT"] = "CI Gas Panel"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("CROSS-CONNECT ACCESS Multiplexer", na = False, regex = False), "ASSET_DESC_CAT"] = "CROSS-CONNECT ACCESS Multiplexer"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Electrically Supervised Valve", na = False, regex = False), "ASSET_DESC_CAT"] = "Electrically Supervised Valve"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Hosereel Pump", na = False, regex = False), "ASSET_DESC_CAT"] = "Hosereel Pump"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Level Fire Shutter", na = False, regex = False), "ASSET_DESC_CAT"] = "Level Fire Shutter"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Level Roller Shutter", na = False, regex = False), "ASSET_DESC_CAT"] = "Level Roller Shutter"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Main Fire Alarm Panel", na = False, regex = False), "ASSET_DESC_CAT"] = "Main Fire Alarm Panel"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Traffic Direction", case = False, na = False, regex = False), "ASSET_DESC_CAT"] = "Traffic Direction"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Tunnel LTG Ctrl Panel", case = False, na = False, regex = False), "ASSET_DESC_CAT"] = "Tunnel LTG Ctrl Panel"
    except:
        pass
    try:
        dfInput.loc[df["ASSET_DESC_CAT"].str.contains("Zone -", case = False, na = False, regex = False), "ASSET_DESC_CAT"] = "ZONE SUBLOCATION"
    except:
        pass


    # Remove additional locations
    dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].str.split(" at ", n = 1, expand = True)[0]
    dfInput["ASSET_DESC_CAT"] = dfInput["ASSET_DESC_CAT"].str.split(" for ", n = 1, expand = True)[0]
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace('SUBLOCATION-SUBLOCATION', 'SUBLOCATION', regex = False) # New Update
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace('SUBLOCATION-', 'SUBLOCATION', regex = False) # New Update
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace('-SUBLOCATION', 'SUBLOCATION', regex = False) # New Update
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace(r'( at)$', '', case = False, regex = True) # New Update
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace(r'( for)$', '', case = False, regex = True) # New Update
    dfInput['ASSET_DESC_CAT'] = dfInput['ASSET_DESC_CAT'].str.replace(r'^(:)', '', regex = True) # New Update
    
    # Return dataframe with location names cleaned
    return dfInput

In [None]:
def cleanEventDesc(dfInput): 
    ###############################################################

    # Create "EVENT_DESCRIPTION" field
    #df["EVENT_DESC_CAT"] = df["MESSAGE"].copy()  

    # Remove Location Names      
    try: # Error catch if the entire column is empty
        #df = df.replace({"EVENT_DESC_CAT": locNames}, regex = True) # Does not work with Modin; slower than list method
        dfInput["EVENT_DESC_CAT"] = dfInput["EVENT_DESC_CAT"].replace(regex = locNamesList, value = locNamesVal)
    except:
        pass


    ###############################################################
    # Get Event Description Category (Remove Numbers)
    # Remove Numbers
    dfInput['EVENT_DESC_CAT'] = dfInput['EVENT_DESC_CAT'].str.replace(r'\d+', '', regex = True)

    # Fix entries which still need to preserve number info
    dfInput['EVENT_DESC_CAT'] = dfInput['EVENT_DESC_CAT'].str.replace(" kV", "22 kV", regex = False) # NEW UPDATE FIX
    dfInput['EVENT_DESC_CAT'] = dfInput['EVENT_DESC_CAT'].str.replace("at KV SW", "at 22 kV SW", regex = False) # NEW UPDATE FIX
    dfInput['EVENT_DESC_CAT'] = dfInput['EVENT_DESC_CAT'].str.replace("DC  V", "DC 1500 V", regex = False) # NEW UPDATE FIX

    try: # New Update
        dfInput.loc[dfInput["EVENT_DESC_CAT"] == "MP  VDC Status", "EVENT_DESC_CAT"] = "MP 1500 VDC Status"
    except:
        pass

    ###############################################################
    # Get Event Description Category (Remove Redundant White Spaces)
    # Remove redundant white spaces    
    dfInput["EVENT_DESC_CAT"] = dfInput["EVENT_DESC_CAT"].str.strip().str.replace(r'\s+', ' ', regex = True)
    
    ###############################################################
    # Get Event Description Category (Account for Misc Exceptions)
    # Account for Exceptions
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("logged", na = False, regex = False)) & 
               (df['EVENT_DESC_CAT'].str.contains("Operator", na = False, regex = False)) &
               (df['EVENT_DESC_CAT'].str.contains("NelVisu", na = False, regex = False)),
               "EVENT_DESC_CAT"] = "Operator Logged In/Out of NelVisu"
    except:
        pass


    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' /, /...', '', regex = False)
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'__:', '', regex = False) 
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'_:', '', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'([.]+){2}', '', regex = True) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'(_+){2}', '', regex = True) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'::', '', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' : ', ': ', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'@n', '', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' ,', ',', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'< >', '', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'-:', ':', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'^(:)', '', regex = True) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'( -)\S', ' - ', regex = True) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' )', ')', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'()', '', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' ump Rm', ' Pump Rm', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' latform', ' Platform', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r'( )+', ' ', regex = True) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace('SUBLOCATION-SUBLOCATION', 'SUBLOCATION', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace('SUBLOCATION-', 'SUBLOCATION', regex = False) # New Update
    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace('-SUBLOCATION', 'SUBLOCATION', regex = False) # New Update
    #df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace()

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains(" at ", na = False, regex = False)) & 
               (df['EVENT_DESC_CAT'].str.contains(": ", na = False, regex = False)), "EVENT_DESC_CAT"] = df['EVENT_DESC_CAT'].str.split("at", 1, expand = True)[0] + ": " + df['EVENT_DESC_CAT'].str.split(": ", 1, expand = True)[1]
        df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.replace(r' : ', ': ', regex = False) # New Update
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Gws", na = False, regex = False)) & 
               (df['EVENT_DESC_CAT'].str.contains("msg in", na = False, regex = False)), "EVENT_DESC_CAT"] = df['EVENT_DESC_CAT'].str.split("msg in", 1, expand = True)[0] + "msg in SUBLOCATION"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Gws", na = False, regex = False)) & 
               (df['EVENT_DESC_CAT'].str.contains("bcast in", na = False, regex = False)), "EVENT_DESC_CAT"] = df['EVENT_DESC_CAT'].str.split("bcast in", 1, expand = True)[0] + "bcast in SUBLOCATION"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Train", na = False, regex = False)) & 
               (df['EVENT_DESC_CAT'].str.contains("Car", na = False, regex = False)) &
               (df['EVENT_DESC_CAT'].str.contains("assigned", na = False, regex = False)) &
               (df['EVENT_DESC_CAT'].str.contains("Manoeuvre", na = False, regex = False)), "EVENT_DESC_CAT"] = "Manoeuvre assigned to Train Car"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Train", na = False, regex = False)) & 
               (df['EVENT_DESC_CAT'].str.contains("Car", na = False, regex = False)) &
               (df['EVENT_DESC_CAT'].str.contains("abandoned", na = False, regex = False)) &
               (df['EVENT_DESC_CAT'].str.contains("Manoeuvre", na = False, regex = False)), "EVENT_DESC_CAT"] = "Manoeuvre abandoned by Train Car"
    except:
        pass

    # New Update
    try:
        df.loc[(df['MESSAGE'].str.contains("Display of Free-Text", na = False, regex = False)), "EVENT_DESC_CAT"] = "Display of Free-Text"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("DVA version mismatch", na = False, regex = False)), "EVENT_DESC_CAT"] = "DVA version mismatch"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Automatic hand-over", na = False, regex = False)), "EVENT_DESC_CAT"] = "Automatic hand-over"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Automatic Hold Applied", na = False, regex = False)), "EVENT_DESC_CAT"] = "Automatic Hold Applied"
    except:
        pass

    # New Update
    try:
        df.loc[(df['EVENT_DESC_CAT'].str.contains("Communication between", na = False, regex = False)), "EVENT_DESC_CAT"] = "Communication between Nodes"
    except:
        pass

    df['EVENT_DESC_CAT'] = df['EVENT_DESC_CAT'].str.strip() # New Update
    
    # Return dataframe Event Description Cleaned
    return dfInput

In [None]:
def extractTrainInfo(dfInput):
    # Extract Train Information
    # Get Train ID (# NEW UPDATE)
    dfInput[["TrainID", "TrainID_temp"]] = dfInput["MESSAGE"].str.extract(r"TR___(\d+)|Train (\d+)[ :]") 
    try:
        dfInput.loc[dfInput["TrainID"].isna(), "TrainID"] =  dfInput["TrainID_temp"]
    except:
        pass

    # Get CarID and ServiceID (NEW UPDATE)
    dfInput[["CarID", "ServiceID", "CarID_temp"]] = dfInput["MESSAGE"].str.extract(r"cars (\d+)/(\d+)|Car (\d+)[ :]")
    try:
        dfInput.loc[df["CarID"].isna(), "CarID"] =  dfInputInput["CarID_temp"]
    except:
        pass
    
    # Delete redundant variables
    del dfInput["TrainID_temp"], dfInput["CarID_temp"] # NEW UPDATE
    
    # Return dataframe with train info extracted
    return dfInput

In [None]:
def extractAssetInfo(dfInput):
    # Extract Asset Information
    dfInput["AssetClass"] = dfInput["ASSETNAME"] 

    # Remove Location Names  
    try: # Error catch if entire column is empty
        #df = df.replace({"AssetClass": locNames}, regex = True) # Does not work with Modin; slower than list method
        dfInput["AssetClass"] = dfInput["AssetClass"].replace(regex = locNamesList, value = "")
    except:
        pass

    # Remove Numbers
    dfInput['AssetClass'] = dfInput['AssetClass'].str.replace(r'\d+', '', regex = True)

    # Remove Exceptions
    try:
        dfInput.loc[df['AssetClass'].str.contains("TRACTION", regex = False), 'AssetClass'] = "TRACTION/TRACTION"
    except:
        pass

    try:
        dfInput.loc[(df['AssetClass'].str.contains("TUNNEL", regex = False)) & 
           (dfInput['AssetClass'].str.contains("LIGHT", regex = False)), 'AssetClass'] = "TUNNEL/LIGHT"
    except:
        pass

    # Clean up string prior to delimiting
    dfInput['AssetClass'] = dfInput['AssetClass'].str.replace(r'\A(_)', '', regex = True)
    dfInput['AssetClass'] = dfInput['AssetClass'].str.replace(r'(_)\Z', '', regex = True)
    dfInput['AssetClass'] = dfInput['AssetClass'].str.replace(r'_+', '/', regex = True)

    # Get AssetSubClass
    dfInput['AssetSubClass'] = dfInput['AssetClass'].str.extract(r'/(\w+)$')

    # Get AssetClass
    dfInput['AssetClass'] = dfInput['AssetClass'].str.extract(r'(\w+)/')[0]

    # Return dataframe with asset info extracted
    return dfInput

In [None]:
def changeDType(dfInput):
    
    # Standardise non-finite values
    dfInput = dfInput.replace([np.inf, -np.inf], np.nan)
    
    # Standardise null values
    dfInput = dfInput.replace("", np.nan).fillna(value=np.nan) #NEW UPDATE
    
    # Fill in null values of key problematic columns with 0
    dfInput['FORMAT'] = dfInput['FORMAT'].fillna(0)
    dfInput['GROUP1'] = dfInput['GROUP1'].fillna(0)
    dfInput['GROUP2'] = dfInput['GROUP2'].fillna(0)
    dfInput['TrainID'] = dfInput['TrainID'].fillna(0)
    dfInput['CarID'] = dfInput['CarID'].fillna(0)
    dfInput['ServiceID'] = dfInput['ServiceID'].fillna(0)
    
    # Convert data format
    dfInput["ALARMID"] = dfInput["ALARMID"].astype("int64")
    dfInput["VALUE"] = dfInput["VALUE"].astype("int64")
    dfInput['DATEANDTIME'] = pd.to_datetime(df['DATEANDTIME'], dayfirst=True)
    dfInput["ACKREQUIRED"] = dfInput["ACKREQUIRED"].astype("int64")
    dfInput["SEVERITY"] = dfInput["SEVERITY"].astype("int64")
    dfInput["EQUIPMENTCLASS"] = dfInput["EQUIPMENTCLASS"].astype("int64")
    dfInput["FUNCTIONALCAT"] = dfInput["FUNCTIONALCAT"].astype("int64")
    dfInput["GEOGRAPHICALCAT"] = dfInput["GEOGRAPHICALCAT"].astype("int64")
    dfInput["GROUP1"] = dfInput["GROUP1"].astype("int64")
    dfInput["GROUP2"] = dfInput["GROUP2"].astype("int64")
    dfInput["FORMAT"] = dfInput["FORMAT"].astype("int64")
    dfInput["TrainID"] = dfInput["TrainID"].astype("int64")
    dfInput["CarID"] = dfInput["CarID"].astype("int64")
    dfInput["ServiceID"] = dfInput["ServiceID"].astype("int64")

    # Infer datatypes of columns
    dfInput = dfInput.infer_objects()
    
    # Extract out key date and time values for matching with dimension tables
    # Get date
    dfInput['DATE'] = dfInput['DATEANDTIME'].dt.date
    dfInput['DATE'] = pd.to_datetime(dfInput['DATE'])
    # Get time of day
    dfInput['TIME'] = dfInput['DATEANDTIME'].dt.time
    # Get time of day in seconds if it comes in higher resolutions
    # Not required
    #dfInput['TIME_S'] = dfInput['DATEANDTIME'].dt.floor("s").dt.time

    # Return dataframe with data type updated
    return dfInput

#### Test dataframe processing script

In [None]:
# Test dataframe processing script
if (testMode == True):
    # Get list of files to initialise
    fileList = os.listdir()
    
    # Get target file
    inputFile = fileList[0]
    
    # Ingest log files as a dataframe for further processing
    df = fileIngestor(inputFile)

    # Remove data anomalies
    df = removeDataAnomalies(df)

    # Clean up location names
    df = cleanLocNames(df)

    # Clean event description data
    df = cleanEventDesc(df)

    # Extract train info
    df = extractTrainInfo(df)

    # Extract asset info
    df = extractAssetInfo(df)

    # change data type of columns
    df = changeDType(df)
    
    # Inspect data
    df.info()
    
    # Inspect data
    print(df.head())
    
    # Delete data
    del df
        
else:
    print("Test script skipped")


### Process Data (Packaged Version)

In [None]:
# Get timestamp of operation start:
dateTimeStartOp = dt.datetime.now()
print("Start Operation")
print(dateTimeStartOp)

# Get list of files to initialise
fileList = os.listdir()

# Define Archive Folder Location
archiveFolderLoc = "../Logs_archive/"

# Delete or archived completed files
delFileCondition = False

while len(fileList) > 0:
    # Initiate batch run  
    print("Start batch run")
    # Get timestamp of batch run start:
    dateTimeStartBatch = dt.datetime.now()
    print(dateTimeStartBatch)
    
    for file in fileList:
        # Get Oldest File in Directory in a First in First Out (FIFO) manner as an input
        inputFile = min(fileList, key=os.path.getctime)
        
        #############################################
        # Process file as a cleaned dataframe - START
        #############################################
        
        # Get timestamp of run start:
        print("Commence processing of", inputFile)
        dateTimeStartRun = dt.datetime.now()
        print(dateTimeStartRun)
        
        # Ingest log files as a dataframe for further processing
        df = fileIngestor(inputFile)

        # Remove data anomalies
        df = removeDataAnomalies(df)

        # Clean up location names
        df = cleanLocNames(df)

        # Clean event description data
        df = cleanEventDesc(df)

        # Extract train info
        df = extractTrainInfo(df)

        # Extract asset info
        df = extractAssetInfo(df)

        # change data type of columns
        df = changeDType(df)
        
        #############################################
        # Process file as a cleaned dataframe - END
        #############################################
        
        # Append data to SQL DB
        appendData(targetTable, df, serverParams)
        
        if (delFileCondition == True):
            # Delete processed file
            os.remove(inputFile)
            print(inputFile, "deleted")
        else:
            # Archive processed file
            fileTransfer(inputFile, archiveFolderLoc)
            #print(inputFile, "archived")
        
        # Update list of files
        fileList = os.listdir()
        
        #Get timestamp of run end: 
        dateTimeEndRun = dt.datetime.now()
        print("Run Tlapsed Time")
        print(dateTimeEndRun - dateTimeStartRun)
        del dateTimeEndRun, dateTimeStartRun
        
    print("Batch run completed")
    # Get timestamp of batch run end:
    dateTimeEndBatch = dt.datetime.now()
    print(dateTimeEndBatch)
    print("Batch Elapsed Time")
    print(dateTimeEndBatch - dateTimeStartBatch)
    del dateTimeEndBatch, dateTimeStartBatch
    
    # When initial batch run is completed check again for newly added files
    # This is to account for any lag or latency in the file transfer
    # 10 second timer delay between batches
    time.sleep(10)    
    # Update list of fileshttp://localhost:8888/notebooks/Documents/SBST%20Train%20IAMS%20Project/scripts/HISevent_log_processing_and_storage_v1.1.ipynb#
    fileList = os.listdir()
    
    
print("Script Terminated")
dateTimeEndOp = dt.datetime.now()
print(dateTimeEndOp)
print("Operation Elapsed Time")
print(dateTimeEndOp - dateTimeStartOp)

In [None]:
# Close Connection to Database
conn.close()