**Libraries Imported**

- `dask.dataframe` --> used to process large tabular data by parallelizing `pandas`, either on a regular laptop for larger-than-memory computing, or on a distributed cluster of computers.
- `pandas`: Using the `dataframe` data structure and functions handle, manipulate and analyze large datasets.

In [None]:
import dask.dataframe as pd
import pandas as pd

This step allows for mounting the Google Drive (when using the Google Colab environment), to gain access to a cloud drive.

In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


The `.csv` files being read from the `Preprocessing.ipynb` file, to then be merged to create the `final_df` dataframe.

In [None]:
csv_station = pd.read_csv('/content/drive/MyDrive/datascience/station_df.csv')
csv_pollutant = pd.read_csv('/content/drive/MyDrive/datascience/pollutant_df.csv')
csv_weather = pd.read_csv('/content/drive/MyDrive/datascience/weather_df.csv')
csv_geography = pd.read_csv('/content/drive/MyDrive/datascience/geography_df.csv')
csv_date = pd.read_csv('/content/drive/MyDrive/datascience/date_main.csv')

In [None]:
weather_station_df = pd.merge(csv_weather, csv_station, left_on=['station_id'], right_on=['ID'], how="inner")
weather_station_df = weather_station_df.drop(["Surrogate Keys_y","ID", 'Lat', 'Lon'],axis=1)

In [None]:
cutoff_date = pd.to_datetime('=-01-01') # 2020-01-01
csv_date['Date'] = pd.to_datetime(csv_date['Date'])
weather_station_df['Date'] = pd.to_datetime(weather_station_df['Date'])
date_weather_station_df = pd.merge(csv_date, weather_station_df, on="Date", how="inner")
date_weather_station_df = date_weather_station_df.drop(["Surrogate Keys_x"],axis=1)
# date_weather_station_df['Date'] = pd.to_datetime(date_weather_station_df['Date'])
date_weather_station_df =date_weather_station_df[date_weather_station_df['Date'] >= cutoff_date]
date_weather_station_df = date_weather_station_df.drop_duplicates()

# geography and pollutant dataframes handling
geography_pollutant_df = pd.merge( csv_pollutant, csv_date, on=['Date'], how="inner")
geography_pollutant_df = geography_pollutant_df.drop(["Unnamed: 0","Surrogate Keys_y",],axis=1)
geography_pollutant_df['Date'] = pd.to_datetime(geography_pollutant_df['Date'])
geography_pollutant_df = geography_pollutant_df[geography_pollutant_df['Date'] >= cutoff_date]
geography_pollutant_df = geography_pollutant_df.drop_duplicates()

# test = pd.merge(geography_pollutant_df, csv_date, on=['Date'], how="inner")

In [None]:
# The dataframes' `Date` column is converted to datetime format and the dataframes are filtered to only include data from 1980 onwards.
csv_pollutant['Date'] = pd.to_datetime(csv_pollutant['Date'])
csv_date['Date'] = pd.to_datetime(csv_date['Date'])
csv_weather['Date'] = pd.to_datetime(csv_weather['Date'])

# print(csv_pollutant.shape[0])
# test_df = pd.merge( csv_pollutant, csv_date, on=['Date'], how="inner")
# print(test_df.shape[0])
# print(test_df.head())

cutoff_date = pd.to_datetime('1980-01-01')

csv_pollutant = csv_pollutant[csv_pollutant['Date'] >= cutoff_date]
csv_weather = csv_weather[csv_weather['Date'] >= cutoff_date]
csv_date = csv_date[csv_date['Date'] >= cutoff_date]
csv_weather = csv_weather.drop_duplicates()
csv_pollutant = csv_pollutant.drop_duplicates()
csv_date = csv_date.drop_duplicates()

pollutant_date = pd.merge( csv_pollutant, csv_date, how="inner", on=['Date'])

pollutant_date = pollutant_date.drop_duplicates()


In [None]:
# pollutant_date_weather = pd.merge(pollutant_date, csv_weather, how="inner", on=['Date'])
# pollutant_date_weather = pollutant_date_weather.drop_duplicates()

In [None]:
# Merging of the `pollutant_date` and `csv_station` dataframes on the `State` and `City` columns
pollutant_date_station = pd.merge(pollutant_date, csv_station, left_on=['State',"City"], right_on=['State',"City"], how="inner")

# Merging of the `pollutant_date_station` and `csv_geography` dataframes on the `State` and `City` columns
pollutant_date_station_geo = pd.merge(pollutant_date_station, csv_geography, on=['State',"City"], how="inner")

  pollutant_date_station_geo = pd.merge(pollutant_date_station, csv_geography, on=['State',"City"], how="inner")


In [None]:
print(pollutant_date_station.shape[0])

1608382


