# ETL Pipeline (from calling APIs to loading it into MySQL on AWS RDS)

## Load Dependencies

In [2]:
import pandas as pd
import requests
#import marcus_keys
import json

In [None]:
pip install pymysql

In [10]:
!pip install session_info
import session_info

Collecting session_info
  Downloading session_info-1.0.0.tar.gz (24 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting stdlib_list (from session_info)
  Downloading stdlib_list-0.9.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: session_info
  Building wheel for session_info (setup.py) ... [?25l[?25hdone
  Created wheel for session_info: filename=session_info-1.0.0-py3-none-any.whl size=8026 sha256=02ea280275886604b1e59cee2375db44a6b56f2702132d6ba56780429a6a9be3
  Stored in directory: /root/.cache/pip/wheels/6a/aa/b9/eb5d4031476ec10802795b97ccf937b9bd998d68a9b268765a
Successfully built session_info
Installing collected packages: stdlib_list, session_info
Successfully installed session_info-1.0.0 stdlib_list-0.9.0


In [11]:
session_info.show()

## Logging onto AWS RDS MySQL Instance

In [None]:
schema="jam_fp_db"
host="jam-project-2023-db.cjdcbdhnueky.eu-north-1.rds.amazonaws.com"
user="mkadmin"
password=marcus_keys.aws_rds_key
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

# Loading Cities Data

In [None]:
url = "https://drive.google.com/file/d/1TdhZIuhQfKVSHSnQK2Epst8ktkdo14DY/view?usp=sharing"
path = 'https://drive.google.com/uc?export=download&id='+url.split('/')[-2]
worldcities_df = pd.read_csv(path)

In [None]:
worldcities_df.head()

#### List of selected Cities

In [None]:
city_list = [
    "Toronto", "Mexico City", "São Paulo", "Buenos Aires", "Rio de Janeiro",
    "Bogotá", "Lima", "London", "Paris", "Berlin", "Rome", "Madrid", "Sydney", "Melbourne", "Brisbane", "Perth", "Adelaide",
    "Cairo", "Lagos", "Johannesburg", "Nairobi", "Casablanca", "Addis Ababa", "Dakar", "Accra", "Cape Town",
    "Tokyo", "Mumbai", "Beijing", "Istanbul", "Bangkok", "Seoul", "Jakarta", "Karachi", "Riyadh", "Manila"
]

#### Get longitude and latitude for the cities in `city_list`

In [None]:
city_country_dict = {
    "New York": "USA",
    "Los Angeles": "USA",
    "Toronto": "Canada",
    "Mexico City": "Mexico",
    "Chicago": "USA",
    "São Paulo": "Brazil",
    "Buenos Aires": "Argentina",
    "Rio de Janeiro": "Brazil",
    "Bogotá": "Colombia",
    "Lima": "Peru",
    "London": "United Kingdom",
    "Paris": "France",
    "Berlin": "Germany",
    "Rome": "Italy",
    "Madrid": "Spain",
    "Sydney": "Australia",
    "Melbourne": "Australia",
    "Brisbane": "Australia",
    "Perth": "Australia",
    "Adelaide": "Australia",
    "Cairo": "Egypt",
    "Lagos": "Nigeria",
    "Johannesburg": "South Africa",
    "Nairobi": "Kenya",
    "Casablanca": "Morocco",
    "Addis Ababa": "Ethiopia",
    "Dakar": "Senegal",
    "Accra": "Ghana",
    "Cape Town": "South Africa",
    "Abidjan": "Ivory Coast",
    "Tokyo": "Japan",
    "Mumbai": "India",
    "Beijing": "China",
    "Istanbul": "Turkey",
    "Bangkok": "Thailand",
    "Seoul": "South Korea",
    "Jakarta": "Indonesia",
    "Karachi": "Pakistan",
    "Riyadh": "Saudi Arabia",
    "Manila": "Philippines"
}


In [None]:
city_country_df = pd.DataFrame(city_country_dict.items(), columns=['city', 'country'])

In [None]:
extended_df = pd.merge(city_country_df, worldcities_df, on=["city", "country"], how="inner")

In [None]:
columns_to_remove = ['city_ascii', 'iso2', 'iso3', 'admin_name', 'capital', 'id', 'population', 'country']
extracted_data_df = extended_df.drop(columns=columns_to_remove)

In [None]:
extracted_data_df.head()

In [None]:
from google.colab import files

extracted_data_df.to_csv("extracted_data.csv", index=False)
files.download("extracted_data.csv")

## Data Cleaning

In [None]:
columns_to_remove = ['city_ascii', 'iso2', 'iso3', 'admin_name', 'capital', 'id']
cities_data_df = worldcities_df.drop(columns=columns_to_remove)

In [None]:
cities_data_df.head()

In [None]:
column_mapping = {
    'city': 'city_name',
    'lat': 'latitude',
    'lng': 'longitude'
}

cities_data_df.rename(columns=column_mapping, inplace=True)

# Getting Historical Weather Data

#### Single City Data Call

In [None]:
def get_historical_weather_data(city_name, start_date, end_date):
    latitude = city_data['latitude'].values[0]
    longitude = city_data['longitude'].values[0]

    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "daily": "weathercode,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,rain_sum,snowfall_sum,windspeed_10m_max,shortwave_radiation_sum",
        "timezone": "auto"
    }

    response = requests.get(url, params=params)

    historical_data = response.json()

    historical_daily_data = historical_data['daily']

    historical_daily_df = pd.DataFrame(historical_daily_data)

    return historical_daily_df

In [None]:
print("Historical Weather Data")
city_name = input("Enter a city name: ").lower()

worldcities_df['city_lower'] = worldcities_df['city'].str.lower()
if city_name in worldcities_df['city_lower'].values:
    city_data = worldcities_df[worldcities_df['city_lower'] == city_name]
else:
    print(f"The city '{city_name}' was not found our database.")

start_date = input("Enter a start date [i.e.: 1900-01-01]: ")
end_date = input("Enter an end date [i.e.: 1900-01-01]: ")

historical_daily_df = get_historical_weather_data(city_name, start_date, end_date)

#### Multiple Cities Data Call

In [None]:
def get_historical_weather_data(city_list, start_date, end_date):
    historical_data_list = []

    for city_name in city_list:
        latitude = cities_data_df[cities_data_df['city_name'] == city_name]['latitude'].values[0]
        longitude = cities_data_df[cities_data_df['city_name'] == city_name]['longitude'].values[0]

        url = "https://archive-api.open-meteo.com/v1/archive"
        params = {
            "latitude": latitude,
            "longitude": longitude,
            #"forecast": 1,
            "daily": "weathercode,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,rain_sum,snowfall_sum,windspeed_10m_max,shortwave_radiation_sum",
            "timezone": "Europe%2FBerlin"
        }

        response = requests.get(url, params=params)
        historical_data = response.json()

        historical_daily_data = historical_data['daily']

        historical_daily_df = pd.DataFrame(historical_daily_data)
        historical_daily_df['city_name'] = city_name

        historical_data_list.append(historical_daily_df)

    combined_data = pd.concat(historical_data_list, ignore_index=True)
    return combined_data

In [None]:
start_date = "1940-01-01"
end_date = "2022-12-31"

historical_data = get_historical_weather_data(city_list, start_date, end_date)

#### Data Cleaning

In [None]:
historical_data.info()

In [None]:
column_mapping_hist = {
    'weathercode': 'weather_code',
    'city': 'city_name',
    'temperature_2m_max': 'temperature_max',
    'temperature_2m_min': 'temperature_min',
    'temperature_2m_mean': 'temperature_avg',
    'precipitation_sum': 'percipitation',
    'windspeed_10m_max': 'wind_speed',
    'rain_sum': 'rain',
    'snowfall_sum': 'snowfall',
    'shortwave_radiation_sum': 'shortwave_radiation',
    'time': 'date_time'
}

historical_data.rename(columns=column_mapping_hist, inplace=True)

In [None]:
historical_daily_df = historical_data

In [None]:
historical_daily_df.info()

In [None]:
historical_daily_df["date_time"] = pd.to_datetime(historical_daily_df["date_time"])

# Getting Current Weather Data

NOTE: The `Multiple Cities Call` part of this section is the AWS Lambda Function, being called on a daily basis at 1 a.m. to extract the current weather forecast data.

#### Single City Data Call

In [None]:
def get_current_weather_data(city_name, start_date, end_date):
    latitude = city_data['lat'].values[0]
    longitude = city_data['lng'].values[0]

    url = "https://api.open-meteo.com/v1/forecast"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "models": "best_match",
        "daily": "weathercode,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,rain_sum,snowfall_sum,windspeed_10m_max,shortwave_radiation_sum",
        "hourly": "temperature_2m,relativehumidity_2m,precipitation,rain,snowfall,weathercode,surface_pressure,cloudcover,visibility,evapotranspiration,windspeed_10m,winddirection_10m",
        "timezone": "auto"
   }

    response = requests.get(url, params=params)

    current_data = response.json()

    current_hourly_data = current_data['hourly']
    current_daily_data = current_data['daily']

    current_hourly_df = pd.DataFrame(current_hourly_data)
    current_daily_df = pd.DataFrame(current_daily_data)

    return current_hourly_df, current_daily_df

In [None]:
print("Current Weather Forecast")
city_name = input("Enter a city name: ").lower()

worldcities_df['city_lower'] = worldcities_df['city'].str.lower()
if city_name in worldcities_df['city_lower'].values:
    city_data = worldcities_df[worldcities_df['city_lower'] == city_name]
else:
    print(f"The city '{city_name}' was not found our database.")

start_date = input("Enter a start date [i.e.: 1900-01-01]: ")
end_date = input("Enter an end date [i.e.: 1900-01-01]: ")

current_hourly_df, current_daily_df = get_current_weather_data(city_name, start_date, end_date)

#### Multiple Cities Data Call

In [None]:
def get_current_weather_data(city_list):
    all_hourly_data = []
    all_daily_data = []

    for city_name in city_list:
        latitude = cities_data_df[cities_data_df['city_name'] == city_name]['latitude'].values[0]
        longitude = cities_data_df[cities_data_df['city_name'] == city_name]['longitude'].values[0]

        url = "https://api.open-meteo.com/v1/forecast"
        params = {
            "latitude": latitude,
            "longitude": longitude,
            "forecast_days": 1,
            "models": "best_match",
            "daily": "weathercode,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,rain_sum,snowfall_sum,windspeed_10m_max,shortwave_radiation_sum",
            "hourly": "temperature_2m,relativehumidity_2m,precipitation,rain,snowfall,weathercode,surface_pressure,cloudcover,visibility,evapotranspiration,windspeed_10m,winddirection_10m",
            "timezone": "Europe/Berlin"
        }

        response = requests.get(url, params=params)
        current_data = response.json()

        current_hourly_data = current_data['hourly']
        current_daily_data = current_data['daily']

        current_hourly_df = pd.DataFrame(current_hourly_data)
        current_daily_df = pd.DataFrame(current_daily_data)

        current_hourly_df['city_name'] = city_name
        current_daily_df['city_name'] = city_name

        all_hourly_data.append(current_hourly_df)
        all_daily_data.append(current_daily_df)

    all_hourly_data = pd.concat(all_hourly_data, ignore_index=True)
    all_daily_data = pd.concat(all_daily_data, ignore_index=True)

    return all_hourly_data, all_daily_data

In [None]:
hourly_data, daily_data = get_current_weather_data(city_list)

### Data Cleaning

In [None]:
hourly_data.info()

In [None]:
column_mapping_hrl = {
    'city': 'city_name',
    'time': 'date_time',
    'weathercode': 'weather_code',
    'temperature_2m': 'temperature',
    'windspeed_10m': 'wind_speed',
    'winddirection_10m': 'wind_direction',
    'relativehumidity_2m': 'humidity'
}

hourly_data.rename(columns=column_mapping_hrl, inplace=True)
current_hourly_df = hourly_data
current_hourly_df["date_time"] = pd.to_datetime(current_hourly_df["date_time"])

In [None]:
column_mapping_dl = {
    'weathercode': 'weather_code',
    'city': 'city_name',
    'temperature_2m_max': 'temperature_max',
    'temperature_2m_min': 'temperature_min',
    'temperature_2m_mean': 'temperature_avg',
    'precipitation_sum': 'percipitation',
    'windspeed_10m_max': 'wind_speed',
    'rain_sum': 'rain',
    'snowfall_sum': 'snowfall',
    'shortwave_radiation_sum': 'shortwave_radiation',
    'time': 'date_time'
}

daily_data.rename(columns=column_mapping_dl, inplace=True)
current_daily_df = daily_data
current_daily_df["date_time"] = pd.to_datetime(current_daily_df["date_time"])

# Getting Future Weather Data (`MPI Model`)

#### Singel City Data Call

In [None]:
def get_future_weather_data(city_name, start_date, end_date):
    latitude = city_data['lat'].values[0]
    longitude = city_data['lng'].values[0]

    url = "https://climate-api.open-meteo.com/v1/climate"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "models": "MPI_ESM1_2_XR",
        "daily": "temperature_2m_mean,temperature_2m_max,temperature_2m_min,windspeed_10m_mean,shortwave_radiation_sum,precipitation_sum,rain_sum,snowfall_sum,et0_fao_evapotranspiration_sum"
    }

    response = requests.get(url, params=params)

    future_data = response.json()

    future_daily_data = future_data['daily']

    future_daily_df = pd.DataFrame(future_daily_data)

    return future_daily_df

