### Import Required Libraries

In [1]:
import os
import json
import pymongo
import pandas as pd
import numpy as np
import pymysql
from sqlalchemy import create_engine
import requests

#### Declare & Assign Connection Variables for the MySQL Server & Databases Working With

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "arong"
pwd = "Passw0rd123"

dst_dbname = "ds3002_capstone"

#### Define Functions for Getting Data From and Setting Data into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')

    connection.close()

#### Create New Data Warehouse database, and to Use it, Switch the Connection Context

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.result.ResultProxy at 0x7fcf78005c10>

### Connection to the MongoDB Instance

In [5]:
# pip install pymongo[srv]

In [6]:
host_name = "localhost"
port = "27017"

atlas_cluster_name = "sandbox"
atlas_default_dbname = "sample_airbnb"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

conn_str = {"local" : f"mongodb://{host_name}:{port}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.zibbf.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
}

#### Extract

#### Interogate the MongoDB Atlas instance for the databases it hosts.

In [7]:
client = pymongo.MongoClient(conn_str["atlas"])
client.list_database_names()

['adventure_works',
 'blog',
 'sample_airbnb',
 'sample_analytics',
 'sample_geospatial',
 'sample_mflix',
 'sample_restaurants',
 'sample_supplies',
 'sample_training',
 'sample_weatherdata',
 'admin',
 'local']

#### Connect to the "sample_airbnb" database, and interogate it for the collections it contains.

In [8]:
db_name = "sample_airbnb"

db = client[db_name]
db.list_collection_names()

['listingsAndReviews']

#### Connect the listsAndReviews collection to query data

In [9]:
'''
Columns I am interested in:
    - price, security_deposit, weekly_price, monthly_price, address
- get address (dict-like) by itself and get new df from that 
    - from the address, extract additional data 
'''

'\nColumns I am interested in:\n    - price, security_deposit, weekly_price, monthly_price, address\n- get address (dict-like) by itself and get new df from that \n    - from the address, extract additional data \n'

In [10]:
collection = "listingsAndReviews"
listingsAndReviews = db[collection]

df = pd.DataFrame(list(listingsAndReviews.find()))

#### Transform

In [11]:
df.columns

# extract columns of interest
new_df = df[['name','price','security_deposit', 'weekly_price', 'monthly_price']]

new_df.head(5)


Unnamed: 0,name,price,security_deposit,weekly_price,monthly_price
0,Ribeira Charming Duplex,80.0,200.0,,
1,Horto flat with small garden,317.0,,1492.0,4849.0
2,Ocean View Waikiki Marina w/prkg,115.0,,650.0,2150.0
3,Private Room in Bushwick,40.0,,,
4,Apt Linda Vista Lagoa - Rio,701.0,1000.0,,


In [12]:
df.head(5)
addresses = pd.json_normalize(df['address'])
country_codes = addresses['country_code']

In [13]:
new_df['country_codes'] = country_codes

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [14]:
new_df

Unnamed: 0,name,price,security_deposit,weekly_price,monthly_price,country_codes
0,Ribeira Charming Duplex,80.00,200.00,,,PT
1,Horto flat with small garden,317.00,,1492.00,4849.00,BR
2,Ocean View Waikiki Marina w/prkg,115.00,,650.00,2150.00,US
3,Private Room in Bushwick,40.00,,,,US
4,Apt Linda Vista Lagoa - Rio,701.00,1000.00,,,BR
...,...,...,...,...,...,...
5550,Cozy apartment downtown Porto,58.00,500.00,,,PT
5551,Kadıköy-Altıyol,501.00,,,,TR
5552,The best suite in Copacabana (total privacy),101.00,0.00,,,BR
5553,March Madness Special! Aina Nalu Platinum D107,227.00,200.00,,,US


In [15]:
airports_and_locations = pd.read_csv('/Users/annedarong/Desktop/airport-codes_csv.csv')
airports_and_locations.columns

airports_and_locations = airports_and_locations[['name','type','iso_country']]
airports = airports_and_locations[airports_and_locations['type'].str.contains('airport')]

airports_and_locations.head(5)
airports.head(10)

Unnamed: 0,name,type,iso_country
1,Aero B Ranch Airport,small_airport,US
2,Lowell Field,small_airport,US
3,Epps Airpark,small_airport,US
5,Fulton Airport,small_airport,US
6,Cordes Airport,small_airport,US
7,Goldstone /Gts/ Airport,small_airport,US
8,Williams Ag Airport,small_airport,US
11,Grass Patch Airport,small_airport,US
13,River Oak Airport,small_airport,US
14,Lt World Airport,small_airport,US


## API - Aviation Stack

