In [14]:
from dagster import op, Out, In, get_dagster_logger, job
from pymongo import MongoClient, errors
import openmeteo_requests
import requests_cache
import pandas as pd
from retry_requests import retry
from sqlalchemy import create_engine

log = get_dagster_logger()
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession(".cache", expire_after=-1)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
openmeteo = openmeteo_requests.Client(session=retry_session)

# postgres_connect = "postgresql://dap:dap@postgres_database:5432/projectdb"
# mongo_connect = "mongodb://dap:dap@mongodb_database"
postgres_connect = "postgresql://dap:dap@127.0.0.1:5432/projectdb"
mongo_connect = "mongodb://dap:dap@127.0.0.1"



In [15]:
@op()
def transform_weather() -> pd.DataFrame:
    client = MongoClient(mongo_connect)
    projectdb_mongo = client["projectdb_mongo"]

    weather_collection = projectdb_mongo["weather_collection"]

    weather_df = pd.DataFrame(list(weather_collection.find({})))

    weather_df['date'] = pd.to_datetime(weather_df['date'])

    # Filtering data from 2023-01-01 to 2023-12-31
    weather_df = weather_df[(weather_df['date'] >= '2023-01-01') & (weather_df['date'] < '2024-01-01')]


    return weather_df

In [16]:
# Checking for missing data in the filtered dataframe
missing_data = transform_weather().isnull().sum()
missing_data


_id                     0
date                    0
temperature_2m          0
relative_humidity_2m    0
dew_point_2m            0
apparent_temperature    0
precipitation           0
rain                    0
snowfall                0
weather_code            0
cloud_cover             0
wind_speed_10m          0
wind_direction_10m      0
is_day                  0
sunshine_duration       0
dtype: int64

In [32]:
display(transform_weather())

Unnamed: 0,_id,date,temperature_2m,relative_humidity_2m,dew_point_2m,apparent_temperature,precipitation,rain,snowfall,weather_code,cloud_cover,wind_speed_10m,wind_direction_10m,is_day,sunshine_duration
1,1672531200,2023-01-01 00:00:00,6.27,91.387459,4.97,2.430736,0.0,0.0,0.0,3.0,100.000000,17.935081,218.480225,0.0,0.0
2,1672534800,2023-01-01 01:00:00,5.57,90.385803,4.12,1.473115,0.1,0.1,0.0,51.0,6.900000,18.504139,217.092911,0.0,0.0
3,1672538400,2023-01-01 02:00:00,4.72,92.905891,3.67,0.670981,0.0,0.0,0.0,2.0,55.500000,17.566378,225.830231,0.0,0.0
4,1672542000,2023-01-01 03:00:00,4.92,93.571838,3.97,1.040413,0.0,0.0,0.0,3.0,100.000000,16.808571,223.264328,0.0,0.0
5,1672545600,2023-01-01 04:00:00,5.47,91.976448,4.27,1.724712,0.0,0.0,0.0,2.0,61.500000,16.299694,223.210114,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8755,1704045600,2023-12-31 18:00:00,6.82,87.689705,4.92,1.515806,0.2,0.2,0.0,51.0,100.000000,27.908709,263.333435,0.0,0.0
8756,1704049200,2023-12-31 19:00:00,7.12,85.905251,4.92,1.778767,0.1,0.1,0.0,51.0,100.000000,28.162956,265.601379,0.0,0.0
8757,1704052800,2023-12-31 20:00:00,6.97,86.189529,4.82,1.755062,0.2,0.2,0.0,51.0,93.299995,27.153164,263.911560,0.0,0.0
8758,1704056400,2023-12-31 21:00:00,6.57,87.056412,4.57,1.318548,0.0,0.0,0.0,2.0,66.899994,27.047956,260.036255,0.0,0.0


In [19]:
@op()
def transform_aqi() -> pd.DataFrame:
    client = MongoClient(mongo_connect)
    projectdb_mongo = client["projectdb_mongo"]

    aqi_collection = projectdb_mongo["aqi_collection"]

    aqi_df = pd.DataFrame(list(aqi_collection.find({})))

    aqi_df['date'] = pd.to_datetime(aqi_df['date'])

    # Filtering data from 2023-01-01 to 2023-12-31
    aqi_df = aqi_df[(aqi_df['date'] >= '2023-01-01') & (aqi_df['date'] < '2024-01-01')]


    return aqi_df

In [20]:
display(transform_aqi())

