<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## 🗒️ This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> 📝 Imports

In [1]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from functions import util
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> 🌍 Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [3]:
meta_csv_file="../../data/meta.csv"
meta_df = pd.read_csv(meta_csv_file, skipinitialspace=True)
meta_dict = {}
for row in meta_df.itertuples():
    meta_dict[row[2]] = {
        'Street' : row[2],
        'City' : row[3],
        'Country' : row[4],
        'Latitude' : row[5],
        'Longitude' : row[6],
        'aqicn_url' : row[7],
    }

meta_dict

{'Rosenlundsgatan': {'Street': 'Rosenlundsgatan',
  'City': 'Stockholm',
  'Country': 'Sweden',
  'Latitude': 59.31430493,
  'Longitude': 18.05700184,
  'aqicn_url': 'https://api.waqi.info/feed/A129124'},
 'Erstagatan': {'Street': 'Erstagatan',
  'City': 'Stockholm',
  'Country': 'Sweden',
  'Latitude': 59.31515153,
  'Longitude': 18.0901034,
  'aqicn_url': 'https://api.waqi.info/feed/A78022'},
 'Bellmansgatan': {'Street': 'Bellmansgatan',
  'City': 'Stockholm',
  'Country': 'Sweden',
  'Latitude': 59.32099927,
  'Longitude': 18.06445284,
  'aqicn_url': 'https://api.waqi.info/feed/A56749'},
 'Hornsgatan': {'Street': 'Hornsgatan',
  'City': 'Stockholm',
  'Country': 'Sweden',
  'Latitude': 59.31711415,
  'Longitude': 18.04826249,
  'aqicn_url': 'https://api.waqi.info/feed/@10009'}}

In [4]:
# If you haven't set the env variable 'HOPSWORKS_API_KEY', then uncomment the next line and enter your API key
os.environ["HOPSWORKS_API_KEY"] = "cMbQxvfLOlU1g4pG.kw2Qm4g64D9Ngs50Eju8KYpTfU6SnRmRKbikreK9ioiBJu6rLrMdh1bWefmBt7rE"

project = hopsworks.login()
fs = project.get_feature_store() 
secrets = util.secrets_api(project.name)

# This line will fail if you have not registered the AQI_API_KEY as a secret in Hopsworks
AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value


today = datetime.date.today()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1164444
Connected. Call `.close()` to terminate connection gracefully.
Connected. Call `.close()` to terminate connection gracefully.


### <span style="color:#ff5f27;"> 🔮 Get references to the Feature Groups </span>

In [6]:
# Retrieve feature groups
#air_quality_fg = fs.get_feature_group(
#    name='air_quality',
#    version=1,
#)
#weather_fg = fs.get_feature_group(
#    name='weather',
#    version=1,
#)

---

## <span style='color:#ff5f27'> 🌫 Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [9]:
import requests
import pandas as pd

for key, inner_dict in meta_dict.items():
    aq_today_df = util.get_pm25(inner_dict['aqicn_url'], inner_dict['Country'], inner_dict['City'], inner_dict['Street'], today, AQI_API_KEY)
    meta_dict[key]['aq_today_df'] =  aq_today_df

In [10]:
#meta_dict['Hornsgatan']['aq_today_df'].info()

## <span style='color:#ff5f27'> 🌦 Get Weather Forecast data</span>

In [12]:
for key, inner_dict in meta_dict.items():
    city = inner_dict['City']
    latitude = inner_dict['Latitude']
    longitude = inner_dict['Longitude']
    hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude)
    hourly_df = hourly_df.set_index('date')
    
    # We will only make 1 daily prediction, so we will replace the hourly forecasts with a single daily forecast
    # We only want the daily weather data, so only get weather at 12:00
    daily_df = hourly_df.between_time('11:59', '12:01')
    daily_df = daily_df.reset_index()
    daily_df['date'] = pd.to_datetime(daily_df['date']).dt.date
    daily_df['date'] = pd.to_datetime(daily_df['date'])
    daily_df['city'] = city
    inner_dict['daily_df'] = daily_df

Coordinates 59.25°N 18.0°E
Elevation 29.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 59.25°N 18.0°E
Elevation 34.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 59.25°N 18.0°E
Elevation 0.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 59.25°N 18.0°E
Elevation 35.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [13]:
#meta_dict['Hornsgatan']['daily_df'].info()

## <span style="color:#ff5f27;">⬆️ Uploading new data to the Feature Store</span>