In [None]:
print(pollutant_date_station.head(1608382))

         Unnamed: 0  Surrogate Keys_x       Date     State       City  \
0                20                21 1980-01-01     Texas     Dallas   
1               602               603 1980-01-02     Texas     Dallas   
2              1461              1462 1980-01-03     Texas     Dallas   
3              2016              2017 1980-01-04     Texas     Dallas   
4              2315              2316 1980-01-05     Texas     Dallas   
...             ...               ...        ...       ...        ...   
1608377    13041939          13041940 2022-05-27  Missouri  Bridgeton   
1608378    13042807          13042808 2022-05-28  Missouri  Bridgeton   
1608379    13043964          13043965 2022-05-29  Missouri  Bridgeton   
1608380    13045065          13045066 2022-05-30  Missouri  Bridgeton   
1608381    13045205          13045206 2022-05-31  Missouri  Bridgeton   

         NO2 Mean   CO Mean  SO2 Mean   O3 Mean  PM2.5 Mean  ...  month  year  \
0        0.000000  1.206250  0.002083  0.0

In [None]:
print(pollutant_date_station_geo.shape[0])

1467284


In [None]:
pollutant_date_station_geo.head(1467284)

Unnamed: 0.1,Unnamed: 0,Surrogate Keys_x,Date,State,City,NO2 Mean,CO Mean,SO2 Mean,O3 Mean,PM2.5 Mean,...,ID,Lat,Lon,Status,Scale Type,Surrogate Keys_y,Area Code,Population,Population Density,Timezone
0,20,21,1980-01-01,Texas,Dallas,0.000000,1.206250,0.002083,0.020117,0.015688,...,USW00003927,32.8978,-97.0189,Active,Middle,78,19100,5910669,1522.0,America/Chicago
1,602,603,1980-01-02,Texas,Dallas,0.030000,1.571667,0.001500,0.011064,0.015688,...,USW00003927,32.8978,-97.0189,Active,Middle,78,19100,5910669,1522.0,America/Chicago
2,1461,1462,1980-01-03,Texas,Dallas,0.066957,1.549185,0.000000,0.008588,0.015688,...,USW00003927,32.8978,-97.0189,Active,Middle,78,19100,5910669,1522.0,America/Chicago
3,2016,2017,1980-01-04,Texas,Dallas,0.038636,1.038095,0.000000,0.007403,0.015688,...,USW00003927,32.8978,-97.0189,Active,Middle,78,19100,5910669,1522.0,America/Chicago
4,2315,2316,1980-01-05,Texas,Dallas,0.000000,0.637500,0.000000,0.006442,0.015688,...,USW00003927,32.8978,-97.0189,Active,Middle,78,19100,5910669,1522.0,America/Chicago
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1467279,11573817,11573818,2017-12-27,Mississippi,Hattiesburg,0.002908,0.140250,0.000042,0.035765,0.005283,...,USW00013833,31.2819,-89.2531,Active,Micro,132,25620,85086,332.0,America/Chicago
1467280,11574667,11574668,2017-12-28,Mississippi,Hattiesburg,0.011277,0.232208,0.000065,0.027324,0.006829,...,USW00013833,31.2819,-89.2531,Active,Micro,132,25620,85086,332.0,America/Chicago
1467281,11575848,11575849,2017-12-29,Mississippi,Hattiesburg,0.013500,0.947261,0.001157,0.038941,0.009467,...,USW00013833,31.2819,-89.2531,Active,Micro,132,25620,85086,332.0,America/Chicago
1467282,11576292,11576293,2017-12-30,Mississippi,Hattiesburg,0.009100,0.333333,0.000697,0.014000,0.010025,...,USW00013833,31.2819,-89.2531,Active,Micro,132,25620,85086,332.0,America/Chicago


In [None]:
# print(test.shape[0])
print(pollutant_date_station_geo.head())
# print(geography_pollutant_df.head())


   Unnamed: 0  Surrogate Keys_x       Date  State    City  NO2 Mean   CO Mean  \
0          20                21 1980-01-01  Texas  Dallas  0.000000  1.206250   
1         602               603 1980-01-02  Texas  Dallas  0.030000  1.571667   
2        1461              1462 1980-01-03  Texas  Dallas  0.066957  1.549185   
3        2016              2017 1980-01-04  Texas  Dallas  0.038636  1.038095   
4        2315              2316 1980-01-05  Texas  Dallas  0.000000  0.637500   

   SO2 Mean   O3 Mean  PM2.5 Mean  ...           ID      Lat      Lon  Status  \