In [None]:
print("Future Weather Data")
city_name = input("Enter a city name: ").lower()

worldcities_df['city_lower'] = worldcities_df['city'].str.lower()
if city_name in worldcities_df['city_lower'].values:
    city_data = worldcities_df[worldcities_df['city_lower'] == city_name]
else:
    print(f"The city '{city_name}' was not found our database.")

start_date = input("Enter a start date [i.e.: 1900-01-01]: ")
end_date = input("Enter an end date [i.e.: 1900-01-01]: ")

future_daily_df = get_future_weather_data(city_name, start_date, end_date)

#### Multiple Cities Data Call

In [None]:
def get_future_weather_data(city_list, start_date, end_date, city_data):
    all_future_data = []

    for city_name in city_list:
        latitude = cities_data_df[cities_data_df['city_name'] == city_name]['latitude'].values[0]
        longitude = cities_data_df[cities_data_df['city_name'] == city_name]['longitude'].values[0]

        url = "https://climate-api.open-meteo.com/v1/climate"
        params = {
            "latitude": latitude,
            "longitude": longitude,
            "start_date": start_date,
            "end_date": end_date,
            "models": "MPI_ESM1_2_XR",
            "daily": "temperature_2m_mean,temperature_2m_max,temperature_2m_min,windspeed_10m_mean,shortwave_radiation_sum,precipitation_sum,rain_sum,snowfall_sum,et0_fao_evapotranspiration_sum"
        }

        response = requests.get(url, params=params)
        future_data = response.json()

        future_daily_data = future_data['daily']

        future_daily_df = pd.DataFrame(future_daily_data)
        future_daily_df['city_name'] = city_name

        all_future_data.append(future_daily_df)

    all_future_data = pd.concat(all_future_data, ignore_index=True)
    return all_future_data

