# Create an automated datapipeline in the cloud using Python

In [1]:
from bs4 import BeautifulSoup
import requests
import pandas as pd
from datetime import datetime, timedelta
import sqlalchemy

In [2]:
# API KEYS
flight_api_key = 'your_flight_api_key'
OWM_key = 'your_openweathermap_key'

## WEB SCRAPING: Collect demographical data
- Grab data from the web about cities: city name, country, latitude, longitude, and population
- How: use the list_of_major_european cities from Wikipedia (https://en.wikipedia.org/wiki/List_of_European_cities_by_population_within_city_limits)

In [None]:
'''
# %% 1. WEB SCRAPING: Collect demographical data
# Grab data from the web about cities: city name, country, latitude, longitude, and population
# How: use the ist_of_major_european cities from wikipedia
city_url = 'https://en.wikipedia.org/wiki/List_of_European_cities_by_population_within_city_limits'
response = requests.get(city_url)
print(response.status_code)
soup = BeautifulSoup(response.content, "html.parser")
# print(soup.prettify)

cities, countries, population = [], [], []
latitude, longitude = [], []
len_table = 297
for i in range(len_table):
    info_i_need = soup.select("table tbody tr td")[i].getText()
    # print(i, i%9, '**',  info_i_need)
    if i % 9 == 1:
        if "[" in info_i_need:
            my_ind = info_i_need.index('[')
            info_i_need = info_i_need[:my_ind]
            # print(info_i_need)
        cities.append(info_i_need)
    elif i % 9 == 2:
        countries.append(info_i_need.replace('\n',''))
    elif i % 9 == 3:
        if "[" in info_i_need:
            my_ind = info_i_need.index('[')
            info_i_need = info_i_need[:my_ind]
        population.append(info_i_need.replace('\n',''))
    elif i % 9 == 7:
        lat = info_i_need.rsplit('/')[-1].rsplit(';')[0].strip()
        latitude.append(lat)
        long = info_i_need.rsplit('/')[-1].rsplit(';')[-1].rsplit('(')[0].strip()
        longitude.append(long)

cities_df = pd.DataFrame({"cities": cities,
                          "countries": countries,
                          "longitude": longitude,
                          "latitude": latitude,
                          "population": population})
# drop some cities:
city_info_df = cities_df.drop_duplicates(subset='countries', keep="first").reset_index(drop=True)
city_info_df.to_csv('/Users/ilkayisik/Desktop/WBS_DS/Chapter05/ilkay/city_info.csv', 
                   index=False)
'''

city_info_df = pd.read_csv('/Users/ilkayisik/Desktop/WBS_DS/Chapter05/ilkay/city_info.csv')
city_info_df.head()

In [None]:
# another method
'''
from bs4 import BeautifulSoup
import requests
city_url='https://en.wikipedia.org/wiki/List_of_European_cities_by_population_within_city_limits'
response = requests.get(city_url)
soup = BeautifulSoup(response.content, "html.parser")

parent = soup.find(class_ = 'wikitable sortable')
children = parent.contents[1]
city_dict = {"City": [], "Country":[], "Population":[]}

for child_ind, child in enumerate(children):
    if((child_ind != 0) & (child_ind%2 == 0)):
        city_dict['City'].append(child.contents[3].get_text("|", strip=True).replace('\n',''))
        city_dict['Population'].append(child.contents[7].get_text("|", strip=True))
        city_dict['Country'].append(child.contents[5].get_text("|", strip=True))
    if(child_ind == 40):
        break 
cities_df = pd.DataFrame(city_dict)
cities_df
'''

## FLIGHT DATA
Every day, the company wants to know which flights will arrive the next day. 

###  Get the list of airports with ICAO code matching the search term

In [6]:
'''
# To obtain the icao codes we use Search airports by free text
url = "https://aerodatabox.p.rapidapi.com/airports/search/term"
headers = {
    "X-RapidAPI-Key": flight_api_key,
    "X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
}
cities = list(city_info_df["cities"])

airport_df_list = []
for city_name in cities:
    print(city_name)
    querystring = {"q": city_name, "limit": "10"}

    response = requests.request("GET", url, headers=headers, params=querystring)
    result = response.json()
    # print(result)
    temp_df = pd.json_normalize(response.json()['items']).filter(['icao',
                                                                  'iata',
                                                                  'name',
                                                                  'municipalityName',
                                                                  'countryCode'])
    airport_df_list.append(temp_df)

# concat the dataframes in the list:
airports_df = pd.concat(airport_df_list, axis=0, ignore_index=False)

europe_country_codes = ['TR', 'RU', 'GB', 'DE', 'ES', 'IT', 'UA', 'RO', 'FR',
                        'BY', 'AT', 'PL', 'HU', 'RS', 'BG', 'CZ', 'GE', 'NL']
# remove entries if they are not in europe
airports_df = airports_df[airports_df['countryCode'].isin(
    europe_country_codes)].reset_index(drop=True)
# remove entries if the municipality name is not the city
airports_df = airports_df[airports_df['municipalityName'].isin(
    cities)].reset_index(drop=True)

airports_df.to_csv('/Users/ilkayisik/Desktop/WBS_DS/Chapter05/ilkay/airport_names.csv', 
                   index=False)
'''


airports_df = pd.read_csv('/Users/ilkayisik/Desktop/WBS_DS/Chapter05/ilkay/airport_names.csv')
airports_df.head()


Unnamed: 0,icao,iata,name,municipalityName,countryCode
0,LTFM,IST,Istanbul,Istanbul,TR
1,LTFJ,SAW,"Istanbul, Sabiha Gökçen",Istanbul,TR
2,UUEE,SVO,"Moscow, Sheremetyevo",Moscow,RU
3,UUDD,DME,"Moscow, Domodedovo",Moscow,RU
4,UUBW,ZIA,"Moscow, Zhukovsky",Moscow,RU


### Use the icao codes to obtain flight arrival info
To automate this we generate tomorrow’s date and transform it into the format that the particular API endpoint requires

In [None]:
to_local_time = datetime.now().strftime('%Y-%m-%dT%H:00')
from_local_time = (datetime.now() + timedelta(hours=9)).strftime('%Y-%m-%dT%H:00')

querystring = {"withLeg":"true","withCancelled":"true","withCodeshared":"true",
               "withCargo":"true","withPrivate":"false","withLocation":"false"}

headers = {
    'x-rapidapi-host': "aerodatabox.p.rapidapi.com",
    'x-rapidapi-key': flight_api_key
    }

arrival_df_list = []
for icao_code in list(airports_df['icao'])[0:2]:
    print(icao_code)

    url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao_code}/{to_local_time}/{from_local_time}"
    response = requests.request("GET", url, headers=headers, params=querystring)

    try:
        arrival_df = pd.json_normalize(response.json()['arrivals'])
        print(arrival_df.columns)
        filtered_df = arrival_df.filter(['departure.airport.name',
                                         'departure.airport.icao',
                                         'arrival.scheduledTimeLocal',
                                         'status', 'airline.name'])
        print(filtered_df.columns)
        filtered_df['arr_airport_icao'] = icao_code

        filtered_df.rename(columns={'departure.airport.name': 'dep_airport',
                                    'arrival.scheduledTimeLocal': 'arr_sched_loc_time',
                                    'departure.airport.icao': 'dep_airport_icao',
                                    'status': 'status',
                                    'airline.name': 'airline',
                                    'arr_airport_icao': 'arr_airport_icao'
                                    }, inplace=True)

    except:
        pass
    arrival_df_list.append(filtered_df)