0  0.002083  0.020117    0.015688  ...  USW00003927  32.8978 -97.0189  Active   
1  0.001500  0.011064    0.015688  ...  USW00003927  32.8978 -97.0189  Active   
2  0.000000  0.008588    0.015688  ...  USW00003927  32.8978 -97.0189  Active   
3  0.000000  0.007403    0.015688  ...  USW00003927  32.8978 -97.0189  Active   
4  0.000000  0.006442    0.015688  ...  USW00003927  32.8978 -97.0189  Active   

   Scale Type  Surrogate K

In [None]:
pollutant_date_station_geo_weather = pd.merge(pollutant_date_station_geo, csv_weather, left_on=['Date',"ID"],right_on=['Date','station_id'], how="inner")
pollutant_date_station_geo_weather_date = pd.merge(pollutant_date_station_geo_weather, csv_date, on=['Date'], how="inner")

In [None]:
print(pollutant_date_station_geo_weather.shape[0])
# print(pollutant_date_station_geo_weather.head())
# pollutant_date_station_geo_weather = pollutant_date_station_geo_weather.drop(['Unnamed: 0',"Surrogate Keys_y","Surrogate Keys"],axis=1)
# pollutant_date_station_geo_weather.set_index("Surrogate Keys_x")
# pollutant_date_station_geo_weather['Surrogate Keys'] = pollutant_date_station_geo_weather['Surrogate Keys_x']
# pollutant_date_station_geo_weather = pollutant_date_station_geo_weather.drop(['Surrogate Keys_x'],axis=1)
# pollutant_date_station_geo_weather = pollutant_date_station_geo_weather.drop(['Surrogate Keys_x'],axis=1)

pollutant_date_station_geo_weather['Surrogate Keys'] = range(1,len(pollutant_date_station_geo_weather)+1)
pollutant_date_station_geo_weather = pollutant_date_station_geo_weather.reindex(columns=['Surrogate Keys'] + list([c for c in pollutant_date_station_geo_weather.columns if c!= 'Surrogate Keys'])).reset_index()

1439667


In [None]:
pollutant_date_station_geo_weather.head(1439667)

Unnamed: 0,Surrogate Keys,Date,State,City,NO2 Mean,CO Mean,SO2 Mean,O3 Mean,PM2.5 Mean,PM10 Mean,...,Status,Scale Type,Area Code,Population,Population Density,Timezone,tmax,tmin,prcp,station_id
0,1,1980-01-01,Texas,Dallas,0.000000,1.206250,0.002083,0.020117,0.015688,0.024,...,Active,Middle,19100,5910669,1522.0,America/Chicago,63,32,0.00,USW00003927
1,2,1980-01-02,Texas,Dallas,0.030000,1.571667,0.001500,0.011064,0.015688,0.024,...,Active,Middle,19100,5910669,1522.0,America/Chicago,60,41,0.00,USW00003927
2,3,1980-01-03,Texas,Dallas,0.066957,1.549185,0.000000,0.008588,0.015688,0.024,...,Active,Middle,19100,5910669,1522.0,America/Chicago,51,33,0.00,USW00003927
3,4,1980-01-04,Texas,Dallas,0.038636,1.038095,0.000000,0.007403,0.015688,0.024,...,Active,Middle,19100,5910669,1522.0,America/Chicago,45,32,0.00,USW00003927
4,5,1980-01-05,Texas,Dallas,0.000000,0.637500,0.000000,0.006442,0.015688,0.024,...,Active,Middle,19100,5910669,1522.0,America/Chicago,47,30,0.00,USW00003927
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1439662,1439663,2017-12-27,Mississippi,Hattiesburg,0.002908,0.140250,0.000042,0.035765,0.005283,0.011,...,Active,Micro,25620,85086,332.0,America/Chicago,43,39,0.82,USW00013833
1439663,1439664,2017-12-28,Mississippi,Hattiesburg,0.011277,0.232208,0.000065,0.027324,0.006829,0.008,...,Active,Micro,25620,85086,332.0,America/Chicago,46,35,0.00,USW00013833
1439664,1439665,2017-12-29,Mississippi,Hattiesburg,0.013500,0.947261,0.001157,0.038941,0.009467,0.036,...,Active,Micro,25620,85086,332.0,America/Chicago,53,29,0.00,USW00013833
1439665,1439666,2017-12-30,Mississippi,Hattiesburg,0.009100,0.333333,0.000697,0.014000,0.010025,0.036,...,Active,Micro,25620,85086,332.0,America/Chicago,52,35,0.04,USW00013833


In [None]:
pollutant_date_station_geo_weather = pollutant_date_station_geo_weather.drop('index',axis=1)

In [None]:
print(pollutant_date_station_geo_weather_date.head(0))