In [None]:
start_date = "2024-01-01"
end_date = "2050-12-31"

future_data = get_future_weather_data(city_list, start_date, end_date, worldcities_df)

### Data Cleaning

In [None]:
future_data.info()

In [None]:
column_mapping_fut = {
    'city': 'city_name',
    'temperature_2m_max': 'temperature_max',
    'temperature_2m_min': 'temperature_min',
    'temperature_2m_mean': 'temperature_avg',
    'precipitation_sum': 'precipitation',
    'windspeed_10m_mean': 'wind_speed',
    'rain_sum': 'rain',
    'snowfall_sum': 'snowfall',
    'shortwave_radiation_sum': 'shortwave_radiation',
    'et0_fao_evapotranspiration_sum': 'evapotranspiration',
    'time': 'date_time'
}

future_data.rename(columns=column_mapping_fut, inplace=True)
future_daily_df = future_data
future_daily_df["date_time"] = pd.to_datetime(future_daily_df["date_time"])

# Checking Datatypes

In [None]:
cities_data_df.info()

In [None]:
historical_daily_df.info()

In [None]:
current_daily_df.info()

In [None]:
current_hourly_df.info()

In [None]:
future_daily_df.info()

# Data Upload onto MySQL on AWS RDS

## Uploading Cities Data