# concat the dataframes in the list:
arrivals_df = pd.concat(arrival_df_list, axis=0, ignore_index=False)

In [None]:
arrivals_df.head()

## Collect weather data

In [None]:
weather_df_list = []
for city in list(airports_df.municipalityName.unique()):
    country = list(airports_df[airports_df['municipalityName'] == city]['countryCode'])[0]
    # print(city, country)
    
    response = requests.get(f'http://api.openweathermap.org/data/2.5/forecast/?q={city},{country}&appid={OWM_key}&units=metric&lang=en')
    
    forecast_api = response.json()['list']
    weather_info = []
    
    for forecast_3h in forecast_api:
        weather_hour = {}
        # datetime utc
        weather_hour['datetime'] = forecast_3h['dt_txt']
        # temperature
        weather_hour['temperature'] = forecast_3h['main']['temp']
        # wind
        weather_hour['wind'] = forecast_3h['wind']['speed']
        # probability precipitation
        try:
            weather_hour['prob_perc'] = float(forecast_3h['pop'])
        except: weather_hour['prob_perc'] = 0
        # rain
        try:
            weather_hour['rain_qty'] = float(forecast_3h['rain']['3h'])
        except: weather_hour['rain_qty'] = 0
        # wind
        try:
            weather_hour['snow'] = float(forecast_3h['snow']['3h'])
        except: weather_hour['snow'] = 0
        weather_hour['municipality'] = city
        weather_hour['country'] = country
        weather_hour['municipality_iso_country'] = city + ',' + country
        weather_info.append(weather_hour)

    weather_data = pd.DataFrame(weather_info)
    weather_df_list.append(weather_data)
# concat the dataframes in the list:
weather_df = pd.concat(weather_df_list, axis=0, ignore_index=False)

## Check dataframes

In [None]:
city_info_df.head()

In [None]:
airports_df.head()

In [None]:
arrivals_df.head()

In [None]:
weather_df.head()

## Update data into database

### `sqlalchemy`

#### Establish the connection:

in mysql do:
CREATE DATABASE IF NOT EXISTS gans_02;

In [None]:
schema="gans_02"
host="127.0.0.1"
user="root"
password="your_pw"
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

In [None]:
# my AWS instance
schema="gans_02"
host="wbs-project3-db.c2btp8jeti51.eu-central-1.rds.amazonaws.com"
user="admin"
password="your_pw"
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

#### Update the tables

In [None]:
#### Update the tables
(
city_info_df
    # .dropna()
    .to_sql('city_info', con=con, if_exists='append', index=False)
    )

In [None]:
(
airports_df
    .dropna()
    .to_sql('airports', if_exists='append', con=con, index=False)
)

In [None]:
(
    arrivals_df
    .dropna()
    .to_sql('arrivals', con=con, if_exists='append', index=False)
)

In [None]:
(
    weather_df
    # .dropna()
    .to_sql('weather', con=con, if_exists='append', index=False)
)

## In mysql check that the dataframes are correctly created:
USE gans_02;

SELECT * FROM airports;

SELECT * FROM city_info;

SELECT * FROM arrivals;

SELECT * FROM weather;

### How to retrive data from my mysql instance:
So far we created our database by trnasferring dataframes that we created in Python to mySQL (using df.to_sql)

We might want to do the opposite, namely transferring data from MySQL to python.

To do that we need to make sure that a connection is established between our sql instance and python and then use  _pd.read_sql_

In [None]:

schema="gans_02"
host="127.0.0.1"
user="root"
password="your_pw"
port=3306
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

pd.read_sql('SELECT icao FROM airports', con)[0:2]['icao'].to_list()
 

In [None]:
weather_city_list = pd.read_sql('SELECT municipalityName FROM airports', con)['municipalityName'].to_list()[0:1]
for ind, city in enumerate(weather_city_list):
    country = pd.read_sql('SELECT * FROM airports', con)['countryCode'].iloc[ind]
    print(city, country)
