# <span style="color:#ff5f27"> 📝 Imports

In [None]:
import os
from datetime import datetime
import requests
import json
import pandas as pd

In [None]:
!pip install geopy --quiet

# <span style="color:#ff5f27"> ⚙️ Functions

In [None]:
def get_city_coordinates(city_name: str):
    """
    Takes city name and returns its latitude and longitude (rounded to 2 digits after dot).
    """
    from geopy.geocoders import Nominatim
    
    
    # Initialize Nominatim API (for getting lat and long of the city)
    geolocator = Nominatim(user_agent="MyApp")
    city = geolocator.geocode(city_name)

    latitude = round(city.latitude, 2)
    longitude = round(city.longitude, 2)
    
    return latitude, longitude

In [None]:
def get_weather_data(city_name: str,
                     start_date: str = None,
                     end_date: str = None,
                     forecast: bool = False):
    """
    Takes city name and returns pandas DataFrame with weather data.
    
    'start_date' and 'end_date' are required if you parse historical observations data. (forecast=False)
    
    If forecast=True - returns 7 days forecast data by default (without specifying daterange).
    """
    
    latitude, longitude = get_city_coordinates(city_name=city_name)
    
    params = {
        'latitude': latitude,
        'longitude': longitude,
        'hourly': ['temperature_2m','relativehumidity_2m','precipitation',
                   'weathercode','windspeed_10m','winddirection_10m'],
        'start_date': start_date,
        'end_date': end_date
    }
    
    if forecast:
        # historical forecast endpoint
        base_url = 'https://api.open-meteo.com/v1/forecast' 
    else:
        # historical observations endpoint
        base_url = 'https://archive-api.open-meteo.com/v1/archive?' 
        
    response = requests.get(base_url, params=params)

    response_json = response.json()

    some_metadata = {key: response_json[key] for key in ('latitude', 'longitude',
                                                         'timezone', 'hourly_units')}
    
    
    res_df = pd.DataFrame(response_json["hourly"])
    
    res_df["forecast_hr"] = 0
    
    if forecast:
        res_df["forecast_hr"] = res_df.index
    
    some_metadata["city_name"] = city_name
    res_df["city_name"] = city_name
    
    # rename columns
    res_df = res_df.rename(columns={
        "time": "base_time",
        "temperature_2m": "temperature",
        "relativehumidity_2m": "relative_humidity",
        "weathercode": "weather_code",
        "windspeed_10m": "wind_speed",
        "winddirection_10m": "wind_direction"
    })
    
    # change columns order
    res_df = res_df[["city_name", "base_time", "forecast_hr", "temperature",
                     "relative_humidity", "weather_code", "wind_speed", "wind_direction"]]
    
    # convert dates
    res_df["base_time"] = pd.to_datetime(res_df["base_time"])
    
    return res_df, some_metadata

# <span style="color:#ff5f27"> 🔮 Data Parsing

In [None]:
weather_df, metadata = get_weather_data("Madrid", forecast=False, 
                                        start_date="2023-02-10", end_date="2023-02-12")

In [None]:
weather_df

In [None]:
weather_df, metadata = get_weather_data("Madrid", forecast=True, 
                                        start_date="2023-02-10", end_date="2023-02-12")

In [None]:
weather_df

In [None]:
metadata

---

#### Code to transform df to 1-n dimensional dataframe (first Jim's idea)

In [None]:
# def get_transformed_forecast_df(weather_df):
#     # take every second hour-observation, transpose the dataframe
#     df_temp = weather_df.iloc[::2].T

#     res_dict = dict()
    
#     # add 'city_name' and 'base_date' columns
#     res_dict["city_name"] = weather_df.loc[0, "city_name"]
#     res_dict["base_date"] = weather_df.loc[0, "base_date"]

#     for i in df_temp.columns:
#         res_dict[f"{i}hr_temperature"] = df_temp.loc["temperature_2m", i]
#         res_dict[f"{i}hr_humidity"] = df_temp.loc["relativehumidity_2m", i]
#         res_dict[f"{i}hr_precipitation"] = df_temp.loc["precipitation", i]
#         res_dict[f"{i}hr_weathercode"] = df_temp.loc["weathercode", i]
#         res_dict[f"{i}hr_windspeed"] = df_temp.loc["windspeed_10m", i]
#         res_dict[f"{i}hr_winddirection"] = df_temp.loc["winddirection_10m", i]