In [None]:
cities_data_df.to_sql('cities_data',
                        if_exists='append',
                        con=con,
                        index=False)

## Uploading Historical Weather Data

In [None]:
historical_daily_df.info()

In [None]:
historical_daily_df.to_sql('historical_weather',
                        if_exists='append',
                        con=con,
                        index=False)

## Uploading Current Weather Data

In [None]:
current_daily_df.to_sql('current_weather_daily',
                        if_exists='replace',
                        con=con,
                        index=False)

In [None]:
current_hourly_df.to_sql('current_weather_hourly',
                        if_exists='replace',
                        con=con,
                        index=False)

## Uploading Future Weather Data

In [None]:
future_daily_df['city_name'].nunique()

In [None]:
future_daily_df.to_sql('future_weather_mpi',
                        if_exists='replace',
                        con=con,
                        index=False)

# Future Weather Prediction (`JAM Model`)

## Load predicted data

In [None]:
url = "https://drive.google.com/file/d/1IK4sb8Jj0BI-ay2kED004AfA3D_ntG10/view?usp=sharing"
path = 'https://drive.google.com/uc?export=download&id='+url.split('/')[-2]
future_weather_jam_df = pd.read_csv(path)

## Clean predicted data

In [None]:
column_mapping_jam = {
    'City': 'city_name',
    'Future_temperature': 'temperature',
    'Future_precipitation': 'precipitation',
    'Date_future': 'date_time'
}

future_weather_jam_df.rename(columns=column_mapping_jam, inplace=True)
future_weather_jam_df["date_time"] = pd.to_datetime(future_weather_jam_df["date_time"])

In [None]:
columns_to_remove = ['Date_reference', 'Ref_temperature', 'Ref_precipitation']
future_weather_jam_df = future_weather_jam_df.drop(columns=columns_to_remove)

In [None]:
future_weather_jam_df.info()

Shorten the DataFrame

In [None]:
future_weather_jam_df = future_weather_jam_df[future_weather_jam_df['date_time'] <= '2050-12-31']

## Load data into database

In [None]:
future_weather_jam_df.to_sql('future_weather_jam',
                        if_exists='replace',
                        con=con,
                        index=False)