In [1]:
import os
import sys
import pandas as pd
import logging
from google.cloud import bigquery
from hashlib import md5
from typing import List

In [2]:
df = pd.read_json('./data/tickets.json', lines=True)

#create airline df
d_airline_df=pd.concat([pd.json_normalize(df['airline'][key]) for key in df.index]).reset_index(drop=True)
d_airline_df = d_airline_df.rename(columns={'iata': 'airline_iata'})
#create a df for d_airports by extrating the dict values into a new table and appending origin airport with dest airport then dropping duplicated to get unique airort iata codes
d_origin_airport_df=pd.json_normalize(df['origin'])
d_destination_df=d_destination_df=pd.json_normalize(df['destination'])
#create passenger df
d_passenger_df=pd.concat([pd.json_normalize(df['passenger'][key]) for key in df.index]).reset_index(drop=True)
#create f_tickets df
f_tickets_df = df[['eticket_num', 'confirmation','ticket_date','price','seat','status']]
f_tickets_df = f_tickets_df.join(d_origin_airport_df[['iata']])
f_tickets_df = f_tickets_df.rename(columns={'iata': 'origin_airport_iata'})
f_tickets_df = f_tickets_df.join(d_destination_df[['iata']])
f_tickets_df = f_tickets_df.rename(columns={'iata': 'dest_airport_iata'})
f_tickets_df = f_tickets_df.join(d_airline_df[['airline_iata']])
f_tickets_df = f_tickets_df.join(d_passenger_df[['email']])
f_tickets_df.head()

Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,origin_airport_iata,dest_airport_iata,airline_iata,email
0,498-938211-0795,ZVFDC4,2022-03-23,723.42,31I,active,YUL,MDW,MU,robert.brown.69@hotmail.com
1,482-850738-6048,IL5GUI,2022-03-23,765.18,29B,active,KWE,MNL,HA,laura.kent.98@hotmail.com
2,275-207321-8092,CYEFBC,2022-03-21,753.89,26I,active,PVR,IBZ,W6,lisa.tucker.65@hotmail.com
3,246-793315-3102,ZNGPC2,2022-03-22,793.89,15A,active,PMC,GDN,AK,matthew.yates.75@yahoo.com
4,091-128904-1226,MGSBD9,2022-03-24,820.25,17F,active,BWI,LGW,MF,megan.villanueva.45@hotmail.com


In [3]:
# **** SETUP ****
# change to match your gcloud project 
PROJECT_NAME = "deb-01-372116"
DATASET_NAME = "airline_ticket_processor"


# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'f_tickets': {
        'table_name': 'f_tickets',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('eticket_num', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airline_iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('confirmation', 'string', mode='NULLABLE'),
            bigquery.SchemaField('ticket_date', 'string', mode='NULLABLE'),
            bigquery.SchemaField('origin_airport_iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('dest_airport_iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('price', 'FLOAT', mode='NULLABLE'),
            bigquery.SchemaField('seat', 'string', mode='NULLABLE'),
            bigquery.SchemaField('status', 'string', mode='NULLABLE'),
            bigquery.SchemaField('passenger_sk', 'STRING', mode='NULLABLE'),
        ],
    },
}


# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
client = bigquery.Client()

logger.info(f"Setup Completed")

[DEBUG][2023-01-10 14:11:02,351][1969854602:0041] : Creating bigquery client
[INFO ][2023-01-10 14:11:02,356][1969854602:0044] : Setup Completed


In [4]:
# d_passengers table query
query = f"""
SELECT
  passenger_sk,
  email
FROM 
  `{PROJECT_NAME}.{DATASET_NAME}.d_passengers`
"""
# query bigquery table into dataframe
pdf = client.query(query).to_dataframe()
# index by product_name for the join
pdf = pdf.set_index(keys='email')
logger.info(f"query {len(pdf.index)} rows from d_passengers table")


# drop product_id and customer_id columns if they already exist
#   this line will allow us to re-run this cell
df = df.drop(columns=['passenger_sk'], errors='ignore')

logger.debug(f"preparing for join")
# join with the main dataframe to get product_id and customer_id
df = f_tickets_df.join(pdf, on='email', how='inner')
logger.info(f"looked up passenger_sk")