In [16]:
def get_airline_info(offset: int = 0):
    params = {
    'access_key': 'fcca77556058dc509fe815718612c4f9',
    'offset': offset}
    api_result = requests.get('http://api.aviationstack.com/v1/flights', params)
    return api_result.json()
api_response = get_airline_info()

print(api_response)

{'pagination': {'limit': 100, 'offset': 0, 'count': 100, 'total': 277664}, 'data': [{'flight_date': '2022-05-13', 'flight_status': 'active', 'departure': {'airport': 'Jackson Field', 'timezone': 'Pacific/Port_Moresby', 'iata': 'POM', 'icao': 'AYPY', 'terminal': 'D', 'gate': None, 'delay': None, 'scheduled': '2022-05-13T05:05:00+00:00', 'estimated': '2022-05-13T05:05:00+00:00', 'actual': None, 'estimated_runway': None, 'actual_runway': None}, 'arrival': {'airport': 'Nadzab', 'timezone': 'Pacific/Port_Moresby', 'iata': 'LAE', 'icao': 'AYNZ', 'terminal': 'D', 'gate': None, 'baggage': None, 'delay': None, 'scheduled': '2022-05-13T06:05:00+00:00', 'estimated': '2022-05-13T06:05:00+00:00', 'actual': None, 'estimated_runway': None, 'actual_runway': None}, 'airline': {'name': 'Air Niugini', 'iata': 'PX', 'icao': 'ANG'}, 'flight': {'number': '200', 'iata': 'PX200', 'icao': 'ANG200', 'codeshared': None}, 'aircraft': None, 'live': None}, {'flight_date': '2022-05-13', 'flight_status': 'landed', 'd

In [17]:
def create_df_from_json(json_data):
    return pd.json_normalize(json_data['data'])

flight_df = create_df_from_json(api_response)
flight_df.columns

Index(['flight_date', 'flight_status', 'aircraft', 'live', 'departure.airport',
       'departure.timezone', 'departure.iata', 'departure.icao',
       'departure.terminal', 'departure.gate', 'departure.delay',
       'departure.scheduled', 'departure.estimated', 'departure.actual',
       'departure.estimated_runway', 'departure.actual_runway',
       'arrival.airport', 'arrival.timezone', 'arrival.iata', 'arrival.icao',
       'arrival.terminal', 'arrival.gate', 'arrival.baggage', 'arrival.delay',
       'arrival.scheduled', 'arrival.estimated', 'arrival.actual',
       'arrival.estimated_runway', 'arrival.actual_runway', 'airline.name',
       'airline.iata', 'airline.icao', 'flight.number', 'flight.iata',
       'flight.icao', 'flight.codeshared', 'flight.codeshared.airline_name',
       'flight.codeshared.airline_iata', 'flight.codeshared.airline_icao',
       'flight.codeshared.flight_number', 'flight.codeshared.flight_iata',
       'flight.codeshared.flight_icao', 'aircraft.regi

In [18]:
def clean_df(df):
    filtered_flight = df[['flight_date','departure.airport','arrival.airport','departure.estimated','arrival.estimated']]
    filtered_flight['Estimated_travel_time'] = (pd.to_datetime(filtered_flight['arrival.estimated']) -      pd.to_datetime(filtered_flight['departure.estimated'])).dt.total_seconds()/3600
    return filtered_flight
filtered_flight = clean_df(flight_df)
filtered_flight

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,flight_date,departure.airport,arrival.airport,departure.estimated,arrival.estimated,Estimated_travel_time
0,2022-05-13,Jackson Field,Nadzab,2022-05-13T05:05:00+00:00,2022-05-13T06:05:00+00:00,1.000000
1,2022-05-13,Hasanudin,Timika,2022-05-13T03:05:00+00:00,2022-05-13T07:00:00+00:00,3.916667
2,2022-05-13,Hasanudin,Timika,2022-05-13T03:00:00+00:00,2022-05-13T07:05:00+00:00,4.083333
3,2022-05-13,Beijing Daxing International Airport,Chongqing Jiangbei International,2022-05-13T07:00:00+00:00,2022-05-13T10:00:00+00:00,3.000000
4,2022-05-13,Kerikeri/Bay of Islands,Auckland International,2022-05-13T06:00:00+00:00,2022-05-13T06:45:00+00:00,0.750000
...,...,...,...,...,...,...
95,2022-05-13,Belgrade Nikola Tesla,Rinas Mother Teresa,2022-05-13T00:35:00+00:00,2022-05-13T01:45:00+00:00,1.166667
96,2022-05-13,Oliver Reginald Tambo International (Jan Smuts...,Jomo Kenyatta International,2022-05-13T00:50:00+00:00,2022-05-13T06:10:00+00:00,5.333333
97,2022-05-13,Matei,Nausori,2022-05-13T09:55:00+00:00,2022-05-13T10:55:00+00:00,1.000000
98,2022-05-13,Labasa,Nausori,2022-05-13T09:40:00+00:00,2022-05-13T10:20:00+00:00,0.666667


In [19]:
def concat_df(old,new):
    return pd.concat([old,new])
def apply_pk(df):
    df['Id'] = np.arange(df.shape[0]) + 1
    return df
final_df = filtered_flight
for i in range(9):
    api_response = get_airline_info(100*(i+1))
    df = create_df_from_json(api_response)
    cleaned_df = clean_df(df)
    final_df = concat_df(final_df,cleaned_df)
final_df = apply_pk(final_df)
final_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,flight_date,departure.airport,arrival.airport,departure.estimated,arrival.estimated,Estimated_travel_time,Id
0,2022-05-13,Jackson Field,Nadzab,2022-05-13T05:05:00+00:00,2022-05-13T06:05:00+00:00,1.000000,1
1,2022-05-13,Hasanudin,Timika,2022-05-13T03:05:00+00:00,2022-05-13T07:00:00+00:00,3.916667,2
2,2022-05-13,Hasanudin,Timika,2022-05-13T03:00:00+00:00,2022-05-13T07:05:00+00:00,4.083333,3
3,2022-05-13,Beijing Daxing International Airport,Chongqing Jiangbei International,2022-05-13T07:00:00+00:00,2022-05-13T10:00:00+00:00,3.000000,4
4,2022-05-13,Kerikeri/Bay of Islands,Auckland International,2022-05-13T06:00:00+00:00,2022-05-13T06:45:00+00:00,0.750000,5
...,...,...,...,...,...,...,...
95,2022-05-13,Dalian,Hangzhou,2022-05-13T12:40:00+00:00,2022-05-13T14:55:00+00:00,2.250000,996
96,2022-05-13,Dalian,Shanghai Pudong International,2022-05-13T12:40:00+00:00,2022-05-13T14:45:00+00:00,2.083333,997
97,2022-05-13,Dalian,Linyi,2022-05-13T12:35:00+00:00,2022-05-13T13:45:00+00:00,1.166667,998
98,2022-05-13,Dalian,Dongying,2022-05-13T12:35:00+00:00,2022-05-13T13:40:00+00:00,1.083333,999


In [20]:
new_df = apply_pk(new_df)
new_air_df = apply_pk(airports)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  after removing the cwd from sys.path.


In [21]:
new_df

Unnamed: 0,name,price,security_deposit,weekly_price,monthly_price,country_codes,Id
0,Ribeira Charming Duplex,80.00,200.00,,,PT,1
1,Horto flat with small garden,317.00,,1492.00,4849.00,BR,2
2,Ocean View Waikiki Marina w/prkg,115.00,,650.00,2150.00,US,3
3,Private Room in Bushwick,40.00,,,,US,4
4,Apt Linda Vista Lagoa - Rio,701.00,1000.00,,,BR,5
...,...,...,...,...,...,...,...
5550,Cozy apartment downtown Porto,58.00,500.00,,,PT,5551
5551,Kadıköy-Altıyol,501.00,,,,TR,5552
5552,The best suite in Copacabana (total privacy),101.00,0.00,,,BR,5553
5553,March Madness Special! Aina Nalu Platinum D107,227.00,200.00,,,US,5554


In [22]:
new_air_df

Unnamed: 0,name,type,iso_country,Id
1,Aero B Ranch Airport,small_airport,US,1
2,Lowell Field,small_airport,US,2
3,Epps Airpark,small_airport,US,3
5,Fulton Airport,small_airport,US,4
6,Cordes Airport,small_airport,US,5
...,...,...,...,...
57415,Yanji Chaoyangchuan Airport,medium_airport,CN,39957
57416,Yingkou Lanqi Airport,medium_airport,CN,39958
57417,Shenyang Dongta Airport,medium_airport,CN,39959
57419,Glorioso Islands Airstrip,small_airport,TF,39960


## Load

In [23]:
#Creating tables

airbnb_table = '''
    CREATE TABLE `ds3002_capstone`.`airbnb` (
      `id` INT,
      `name` VARCHAR(200) NOT NULL,
      `airbnb_price` FLOAT NULL,
      `airbnb_security_deposit` FLOAT NULL,
      `airbnb_weekly_price` FLOAT NULL,
      `airbnb_monthly_price` FLOAT NULL,
      `airbnb_country_codes` VARCHAR(200) NOT NULL,
      PRIMARY KEY (`id`));
'''
airports_table = '''
    CREATE TABLE `ds3002_capstone`.`airports` (
      `id` INT,
      `name` VARCHAR(200) NOT NULL,
      `type` VARCHAR(200) NOT NULL,
      `iso_country` VARCHAR(200) NOT NULL,
      PRIMARY KEY (`id`));
      '''

flights_table = '''
    CREATE TABLE `ds3002_capstone`.`flights` (
      `date` DATE,
      `departure_airport` VARCHAR(200) NULL,
      `arrival_airport` VARCHAR(200) NULL,
      `departure_estimated_time` DATETIME,
      `arrival_estimated_time` DATETIME,
      `estimated_travel_time` FLOAT NULL,
      `id` INT,
      PRIMARY KEY (`id`));
      '''

drop_tag = '''DROP TABLE IF EXISTS ds3002_capstone.airbnb;'''

drop_table = '''DROP TABLE IF EXISTS ds3002_capstone.airports;'''

drop_flat = '''DROP TABLE IF EXISTS ds3002_capstone.flights;'''

sqlEngine.execute(drop_tag)
sqlEngine.execute(drop_table)
sqlEngine.execute(drop_flat)

sqlEngine.execute(airbnb_table)
sqlEngine.execute(airports_table)
sqlEngine.execute(flights_table)

<sqlalchemy.engine.result.ResultProxy at 0x7fcf625d1e50>

In [24]:
table_name = 'flights'
pk_column = 'id'
db_operation = 'insert'

set_dataframe(user_id, pwd, host_name, dst_dbname, final_df, table_name, pk_column, db_operation)

In [25]:
table_name = 'airports'
pk_column = 'id'
db_operation = 'insert'

set_dataframe(user_id, pwd, host_name, dst_dbname, airports, table_name, pk_column, db_operation)

In [26]:
table_name = 'airbnb'
pk_column = 'id'
db_operation = 'insert'

set_dataframe(user_id, pwd, host_name, dst_dbname, new_df, table_name, pk_column, db_operation)

In [27]:
final_etl_pipeline_query = '''
SELECT ab.name,ab.country_codes,time_country.avg_estimated_travel_time,ab.security_deposit,ab.weekly_price,ab.monthly_price,ab.price FROM airbnb ab 
LEFT JOIN (SELECT AVG(f.Estimated_travel_time) AS avg_estimated_travel_time,a.iso_country FROM (ds3002_capstone.flights f
LEFT JOIN airports a ON f.`arrival.airport` = a.name)
WHERE a.name is NOT NULL
GROUP BY a.iso_country) as time_country ON ab.country_codes = time_country.iso_country;
'''

r = get_dataframe(user_id, pwd, host_name, dst_dbname, final_etl_pipeline_query)

In [28]:
r

Unnamed: 0,name,country_codes,avg_estimated_travel_time,security_deposit,weekly_price,monthly_price,price
0,Ribeira Charming Duplex,PT,,200.00,,,80.00
1,Horto flat with small garden,BR,1.000000,,1492.00,4849.00,317.00
2,Ocean View Waikiki Marina w/prkg,US,1.412963,,650.00,2150.00,115.00
3,Private Room in Bushwick,US,1.412963,,,,40.00
4,Apt Linda Vista Lagoa - Rio,BR,1.000000,1000.00,,,701.00
...,...,...,...,...,...,...,...
5550,Cozy apartment downtown Porto,PT,,500.00,,,58.00
5551,Kadıköy-Altıyol,TR,2.155556,,,,501.00
5552,The best suite in Copacabana (total privacy),BR,1.000000,0.00,,,101.00
5553,March Madness Special! Aina Nalu Platinum D107,US,1.412963,200.00,,,227.00


In [29]:
airbnb_travel = r[~np.isnan(r['avg_estimated_travel_time'])]
airbnb_travel = apply_pk(airbnb_travel)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  after removing the cwd from sys.path.


In [30]:
airbnb_travel_table = '''
    CREATE TABLE `ds3002_capstone`.`airbnb_travel` (
      `id` INT,
      `name` VARCHAR(200) NOT NULL,
      `country_codes` VARCHAR(2) NOT NULL,
      `avg_estimated_travel_time` FLOAT NOT NULL,
      `security_deposit` FLOAT NULL,
      `weekly_price` FLOAT NULL,
      `monthly_price` FLOAT NULL,
      `price` FLOAT NOT NULL,
      PRIMARY KEY (`id`));
'''

drop_airbnb_travel = '''DROP TABLE IF EXISTS ds3002_capstone.airbnb_travel;'''

sqlEngine.execute(drop_airbnb_travel)

sqlEngine.execute(airbnb_travel_table)


<sqlalchemy.engine.result.ResultProxy at 0x7fcf6c6944d0>

In [31]:
table_name = 'airbnb_travel'
pk_column = 'id'
db_operation = 'insert'

set_dataframe(user_id, pwd, host_name, dst_dbname, airbnb_travel, table_name, pk_column, db_operation)