### WBS Coding School
___
# Data Engineering Project

This is a data engineering project, in which I use Python, MySQL and AWS Services to create and automatically update an online database.

It is a learning project, in which I practise several data engineering techniques, such as API calls and AWS Lambda functions.

Our tasks are:
- [x] Collect data
- [x] Clean data
- [x] Create a database
- [x] Update the database with the latest data
- [ ] Move the data pipeline to the Cloud (AWS)


#### Next Steps
The next task is to transfer the updating of the dynamic tables to the Cloud and automate it.
Therefore, I pasted the code from this script to an AWS Lambda function. Subsequently, I set up an AWS EventBridge Schedule that triggers my Lambda function on a daily basis.

Et voilà, there you have an automated Data Pipeline running in the Cloud.

___

# Update the Cloud database
This script fetches the latest data for the dynamic tables (`weather`, `arrivals`) and pushes them to the Cloud database.

### Table of contents:
- [Connect to the Cloud MySQL instance](#connect)
- [Update `weather` table](#update_weather)
- [Update `arrivals` table](#update_arrivals)

#### Import Libraries & Dataframes

In [1]:
import pandas as pd
import requests
from datetime import datetime, timedelta, date
import config_file

<a id="connect"></a>
#### Connect to the Cloud MySQL instance
(set up a MySQL database on an AWS RDS instance beforehand)

In [2]:
# Cloud MySQL server connection information
user = config_file.AWS_DATABASE_USER
password = config_file.AWS_DATABASE_PASSWORD
host = config_file.AWS_DATABASE_HOST
port = config_file.AWS_DATABASE_PORT
schema = config_file.AWS_DATABASE_SCHEMA

# Connect to RDS MySQL server
connection = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

<a id="update_weather"></a>
## Update `weather` table

In [3]:
def create_weather_response(city: str, cnt = 50):
    base_url = "https://api.openweathermap.org/data/2.5/forecast?"
    API_key = config_file.OPEN_WEATHER_API_KEY
    units = "metric"
    language = "en"
    weather_response = requests.get(f"{base_url}appid={API_key}&q={city}&cnt={cnt}&units={units}&lang={language}")
    return weather_response


def create_weather_dataframe(response: requests.models.Response):
    # Save the weather data:
    weather_json = response.json()
    weather_df = pd.json_normalize(weather_json["list"]) # Column "weather" contains a dictionary ...
    weather_df["city"] = weather_json["city"]["name"]
    weather_exploded_df = pd.json_normalize(weather_df.explode("weather")["weather"]) # ... we therefore explode that column ...
    weather_exploded_df["city"] = weather_json["city"]["name"]
    weather_exploded_df["dt"] = weather_df["dt"]
    weather_all_df = weather_exploded_df.merge(weather_df, on=["city", "dt"]) # ... and re-merge the two dataframes.
    weather_clean_df = weather_all_df[["city", "dt_txt", "main", "description", "main.temp", "main.feels_like", "wind.speed"]]
    weather_clean_df = weather_clean_df.rename(columns={
        "dt_txt": "forecast_time",
        "main": "outlook",
        "description": "outlook_description",
        "main.temp": "temperature",
        "main.feels_like": "feels_like",
        "wind.speed": "wind_speed"
    })
    # Save city data:
    city = weather_json['city']['name']
    latitude = weather_json['city']['coord']['lat']
    longitude = weather_json['city']['coord']['lon']
    country = weather_json['city']['country']
    city_data = {
        "city": city,
        "latitude": latitude,
        "longitude": longitude,
        "country": country
    }
    city_data_df = pd.DataFrame(city_data, index=[0])
    # Join the two dataframes:
    weather_city_df = pd.merge(weather_clean_df, city_data_df, on="city", how="inner")
    return weather_city_df


def get_weather_data(cities: list):
    weather_cities_df = pd.DataFrame()
    for city in cities:
        weather_response = create_weather_response(city)
        weather_city_df = create_weather_dataframe(weather_response)
        weather_cities_df = pd.concat([weather_cities_df, weather_city_df])
    return weather_cities_df.reset_index(drop=True)


def clean_weather(weather_df: pd.DataFrame, city_ids: int):
    weather_df_clean = weather_df.copy()
    # Create city_id column:
    weather_df_clean["city_id"] = weather_df_clean["city"].map(city_ids)
    columns_to_keep = ["city_id", "forecast_time", "outlook", "outlook_description", "temperature", "feels_like", "wind_speed"]
    weather_df_clean = weather_df_clean[columns_to_keep]
    # Change forecast_time column type:
    weather_df_clean['forecast_time']= pd.to_datetime(weather_df_clean['forecast_time'])
    return weather_df_clean

#### Get cities and `city_ids`

In [4]:
gans_cities_data = pd.read_sql_table("cities", con=connection, columns=['city_id', 'city_name'])
cities_of_interest = list(gans_cities_data["city_name"])
city_ids = dict(zip(gans_cities_data['city_name'], gans_cities_data['city_id']))

#### Get up-to-date `weather` data

In [5]:
weather_df = get_weather_data(cities_of_interest)
weather_df_clean = clean_weather(weather_df, city_ids)

#### Push `weather` data to Cloud database

In [6]:
# Update weather data
weather_df_clean.to_sql("weather", if_exists="append", con=connection, index=False)

240

## Update `arrivals` table

In [7]:
def create_date_range():
    date_range_dict = {}
    morning1 = (datetime.combine(date.today(), datetime.min.time()) + timedelta(1)).strftime('%Y-%m-%dT%H:%M')
    evening1 = (datetime.combine(date.today(), datetime.min.time()) + timedelta(1.5)).strftime('%Y-%m-%dT%H:%M')
    morning2 = (datetime.combine(date.today(), datetime.min.time()) + timedelta(1.5)).strftime('%Y-%m-%dT%H:%M')
    evening2 = (datetime.combine(date.today(), datetime.min.time()) + timedelta(2)).strftime('%Y-%m-%dT%H:%M')
    date_range_dict["morning"] = [morning1, morning2]
    date_range_dict["evening"] = [morning2, evening2]
    return date_range_dict


def get_arrivals_response(icao_code: str, date_range: dict, day_time: str):
    url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao_code}/{date_range[day_time][0]}/{date_range[day_time][1]}"
    querystring = {
	    "withLeg": "false", "direction": "Arrival", 
	    "withCancelled": "false", "withCodeshared": "false", 
	    "withCargo": "false", "withPrivate": "false", "withLocation": "false"
    }
    headers = {
    	"X-RapidAPI-Key": config_file.AERODATABOX_API_KEY,
    	"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
    }
    response = requests.get(url, headers=headers, params=querystring)
    return response


def create_arrivals_df(response: requests.models.Response, icao_code: str):
    arrivals_json = response.json()["arrivals"]
    arrivals_df = pd.json_normalize(arrivals_json)
    arrivals_df["icao"] = icao_code
    new_columns_dict = {
        "icao": "icao",
        "number": "flight_number", 
        "movement.scheduledTime.utc": "arrival_time_utc", 
        "movement.scheduledTime.local": "arrival_time_local",
        "airline.name": "airline", 
        "movement.airport.name": "departing_airport", 
        "movement.airport.icao": "departing_airport_icao"
    }
    old_columns = list(new_columns_dict.keys())
    arrivals_df_clean = arrivals_df[old_columns].rename(columns=new_columns_dict)
    return arrivals_df_clean


def get_arrivals_data(icao_codes: list):
    # Initiate empty list:
    arrivals_list = []
    # Get start & end date:
    date_range = create_date_range()

    for icao in icao_codes:
        for day_time in ['morning', 'evening']:
            response = get_arrivals_response(icao_code=icao, date_range=date_range, day_time=day_time)
            if response.status_code != 200:
                continue
            arrivals_df = create_arrivals_df(response, icao_code=icao)
            arrivals_list.append(arrivals_df)
        arrivals = pd.concat(arrivals_list)
    return arrivals.reset_index(drop=True)


def clean_arrivals(arrivals_df: pd.DataFrame):
    arrivals_df_clean = arrivals_df.copy().rename(columns={
        "icao": "arrival_icao", 
        "arrival_time_local": "arrival_time", 
        "departing_airport_icao": "departure_icao"
    })[["flight_number", "arrival_icao", "arrival_time", "departure_icao"]]
    arrivals_df_clean["arrival_time"] = pd.to_datetime(arrivals_df_clean["arrival_time"].str[:-6]) # remove the addition (e.g. +01:00)
    return arrivals_df_clean

#### Get airports and `icao_codes`

In [2]:
gans_airports_data = pd.read_sql_table("airports", con=connection, columns=['airport_icao'])
icao_codes = list(gans_airports_data["airport_icao"])

NameError: name 'connection' is not defined

#### Get up-to-date `arrivals` data

In [3]:
arrivals_df = get_arrivals_data(icao_codes)
arrivals_df_clean = clean_arrivals(arrivals_df)

NameError: name 'get_arrivals_data' is not defined

#### Push `arrivals` data to the Cloud database

In [10]:
arrivals_df_clean.to_sql("arrivals", if_exists="append", con=connection, index=False)

3544