# sort back by receipt number
logger.debug(f"sorting back by receipt id to get the rows")
df = df.sort_values(by=['passenger_sk'])


logger.debug(f"dataframe:")
df.head()

[INFO ][2023-01-10 14:11:04,952][3219843830:0013] : query 32 rows from d_passengers table
[DEBUG][2023-01-10 14:11:04,954][3219843830:0020] : preparing for join
[INFO ][2023-01-10 14:11:04,966][3219843830:0023] : looked up passenger_sk
[DEBUG][2023-01-10 14:11:04,967][3219843830:0026] : sorting back by receipt id to get the rows
[DEBUG][2023-01-10 14:11:04,972][3219843830:0030] : dataframe:


Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,origin_airport_iata,dest_airport_iata,airline_iata,email,passenger_sk
3215,894-815591-6709,P66218,2022-03-22,977.24,21A,active,GRR,BOS,W6,rachel.duffy.60@hotmail.com,0beb14f7-9853-496e-98d3-b5b495e0fd65
3001,665-079459-8176,6WAJVB,2022-03-21,204.78,22F,active,BLQ,REL,AT,rachel.duffy.60@hotmail.com,0beb14f7-9853-496e-98d3-b5b495e0fd65
2946,131-453198-8824,ARXPMP,2022-03-23,539.13,19I,active,IST,TDD,9K,rachel.duffy.60@hotmail.com,0beb14f7-9853-496e-98d3-b5b495e0fd65
2883,148-774896-6572,BSW7TA,2022-03-24,740.89,18E,active,SPR,GYE,AK,rachel.duffy.60@hotmail.com,0beb14f7-9853-496e-98d3-b5b495e0fd65
2790,057-550131-2580,ZGYZ8M,2022-03-23,514.84,32A,active,PNZ,SAT,HO,rachel.duffy.60@hotmail.com,0beb14f7-9853-496e-98d3-b5b495e0fd65


In [5]:
#define the load_table function
def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [6]:
#----------------load f_tickets---------------------
# get unique receipts (not line items)
f_tickets = df[['eticket_num','airline_iata', 'confirmation', 'ticket_date','origin_airport_iata', 'dest_airport_iata', 'price','seat','status','passenger_sk']].drop_duplicates(keep='first')
logger.info(f"Preparing {len(f_tickets.index)} unique receipts to load to bigquery")
f_tickets = f_tickets.astype({"passenger_sk": str})

# load to bigquery
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['f_tickets']['table_name']}"
schema = TABLE_METADATA['f_tickets']['schema']
load_table(f_tickets, client, table_name, schema)
display(f_tickets.head())

[INFO ][2023-01-10 14:11:05,131][2896866174:0004] : Preparing 4096 unique receipts to load to bigquery
[INFO ][2023-01-10 14:11:05,135][1547112885:0030] : loading table: 'deb-01-372116.airline_ticket_processor.f_tickets'
[INFO ][2023-01-10 14:11:17,517][1547112885:0035] : loaded 4096 rows into deb-01-372116:airline_ticket_processor.f_tickets


Unnamed: 0,eticket_num,airline_iata,confirmation,ticket_date,origin_airport_iata,dest_airport_iata,price,seat,status,passenger_sk
3215,894-815591-6709,W6,P66218,2022-03-22,GRR,BOS,977.24,21A,active,0beb14f7-9853-496e-98d3-b5b495e0fd65
3001,665-079459-8176,AT,6WAJVB,2022-03-21,BLQ,REL,204.78,22F,active,0beb14f7-9853-496e-98d3-b5b495e0fd65
2946,131-453198-8824,9K,ARXPMP,2022-03-23,IST,TDD,539.13,19I,active,0beb14f7-9853-496e-98d3-b5b495e0fd65
2883,148-774896-6572,AK,BSW7TA,2022-03-24,SPR,GYE,740.89,18E,active,0beb14f7-9853-496e-98d3-b5b495e0fd65
2790,057-550131-2580,HO,ZGYZ8M,2022-03-23,PNZ,SAT,514.84,32A,active,0beb14f7-9853-496e-98d3-b5b495e0fd65
