In [None]:
# Environment setup
import pandas as pd
import os
import glob
import datetime
import sqlalchemy as db
import shutil

In [None]:
# General settings
## File & path parameters
source_path = 'logs' # Path to look into
sink_path = 'processed logs'
extensions = ['*.log'] # Extensions to look for

## DB parameters
user = 'your_user'
passwd = 'your_password'
host = 'localhost'
port = '5432'
db_name = 'your_db'
table_name = 'your_table'

In [None]:
def get_files_in_dir(path, extension):
    files = []
    for extension in extensions:

        # Join path
        files_path = os.path.join(path, extension)

        # Extract files
        for file in glob.glob(files_path):
            files.append(file)

    return files

In [None]:
def transform_data(file):
    # Data loading
    ## Read file and extract lines
    with open(file) as f:
        lines = f.readlines()

    # Data Processing
    ## Create empty df
    df = pd.DataFrame(columns = ['measurement_datetime', 'measured_temperature'])
    
    transf_succesful = False # Checks if tranforma
    try:
        ## Extract info and add it in dataframe
        for i, line in enumerate(lines):

            try:
                # Split info from lines
                line_splitted = line.split(' ')

                # Extract info from lines
                date = line_splitted[2]
                time = line_splitted[3]
                datetime = date + ' ' + time
                temperature = line_splitted[4].replace('\n', '')

                # Save in dataframe
                df.loc[i] = [datetime, temperature]

            except Exception as e:
                pass


        ## Convert to datetime
        df['measurement_datetime'] = pd.to_datetime(df['measurement_datetime'], errors='coerce')

        ## Convert to float
        df['measured_temperature'] = pd.to_numeric(df['measured_temperature'], errors='coerce')

        ## Drop rows which empty datetime or temperature
        mask = df['measurement_datetime'].isna() | df['measured_temperature'].isna()
        df = df[~mask]

        # Data validation
        column_dtypes = {    
            'measurement_datetime' : 'datetime64[ns]',
            'measured_temperature' : 'float64'
        }
        
        transf_succesful = True
        for column, dtype in column_dtypes.items():
            if not df[column].dtype == dtype:
                transf_succesful = False
                #raise Exception(f'Mismatching data type in column "{column}". Expected: {dtype} | Got: {df[column].dtype}') 
            
    except Exception as e:
        transf_succesful = False
    
    return transf_succesful, df

In [None]:
def save_to_sql(df):
    # Helper functions
    def get_engine(user, passwd, host, port, db_name, echo=False):    
        url = f"postgresql://{user}:{passwd}@{host}:{port}/{db_name}"       
        engine = db.create_engine(url, echo=echo)
        return engine

    
    # Get engine, connection and metadata
    engine = get_engine(user, passwd, host, port, db_name)
    
    # Save to SQL
    saved_succesfully = False
    try:
        df.to_sql(table_name, con=engine, if_exists='append', index=False)
        saved_succesfully = True
        
    except Exception as e:
        pass
    
    return saved_succesfully

In [None]:
if __name__ == '__main__':
    
    # Get files in directory
    files = get_files_in_dir(source_path, extensions)
    
    for file in files:
        print(f'File being proceed: {file}')
        
        # ETL file
        transf_succesful, df = transform_data(file)
        
        # Save the processed data and move the original file
        if transf_succesful:
            # Save to SQL
            saved_succesfully = save_to_sql(df)

            if saved_succesfully:   
                # Move correctly transfomed file to "sink_path"
                shutil.move(file, sink_path)