In [None]:
# Install all modules to help with data cleaning and storage
!pip install flatdict==4.0.1
!pip install google-cloud-bigquery
!pip install --upgrade google-cloud-bigquery-storage

In [28]:
# Import get and post requests from library
from requests import get,post
import os
import flatdict
from google.cloud import bigquery
from google.oauth2 import service_account

In [29]:
# documentation - https://aviationstack.com/documentation

In [30]:
# Api keys
# os.environ["API_KEY"]
key="a686bdcebdac602cfbb96ff529bd9cb8"
weather_api_key="0dc74d9415a7e5a964398b39942cf3d9"

In [31]:
# Get info on flights
flights_url = "http://api.aviationstack.com/v1/flights"
airlines_url = "http://api.aviationstack.com/v1/airlines"
# Urls for api detailing current weather and historical weather
current_weather_url = "http://api.aviationstack.com/v1/current"
historical_weather_url = "http://api.aviationstack.com/v1/historical"

In [32]:
# Helper function to get data from the apis
def get_from_api(url,query,api_key=key):
    return get(url+"?access_key="+api_key+query).json()

In [None]:
ba_flights = get_from_api(flights_url, "&airline_iata=BA")
# try getting data current weather in london
london_weather = get_from_api(current_weather_url, "&query=london", weather_api_key)
london_weather

In [None]:
# Not used right now
# [airline for airline in airlines['data'] if airline['airline_name'] == 'British Airways']

In [None]:
ba_flights = get_from_api(flights_url, "&airline_iata=BA&offset=100")

In [33]:
# Use Google's inbuilt functions to load in my keys for my account
credentials = service_account.Credentials.from_service_account_file(
    'credentials.json'
)

In [34]:
# Construct a BigQuery client object
client = bigquery.Client(credentials=credentials)

In [7]:
def create_dataset(dataset_id):
    # Construct a full Dataset ID in the format `project.dataset`.
    dataset_id = "{}.{}".format(client.project, dataset_id)

    # Construct a Dataset object
    dataset = bigquery.Dataset(dataset_id)

    # Specify the geographic location where the dataset should reside
    dataset.location = "US"

    # Create the dataset
    try:
        dataset = client.create_dataset(dataset)  # API request
        print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
    except Exception as e:
        print("Error in creating dataset: ", e)

# Make dataset
create_dataset('weather_data')

Created dataset data-engineering-416410.weather_data


In [26]:
from google.cloud.bigquery import SchemaField

# Use a schema to create a new table for each of the data sources, flights and weather
def setup_bigquery_table():
    # client = bigquery.Client(credentials=credentials)
    table_id = "{}.{}.{}".format('data-engineering-416410', 'flight_data', 'BA flight info updated')
    # table_id = "{}.{}.{}".format('data-engineering-416410', 'weather_data', 'weather info updated')
    schema = [
        SchemaField("Flight_date", "STRING", mode="REQUIRED"),
        SchemaField("Flight_status", "STRING", mode="REQUIRED"),
        SchemaField("Departure_airport", "STRING", mode="REQUIRED"),
        SchemaField("Departure_timezone", "STRING", mode="REQUIRED"),
        SchemaField("Departure_iota", "STRING", mode="REQUIRED"),
        SchemaField("Departure_icao", "DATE", mode="REQUIRED"),
        SchemaField("Departure_terminal", "STRING", mode="REQUIRED"),
        SchemaField("Departure_gate", "INTEGER", mode="REQUIRED"),
        SchemaField("Departure_delay", "INTEGER", mode="REQUIRED"),
        SchemaField("Departure_scheduled", "INTEGER", mode="REQUIRED"),
        SchemaField("Departure_estimated", "INTEGER", mode="REQUIRED"),
        SchemaField("Departure_actual", "STRING", mode="REQUIRED"),
        SchemaField("Departure_estimated_runway", "INTEGER", mode="REQUIRED"),
        SchemaField("Departure_actual_runway", "INTEGER", mode="REQUIRED"),
        SchemaField("Arrival_airport", "INTEGER", mode="REQUIRED"),
        SchemaField("Arrival_timezone", "INTEGER", mode="REQUIRED"),
        SchemaField("Arrival_iata", "INTEGER", mode="REQUIRED"),
        SchemaField("Arrival_icao", "INTEGER", mode="REQUIRED"),
        SchemaField("Arrival_terminal", "INTEGER", mode="REQUIRED"),
        SchemaField("Arrival_gate", "STRING", mode="REQUIRED"),
        SchemaField("Arrival_baggage", "STRING", mode="REQUIRED"),
        SchemaField("Arrival_scheduled", "STRING", mode="REQUIRED"),
        SchemaField("Arrival_estimated", "STRING", mode="REQUIRED"),
        SchemaField("Arrival_actual", "STRING", mode="REQUIRED"),
        SchemaField("Arrival_estimated_runway", "STRING", mode="REQUIRED"),
        SchemaField("Arrival_actual_runway", "STRING", mode="REQUIRED"),
        SchemaField("Airline_name", "STRING", mode="REQUIRED"),
        SchemaField("Airline_icao", "STRING", mode="REQUIRED"),
        SchemaField("Flight_code_shared_airline_name", "STRING", mode="REQUIRED"),
    ]
    
    table = bigquery.Table(table_id, schema=schema)
    try:
        client.create_table(table)
        print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
    except Exception as e:
        print("Table creation encountered an error: ", e)