Empty DataFrame
Columns: [Surrogate Keys_x, Date, State, City, NO2 Mean, CO Mean, SO2 Mean, O3 Mean, PM2.5 Mean, PM10 Mean, Category, day_x, month_x, year_x, day_of_week_x, season_x, ID, Lat, Lon, Status, Scale Type, Area Code, Population, Population Density, Timezone, tmax, tmin, prcp, station_id, Surrogate Keys_y, day_y, month_y, year_y, day_of_week_y, season_y]
Index: []

[0 rows x 35 columns]


In [None]:
def get_season(month):
    if month in [3, 4, 5]:
        return 'Spring'
    elif month in [6, 7, 8]:
        return 'Summer'
    elif month in [9, 10, 11]:
        return 'Autumn'
    else:
        return 'Winter'

In [None]:
# Extract year, month, and day
pollutant_date_station_geo_weather['Date'] = pd.to_datetime(pollutant_date_station_geo_weather['Date'])
pollutant_date_station_geo_weather['year'] = pollutant_date_station_geo_weather['Date'].dt.year
pollutant_date_station_geo_weather['month'] = pollutant_date_station_geo_weather['Date'].dt.month
pollutant_date_station_geo_weather['day'] = pollutant_date_station_geo_weather['Date'].dt.day
pollutant_date_station_geo_weather['season'] = pollutant_date_station_geo_weather['month'].apply(get_season)
print(pollutant_date_station_geo_weather['year'])

# date_df['date'] = pd.to_datetime(date_df[['year', 'month', 'day']])

# Calculate the day of the week and assign it to a new column
pollutant_date_station_geo_weather['day_of_week'] = pollutant_date_station_geo_weather['Date'].dt.day_name()
# date_df = date_df
# date_df = date_df.drop("date", axis=1)


0          1980
1          1980
2          1980
3          1980
4          1980
           ... 
1439662    2017
1439663    2017
1439664    2017
1439665    2017
1439666    2017
Name: year, Length: 1439667, dtype: int64


In [None]:
print(pollutant_date_station_geo_weather.head(0))

Empty DataFrame
Columns: [Surrogate Keys, Date, State, City, NO2 Mean, CO Mean, SO2 Mean, O3 Mean, PM2.5 Mean, PM10 Mean, Category, day, month, year, day_of_week, season, ID, Lat, Lon, Status, Scale Type, Area Code, Population, Population Density, Timezone, tmax, tmin, prcp, station_id]
Index: []

[0 rows x 29 columns]


The final dataframe is now created, which is then used in the ETL processing file.

In [None]:
final_df = pollutant_date_station_geo_weather[["Surrogate Keys","day","month","year","season","day_of_week","City","State","Population","Population Density","Timezone","Area Code","tmax","tmin","prcp","ID", "Scale Type","Status",'NO2 Mean', 'CO Mean', 'SO2 Mean', 'O3 Mean', 'PM2.5 Mean', 'PM10 Mean', 'Category']]
final_df.head()
final_df.to_csv('/content/drive/MyDrive/datascience/final_df.csv',index=False)

The remaining commands are to be ignored, and were only explored for testing out further preprocessing of the final dataset, for the ease of loading into the database isntance.

In [None]:
full_merge = pd.merge(date_weather_station_df,geography_pollutant_df, on=['Date'], how="inner")
full_merge = full_merge.drop_duplicates()


In [None]:
print(full_merge.shape[0])

77461000


In [None]:
full_merge.head()

Unnamed: 0,Surrogate Keys,day,month,year,day_of_week,season,Date_x,tmax,tmin,prcp,...,CO Mean,SO2 Mean,O3 Mean,PM2.5 Mean,PM10 Mean,Category,Area Code,Population,Population Density,Timezone
0,14611,1,1,2020,Wednesday,Winter,2020-01-01,63,39,0.0,...,0.16087,0.002058,0.026765,0.021333,0.017,Good,42340,284429,525.0,America/New_York
1,14611,1,1,2020,Wednesday,Winter,2020-01-01,63,39,0.0,...,0.308696,0.000592,0.028353,0.006562,0.006,Good,42340,284429,525.0,America/New_York
2,14611,1,1,2020,Wednesday,Winter,2020-01-01,63,39,0.0,...,0.316667,0.000252,0.018706,0.004265,0.008,Good,42340,284429,525.0,America/New_York
3,14611,1,1,2020,Wednesday,Winter,2020-01-01,63,39,0.0,...,0.370833,0.001215,0.024059,0.007242,0.016,Good,42340,284429,525.0,America/New_York
4,14611,1,1,2020,Wednesday,Winter,2020-01-01,63,39,0.0,...,0.218261,0.001725,0.024588,0.00375,0.015,Good,42340,284429,525.0,America/New_York


In [None]:
full_merge.to_csv('/content/drive/MyDrive/datascience/full_merge.csv',index=False)

IsADirectoryError: [Errno 21] Is a directory: '/content/drive/MyDrive/datascience/full_merge.csv'