#### Exercise 1: Create data model 

![Data Model](img/data_model2.png)

#### Exercise 2: 
1. _Load dimensions_  

   a. Create big query client, dataset, and table schema

   b. Create tables
   
   c. Read and transform dimension dataframes from source data

   d. Load to BigQuery

In [1]:
import os
import sys
import pandas as pd
import logging
from typing import List
from google.cloud import bigquery 
 
DATA_DIR = "../data/"
DEFAULT_TICKET_FILE = os.path.join(DATA_DIR, "tickets.json")
PROJECT_NAME = "electric-glyph-372116"
DATASET_NAME = "airlines"
 
# **** TABLE SCHEMAS ****
## Dimension Tables
TABLE_METADATA = {
   'passengers_d': {
       'table_name': 'passengers_d',
       'schema': [
           bigquery.SchemaField('sk_pass_id', 'STRING', mode='REQUIRED'),
           bigquery.SchemaField('email', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('first_name', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('last_name', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('gender', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('birth_date', 'DATE', mode='NULLABLE'),
           bigquery.SchemaField('street', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('city', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('zip', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('state', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('start_date', 'DATE', mode='NULLABLE'),
           bigquery.SchemaField('end_date', 'DATE', mode='NULLABLE')
       ]
   }, 'airlines_d': {
       'table_name': 'airlines_d',
       'schema': [
           bigquery.SchemaField('airline_id', 'STRING', mode='REQUIRED'),
           bigquery.SchemaField('name', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('icao', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('callsign', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('country', 'STRING', mode='NULLABLE')
       ]
   },  'airports_d': {
       'table_name': 'airports_d',
       'schema': [
           bigquery.SchemaField('airport_id', 'STRING', mode='REQUIRED'),
           bigquery.SchemaField('name', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('city', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('country', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('icao', 'STRING', mode='NULLABLE'),
           bigquery.SchemaField('latitude', 'FLOAT', mode='NULLABLE'),
           bigquery.SchemaField('longitude', 'FLOAT', mode='NULLABLE'),
           bigquery.SchemaField('altitude', 'INTEGER', mode='NULLABLE'),
           bigquery.SchemaField('tz_timezone', 'STRING', mode='NULLABLE'),
       ]
   }
}

# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
   format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
   level=logging.INFO,
   stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                 # programmatically reassign the logging level

 
# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
client = bigquery.Client()

logger.info(f"Setup Completed")

# **** CREATE DATASET (IF NEEDED) ****
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created dataset: {dataset.full_dataset_id}")

[DEBUG][2023-01-06 18:01:19,514][2441037682:0069] : Creating bigquery client
[INFO ][2023-01-06 18:01:19,518][2441037682:0072] : Setup Completed
[INFO ][2023-01-06 18:01:20,099][2441037682:0080] : Created dataset: electric-glyph-372116:airlines


In [2]:
# **** CREATE TABLES ****
logger.debug(f"Creating tables:")

for table_id in TABLE_METADATA:
  full_table_id = f"{PROJECT_NAME}.{DATASET_NAME}.{table_id}"
  schema = TABLE_METADATA[f'{table_id}']['schema']
  table = bigquery.Table(full_table_id, schema = schema)
  try:
    client.create_table(table) 
    logger.info(f"Created table: {table_id}")
  except:
    logger.info(f"Did not create table {table_id}. Already exist?")
  # List table Schema
  table_ref = client.get_table(table)
  for column in table_ref.schema:
    print(f"\t{column.name}\t{column.field_type}") 
  print("\n")

[DEBUG][2023-01-06 18:01:20,128][2989774279:0002] : Creating tables:
[INFO ][2023-01-06 18:01:20,408][2989774279:0012] : Did not create table passengers_d. Already exist?
	sk_pass_id	STRING
	first_name	STRING
	last_name	STRING
	gender	STRING
	birth_date	DATE
	email	STRING
	street	STRING
	city	STRING
	state	STRING
	zip	STRING
	start_date	DATE
	end_date	DATE


[INFO ][2023-01-06 18:01:20,791][2989774279:0012] : Did not create table airlines_d. Already exist?
	airline_id	STRING
	name	STRING
	icao	STRING
	callsign	STRING
	country	STRING


[INFO ][2023-01-06 18:01:21,124][2989774279:0012] : Did not create table airports_d. Already exist?
	airport_id	STRING
	name	STRING
	city	STRING
	country	STRING
	icao	STRING
	latitude	FLOAT
	longitude	FLOAT
	altitude	INTEGER
	tz_timezone	STRING




In [3]:
# **** READ/TRANSFORM DIMENSION DATA ****
import json
import datetime as dt

## Read JSON Row File (Nested)
output = []
with open("data/tickets.json", "r") as json_file:
  for line in json_file:
    row = json.loads(line.strip())
    output.append(row)
df = pd.json_normalize(output)
 
# Extract unnested columns and remove prefixes
def extractUnnested(prefix):
  extract = df.filter(regex = prefix)
  extract.columns = [col.split('.')[1] for col in extract.columns]
  return extract

airline_df = extractUnnested('airline\.')
pass_df = extractUnnested('passenger\.')
origin_df = extractUnnested('origin\.')
dest_df = extractUnnested('destination\.')
airports_df = origin_df.append(dest_df)

In [4]:
## Dimension 1: Airlines 
airline_df.set_index('iata', inplace=True)
airline_df.index.rename('airline_id', inplace=True)
airline_df = airline_df.drop_duplicates()

In [5]:
## Dimension 2: Airports
airports_df.set_index('iata', inplace=True)
airports_df.index.rename('airport_id', inplace=True)
airports_df = airports_df.drop_duplicates()

In [6]:
## Dimension 3: Passengers
import shortuuid as su
from datetime import date

pass_df = pass_df.drop_duplicates()
pass_df['sk_pass_id'] = [su.uuid() for _ in range(len(pass_df.index))]
pass_df['start_date'] = date.today()
pass_df['end_date'] = None
pass_df['birth_date'] = pd.to_datetime(pass_df['birth_date'])
pass_df.set_index('sk_pass_id', inplace=True)

In [7]:
# **** LOADING DIMENSION TABLES  ****
def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

## Airlines Dimension
# load to bigquery
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airlines_d']['table_name']}"
schema = TABLE_METADATA['airlines_d']['schema']
load_table(airline_df, client, table_name, schema)

## Airports Dimension
# load to bigquery
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airports_d']['table_name']}"
schema = TABLE_METADATA['airports_d']['schema']
load_table(airports_df, client, table_name, schema)

## Passengers Dimension
# load to bigquery
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['passengers_d']['table_name']}"
schema = TABLE_METADATA['passengers_d']['schema']
load_table(pass_df, client, table_name, schema)

[INFO ][2023-01-06 18:01:21,677][600813873:0030] : loading table: 'electric-glyph-372116.airlines.airlines_d'
[INFO ][2023-01-06 18:01:24,605][600813873:0035] : loaded 48 rows into electric-glyph-372116:airlines.airlines_d
[INFO ][2023-01-06 18:01:24,606][600813873:0030] : loading table: 'electric-glyph-372116.airlines.airports_d'
[INFO ][2023-01-06 18:01:29,130][600813873:0035] : loaded 392 rows into electric-glyph-372116:airlines.airports_d
[INFO ][2023-01-06 18:01:29,131][600813873:0030] : loading table: 'electric-glyph-372116.airlines.passengers_d'
[INFO ][2023-01-06 18:01:32,646][600813873:0035] : loaded 32 rows into electric-glyph-372116:airlines.passengers_d


2. _Load Facts_

In [8]:
tickets_df = df[['eticket_num', 
'confirmation', 
'ticket_date',
'passenger.email', # use to lookup sk_pass_id in pass_df
'price', 
'seat', 
'airline.iata', 
'origin.iata', 
'destination.iata'
]]

tickets_df.rename(columns={
  'passenger.email':'email', 
  'airline.iata':'airline_id', 
  'origin.iata':'origin_id',
  'destination.iata':'dest_id'
}, inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


In [9]:
%%bash

pip install db-dtypes



In [12]:
query = f"""
SELECT
  sk_pass_id, 
  email
FROM {PROJECT_NAME}.{DATASET_NAME}.passengers_d
"""
pdf = client.query(query).to_dataframe()
tickets_df = pd.merge(tickets_df, pdf, on = "email").drop("email", axis=1)




Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,airline_id,origin_id,dest_id,sk_pass_id
0,498-938211-0795,ZVFDC4,2022-03-23,723.42,31I,MU,YUL,MDW,iX9KXJhuMrsEgn3sskLYqe
1,858-807236-9962,C1V041,2022-03-22,329.06,33B,AT,EZE,LPA,iX9KXJhuMrsEgn3sskLYqe
2,670-168600-5273,36TZ1W,2022-03-23,696.82,26E,MW,XMN,BUR,iX9KXJhuMrsEgn3sskLYqe
3,947-437384-1202,P06H2N,2022-03-24,218.32,13C,AF,FTE,VCE,iX9KXJhuMrsEgn3sskLYqe
4,471-738372-4392,438YR7,2022-03-22,889.04,27D,DD,RNO,TAO,iX9KXJhuMrsEgn3sskLYqe
...,...,...,...,...,...,...,...,...,...
4091,760-206124-8120,NHFJK3,2022-03-23,372.00,7C,MF,SJC,VIX,JAfGaQxsj3orh8aeVhnuuB
4092,553-043261-6620,B74278,2022-03-24,626.87,28E,MF,JJN,LAS,JAfGaQxsj3orh8aeVhnuuB
4093,531-358995-2051,440THN,2022-03-23,523.76,12H,MW,GOI,MSP,JAfGaQxsj3orh8aeVhnuuB
4094,269-794782-2875,B87BMW,2022-03-23,349.08,21I,HO,LIM,ORY,JAfGaQxsj3orh8aeVhnuuB
