In this notebook I collected different external data, created and automated pipeline in the cloud.
- Collect data from different sources (flights data flight arrivals with Rapid API, weather data with Pyowm library to request API from OpenWeather and popupation data from Wikipedia using python and BeautifulSoup

- Set up a local database in MySQL

- Move pipeline to the cloud (AWS)

- Automated the above steps


1. collect data from different sources

Flights data (API)

In [77]:
import requests

API_key = "Your API key"

airport_icoa = "EDDB"
to_local_time = "2022-04-06T00:00"
from_local_time = "2022-04-06T12:00"

url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{airport_icoa}/{to_local_time}/{from_local_time}"

querystring = {"withLeg":"true","withCancelled":"true","withCodeshared":"true","withCargo":"true","withPrivate":"false","withLocation":"false"}

headers = {
	"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com",
	"X-RapidAPI-Key": API_key
}

response = requests.request("GET", url, headers=headers, params=querystring)

from IPython.display import JSON
JSON(response.json())

<IPython.core.display.JSON object>

In [75]:
response.status_code

200

In [78]:
for i in response.json():
    print(i)

departures
arrivals


In [79]:
### Option 1
arrivals_berlin = response.json()['arrivals']

def get_flight_info(flight_json):
    # terminal
    try: terminal = flight_json['arrival']['terminal']
    except: terminal = None
    # aircraft
    try: aircraft = flight_json['aircraft']['model']
    except: aircraft = None

    return {
        'dep_airport':flight_json['departure']['airport']['name'],
        'sched_arr_loc_time':flight_json['arrival']['scheduledTimeLocal'],
        'terminal':terminal,
        'status':flight_json['status'],
        'aircraft':aircraft,
        'icao_code':airport_icoa
    }


import pandas as pd
# [get_flight_info(flight) for flight in arrivals_berlin]
arrivals_berlin = pd.DataFrame([get_flight_info(flight) for flight in arrivals_berlin])
arrivals_berlin

Unnamed: 0,dep_airport,sched_arr_loc_time,terminal,status,aircraft,icao_code
0,Duesseldorf,2022-04-06 07:35+02:00,1,Unknown,Airbus A319,EDDB
1,Munich,2022-04-06 07:35+02:00,1,Arrived,Airbus A319,EDDB
2,Stuttgart,2022-04-06 07:29+02:00,,Arrived,Airbus A319,EDDB
3,Stuttgart,2022-04-06 07:40+02:00,1,Unknown,Airbus A319,EDDB
4,Paris,2022-04-06 07:36+02:00,,Arrived,Airbus A320-100/200,EDDB
...,...,...,...,...,...,...
86,London,2022-04-06 11:46+02:00,,Arrived,Embraer EMB 190,EDDB
87,Istanbul,2022-04-06 11:20+02:00,1,Arrived,Boeing 737-800,EDDB
88,Unknown,2022-04-06 11:42+02:00,,Arrived,Airbus A319,EDDB
89,Vienna,2022-04-06 11:36+02:00,,Arrived,Airbus A320-100/200,EDDB


In [41]:
### Option 2
import pandas as pd
arrivals = pd.json_normalize(response.json()['arrivals'])
# next step: select the columns you want to incude on your database
(
arrivals
    .filter(['departure.airport.name','arrival.scheduledTimeLocal',
             'arrival.terminal','status','aircraft.model'])
    .assign(icao_code = airport_icoa)
)

Unnamed: 0,departure.airport.name,arrival.scheduledTimeLocal,arrival.terminal,status,aircraft.model,icao_code
0,Stuttgart,2022-04-06 01:16+02:00,,Arrived,Airbus A320-100/200,EDDB
1,Cologne,2022-04-06 07:25+02:00,1,Arrived,Boeing 737,EDDB
2,Duesseldorf,2022-04-06 07:35+02:00,1,Unknown,Airbus A319,EDDB
3,Munich,2022-04-06 07:35+02:00,1,Arrived,Airbus A319,EDDB
4,Stuttgart,2022-04-06 07:29+02:00,,Arrived,Airbus A319,EDDB
...,...,...,...,...,...,...
86,London,2022-04-06 11:46+02:00,,Arrived,Embraer EMB 190,EDDB
87,Istanbul,2022-04-06 11:20+02:00,1,Arrived,Boeing 737-800,EDDB
88,Unknown,2022-04-06 11:42+02:00,,Arrived,Airbus A319,EDDB
89,Vienna,2022-04-06 11:36+02:00,,Arrived,Airbus A320-100/200,EDDB


Wheather data (API)

### Using `pyowm` library

In [42]:
from pyowm import OWM
from pyowm.utils import config
from pyowm.utils import timestamps
import pandas as pd

OWM_key = 'Your OWM key'

city = "Berlin"
country = "DE"

owm = OWM(OWM_key)
mgr = owm.weather_manager()

# Will it be clear tomorrow at this time in Milan (Italy) ?
forecast = mgr.forecast_at_place(city + ',' + country, '3h')
# answer = forecast.will_be_clear_at(timestamps.tomorrow())

from numpy import nan

def forecast_extraction(forecast_object): 
    """
    Given a forecast object from Open weathor Pyhton API, 
    return a structured dataframe. 

    :param: forecast object
    :returns: a dataframe with specific weather information
    """
    forecast_df = pd.json_normalize({
        'time' : forecast_object.reference_time(timeformat='date'), 
        'temperature' : forecast_object.temperature(unit='celsius'), 
        'precipitation_probabily' : forecast_object.precipitation_probability, 
        'rain' : forecast_object.rain, 
        'humidity' : forecast_object.humidity, 
        'status' : forecast_object.status, 
        'snow' : [nan if forecast_object.snow is None else forecast_object.snow], 
        'wind' : forecast_object.wind('km_hour')
    })

    return(forecast_df)

weather_forecast = pd.concat(
    [forecast_extraction(weather) for weather in forecast.forecast.weathers]
)

weather_forecast.head()

Unnamed: 0,time,precipitation_probabily,humidity,status,snow,temperature.temp,temperature.temp_kf,temperature.temp_max,temperature.temp_min,temperature.feels_like,rain.3h,wind.speed,wind.deg,wind.gust
0,2022-04-07 09:00:00+00:00,0.2,69,Rain,[{}],12.43,-273.04,12.43,12.32,11.53,0.15,32.796,216,54.108
0,2022-04-07 12:00:00+00:00,0.41,68,Rain,[{}],12.18,-272.65,12.18,11.68,11.23,0.47,36.792,208,74.844
0,2022-04-07 15:00:00+00:00,1.0,71,Rain,[{}],10.66,-272.27,10.66,9.78,9.64,3.1,30.924,251,62.028
0,2022-04-07 18:00:00+00:00,1.0,65,Rain,[{}],8.68,-273.15,8.68,8.68,4.72,0.19,32.148,246,65.448
0,2022-04-07 21:00:00+00:00,0.27,68,Rain,[{}],7.37,-273.15,7.37,7.37,2.79,0.16,35.1,242,68.04


Connecting to OWM API

In [43]:
# achieve the same result with the wather api
#response = requests.get(f'http://api.openweathermap.org/data/2.5/forecast/?q={city},{country}&appid={OWM_key}&units=metric&lang=en')
#response
#from IPython.display import JSON
#JSON(response.json())

<IPython.core.display.JSON object>

In [45]:
forecast_api = response.json()['list']
# look for the fields that could ve relevant: 
# better field descriptions https://www.weatherbit.io/api/weather-forecast-5-day

weather_info=[]

# datetime, temperature, wind, prob_perc, rain_qty, snow = [], [], [], [], [], []
for forecast_3h in forecast_api: 
    weather_hour = {}
    # datetime utc
    weather_hour['datetime'] = forecast_3h['dt_txt']
    # temperature 
    weather_hour['temperature'] = forecast_3h['main']['temp']
    # wind
    weather_hour['wind'] = forecast_3h['wind']['speed']
    # probability precipitation 
    try: weather_hour['prob_perc'] = float(forecast_3h['pop'])
    except: weather_hour['prob_perc'] = 0
    # rain
    try: weather_hour['rain_qty'] = float(forecast_3h['rain']['3h'])
    except: weather_hour['rain_qty'] = 0
    # wind 
    try: weather_hour['snow'] = float(forecast_3h['snow']['3h'])
    except: weather_hour['snow'] = 0
    weather_hour['municipality_iso_country'] = city + ',' + country
    weather_info.append(weather_hour)
    
weather_data = pd.DataFrame(weather_info)
weather_data.head()

Unnamed: 0,datetime,temperature,wind,prob_perc,rain_qty,snow,municipality_iso_country
0,2022-04-07 09:00:00,12.41,9.11,0.2,0.15,0,"Berlin,DE"
1,2022-04-07 12:00:00,12.17,10.22,0.41,0.47,0,"Berlin,DE"
2,2022-04-07 15:00:00,10.66,8.59,1.0,3.1,0,"Berlin,DE"
3,2022-04-07 18:00:00,8.68,8.93,1.0,0.19,0,"Berlin,DE"
4,2022-04-07 21:00:00,7.37,9.75,0.27,0.16,0,"Berlin,DE"


Population data (Beautiful Soup)

In [46]:
import requests
import bs4
from bs4 import BeautifulSoup as bs
import pandas as pd
import unicodedata

# cities = ['Berlin', 'Hamburg', 'Frankfurt','Munich','Stuttgart','Leipzig','Cologne','Dresden','Hannover','Paris', 'Barcelona','Lisbon','Madrid']
cities = ['Berlin','Paris','Amsterdam','Barcelona','Rome','Lisbon','Prague','Vienna','Madrid']

def City_info(soup):
    
    ret_dict = {}
    ret_dict['city'] = soup.h1.get_text()
    
    
    if soup.select_one('.mergedrow:-soup-contains("Mayor")>.infobox-label') != None:
        i = soup.select_one('.mergedrow:-soup-contains("Mayor")>.infobox-label')
        mayor_name_html = i.find_next_sibling()
        mayor_name = unicodedata.normalize('NFKD',mayor_name_html.get_text())
        ret_dict['mayor']  = mayor_name
    
    if soup.select_one('.mergedrow:-soup-contains("City")>.infobox-label') != None:
        j =  soup.select_one('.mergedrow:-soup-contains("City")>.infobox-label')
        area = j.find_next_sibling('td').get_text()
        ret_dict['city_size'] = unicodedata.normalize('NFKD',area)

    if soup.select_one('.mergedtoprow:-soup-contains("Elevation")>.infobox-data') != None:
        k = soup.select_one('.mergedtoprow:-soup-contains("Elevation")>.infobox-data')
        elevation_html = k.get_text()
        ret_dict['elevation'] = unicodedata.normalize('NFKD',elevation_html)
    
    if soup.select_one('.mergedtoprow:-soup-contains("Population")') != None:
        l = soup.select_one('.mergedtoprow:-soup-contains("Population")')
        c_pop = l.findNext('td').get_text()
        ret_dict['city_population'] = c_pop
    
    if soup.select_one('.infobox-label>[title^=Urban]') != None:
        m = soup.select_one('.infobox-label>[title^=Urban]')
        u_pop = m.findNext('td')
        ret_dict['urban_population'] = u_pop.get_text()

    if soup.select_one('.infobox-label>[title^=Metro]') != None:
        n = soup.select_one('.infobox-label>[title^=Metro]')
        m_pop = n.findNext('td')
        ret_dict['metro_population'] = m_pop.get_text()
    
    if soup.select_one('.latitude') != None:
        o = soup.select_one('.latitude')
        ret_dict['lat'] = o.get_text()

    if soup.select_one('.longitude') != None:    
        p = soup.select_one('.longitude')
        ret_dict['long'] = p.get_text()
    
    return ret_dict

list_of_city_info = []
for city in cities:
    url = 'https://en.wikipedia.org/wiki/{}'.format(city)
    web = requests.get(url,'html.parser')
    soup = bs(web.content)
    list_of_city_info.append(City_info(soup))
df_cities = pd.DataFrame(list_of_city_info)
# df_cities = df_cities.set_index('city')
df_cities

Unnamed: 0,city,mayor,city_size,elevation,city_population,urban_population,metro_population,lat,long
0,Berlin,Franziska Giffey (SPD),891.7 km2 (344.3 sq mi),34 m (112 ft),3769495,4473101,6144600,52°31′12″N,13°24′18″E
1,Paris,Anne Hidalgo (PS),,28–131 m (92–430 ft) (avg. 78 m or 256 ft),2165423,10785092,13024518,48°51′24″N,2°21′08″E
2,Amsterdam,Femke Halsema (GL),,−2 m (−7 ft),905234,1558755,,52°22′N,4°54′E
3,Barcelona,Ada Colau Ballano[1] (Barcelona en Comú),101.4 km2 (39.2 sq mi),12 m (39 ft),1620343,"4,840,000[3]","5,474,482[4]",41°23′N,2°11′E
4,Rome,Strong Mayor–Council,"4,342,212[2]",21 m (69 ft),1st in Italy (3rd in the EU),,Rome Capital,41°53′36″N,12°28′58″E
5,Lisbon,Carlos Moedas,,2 m (7 ft),"544,851[1]","2,719,000[4]","2,871,133[2][3]",38°43′31″N,9°09′00″W
6,Prague,Zdeněk Hřib (Pirates),,,1335084,,"2,709,418[4]",50°5′N,14°25′E
7,Vienna,Michael Ludwig (SPÖ),,"151 (Lobau) – 542 (Hermannskogel) m (495–1,778...",1st in Austria (6th in EU),"1,911,191 (01−01−20)",2600000,48°12′N,16°22′E
8,Madrid,José Luis Martínez-Almeida (PP),,"650 m (2,130 ft)",3223334,"6,345,000 (2,019)[3]","6,791,667 (2,018)[2]",40°25′N,3°43′W


 Airports data (Imported)

In [47]:
import pandas as pd

airports_cities = (
pd.read_csv('airports.csv')
    .query('type == "large_airport"')
    .filter(['name','latitude_deg','longitude_deg','iso_country','iso_region','municipality','gps_code','iata_code'])
    .rename(columns={'gps_code':'icao_code'})
    .assign(municipality_iso_country = lambda x: x['municipality'] + ',' + x['iso_country'])
)
airports_cities.head()

Unnamed: 0,name,latitude_deg,longitude_deg,iso_country,iso_region,municipality,icao_code,iata_code,municipality_iso_country
10890,Honiara International Airport,-9.428,160.054993,SB,SB-CT,Honiara,AGGH,HIR,"Honiara,SB"
12461,Port Moresby Jacksons International Airport,-9.44338,147.220001,PG,PG-NCD,Port Moresby,AYPY,POM,"Port Moresby,PG"
12981,Keflavik International Airport,63.985001,-22.6056,IS,IS-2,Reykjavík,BIKF,KEF,"Reykjavík,IS"
13028,Priština Adem Jashari International Airport,42.5728,21.035801,XK,XK-01,Prishtina,BKPR,PRN,"Prishtina,XK"
17254,Guodu Air Base,36.001741,117.63201,CN,CN-37,"Xintai, Tai'an",,,"Xintai, Tai'an,CN"


In [None]:
airports_cities.query('municipality == "Berlin"')

Check all tables

In [None]:
arrivals_berlin.head()

In [None]:
arrivals_berlin.info()

In [None]:
weather_data.head()

In [None]:
weather_data.info()

In [None]:
airports_cities.head()

In [None]:
airports_cities.info()

Clean table cities

In [55]:
cities = airports_cities.filter(['municipality','iso_country','municipality_iso_country']).drop_duplicates()
cities.head()

Unnamed: 0,municipality,iso_country,municipality_iso_country
10890,Honiara,SB,"Honiara,SB"
12461,Port Moresby,PG,"Port Moresby,PG"
12981,Reykjavík,IS,"Reykjavík,IS"
13028,Prishtina,XK,"Prishtina,XK"
17254,"Xintai, Tai'an",CN,"Xintai, Tai'an,CN"


In [56]:
airports_cities.merge(arrivals_berlin, on='icao_code', how='inner').merge(weather_data, on='municipality_iso_country', how='inner').head()

Unnamed: 0,name,latitude_deg,longitude_deg,iso_country,iso_region,municipality,icao_code,iata_code,municipality_iso_country,dep_airport,sched_arr_loc_time,terminal,status,aircraft,datetime,temperature,wind,prob_perc,rain_qty,snow
0,Berlin Brandenburg Airport,52.351389,13.493889,DE,DE-BR,Berlin,EDDB,BER,"Berlin,DE",Stuttgart,2022-04-06 01:16+02:00,,Arrived,Airbus A320-100/200,2022-04-07 09:00:00,12.41,9.11,0.2,0.15,0
1,Berlin Brandenburg Airport,52.351389,13.493889,DE,DE-BR,Berlin,EDDB,BER,"Berlin,DE",Stuttgart,2022-04-06 01:16+02:00,,Arrived,Airbus A320-100/200,2022-04-07 12:00:00,12.17,10.22,0.41,0.47,0
2,Berlin Brandenburg Airport,52.351389,13.493889,DE,DE-BR,Berlin,EDDB,BER,"Berlin,DE",Stuttgart,2022-04-06 01:16+02:00,,Arrived,Airbus A320-100/200,2022-04-07 15:00:00,10.66,8.59,1.0,3.1,0
3,Berlin Brandenburg Airport,52.351389,13.493889,DE,DE-BR,Berlin,EDDB,BER,"Berlin,DE",Stuttgart,2022-04-06 01:16+02:00,,Arrived,Airbus A320-100/200,2022-04-07 18:00:00,8.68,8.93,1.0,0.19,0
4,Berlin Brandenburg Airport,52.351389,13.493889,DE,DE-BR,Berlin,EDDB,BER,"Berlin,DE",Stuttgart,2022-04-06 01:16+02:00,,Arrived,Airbus A320-100/200,2022-04-07 21:00:00,7.37,9.75,0.27,0.16,0


2. Update data into MySQL database

### `sqlalchemy` is the simplest way to connect Python to any SQL

Establish the connection

In [64]:
#%pip install sqlalchemy

Note: you may need to restart the kernel to use updated packages.


In [68]:
#import pymysql
#%pip install pymysql

Collecting pymysql
  Downloading PyMySQL-1.0.2-py3-none-any.whl (43 kB)
Installing collected packages: pymysql
Successfully installed pymysql-1.0.2
Note: you may need to restart the kernel to use updated packages.


In [69]:
import pandas as pd
import sqlalchemy

In [66]:
schema="Name of the schema"
host="127.0.0.1"
user="root"
password="Your password*"
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

#### Update the tables

In [70]:
cities.dropna().to_sql('cities', con=con, if_exists='append', index=False)

In [71]:
airports_cities.dropna().to_sql('airports_cities', if_exists='append', con=con, index=False)

In [None]:
# cities.isna().sum()
# cities.dropna().to_sql('cities', if_exists='append', con=con, index=False)
# airports_cities.dropna().to_sql('airports_cities', if_exists='append', con=con, index=False)

In [72]:
weather_data.assign(datetime = lambda x: pd.to_datetime(x['datetime'])).to_sql('weather', if_exists='append', con=con, index=False)

In [73]:
import numpy as np
(
arrivals_berlin
    .replace({np.nan},'unknown')
    .assign(sched_arr_loc_time = lambda x: pd.to_datetime(x['sched_arr_loc_time']))
    .to_sql('arrivals', if_exists='append', con=con, index=False))

#### Run sql queries in our python session

In [74]:
pd.read_sql(
    sql = """
        select * from arrivals
        where status = "Unknown"
    """,
    con = con
)

Unnamed: 0,dep_airport,sched_arr_loc_time,terminal,status,aircraft,icao_code
0,Duesseldorf,2022-04-06 07:35:00,1,Unknown,Airbus A319,EDDB
1,Stuttgart,2022-04-06 07:40:00,1,Unknown,Airbus A319,EDDB
2,Paris,2022-04-06 07:55:00,1,Unknown,Airbus A320,EDDB
3,Saarbrücken,2022-04-06 08:10:00,0,Unknown,ATR 42-300,EDDB
4,Milan,2022-04-06 08:45:00,1,Unknown,Airbus A319,EDDB
5,Oslo,2022-04-06 09:05:00,1,Unknown,Boeing 737-800,EDDB
6,London,2022-04-06 09:10:00,1,Unknown,Airbus A320,EDDB
7,Duesseldorf,2022-04-06 09:15:00,1,Unknown,Airbus A320,EDDB
8,Nápoli,2022-04-06 09:20:00,1,Unknown,Airbus A319,EDDB
9,Cologne,2022-04-06 10:00:00,1,Unknown,Airbus A320,EDDB


3&4. Move pipeline to the cloud (AWS) and automate the above steps using AWS Lambda function https://aws.amazon.com/lambda/
 

The Lambda function 

In [None]:
import json
import pandas as pd
import numpy as np
import requests
# from my_module import get_flight_info

def get_flight_info(flight_json, airport_icoa):
    """
    This function helps to structure json data gathared from flights api
    """
    # terminal
    try: terminal = flight_json['arrival']['terminal']
    except: terminal = None
    # aircraft
    try: aircraft = flight_json['aircraft']['model']
    except: aircraft = None
    return {
        'dep_airport':flight_json['departure']['airport']['name'],
        'sched_arr_loc_time':flight_json['arrival']['scheduledTimeLocal'],
        'terminal':terminal,
        'status':flight_json['status'],
        'aircraft':aircraft,
        'icao_code':airport_icoa
        }

def lambda_handler(event, context):
    # flights api
    airport_icoa = "EDDS"
    to_local_time = "2022-04-06T20:00"
    from_local_time = "2022-04-06T08:00"
    flight_api_key = "Your API key"
    url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{airport_icoa}/{to_local_time}/{from_local_time}"
    querystring = {"withLeg":"true","withCancelled":"true","withCodeshared":"true","withCargo":"true","withPrivate":"false","withLocation":"false"}
    headers = {
        'x-rapidapi-host': "aerodatabox.p.rapidapi.com",
        'x-rapidapi-key': flight_api_key
        }
    response = requests.request("GET", url, headers=headers, params=querystring)
    arrivals_berlin =response.json()['arrivals']
    arrivals_berlin = pd.DataFrame([get_flight_info(flight, airport_icoa) for flight in arrivals_berlin])
    
    # Weather api
    city = "Berlin"
    country = "DE"
    OWM_key = "Your OWM key"
    response = requests.get(f'http://api.openweathermap.org/data/2.5/forecast/?q={city},{country}&appid={OWM_key}&units=metric&lang=en')
    forecast_api = response.json()['list']
    weather_info = []
    for forecast_3h in forecast_api: 
        weather_hour = {}
        # datetime utc
        weather_hour['datetime'] = forecast_3h['dt_txt']
        # temperature 
        weather_hour['temperature'] = forecast_3h['main']['temp']
        # wind
        weather_hour['wind'] = forecast_3h['wind']['speed']
        # probability precipitation 
        try: weather_hour['prob_perc'] = float(forecast_3h['pop'])
        except: weather_hour['prob_perc'] = 0
        # rain
        try: weather_hour['rain_qty'] = float(forecast_3h['rain']['3h'])
        except: weather_hour['rain_qty'] = 0
        # wind 
        try: weather_hour['snow'] = float(forecast_3h['snow']['3h'])
        except: weather_hour['snow'] = 0
        weather_hour['municipality_iso_country'] = city + ',' + country
        weather_info.append(weather_hour)
    weather_data = pd.DataFrame(weather_info)
    
    # inserting data in our RDS
    schema="Name of the schema"
    host="Your RDS Host"
    user="Your User"
    password="Your Password"
    port=3306
    con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'
    # weather data
    (weather_data
     .assign(datetime = lambda x: pd.to_datetime(x['datetime']))
     .to_sql('weather', if_exists='append', con=con, index=False))
    # flight arrivals data
    (arrivals_berlin
     .replace({np.nan},'unknown')
     .assign(sched_arr_loc_time = lambda x: pd.to_datetime(x['sched_arr_loc_time']))
     .to_sql('arrivals', if_exists='append', con=con, index=False))
