# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="../../images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Feature Pipeline</span>

## 🗒️ This notebook is divided into the following sections:
1. Parse Data
2. Feature Group Insertion

### <span style='color:#ff5f27'> 📝 Imports

In [None]:
import datetime
import time
import requests
import pandas as pd
import json

from features import air_quality
from functions import *

import warnings
warnings.filterwarnings("ignore")

In [None]:
# Opening the 'target_cities.json' file in read mode using the 'with' statement
with open('target_cities.json') as json_file:
    # Loading the JSON data from the file and storing it in the 'target_cities' variable
    target_cities = json.load(json_file)

In [None]:
# Getting the current date
today = datetime.date.today()

# Displaying the current date and its string representation
today, str(today)

### <span style="color:#ff5f27;"> 🔮 Connecting to Hopsworks Feature Store </span>

In [None]:
import hopsworks

project = hopsworks.login()
fs = project.get_feature_store() 

# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name='air_quality',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)

---

## <span style='color:#ff5f27'> 🌫 Filling gaps in Air Quality data (PM2.5)</span>

In [None]:
# Read data from feature groups
df_air_quality = air_quality_fg.read()
df_weather = weather_fg.read()

In [None]:
# Extracting the "date" and "city_name" columns from the 'df_air_quality' DataFrame
# Grouping the data by "city_name" and finding the maximum date for each city
last_dates_aq = df_air_quality[["date", "city_name"]].groupby("city_name").max()

# Converting the date values to string format for consistency
last_dates_aq.date = last_dates_aq.date.astype(str)

# Creating a dictionary with city names as keys and their corresponding last updated date as values
last_dates_aq = last_dates_aq.to_dict()["date"]

In [None]:
# Accessing the last updated date for the city of Paris
paris_last_date = last_dates_aq.get("Paris", "Not available")

# Accessing the last updated date for the city of Columbus
columbus_last_date = last_dates_aq.get("Columbus", "Not available")

# Printing the results
print("⛳️ Last update for Paris:", paris_last_date)
print("⛳️ Last update for Columbus:", columbus_last_date)

### <span style='color:#ff5f27'>  🧙🏼‍♂️ Parsing PM2.5 data

In [None]:
# Storing the current time as the start time of the cell execution
start_of_cell = time.time()

# Creating an empty DataFrame to store raw air quality data
df_aq_raw = pd.DataFrame()

# Iterating through continents and cities in the 'target_cities' dictionary
for continent in target_cities:
    for city_name, coords in target_cities[continent].items():
        # Retrieving air quality data using the 'get_aqi_data_from_open_meteo' function
        # with specified parameters such as city name, coordinates, start date, and end date
        df_ = get_aqi_data_from_open_meteo(
            city_name=city_name,
            coordinates=coords,
            start_date=last_dates_aq[city_name],
            end_date=str(today)
        )
        
        # Concatenating the retrieved data with the existing 'df_aq_raw' DataFrame
        # and resetting the index to ensure proper alignment
        df_aq_raw = pd.concat([df_aq_raw, df_]).reset_index(drop=True)

# Storing the current time as the end time of the cell execution
end_of_cell = time.time()

# Printing information about the execution, including the time taken
print("-" * 64)
print(f"Parsed new PM2.5 data for ALL locations up to {str(today)}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

In [None]:
df_aq_raw.tail(3)

### <span style="color:#ff5f27;">🛠 Feature Engineering PM2.5</span>

In [None]:
# Converting the 'date' column in the 'df_aq_update' DataFrame to datetime format
df_aq_raw['date'] = pd.to_datetime(df_aq_raw['date'])

In [None]:
# Applying a feature engineering function 'feature_engineer_aq' to the 'df_aq_update' DataFrame
df_aq_update = air_quality.feature_engineer_aq(df_aq_raw)

# Dropping rows with missing values in the 'df_aq_update' DataFrame
df_aq_update = df_aq_update.dropna()
df_aq_update.tail(3)

In [None]:
# Checking the total number of missing values in the 'df_aq_update' DataFrame
df_aq_update.isna().sum().sum()

In [None]:
# Retrieving the dimensions (number of rows and columns) of the 'df_aq_update' DataFrame
df_aq_update.shape

---

## <span style='color:#ff5f27'> 🌦 Filling gaps in Weather data</span>

In [None]:
# Extracting the "date" and "city_name" columns from the 'df_weather' DataFrame
# Grouping the data by "city_name" and finding the maximum date for each city
last_dates_weather = df_weather[["date", "city_name"]].groupby("city_name").max()

# Converting the date values to string format for consistency
last_dates_weather.date = last_dates_weather.date.astype(str)

# Creating a dictionary with city names as keys and their corresponding last updated date as values
last_dates_weather = last_dates_weather.to_dict()["date"]

### <span style='color:#ff5f27'>  🧙🏼‍♂️ Parsing Weather data

In [None]:
# Storing the current time as the start time of the cell execution
start_of_cell = time.time()

# Creating an empty DataFrame to store raw weather data
df_weather_update = pd.DataFrame()

# Iterating through continents and cities in the 'target_cities' dictionary
for continent in target_cities:
    for city_name, coords in target_cities[continent].items():
        # Retrieving weather data using the 'get_weather_data_from_open_meteo' function
        # with specified parameters such as city name, coordinates, start date, end date, and forecast flag
        df_ = get_weather_data_from_open_meteo(
            city_name=city_name,
            coordinates=coords,
            start_date=last_dates_weather[city_name],
            end_date=str(today),
            forecast=True,
        )
        
        # Concatenating the retrieved data with the existing 'df_weather_update' DataFrame
        # and resetting the index to ensure proper alignment
        df_weather_update = pd.concat([df_weather_update, df_]).reset_index(drop=True)

# Dropping rows with missing values in the 'df_weather_update' DataFrame
df_weather_update.dropna(inplace=True)

# Storing the current time as the end time of the cell execution
end_of_cell = time.time()

# Printing information about the execution, including the time taken
print("-" * 64)
print(f"Parsed new weather data for ALL cities up to {str(today)}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")

In [None]:
# Converting the 'date' column in the 'df_aq_update' DataFrame to datetime format
df_aq_update.date = pd.to_datetime(df_aq_update.date)

# Converting the 'date' column in the 'df_weather_update' DataFrame to datetime format
df_weather_update.date = pd.to_datetime(df_weather_update.date)

# Creating a new column 'unix_time' in 'df_aq_update' by applying the 'convert_date_to_unix' function
df_aq_update["unix_time"] = df_aq_update["date"].apply(convert_date_to_unix)

# Creating a new column 'unix_time' in 'df_weather_update' by applying the 'convert_date_to_unix' function
df_weather_update["unix_time"] = df_weather_update["date"].apply(convert_date_to_unix)

In [None]:
# Converting the 'date' column in the 'df_aq_update' DataFrame to string format
df_aq_update.date = df_aq_update.date.astype(str)

# Converting the 'wind_direction_dominant' column in the 'df_weather_update' DataFrame to integer format
df_weather_update.wind_direction_dominant = df_weather_update.wind_direction_dominant.astype('int')

# Converting the 'date' column in the 'df_weather_update' DataFrame to string format
df_weather_update.date = df_weather_update.date.astype(str)
df_weather_update.tail(3)

---

## <span style="color:#ff5f27;">⬆️ Uploading new data to the Feature Store</span>

In [None]:
# Insert new data
air_quality_fg.insert(df_aq_update)

In [None]:
# Insert new data
weather_fg.insert(df_weather_update)

## <span style="color:#ff5f27;">⏭️ **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