# Call this function to make the table for weather data
setup_bigquery_table()

Created table data-engineering-416410.weather_data.weather info updated


# Process

In [35]:
# Send the rows to a bigquery table, depending on the table id provided. if no id is provided it will default to flight data
def send_to_bigquery(rows_to_insert, table_id="data-engineering-416410.flight_data.BA flight info"):

    errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
    if not errors:
        print("New rows have been added.")
    else:
        print("Encountered errors while inserting rows: {}".format(errors))

In [36]:
# Flatten a deeply nested entry
def flatten(row):
    return dict(flatdict.FlatDict(row, delimiter="__"))

# This will format the times to be used in Google BigQuery
def fix_datetime(dictionary):
    dict_copy = dictionary.copy()
    for entry in dict_copy.items():
        if entry[0] in ["departure__scheduled","departure__estimated","departure__actual","departure__estimated_runway","departure__actual_runway","arrival__scheduled","arrival__estimated","arrival__actual","arrival__estimated_runway","arrival__actual_runway", "location__localtime"]:
            if entry[1] is not None:
                dict_copy[entry[0]] = entry[1][:-6]
    if dict_copy.get('flight_date'):
        dict_copy['flight_date'] = dict_copy['flight_date']+"T00:00:00"
    # del dict_copy['']
    return dict_copy

In [44]:
def get_flight_data_and_add_to_bigquery():
    for i in range(0,6000,100):
    # for i in range(0,100,100):
        rows_to_insert = []
        flight_page = get_from_api(flights_url,"&airline_iata=BA&dep_iata=LHR&offset="+str(i))
        for item in flight_page['data']:
           rows_to_insert.append(fix_datetime(flatten(item)))
        send_to_bigquery(rows_to_insert)
        
        rows_to_insert = []
        flight_page = get_from_api(flights_url,"&airline_iata=BA&arr_iata=LHR&offset="+str(i))
        for item in flight_page['data']:
           rows_to_insert.append(fix_datetime(flatten(item)))
        send_to_bigquery(rows_to_insert)
get_flight_data_and_add_to_bigquery()

Encountered errors while inserting rows: [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'flight__codeshared', 'debugInfo': '', 'message': 'no such field: flight__codeshared.'}]}, {'index': 1, 'errors': [{'reason': 'invalid', 'location': 'flight__codeshared', 'debugInfo': '', 'message': 'no such field: flight__codeshared.'}]}, {'index': 2, 'errors': [{'reason': 'invalid', 'location': 'flight__codeshared', 'debugInfo': '', 'message': 'no such field: flight__codeshared.'}]}, {'index': 3, 'errors': [{'reason': 'invalid', 'location': 'flight__codeshared', 'debugInfo': '', 'message': 'no such field: flight__codeshared.'}]}, {'index': 4, 'errors': [{'reason': 'invalid', 'location': 'flight__codeshared', 'debugInfo': '', 'message': 'no such field: flight__codeshared.'}]}, {'index': 5, 'errors': [{'reason': 'invalid', 'location': 'flight__codeshared', 'debugInfo': '', 'message': 'no such field: flight__codeshared.'}]}, {'index': 6, 'errors': [{'reason': 'invalid', 'location': 'fligh

KeyboardInterrupt: 

In [20]:
# Get data on weather and then put this data into big query
def get_weather_data_and_add_to_bigquery():
    #for i in range(0,6000,100):
    # Inquire for todays data, due to a limitation on how far back you can request data
    for i in range(0,100,100):
        rows_to_insert = []
        weather_page = get_from_api(current_weather_url, "&query=london", weather_api_key)
        # flight_page = get_from_api(flights_url,"&airline_iata=BA&offset="+str(i))
        rows_to_insert.append(fix_datetime(flatten(weather_page)))
        send_to_bigquery(rows_to_insert, table_id='data-engineering-416410.weather_data.weather info updated')
get_weather_data_and_add_to_bigquery()

Encountered errors while inserting rows: [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'location__localtime', 'debugInfo': '', 'message': 'no such field: location__localtime.'}]}]
