In [1]:
#import all neccesary libraries
import pandas as pd
from functools import reduce
from datetime import timedelta
import numpy as np

Function declaration

In [None]:
# clean data 
def cleanData(df,substance):
    df = df.drop(columns=['historianTagnummer'])
    df.rename(columns={'hstWaarde': substance + 'Value'}, inplace=True)
    df[substance + 'Value'] = df[substance + 'Value'].astype(float)
    df = df.drop(columns=['datumBeginMeting'])
    df['datumEindeMeting'] = pd.to_datetime(df['datumEindeMeting'])
    df.rename(columns={'datumEindeMeting': 'measurementDate'}, inplace=True)
    df.to_parquet('../../data/cleanedData/'+ substance +'.parquet', index=False)
    pass

In [None]:
# day light savings
def shiftDates(df, name):
    duplicates = df[df.duplicated(subset='measurementDate', keep=False)].index.tolist()
    marchDate = df.loc[df['measurementDate'] == '2021-03-28 03:00:00']
    startTime = marchDate.index.values[0]
    endTime = duplicates[-1]

    for i in range(startTime, endTime + 1):
        if i < (endTime - len(duplicates)):
            df.loc[i, 'measurementDate'] -= timedelta(hours=1)
        elif not ((i % 2) == 0):
            df.loc[i, 'measurementDate'] -= timedelta(hours=1)
    
    df = df.sort_values(by='measurementDate')
    df.to_parquet('../../data/shiftedDates/'+ name +'.parquet', index=False)
    pass


In [None]:
# combine oxygen in both tanks 
def calculateOxygen(oxygenA, oxygenB):
    if pd.isna(oxygenA):
        oxygenA['oxygenAValue'].fillna(0)
    elif pd.isna(oxygenB):
        oxygenB['oxygenBValue'].fillna(0)
    return (oxygenA + oxygenB)/2

Load precipitaion and rename columns

In [2]:
precepitation = pd.read_csv('../../data/precipitation.csv')
precepitation.rename(columns={'time':'measurementDate','precipitation (mm)':'precipitation'},inplace=True)
precepitation.drop(columns=['rain (mm)'],inplace=True)
precepitation['measurementDate'] = pd.to_datetime(precepitation['measurementDate'])
precepitation.to_parquet('../../data/shiftedDates/precipitation.parquet', index=False)

Load all files that need to be cleaned

In [None]:
data_files = [
    ('../../data/Ammonium_measurements.parquet', 'ammonium'),
    ('../../data/Nitrate_measurements.parquet', 'nitrate'),
    ('../../data/Oxygen_A.parquet', 'oxygenA'),
    ('../../data/Oxygen_B.parquet', 'oxygenB'),
    ('../../data/Phosphate_measurements.parquet', 'phosphate')
]

for file_path, var_name in data_files:
    df = pd.read_parquet(file_path)
    cleanData(df, var_name)

LOad all files that need dayling saving shifting

In [None]:
cleaned_data = [
    ('../../data/cleanedData/ammonium.parquet', 'cleanedAmmonium'),
    ('../../data/cleanedData/nitrate.parquet', 'cleanedNitrate'),
    ('../../data/cleanedData/oxygenA.parquet', 'cleanedOxygenA'),
    ('../../data/cleanedData/oxygenB.parquet', 'cleanedOxygenB'),
]

for path_file, name in cleaned_data:
    df = pd.read_parquet(path_file)
    shiftDates(df, name)

Merge oxgenA and B datesets together

In [None]:
oxygenA = pd.read_parquet('../../data/shiftedDates/cleanedOxygenA.parquet')
oxygenB = pd.read_parquet('../../data/shiftedDates/cleanedOxygenB.parquet')

oxygenAB = pd.merge(oxygenA, oxygenB, how="right")
oxygenAB['oxygenAValue'] = oxygenAB['oxygenAValue'].fillna(0)
oxygenAB['oxygenBValue'] = oxygenAB['oxygenBValue'].fillna(0)
oxygenAB["oxygenValue"] = oxygenAB.apply(lambda col: calculateOxygen(col["oxygenAValue"], col["oxygenBValue"]), axis=1)
oxygenAB.drop(["oxygenAValue", "oxygenBValue"], axis=1, inplace=True)
oxygenAB.to_parquet('../../data/shiftedDates/oxygenAB.parquet')

Cleaning total influent waterflow dataset and prepare it for shifting as it requires special care

In [None]:
total = pd.read_csv('../../data/Total_influent_flow_WWTP_Ede_2021_minute_data.csv', sep=';')
total['DateTime'] = pd.to_datetime(total['DateTime'], format='%d-%m-%Y %H:%M')
total.drop(columns=['wwResolution'],inplace=True)
total.rename(columns={'EDE_09902MTW_K100.MTW':'waterFlowPerMinute','DateTime':'measurementDate'}, inplace=True)

total['waterFlowPerMinute'] = total['waterFlowPerMinute'].str.replace(',','.').replace('(null)', np.nan)
total['waterFlowPerMinute'] = total['waterFlowPerMinute'].astype('float')
total = total.sort_values(by='measurementDate')
total = total.reset_index(drop=True)
shiftDates(total,'cleanedTotal')

Save the cleaned total influent

In [None]:
total = pd.read_parquet('../../data/shiftedDates/cleanedTotal.parquet')
total = total.drop(index=436437)
total.to_parquet('../../data/shiftedDates/cleanedTotal.parquet')

Load all files that need to be merged, merge them and save final dataset

In [None]:
file_paths = [
              '../../data/shiftedDates/oxygenAB.parquet',
              '../../data/shiftedDates/cleanedNitrate.parquet',
              '../../data/cleanedData/phosphate.parquet',
              '../../data/shiftedDates/cleanedAmmonium.parquet',
              '../../data/shiftedDates/cleanedTotal.parquet',
              '../../data/shiftedDates/precipitation.parquet'
              ]

dfs = [pd.read_parquet(path) for path in file_paths]
merged = reduce(lambda left, right: pd.merge(left, right, how='outer', on='measurementDate'), dfs)
merged.to_parquet('../../data/cleanedData/allDataClient.parquet')

Encode phosphate with obvious outlier, imput precipitaion data, save it 

In [None]:
df = pd.read_parquet('../../data/cleanedData/allDataClient.parquet')
df['phosphateValue'] = df['phosphateValue'].map(lambda phosphate: -999 if pd.isnull(phosphate) else phosphate)
precepitationIndex=0
for row in df['precipitation']:
    if not pd.isnull(row):
        valuePerMinute = row/60
        if precepitationIndex < 60:
            df['precipitation'][:precepitationIndex] = valuePerMinute
        else:
            df['precipitation'][precepitationIndex-60:precepitationIndex] = valuePerMinute
    precepitationIndex+=1
df = df.dropna()


df.to_parquet('../../data/cleanedData/allDataClient.parquet')

In [None]:
#convert units of mesurement
df['waterFlowPerMinute'] = df['waterFlowPerMinute'].apply(lambda row: row *1000) 
df.to_parquet('../../data/cleanedData/allDataClient.parquet')

Get info about the final training dataset

In [None]:
df.describe()