In [15]:
for key, inner_dict in meta_dict.items():
    air_quality_fg = fs.get_feature_group(
    name='air_quality_' + inner_dict['Street'].lower(),
    version=1,
    )

    #air_quality_fg.insert(inner_dict['aq_today_df'], write_options={"wait_for_job": True})
    result = None
    while result is None:
        try:
            # connect
            result = air_quality_fg.insert(inner_dict['aq_today_df'], write_options={"wait_for_job": True})
        except:
             pass

2024-11-20 01:33:41,776 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352018


Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_rosenlundsgatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/air_quality_rosenlundsgatan_1_offline_fg_materialization/executions
2024-11-20 01:35:13,478 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352019


Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_erstagatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/air_quality_erstagatan_1_offline_fg_materialization/executions
2024-11-20 01:36:32,938 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352020


Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_bellmansgatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/air_quality_bellmansgatan_1_offline_fg_materialization/executions
2024-11-20 01:37:55,484 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352021


Uploading Dataframe: 0.00% |          | Rows 0/1 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_hornsgatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/air_quality_hornsgatan_1_offline_fg_materialization/executions


In [16]:
for key, inner_dict in meta_dict.items():
    weather_fg = fs.get_feature_group(
    name='weather_' + inner_dict['Street'].lower(),
    version=1,
    )
    result=None
    #weather_fg.insert(inner_dict['daily_df'], write_options={"wait_for_job": True})
    while result is None:
        try:
            # connect
            result = weather_fg.insert(inner_dict['daily_df'], write_options={"wait_for_job": True})
        except:
             pass

2024-11-20 01:39:17,628 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1351092


Uploading Dataframe: 0.00% |          | Rows 0/10 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_rosenlundsgatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/weather_rosenlundsgatan_1_offline_fg_materialization/executions
2024-11-20 01:40:49,681 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352022


Uploading Dataframe: 0.00% |          | Rows 0/10 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_erstagatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/weather_erstagatan_1_offline_fg_materialization/executions
2024-11-20 01:42:12,229 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352023


Uploading Dataframe: 0.00% |          | Rows 0/10 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_bellmansgatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/weather_bellmansgatan_1_offline_fg_materialization/executions
2024-11-20 01:43:34,741 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164444/fs/1155147/fg/1352024


Uploading Dataframe: 0.00% |          | Rows 0/10 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_hornsgatan_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1164444/jobs/named/weather_hornsgatan_1_offline_fg_materialization/executions


## <span style="color:#ff5f27;">⏭️ **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store


In [18]:
for key, inner_dict in meta_dict.items():
    inner_dict['_id'] = inner_dict['Street'].lower()
    for collection_name, value in inner_dict.items():
    # Check if the value is a DataFrame
        
        if isinstance(value, pd.DataFrame):
            # Convert the DataFrame to a list of dictionaries
            documents = value.to_dict(orient='records')
            inner_dict[collection_name] = documents

In [67]:
import pymongo
# MongoDB connection URI (replace with your connection details)
uri = "mongodb+srv://eronariodito:eronariodito@lab1id2333.rswu3.mongodb.net/?retryWrites=true&w=majority&appName=Lab1ID2333"

client = pymongo.MongoClient(uri)

try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

# Select the database and collection
db = client["sodermalm"]  # Replace 'mydatabase' with your database name
collection = db["sodermalm"]  # Replace 'mycollection' with your collection name


for key, inner_dict in meta_dict.items():
        target_date = inner_dict['aq_today_df'][0]['date']
        new_data = inner_dict['aq_today_df'][0]['pm25']
        
        # Query to find document containing the date in the 'data' array
        query = {"_id":inner_dict['Street'].lower(), "aq_today_df.date": target_date}
        
        # Update the matching element in the 'data' array
        update = {"$set": {"aq_today_df.$.pm25": new_data}}
        
        # Perform the update
        result = collection.update_one(query, update)
        
        if result.matched_count == 0:
            # If no matching date is found, insert the new date and data
            collection.update_one(
                {"_id": inner_dict['Street'].lower()},  # Assuming you want to update a specific document
                {"$push": 
                 {"aq_today_df": 
                  {"$each": inner_dict['aq_today_df'], "$position": 0  # Insert at the beginning of the array
                  }
                 }
                }
            )
            print(f"Added new date {target_date} with data {new_data}")
        else:
            print(f"Updated existing date {target_date} with new data {new_data}")

Pinged your deployment. You successfully connected to MongoDB!
Updated existing date 2024-11-20 00:00:00 with new data 0.0
Updated existing date 2024-11-20 00:00:00 with new data 5.0
Updated existing date 2024-11-20 00:00:00 with new data 3.0
Updated existing date 2024-11-20 00:00:00 with new data 31.0


In [69]:
inner_dict['aq_today_df'][0]['date']

Timestamp('2024-11-20 00:00:00')