## Setup

In [168]:
import os
import sys
import logging
import pandas as pd
from google.cloud import bigquery
from hashlib import md5
from typing import List


# **** SETUP ****
PROJECT_NAME = "deb-01-372112"
DATASET_NAME = "tickets"
DATA_DIR = "./data/"
DEFAULT_TICKETS_FILE = os.path.join(DATA_DIR, "tickets.json")

# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'airlines': {
        'table_name': 'airlines',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('icao', 'string', mode='REQUIRED'),
            bigquery.SchemaField('callsign', 'string', mode='REQUIRED'),
            bigquery.SchemaField('country', 'string', mode='REQUIRED'),
        ],
    },
    'airports': {
        'table_name': 'airports',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('country', 'string', mode='NULLABLE'),
            bigquery.SchemaField('name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('icao', 'string', mode='REQUIRED'),
            bigquery.SchemaField('latitude', 'float', mode='REQUIRED'),
            bigquery.SchemaField('longitude', 'float', mode='REQUIRED'),
            bigquery.SchemaField('altitude', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('tz_timezone', 'string', mode='REQUIRED')
        ],
    },
  'passengers': {
        'table_name': 'passengers',
        'schema': [
            bigquery.SchemaField('passenger_sk', 'string', mode='REQUIRED'),
            bigquery.SchemaField('first_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('last_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('gender', 'string', mode='REQUIRED'),
            bigquery.SchemaField('birth_date', 'string', mode='REQUIRED'),
            bigquery.SchemaField('email', 'string', mode='REQUIRED'),
            bigquery.SchemaField('street', 'string', mode='REQUIRED'),
            bigquery.SchemaField('city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('state', 'string', mode='REQUIRED'),
            bigquery.SchemaField('zip', 'string', mode='REQUIRED'),
            bigquery.SchemaField('start_date', 'string', mode='REQUIRED'),
            bigquery.SchemaField('end_date', 'string', mode='REQUIRED'),

        ]
    }  
}


# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
client = bigquery.Client()

logger.info(f"Setup Completed")

[DEBUG][2023-01-13 15:38:59,202][406539018:0078] : Creating bigquery client
[INFO ][2023-01-13 15:38:59,211][406539018:0081] : Setup Completed


## Create Tickets' BQ Dataset

In [139]:
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created Tickets dataset: {dataset.full_dataset_id}")

[INFO ][2023-01-13 15:08:14,532][2665781056:0006] : Created Tickets dataset: deb-01-372112:tickets


## Load Original Tickets File

In [140]:
import json 
# receipts data file name
filename = DEFAULT_TICKETS_FILE
logger.debug(f"attempting to process: {filename}")

# *** always perform checks first ***
# check if the file exists
assert os.path.exists(filename), f"Data file does not exists: '{filename}'"
# check if the file contains any data. Header row alone is 78 bytes. size > 78
assert os.path.getsize(filename) > 78, f"Data file size incorrect; does not seem to contain data: '{filename}'"

# load into dataframe
with open('./data/tickets.json') as data_file:    
    data = [json.loads(line) for line in data_file]

df = pd.json_normalize(data)
logger.info(f"loaded {len(df.index)} rows from: {filename}")

# *** always perform check at the end ***
# check schema: contains all expected columns?
expected_columns = ['eticket_num', 'confirmation', 'ticket_date', 'price', 'seat', 'status', 'origin', 'destination']
for col in expected_columns:
    assert col in list(df.columns), f"Data file missing required column: {col}"

# assign & remember receipts dataframe
tickets_df = df
display(tickets_df.head(n=10))

[DEBUG][2023-01-13 15:08:17,051][2599063938:0004] : attempting to process: ./data/tickets.json
[INFO ][2023-01-13 15:08:17,652][2599063938:0017] : loaded 4096 rows from: ./data/tickets.json


Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,airline.name,airline.iata,airline.icao,airline.callsign,...,passenger.last_name,passenger.gender,passenger.birth_date,passenger.email,passenger.street,passenger.city,passenger.state,passenger.zip,origin,destination
0,498-938211-0795,ZVFDC4,2022-03-23,723.42,31I,active,China Eastern Airlines,MU,CES,CHINA EASTERN,...,Brown,M,1969-02-17,robert.brown.69@hotmail.com,5007 Thomas Way,Lake Hollystad,DC,20027,,
1,482-850738-6048,IL5GUI,2022-03-23,765.18,29B,active,Hawaiian Airlines,HA,HAL,HAWAIIAN,...,Kent,F,1998-08-05,laura.kent.98@hotmail.com,13991 Davis Village,North Catherineborough,PA,16516,,
2,275-207321-8092,CYEFBC,2022-03-21,753.89,26I,active,Wizz Air,W6,WZZ,WIZZ AIR,...,Tucker,F,1965-01-22,lisa.tucker.65@hotmail.com,04135 Marvin Via,North Kristabury,MA,1093,,
3,246-793315-3102,ZNGPC2,2022-03-22,793.89,15A,active,AirAsia,AK,AXM,ASIAN EXPRESS,...,Yates,NB,1975-03-31,matthew.yates.75@yahoo.com,76045 Samantha Road Suite 111,Lake Jeffrey,DE,19898,,
4,091-128904-1226,MGSBD9,2022-03-24,820.25,17F,active,Xiamen Airlines,MF,CXA,XIAMEN AIR,...,Villanueva,NB,1945-08-14,megan.villanueva.45@hotmail.com,848 Melissa Springs Suite 947,Kellerstad,TX,76177,,
5,115-196069-8963,XFYQC0,2022-03-23,892.69,18C,active,Air New Zealand,NZ,ANZ,NEW ZEALAND,...,Hall,NB,1944-08-31,sarah.hall.44@gmail.com,75420 Michael Mountains Suite 485,New Victoria,HI,96727,,
6,396-673460-1326,N5UOOZ,2022-03-23,889.53,3C,active,Jeju Air,7C,JJA,JEJU AIR,...,Thompson,M,1968-05-02,seth.thompson.68@yahoo.com,22455 Higgins Junction Apt. 042,New Keith,OR,97405,,
7,380-894599-8109,PAA19Y,2022-03-22,706.78,7D,active,American Airlines,AA,AAL,AMERICAN,...,Garcia,F,1950-02-12,jennifer.garcia.50@gmail.com,6607 Sharp Common,Chadstad,VA,22121,,
8,614-960971-2686,EF4BHJ,2022-03-23,486.4,24J,active,Juneyao Airlines,HO,DKH,JUNEYAO AIRLINES,...,Clark,F,1991-11-09,becky.clark.91@gmail.com,691 Jones Cliffs,Michaelburgh,TX,76003,,
9,481-321233-0702,FVM9EE,2022-03-23,855.93,16A,active,Royal Air Maroc,AT,RAM,ROYALAIR MAROC,...,Cook,M,1976-07-29,ronald.cook.76@hotmail.com,93328 Davis Island,Rodriguezside,MD,21408,,


## Load Airlines

In [150]:
# start from the tickets
df = tickets_df

logger.debug(f"getting uniques products...")

# set of unique columns to return
cols = ['airline.name', 'airline.iata', 'airline.icao', 'airline.callsign', 'airline.country']

# group by unique columns and only select them
df = df.groupby(cols).all()
df = df.reset_index().loc[:, cols]

# rename columns
df = df.rename(columns={'airline.name': 'name', 'airline.iata': 'iata', 'airline.icao': 'icao', 'airline.callsign': 'callsign', 'airline.country': 'country'})
# set index by iata
df = df.set_index(keys='iata')

logger.info(f"airlines dim - found {len(df.index)} rows")
display(df)

[DEBUG][2023-01-13 15:24:49,396][137921925:0004] : getting uniques products...
[INFO ][2023-01-13 15:24:49,687][137921925:0018] : airlines dim - found 48 rows


Unnamed: 0_level_0,name,icao,callsign,country
iata,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AC,Air Canada,ACA,AIR CANADA,Canada
CA,Air China,CCA,AIR CHINA,China
AF,Air France,AFR,AIRFRANS,France
NZ,Air New Zealand,ANZ,NEW ZEALAND,New Zealand
AK,AirAsia,AXM,ASIAN EXPRESS,Malaysia
AS,Alaska Airlines,ASA,Inc.,ALASKA
G4,Allegiant Air,AAY,ALLEGIANT,United States
AA,American Airlines,AAL,AMERICAN,United States
BA,British Airways,BAW,SPEEDBIRD,United Kingdom
9K,Cape Air,KAP,CAIR,United States


## Loading to BigQuery

In [151]:
#function for loading table to BigQuery 
def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema,
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [152]:
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airlines']['table_name']}"
schema = schema=TABLE_METADATA['airlines']['schema']
# load dataframe
load_table(df, client, table_name, schema)

logger.info(f"loaded airlines dim")

[INFO ][2023-01-13 15:25:07,520][1347112994:0030] : loading table: 'deb-01-372112.tickets.airlines'
[INFO ][2023-01-13 15:25:10,882][1347112994:0035] : loaded 48 rows into deb-01-372112:tickets.airlines
[INFO ][2023-01-13 15:25:10,885][2732175864:0007] : loaded airlines dim


## Load Airports

In [101]:
# start from the tickets
df = tickets_df

logger.debug(f"getting unique aiports...")

airports_cols = ['origin.name', 'origin.city', 'origin.country', 'origin.iata', 'origin.icao', 'origin.latitude', 'origin.longitude', 'origin.altitude', 'origin.tz_timezone']

# get unique airports and only selecy them
df = df.groupby(airports_cols).all()
df = df.reset_index().loc[:, airports_cols]

#rename columns
df = df.rename(columns={'origin.name': 'name', 'origin.city': 'city', 'origin.country': 'country', 'origin.iata': 'iata', 'origin.icao': 'icao', 'origin.latitude': 'latitude', 'origin.longitude': 'longitude', 'origin.altitude': 'altitude', 'origin.tz_timezone': 'tz_timezone'})
df = df.set_index(keys='iata')
logger.info(f"customers dim - found {len(df.index)} rows")

display(df)

[DEBUG][2023-01-13 14:18:21,145][2564274586:0004] : getting unique aiports...
[INFO ][2023-01-13 14:18:21,316][2564274586:0015] : customers dim - found 386 rows


Unnamed: 0_level_0,name,city,country,icao,latitude,longitude,altitude,tz_timezone
iata,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
AUH,Abu Dhabi International Airport,Abu Dhabi,United Arab Emirates,OMAA,24.43,54.65,88.0,Asia/Dubai
MAD,Adolfo Suárez Madrid–Barajas Airport,Madrid,Spain,LEMD,40.47,-3.56,1998.0,Europe/Madrid
CWB,Afonso Pena Airport,Curitiba,Brazil,SBCT,-25.53,-49.18,2988.0,America/Sao_Paulo
MCP,Alberto Alcolumbre Airport,Macapa,Brazil,SBMQ,0.05,-51.07,56.0,America/Fortaleza
ABQ,Albuquerque International Sunport,Albuquerque,United States,KABQ,35.04,-106.61,5355.0,America/Denver
...,...,...,...,...,...,...,...,...
ZUH,Zhuhai Jinwan Airport,Zhuhai,China,ZGSD,22.01,113.38,23.0,Asia/Shanghai
MCZ,Zumbi dos Palmares Airport,Maceio,Brazil,SBMO,-9.51,-35.79,387.0,America/Fortaleza
ZRH,Zürich Airport,Zurich,Switzerland,LSZH,47.46,8.55,1416.0,Europe/Zurich
MDQ,Ástor Piazzola International Airport,Mar Del Plata,Argentina,SAZM,-37.93,-57.57,72.0,America/Buenos_Aires


## Loading to BigQuery

In [102]:
# get the table name and schema from TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airports']['table_name']}"
schema = schema=TABLE_METADATA['airports']['schema']
# load the dataframe
load_table(df, client, table_name, schema)
logger.info(f"loaded airports dim")

[INFO ][2023-01-13 14:18:25,425][1319364106:0030] : loading table: 'deb-01-372112.tickets.airports'
[INFO ][2023-01-13 14:18:30,287][1319364106:0035] : loaded 386 rows into deb-01-372112:tickets.airports
[INFO ][2023-01-13 14:18:30,289][339070838:0006] : loaded airports dim


## Load Passengers

In [174]:
# start from the tickets
df = tickets_df

logger.debug(f"getting unique passengers...")

passenger_cols = ['passenger.first_name', 'passenger.last_name', 'passenger.gender', 'passenger.birth_date', 'passenger.email', 'passenger.street', 'passenger.city', 'passenger.state', 'passenger.zip']

# get unique airports and only selecy them
df = df.groupby(passenger_cols).all()
df = df.reset_index().loc[:, passenger_cols]

#rename columns
df = df.rename(columns={'passenger.first_name': 'first_name', 'passenger.last_name': 'last_name', 'passenger.gender': 'gender', 'passenger.birth_date': 'birth_date', 'passenger.email': 'email', 'passenger.street': 'street', 'passenger.city': 'city', 'passenger.state': 'state', 'passenger.zip': 'zip'})

# add start and end date column
df['start_date'] = pd.Timestamp.today().strftime('%Y-%m-%d')
df['end_date'] = 'None'

logger.info(f"passengers dim - found {len(df.index)} rows")

display(df)

[DEBUG][2023-01-13 15:41:10,581][1153150727:0004] : getting unique passengers...
[INFO ][2023-01-13 15:41:10,952][1153150727:0019] : passengers dim - found 32 rows


Unnamed: 0,first_name,last_name,gender,birth_date,email,street,city,state,zip,start_date,end_date
0,Annette,Hawkins,F,1943-07-11,annette.hawkins.43@yahoo.com,361 Robinson Green Apt. 635,North Lynntown,NV,89825,2023-01-13,
1,Autumn,Morse,F,1960-01-18,autumn.morse.60@hotmail.com,6984 Price Shoals,Erictown,HI,96818,2023-01-13,
2,Becky,Clark,F,1991-11-09,becky.clark.91@gmail.com,691 Jones Cliffs,Michaelburgh,TX,76003,2023-01-13,
3,Belinda,Cook,F,1991-01-26,belinda.cook.91@hotmail.com,1965 Kelly Field Apt. 094,Jonesberg,IL,60613,2023-01-13,
4,Carl,Wilson,M,1980-04-24,carl.wilson.80@hotmail.com,2814 Houston Hills,Rodriguezside,IA,51971,2023-01-13,
5,Cheryl,Hughes,NB,1945-05-20,cheryl.hughes.45@gmail.com,00992 Garcia Plaza Suite 367,North Chelseamouth,CT,6315,2023-01-13,
6,Christian,Stevenson,NB,1993-06-14,christian.stevenson.93@hotmail.com,75945 Jennifer Loaf,Pooleland,KY,40009,2023-01-13,
7,Corey,Cook,M,1983-06-14,corey.cook.83@gmail.com,9606 Barton Station Apt. 271,Jacquelinemouth,IN,47081,2023-01-13,
8,Danielle,Henderson,NB,1970-08-11,danielle.henderson.70@hotmail.com,7389 Alec Squares Suite 508,Port Jonathan,NM,87320,2023-01-13,
9,Hannah,Smith,F,1966-07-01,hannah.smith.66@gmail.com,230 Donna Street,Lake Adrianstad,MN,56413,2023-01-13,


## Create passenger_sk for passenger table

In [175]:
from hashlib import md5


# first define a generic function that returns the md4 hash for
# any combination of values
def get_hash(*passenger_cols) -> str:
    """return the md5 hash of all parameters"""
    value = '-'.join([str(x) for x in passenger_cols])
    return md5(value.encode(encoding='utf-16')).hexdigest()



logger.info(f"assigning passenger_sk: using md5 hash of passenger name")

# customer_id = md5 hash of customer first and last name
df['passenger_sk'] = df.apply(lambda row: get_hash(row.first_name, row.last_name), axis=1)
# set index by customer_id
df = df.set_index(keys='passenger_sk')

logger.info(f"passenger_sk generated")

display(df)

[INFO ][2023-01-13 15:41:16,924][3072027600:0013] : assigning passenger_sk: using md5 hash of passenger name
[INFO ][2023-01-13 15:41:16,935][3072027600:0020] : passenger_sk generated


Unnamed: 0_level_0,first_name,last_name,gender,birth_date,email,street,city,state,zip,start_date,end_date
passenger_sk,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
c1008b4454be4c659135a9993e56c205,Annette,Hawkins,F,1943-07-11,annette.hawkins.43@yahoo.com,361 Robinson Green Apt. 635,North Lynntown,NV,89825,2023-01-13,
3c05d5db07d4faa9e736c16bf19caa9c,Autumn,Morse,F,1960-01-18,autumn.morse.60@hotmail.com,6984 Price Shoals,Erictown,HI,96818,2023-01-13,
db0a3fa4d325c59c93b89ef0ed73d54e,Becky,Clark,F,1991-11-09,becky.clark.91@gmail.com,691 Jones Cliffs,Michaelburgh,TX,76003,2023-01-13,
c5958ca0f84695bdbc94bb12198b5a32,Belinda,Cook,F,1991-01-26,belinda.cook.91@hotmail.com,1965 Kelly Field Apt. 094,Jonesberg,IL,60613,2023-01-13,
7788ac3bbc2e2a19506636d001740279,Carl,Wilson,M,1980-04-24,carl.wilson.80@hotmail.com,2814 Houston Hills,Rodriguezside,IA,51971,2023-01-13,
bdeba4d493281c687762e58abf7bb14a,Cheryl,Hughes,NB,1945-05-20,cheryl.hughes.45@gmail.com,00992 Garcia Plaza Suite 367,North Chelseamouth,CT,6315,2023-01-13,
09024c2893a54f80d84ad22255c2d945,Christian,Stevenson,NB,1993-06-14,christian.stevenson.93@hotmail.com,75945 Jennifer Loaf,Pooleland,KY,40009,2023-01-13,
6a097926ab178272c1ff93a97010d6c7,Corey,Cook,M,1983-06-14,corey.cook.83@gmail.com,9606 Barton Station Apt. 271,Jacquelinemouth,IN,47081,2023-01-13,
d5bb7b4375d4a721564d9380659a07bd,Danielle,Henderson,NB,1970-08-11,danielle.henderson.70@hotmail.com,7389 Alec Squares Suite 508,Port Jonathan,NM,87320,2023-01-13,
be43e80ffa3ab638bec51dfb43fdc2f6,Hannah,Smith,F,1966-07-01,hannah.smith.66@gmail.com,230 Donna Street,Lake Adrianstad,MN,56413,2023-01-13,


## Loading to BigQuery

In [176]:
# get the table name and schema from TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['passengers']['table_name']}"
schema = schema=TABLE_METADATA['passengers']['schema']
# load the dataframe
load_table(df, client, table_name, schema)
logger.info(f"loaded passengers dim")

[INFO ][2023-01-13 15:41:21,399][1347112994:0030] : loading table: 'deb-01-372112.tickets.passengers'
[INFO ][2023-01-13 15:41:26,091][1347112994:0035] : loaded 32 rows into deb-01-372112:tickets.passengers
[INFO ][2023-01-13 15:41:26,093][3525464016:0006] : loaded passengers dim
