In [None]:
# Notebook Summary:

# V.History: 
# Date Last Modified: 14 May 2025

# EA has provided one file per year of WQ observations, from 2000 to 2024, totalling 24 files.
# This Notebook reads all 24 years of WQ (EA's open-sourced data) and
# combines them into one dataset per five years.
# This approach avoids GCP's memory issues when processing large files if all 24 files are read at once.

# Output File Locations of this notebook
# 1) gcs://rdmai_data/cleansed/02_WQEA_2000_2004_Raw_New.csv
# 2) gcs://rdmai_data/cleansed/02_WQEA_2005_2009_Raw_New.csv
# 3) gcs://rdmai_data/cleansed/02_WQEA_2010_2014_Raw_New.csv
# 4) gcs://rdmai_data/cleansed/02_WQEA_2015_2019_Raw_New.csv
# 5) gcs://rdmai_data/cleansed/02_WQEA_2020_2024_Raw_New.csv

#Pre-Requisite : 
    #Kernel Python 3 (ipykernel) is required to run this notebook 
    #Required python version - Python 3.10.15 and its compatible Numpy , ScikitLearn libraries
    
#Old Name: 02_NB_EDA_pre_procs.ipynb

In [2]:
#Check python version compatibility 3.10 or above is required
!python -V
python_version=!(python --version 2>&1)
print (python_version)

Python 3.12.3
['Python 3.12.3']


In [3]:
#Begin CARD
print("Begin Card")

#User-Defined functions

#To display the server time
def showtime():
    import time
    
    t = time.localtime()
    current_time = time.strftime("%H:%M:%S", t)
    print(current_time)
    return()

#To save the output as CSV files from a Pandas Dataframe
def savedata(tDF, tname) :
    tpath = 'gcs://rdmai_data/'
    tclensed = 'cleansed/'
    tDF.to_csv(tpath+tclensed+tname)
    return()


Begin Card


In [4]:
showtime()

#Library Declaration section - Installing or Initiating all required Python Libraries

#Dataset Maths and OS
#pip install numpy==1.24.3
#pip install seaborn

import pandas as pd
import numpy as np
import os

#Graphs
import matplotlib.pyplot as plt
%matplotlib inline
from matplotlib import pyplot as plt
import matplotlib.pyplot as plt
import seaborn as sns

pd.set_option("display.max_rows", None) 

#Library Declaration section 1
print("welcome")

#Constants declaration for the folder path for the local notebook path
path = 'DataEAOLD/'
raw = 'Raw/'
curated = 'Curated/'
cleansed = 'Cleansed/'

#Constants declaration for the folder path for files stored under Google Cloud Storage 
path = 'gcs://rdmai_data/'
raw = 'raw/'
curated = 'curated/'
cleansed = 'cleansed/'

#Library Declaration section 3
#building the Auto Arima model
#import pmdarima as aa

#Library Declaration section 4
import re
#from datetime_truncate import truncate
from functools import reduce
from datetime import datetime, timezone, timedelta
from datetime import datetime
from datetime import datetime as dt
pd.set_option('display.float_format', lambda x: '%.2f' % x)

#Library Declaration section 5
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import RFECV

#Library Declaration section 6
import sklearn
#print(sklearn.show_versions())

#Library Declaration section 7
#pip install imbalanced-learn --user
#pip install imblearn --user
#pip install -U threadpoolctl --user
#import imblearn
#print(imblearn.__version__)

#Library Declaration section 8
# Library Declarations - Model performance matrics 
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score

#Library Declaration section 9
# Display configurations
pd.set_option('display.max_columns', 400)
#pd.set_option('max_rows', None)

#Read input file
import glob

#import dask.dataframe as dd

import pandas

!pip install pyspark | grep -v 'already satisfied'

import pyspark

#Install findspark
!pip install findspark | grep -v 'already satisfied'

# Import findspark
import findspark
findspark.init()

#import pyspark
import pyspark
from pyspark.sql import SparkSession

print (path+raw)

09:37:35
welcome
Looking in indexes: https://europe-python.pkg.dev/artifact-registry-python-cache/virtual-python/simple/
Collecting pyspark
  Using cached pyspark-3.5.3-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading https://europe-python.pkg.dev/artifact-registry-python-cache/virtual-python/py4j/py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3