Unnamed: 0,_id,date,pm10,pm2_5,carbon_monoxide,nitrogen_dioxide,sulphur_dioxide,dust,european_aqi,european_aqi_pm2_5,european_aqi_pm10,european_aqi_nitrogen_dioxide,european_aqi_ozone,european_aqi_sulphur_dioxide
1,1672531200,2023-01-01 00:00:00,5.6,1.8,135.0,11.75,0.85,0.0,23.199999,8.391667,11.700000,5.875,20.400000,0.17
2,1672534800,2023-01-01 01:00:00,4.4,1.0,133.0,11.20,1.00,0.0,24.400002,8.308334,11.483334,5.600,21.600000,0.20
3,1672538400,2023-01-01 02:00:00,4.1,0.9,130.0,9.25,1.00,0.0,26.399998,8.108335,11.212500,4.625,22.000000,0.20
4,1672542000,2023-01-01 03:00:00,3.8,1.0,127.0,8.35,0.85,0.0,28.400000,7.866668,10.929165,4.175,25.200001,0.17
5,1672545600,2023-01-01 04:00:00,4.0,1.2,124.0,6.95,0.80,0.0,29.200001,7.641668,10.637498,3.475,24.000000,0.16
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8755,1704045600,2023-12-31 18:00:00,6.2,2.7,158.0,4.50,0.50,0.0,26.399998,6.316667,9.437501,2.250,26.399998,0.10
8756,1704049200,2023-12-31 19:00:00,7.7,2.7,158.0,5.10,0.60,0.0,26.000000,6.083333,9.029166,2.550,26.000000,0.12
8757,1704052800,2023-12-31 20:00:00,8.1,2.7,157.0,4.40,0.60,0.0,24.400002,5.841666,8.625000,2.200,24.400002,0.12
8758,1704056400,2023-12-31 21:00:00,6.7,2.6,157.0,4.60,0.70,0.0,23.600002,5.616667,8.175000,2.300,23.600002,0.14


In [21]:
@op()
def transform_footfall() -> pd.DataFrame:
    client = MongoClient(mongo_connect)
    projectdb_mongo = client["projectdb_mongo"]
    aqi_collection = projectdb_mongo["footfall_collection"]
    footfall_df = pd.DataFrame(list(aqi_collection.find({})))
    #Cleaning data removing all variables with more than 80% Null Values
    threshold=0.8
    missing_percentage = footfall_df.isna().sum() / len(footfall_df)
    columns_to_drop = missing_percentage[missing_percentage > threshold].index
    footfall_df = footfall_df.drop(columns=columns_to_drop)
    footfall_df = footfall_df.fillna(0)
    return footfall_df

In [22]:
display(transform_footfall())

Unnamed: 0,_id,Time,Aston Quay/Fitzgeralds,Aston Quay/Fitzgeralds IN,Aston Quay/Fitzgeralds OUT,Bachelors walk/Bachelors way,Bachelors walk/Bachelors way IN,Bachelors walk/Bachelors way OUT,Baggot st lower/Wilton tce inbound,Baggot st lower/Wilton tce inbound Pedestrians IN,...,Richmond st south/Portabello Harbour outbound Pedestrians OUT,Talbot st/Guineys,Talbot st/Guineys IN,Talbot st/Guineys OUT,Westmoreland Street East/Fleet street,Westmoreland Street East/Fleet street IN,Westmoreland Street East/Fleet street OUT,Westmoreland Street West/Carrolls,Westmoreland Street West/Carrolls IN,Westmoreland Street West/Carrolls OUT
0,1672531200,01/01/2023 00:00,3700,1455,2245,0.0,0.0,0.0,74,40,...,140,2133.0,1044.0,1089.0,1011.0,397.0,614.0,1835.0,1256.0,579.0
1,1672534800,01/01/2023 01:00,4369,2142,2227,0.0,0.0,0.0,100,46,...,317,1347.0,617.0,730.0,634.0,381.0,253.0,1426.0,915.0,511.0
2,1672538400,01/01/2023 02:00,3807,2031,1776,0.0,0.0,0.0,52,15,...,166,751.0,372.0,379.0,501.0,340.0,161.0,1458.0,968.0,490.0
3,1672542000,01/01/2023 03:00,2477,1352,1125,0.0,0.0,0.0,42,18,...,388,654.0,304.0,350.0,445.0,306.0,139.0,1709.0,1198.0,511.0
4,1672545600,01/01/2023 04:00,1534,836,698,0.0,0.0,0.0,11,1,...,168,794.0,449.0,345.0,127.0,94.0,33.0,786.0,580.0,206.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8754,1704049200,31/12/2023 19:00,6906,3248,3658,0.0,0.0,0.0,75,49,...,103,207.0,94.0,113.0,0.0,0.0,0.0,1282.0,713.0,569.0
8755,1704052800,31/12/2023 20:00,5482,2655,2827,2.0,2.0,0.0,45,29,...,82,330.0,141.0,189.0,0.0,0.0,0.0,1183.0,634.0,549.0
8756,1704056400,31/12/2023 21:00,5548,2597,2951,3.0,2.0,1.0,53,36,...,103,225.0,108.0,117.0,0.0,0.0,0.0,1275.0,661.0,614.0
8757,1704060000,31/12/2023 22:00,6041,2516,3525,7.0,5.0,2.0,75,59,...,105,284.0,131.0,153.0,0.0,0.0,0.0,1553.0,883.0,670.0