#     res_df = pd.DataFrame(res_dict, index=[0])

#     return res_df

In [None]:
# df_transformed = get_transformed_forecast_df(weather_df)

In [None]:
# df_transformed

---
# <span style="color:#ff5f27"> 👩🏻‍🔬 Backfill Pipeline

In [None]:
city_names = [
    'Kyiv',
    'London',
    'Paris',
    'Stockholm',
    'New_York',
    'Los_Angeles',
    'Singapore',
    'Sidney',
    'Hong_Kong',
    'Rome'
]

In [None]:
import datetime

today = datetime.date.today()

In [None]:
today = datetime.date.today() # datetime object

day7next = str(today + datetime.timedelta(7))
day7ago = str(today - datetime.timedelta(7))
tomorrow = str(today + datetime.timedelta(1))

In [None]:
str(today)

### <span style="color:#ff5f27"> 🧙🏼‍♂️ Parsing historical weather observations from January 1 2000 till 7 days before today (thats the restrictions of this particular API.)

In [None]:
observations_df = pd.DataFrame()

for city_name in city_names:
    weather_df_temp, metadata_temp = get_weather_data(city_name,
                                                      start_date="2000-01-01", end_date=day7ago)
    observations_df = pd.concat([observations_df, weather_df_temp])

In [None]:
observations_df

In [None]:
# observations_df.to_csv("observations_df.csv", index=False)

### <span style="color:#ff5f27"> 🧙🏼‍♂️ Parsing historical weather forecasts from 7 days before today till today (process it as observations).

In [None]:
forecast_batch_df = pd.DataFrame()

for city_name in city_names:
    weather_df_temp, metadata_temp = get_weather_data(city_name, forecast=True,
                                                      start_date=day7ago, end_date=str(today))
    forecast_batch_df = pd.concat([forecast_batch_df, weather_df_temp])

forecast_batch_df["forecast_hr"] = 0

In [None]:
forecast_batch_df

In [None]:
# forecast_batch_df.to_csv("forecast_batch_df.csv", index=False)

### <span style="color:#ff5f27"> 🧙🏼‍♂️ Parsing weather forecasts for 7 next days.

In [None]:
forecast_df = pd.DataFrame()

for city_name in city_names:
    weather_df_temp, metadata_temp = get_weather_data(city_name, forecast=True,
                                                      start_date=tomorrow, # start with tomorrow
                                                      end_date=day7next)
    forecast_df = pd.concat([forecast_df, weather_df_temp])

In [None]:
forecast_df

In [None]:
# forecast_df.to_csv("forecast_df.csv", index=False)

---
# <span style="color:#ff5f27"> ⬇️ Insert all data into Feature Store

In [None]:
import hopsworks

project = hopsworks.login(project='weather')
fs = project.get_feature_store() 

In [None]:
weather_fg = fs.get_or_create_feature_group(
    name='weather_data',
    description="Public Weather Data. Updates every day.",
    version=1,
    primary_key=["city_name", "forecast_hr"],
    event_time="base_time"
)

In [None]:
# weather_fg.insert(observations_df, write_options={"wait_for_job": False})
# weather_fg.insert(forecast_batch_df, write_options={"wait_for_job": False})
# weather_fg.insert(forecast_df, write_options={"wait_for_job": False})

---
# <span style="color:#ff5f27"> 👨🏻‍🏫 Retrieve and check data consistency 

In [None]:
weather_retrieved = weather_fg.read()

In [None]:
weather_retrieved

In [None]:
weather_retrieved = weather_retrieved.sort_values("base_time")

In [None]:
weather_retrieved

In [None]:
# Create a datetime index object
dt_index = pd.date_range(start='2000-01-01',
                         end=str(today + datetime.timedelta(8)), # to include last, "seventh" day.
                         freq='H')

In [None]:
# Compare the length of the dataframe and datetime index
if len(dt_index) - 1 != int(len(weather_retrieved) / len(city_names)): # we should compare dt_index to one city daterows.
    print('Inconsistent dates in dataframe.')
else:
    print("Everything seems fine.")

In [None]:
# I substract 1 from len(dt_index) cause it takes 00:00 hour from 8th day.
dt_index

---