[0mLooking in indexes: https://europe-python.pkg.dev/artifact-registry-python-cache/virtual-python/simple/
Collecting findspark
  Downloading https://europe-python.pkg.dev/artifact-registry-python-cache/virtual-python/findspark/findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
[0mgcs://rdmai_data/raw/


In [7]:
# Function to load 24 years of WQ observations obtained from EA
def loaddata() :
    #Constants declaration for the folder path for files stored under Google Cloud Storage 
    path = 'gcs://rdmai_data/'
    raw = 'raw/'
    curated = 'curated/'
    cleansed = 'cleansed/'

    all_files = glob.glob(os.path.join(path + raw, "*.csv"))
    wqpath = 'Water_Quality_EA/'
    #wqpath = '' #Make this commented when reading from Google Cloud Storage

    print (path+raw+wqpath )

    #Read 1st Set from 2000 till 2004
    df1 = pd.DataFrame()
    csv_filenames1 = ['2000.csv', '2001.csv', '2002.csv', '2003.csv', '2004.csv']
    #showtime()
    df1 = pd.concat((pd.read_csv(path+raw+wqpath+f) for f in csv_filenames1), ignore_index=True)
    #showtime()

    #Read 1st Set from 2005 till 2009
    df2 = pd.DataFrame()
    csv_filenames2 = ['2005.csv', '2006.csv', '2007.csv', '2008.csv', '2009.csv']
    #showtime()
    df2 = pd.concat((pd.read_csv(path+raw+wqpath+f) for f in csv_filenames2), ignore_index=True)
    #showtime()

    #Read 3rd Set from 2010 till 2013
    df3 = pd.DataFrame()
    csv_filenames3 = ['2010.csv', '2011.csv', '2012.csv', '2013.csv', '2014.csv']

    #showtime()
    df3 = pd.concat((pd.read_csv(path+raw+wqpath+f) for f in csv_filenames3), ignore_index=True)
    #showtime()

    #Read 4th Set from 2014 till 2018
    df4 = pd.DataFrame()
    csv_filenames4 = ['2015.csv', '2016.csv', '2017.csv', '2018.csv', '2019.csv']

    #showtime()
    df4 = pd.concat((pd.read_csv(path+raw+wqpath+f) for f in csv_filenames4), ignore_index=True)
    #showtime()

    #Read 5th Set from 2019 till 2024
    df5 = pd.DataFrame()
    csv_filenames5 = ['2020.csv','2021.csv', '2022.csv', '2023.csv', '2024.csv']

    #showtime()
    df5 = pd.concat((pd.read_csv(path+raw+wqpath+f) for f in csv_filenames5), ignore_index=True)
    #showtime()
    
    retdf = pd.DataFrame()
    print ("Concat df1 to df5 - Begin: ")
    #showtime()
    retdf = pd.concat((df1, df2, df3, df4, df5), ignore_index=True)
    print ("Concat df1 to df5 - End: ")
    #showtime()
    
    #Clearing the memory used by the temporary datasets
    del(df1, df2, df3, df4, df5)
    print ("deleted temp Pandas datasets")
           
    return (retdf)


In [8]:
##################################################################################
# Following 5 steps are reading the whole 24 year data into 5 different datasets
# This steps was introduced to avoid high memory utilization and related failures
##################################################################################

In [10]:
#Main Read
#1) Combining Files from the year 2000 to 2004

#Constants declaration for the folder path for files stored under Google Cloud Storage 
path = 'gcs://rdmai_data/'
raw = 'raw/'
curated = 'curated/'
cleansed = 'cleansed/'

all_files = glob.glob(os.path.join(path + raw, "*.csv"))
wqpath = 'Water_Quality_EA/'
#wqpath = '' #Make this commented when reading from Google Cloud Storage

print (path+raw+wqpath )

#Read 1st Set from 2000 till 2004
csv_filenames1 = ['2000.csv', '2001.csv', '2002.csv', '2003.csv', '2004.csv']
#showtime()

df1 = pd.DataFrame()
df1 = pd.read_csv(path+raw+wqpath+'2000.csv')
print(len(df1))
#showtime()

df2 = pd.DataFrame()
df2 = pd.read_csv(path+raw+wqpath+'2001.csv')
print(len(df2))
#showtime()

df3 = pd.DataFrame()
df3 = pd.read_csv(path+raw+wqpath+'2002.csv')
print(len(df3))
#showtime()

df4 = pd.DataFrame()
df4 = pd.read_csv(path+raw+wqpath+'2003.csv')
print(len(df4))
#showtime()

df5 = pd.DataFrame()
df5 = pd.read_csv(path+raw+wqpath+'2004.csv')
print(len(df5))
#showtime()


df_2000_2004 = pd.concat((df1, df2, df3, df4, df5), ignore_index=True)
print(len(df_2000_2004))
#Clearing the memory used by the temporary datasets
del(df1, df2, df3, df4, df5)
print ("deleted temp Pandas datasets")


gcs://rdmai_data/raw/Water_Quality_EA/
4951327
3922008
4587444
4483915
4265352
22210046
deleted temp Pandas datasets


In [11]:
#######################################################
#1) Write five years of data at once 2000-2004
showtime()
savedata(df_2000_2004, "02_WQEA_2000_2004_Raw_New.csv")
showtime()
del(df_2000_2004)
#######################################################

07:41:10
07:45:35


()

In [13]:
#2) Combining Files from the year 2005 to 2009

#Constants declaration for the folder path for files stored under Google Cloud Storage 
path = 'gcs://rdmai_data/'
raw = 'raw/'
curated = 'curated/'
cleansed = 'cleansed/'

all_files = glob.glob(os.path.join(path + raw, "*.csv"))
wqpath = 'Water_Quality_EA/'
#wqpath = '' #Make this commented when reading from Google Cloud Storage

print (path+raw+wqpath )

#Read 1st Set from 2005 till 2009
csv_filenames2 = ['2005.csv', '2006.csv', '2007.csv', '2008.csv', '2009.csv']
#showtime()

df1 = pd.DataFrame()
df1 = pd.read_csv(path+raw+wqpath+'2005.csv')
print(len(df1))
#showtime()

df2 = pd.DataFrame()
df2 = pd.read_csv(path+raw+wqpath+'2006.csv')
print(len(df2))
#showtime()

df3 = pd.DataFrame()
df3 = pd.read_csv(path+raw+wqpath+'2007.csv')
print(len(df3))
#showtime()

df4 = pd.DataFrame()
df4 = pd.read_csv(path+raw+wqpath+'2008.csv')
print(len(df4))
#showtime()

df5 = pd.DataFrame()
df5 = pd.read_csv(path+raw+wqpath+'2009.csv')
print(len(df5))
#showtime()


df_2005_2009 = pd.concat((df1, df2, df3, df4, df5), ignore_index=True)
print(len(df_2005_2009))
#Clearing the memory used by the temporary datasets
del(df1, df2, df3, df4, df5)
print ("deleted temp Pandas datasets")


gcs://rdmai_data/raw/Water_Quality_EA/
4117693
4003373
3709967
3285780
2787597
17904410
deleted temp Pandas datasets


In [14]:
#######################################################
#2) Write five years of data at once 2005-2009
showtime()
savedata(df_2005_2009, "02_WQEA_2005_2009_Raw_New.csv")
showtime()
del(df_2005_2009)
#######################################################

08:00:33
08:04:04


In [15]:
#3) Combining Files from the year 2010 to 2014

#Constants declaration for the folder path for files stored under Google Cloud Storage 
path = 'gcs://rdmai_data/'
raw = 'raw/'
curated = 'curated/'
cleansed = 'cleansed/'

all_files = glob.glob(os.path.join(path + raw, "*.csv"))
wqpath = 'Water_Quality_EA/'
#wqpath = '' #Make this commented when reading from Google Cloud Storage

print (path+raw+wqpath )

#Read 1st Set from 2010 till 2014
csv_filenames3 = ['2010.csv', '2011.csv', '2012.csv', '2013.csv', '2014.csv']
#showtime()

df1 = pd.DataFrame()
df1 = pd.read_csv(path+raw+wqpath+'2010.csv')
print(len(df1))
#showtime()

df2 = pd.DataFrame()
df2 = pd.read_csv(path+raw+wqpath+'2011.csv')
print(len(df2))
#showtime()

df3 = pd.DataFrame()
df3 = pd.read_csv(path+raw+wqpath+'2012.csv')
print(len(df3))
#showtime()

df4 = pd.DataFrame()
df4 = pd.read_csv(path+raw+wqpath+'2013.csv')
print(len(df4))
#showtime()

df5 = pd.DataFrame()
df5 = pd.read_csv(path+raw+wqpath+'2014.csv')
print(len(df5))
#showtime()


df_2010_2014 = pd.concat((df1, df2, df3, df4, df5), ignore_index=True)
print(len(df_2010_2014))
#Clearing the memory used by the temporary datasets
del(df1, df2, df3, df4, df5)
print ("deleted temp Pandas datasets")


gcs://rdmai_data/raw/Water_Quality_EA/
2610447
2657107
2762712
2852883
2380052
13263201
deleted temp Pandas datasets


In [16]:
#######################################################
#3) Write five years of data at once 2010-2014
showtime()
savedata(df_2010_2014, "02_WQEA_2010_2014_Raw_New.csv")
showtime()
del(df_2010_2014)
#######################################################

08:10:54
08:13:40


In [17]:
#4) Combining Files from the year 2015 to 2019

#Constants declaration for the folder path for files stored under Google Cloud Storage 
path = 'gcs://rdmai_data/'
raw = 'raw/'
curated = 'curated/'
cleansed = 'cleansed/'

all_files = glob.glob(os.path.join(path + raw, "*.csv"))
wqpath = 'Water_Quality_EA/'
#wqpath = '' #Make this commented when reading from Google Cloud Storage

print (path+raw+wqpath )

#Read 1st Set from 2015 till 2019
csv_filenames4 = ['2015.csv', '2016.csv', '2017.csv', '2018.csv', '2019.csv']
#showtime()

df1 = pd.DataFrame()
df1 = pd.read_csv(path+raw+wqpath+'2015.csv')
print(len(df1))
#showtime()

df2 = pd.DataFrame()
df2 = pd.read_csv(path+raw+wqpath+'2016.csv')
print(len(df2))
#showtime()

df3 = pd.DataFrame()
df3 = pd.read_csv(path+raw+wqpath+'2017.csv')
print(len(df3))
#showtime()

df4 = pd.DataFrame()
df4 = pd.read_csv(path+raw+wqpath+'2018.csv')
print(len(df4))
#showtime()

df5 = pd.DataFrame()
df5 = pd.read_csv(path+raw+wqpath+'2019.csv')
print(len(df5))
#showtime()


df_2015_2019 = pd.concat((df1, df2, df3, df4, df5), ignore_index=True)
print(len(df_2015_2019))
#Clearing the memory used by the temporary datasets
del(df1, df2, df3, df4, df5)
print ("deleted temp Pandas datasets")


gcs://rdmai_data/raw/Water_Quality_EA/
2217068
2097613
1851074
1567779
1678148
9411682
deleted temp Pandas datasets


In [18]:
#######################################################
#4) Write five years of data at once 2015-2019
showtime()
savedata(df_2015_2019, "02_WQEA_2015_2019_Raw_New.csv")
showtime()
del(df_2015_2019)
#######################################################

08:18:12
08:20:17


In [19]:
#5) Combining Files from the year 2020 to 2024

#Constants declaration for the folder path for files stored under Google Cloud Storage 
path = 'gcs://rdmai_data/'
raw = 'raw/'
curated = 'curated/'
cleansed = 'cleansed/'

all_files = glob.glob(os.path.join(path + raw, "*.csv"))
wqpath = 'Water_Quality_EA/'
#wqpath = '' #Make this commented when reading from Google Cloud Storage

print (path+raw+wqpath )

#Read 1st Set from 2020 till 2024
csv_filenames5 = ['2020.csv', '2021.csv', '2022.csv', '2023.csv', '2024.csv']
#showtime()

df1 = pd.DataFrame()
df1 = pd.read_csv(path+raw+wqpath+'2020.csv')
print(len(df1))
#showtime()

df2 = pd.DataFrame()
df2 = pd.read_csv(path+raw+wqpath+'2021.csv')
print(len(df2))
#showtime()

df3 = pd.DataFrame()
df3 = pd.read_csv(path+raw+wqpath+'2022.csv')
print(len(df3))
#showtime()

df4 = pd.DataFrame()
df4 = pd.read_csv(path+raw+wqpath+'2023.csv')
print(len(df4))
#showtime()

df5 = pd.DataFrame()
df5 = pd.read_csv(path+raw+wqpath+'2024.csv')
print(len(df5))
#showtime()


df_2020_2024 = pd.concat((df1, df2, df3, df4, df5), ignore_index=True)
print(len(df_2020_2024))
#Clearing the memory used by the temporary datasets
del(df1, df2, df3, df4, df5)
print ("deleted temp Pandas datasets")


gcs://rdmai_data/raw/Water_Quality_EA/
668453
1229860
1513944
1726183
542881
5681321
deleted temp Pandas datasets


In [20]:
#######################################################
#5) Write five years of data at once 2020-2024
showtime()
savedata(df_2020_2024, "02_WQEA_2020_2024_Raw_New.csv")
showtime()
del(df_2020_2024)
#######################################################

08:32:39
08:33:50


In [None]:
#End CARD
#In line comments completed 09-May-2025