# Extract/Load/Transform 
**SQLite $\rightarrow$ BigQuery**

This notebook steps through the process of ...
1. extracting data from a local SQLite database
2. loading it into a BigQuery dataset
3. transforming the data to suit BigQuery's columnar tech
4. cleaning up ELT artifacts

## **Preliminaries**

In [None]:
# Install required python libaries 
!pip install --user --upgrade google-api-python-client
!pip install --user google-cloud-bigquery
!pip install --user pyarrow
!pip install --upgrade pandas-gbq 'google-cloud-bigquery[bqstorage,pandas]'
!pip install --user pyarrow
!pip install --user fastparquet

## **1. Extract from SQLite**

In [13]:
# Load %%sql magic
%load_ext sql

import pandas as pd
import sqlite3

# Create/Connect to local CDW
%sql sqlite:///CourseDataWarehouse.db

In [97]:
# Export to CSVs for bulk storage
from sqlalchemy import create_engine  

def table_export(tnames, dest):
    engine = create_engine("sqlite:///CourseDataWarehouse.db")
    with engine.connect() as conn, conn.begin():
        for tname in tnames:
            print(tname)
            df = pd.read_sql(tname,conn)
            df.to_csv(f'{dest}/{tname}.csv')
            df.to_parquet(f'{dest}/{tname}.parquet',engine='pyarrow')

            

table_export( ['PROGRAMS_DIM','LOCATIONS_DIM',
             'DAYS_OF_WEEK_UTIL','TERMS_DIM',
             'TIMECODES_DIM','TIME_SEGMENTS_UTIL', 'TIMECODE_SEGMENTS_INTERSECT',
             'INSTRUCTORS_DIM','COURSE_OFFERINGS_DIM',
             'CLASS_MEETING_FACTS','COURSE_SECTION_FACTS'
            ], 
            'TransferFiles')
'TransferFiles')


TIMECODES_DIM


## **2. Load into BigQuery**

### **Authenticate with Google to get BigQuery access**

In [None]:
%%bash
gcloud auth application-default login

### **Use the BigQuery Python API to load the tables**

In [5]:
from google.cloud import bigquery

def load_bq_transfers(tables,project,dataset):
    
    '''
    Loads local data into BigQuery. 
    
    Input Params
    - tables -- a list of table names (one per data file)
    - project -- a BigQuery project name
    - dataset -- the database name
    
    Adapted from https://cloud.google.com/bigquery/docs/batch-loading-data
    '''
    
    # Construct a BigQuery client object
    client = bigquery.Client(project=project)

    # Configure the upload job
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET
    )
    
    # Load each table, one at a time
    for table in tables:
        # Use table name to constuct paths and destination tables
        file_path = f"TransferFiles/{table}.parquet"
        table_id = f"{project}.{dataset}.IMPORT_{table}"
        print(file_path)
        print(table_id)
        
        # Load using the selected file
        with open(file_path, "rb") as source_file:
            job = client.load_table_from_file(source_file, table_id, job_config=job_config)
        job.result()  # Waits for the job to complete.

        # Check that it worked
        table = client.get_table(table_id)  # Make an API request.
        print(
            "Loaded {} rows and {} columns to {}".format(
                table.num_rows, len(table.schema), table_id
            )
        )

# A configurable list of tables
tables = ["CLASS_MEETING_FACTS",
          "COURSE_SECTION_FACTS",
          "COURSE_OFFERINGS_DIM",
          "DAYS_OF_WEEK_UTIL",
          "INSTRUCTORS_DIM",
          "LOCATIONS_DIM",
          "PROGRAMS_DIM",
          "TERMS_DIM",
          "TIMECODES_DIM",
          "TIME_SEGMENTS_UTIL",
          "TIMECODE_SEGMENTS_INTERSECT",
         ]

# The specific project and dataset; in Airflow use environment vars
# project = 'banner-data-366520'
# dataset = 'course_data_warehouse'

project = 'data6510-banner-project'
dataset = 'course_data_warehouse'

# call the function to execute the load
load_bq_transfers(tables,project,dataset)



TransferFiles/CLASS_MEETING_FACTS.parquet
data6510-banner-project.course_data_warehouse.IMPORT_CLASS_MEETING_FACTS
Loaded 509152 rows and 8 columns to data6510-banner-project.course_data_warehouse.IMPORT_CLASS_MEETING_FACTS
TransferFiles/COURSE_SECTION_FACTS.parquet
data6510-banner-project.course_data_warehouse.IMPORT_COURSE_SECTION_FACTS
Loaded 28971 rows and 9 columns to data6510-banner-project.course_data_warehouse.IMPORT_COURSE_SECTION_FACTS
TransferFiles/COURSE_OFFERINGS_DIM.parquet
data6510-banner-project.course_data_warehouse.IMPORT_COURSE_OFFERINGS_DIM
Loaded 28971 rows and 14 columns to data6510-banner-project.course_data_warehouse.IMPORT_COURSE_OFFERINGS_DIM
TransferFiles/DAYS_OF_WEEK_UTIL.parquet
data6510-banner-project.course_data_warehouse.IMPORT_DAYS_OF_WEEK_UTIL
Loaded 7 rows and 3 columns to data6510-banner-project.course_data_warehouse.IMPORT_DAYS_OF_WEEK_UTIL
TransferFiles/INSTRUCTORS_DIM.parquet
data6510-banner-project.course_data_warehouse.IMPORT_INSTRUCTORS_DIM
Loa

## **3. Transform to build BigQuery tables**

In [6]:
# Configure BigQuery magic

%load_ext google.cloud.bigquery
# %env GCLOUD_PROJECT=banner-data-366520
%env GCLOUD_PROJECT=data6510-banner-project

The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery
env: GCLOUD_PROJECT=data6510-banner-project


### **First, A few random BigQuery Quirks**

In moving from SQLite we will need to account for the following BigQuery quirks:
- **Read-optimized;** Writing new data is **much** more expensive than storing it 
- **No explicit primary keys;** essentially, every column is treated like an index anyway
- **No foreign key constraints;** the data is essentially "write once" anyway, so no need to keep checking integrity
- **Uses partitioning and clustering** to segment rows into smaller chunks; only using segments you actually need saves $

### The `TERMS_DIM` Table

In [7]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.TERMS_DIM; 
CREATE TABLE course_data_warehouse.TERMS_DIM (
    -- Note: no PK constraint
    TermID INTEGER NOT NULL, 
    TermCode STRING NOT NULL,
    TermName STRING NOT NULL,
    CalendarYear INTEGER NOT NULL,
    SchoolYearEnd INTEGER NOT NULL,
    SchoolYearText STRING NOT NULL
) AS
SELECT TermID,TermCode,TermName,CalendarYear,SchoolYearEnd,SchoolYearText
FROM course_data_warehouse.IMPORT_TERMS_DIM
ORDER BY TermID;

Query complete after 0.05s: 100%|██████████| 1/1 [00:00<00:00, 381.54query/s] 


### The `PROGRAMS_DIM` Table

In [8]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.PROGRAMS_DIM;
CREATE TABLE course_data_warehouse.PROGRAMS_DIM (
    ProgramID INTEGER NOT NULL,
    ProgCode STRING NOT NULL,
    ProgName STRING NOT NULL,
    School STRING NOT NULL
) AS
SELECT ProgramID, ProgCode,ProgName,School
FROM course_data_warehouse.IMPORT_PROGRAMS_DIM
ORDER BY ProgramID;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 1044.92query/s] 


### The `LOCATIONS_DIM` Table

In [9]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.LOCATIONS_DIM;
CREATE TABLE course_data_warehouse.LOCATIONS_DIM (
    LocationID INTEGER NOT NULL, 
    LocationCode STRING NOT NULL, 
    Building STRING, 
    Room STRING, 
    Capacity INTEGER, 
    SeatsMax INTEGER
) AS
SELECT LocationID, LocationCode, Building, Room, Capacity, SeatsMax
FROM course_data_warehouse.IMPORT_LOCATIONS_DIM
ORDER BY LocationID;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 1337.04query/s] 


### The `DAYS_OF_WEEK_UTIL` Table

In [10]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.DAYS_OF_WEEK_UTIL;
CREATE TABLE course_data_warehouse.DAYS_OF_WEEK_UTIL (
    DayCode STRING NOT NULL, 
    DayShort STRING NOT NULL, 
    DayLong STRING NOT NULL
) AS
SELECT *
FROM course_data_warehouse.IMPORT_DAYS_OF_WEEK_UTIL;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 1040.77query/s]


### The `TIME_SEGMENTS_UTIL` Table

In [11]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.TIME_SEGMENTS_UTIL;
CREATE TABLE course_data_warehouse.TIME_SEGMENTS_UTIL (
    TimeSegmentID INTEGER NOT NULL, 
    StartSegTime TIME NOT NULL, 
    EndSegTime TIME NOT NULL
) AS
SELECT 
    TimeSegmentID, 
    
    -- STRING --> TIME
    parse_time('%R',StartSegTime) AS StartTime, 
    parse_time('%R',EndSegTime) AS EndTime
FROM course_data_warehouse.IMPORT_TIME_SEGMENTS_UTIL;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 1661.11query/s] 


### The `TIMECODES_DIM` Table

In [12]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.TIMECODES_DIM;
CREATE TABLE course_data_warehouse.TIMECODES_DIM (
    TimeCodeID INTEGER NOT NULL, 
    DayCode STRING,
    DayNum INTEGER,
    StartTime TIME, 
    EndTime TIME, 
    DurationMins FLOAT64
) AS
SELECT 
    TimeCodeID, 
    DayCode,DayNum,
    
    -- STRING --> TIME
    parse_time('%T',StartTime) AS StartTime, 
    parse_time('%T',EndTime) AS EndTime, 
    DurationMins
FROM course_data_warehouse.IMPORT_TIMECODES_DIM;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 848.19query/s] 


### The `TIMECODE_SEGMENTS_INTERSECT` Table

In [13]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.TIMECODE_SEGMENTS_INTERSECT;
CREATE TABLE course_data_warehouse.TIMECODE_SEGMENTS_INTERSECT (
    TimecodeID INTEGER NOT NULL,
    TimeSegmentID INTEGER NOT NULL
) AS
SELECT 
    TimecodeID, 
    TimeSegmentID 
FROM course_data_warehouse.IMPORT_TIMECODE_SEGMENTS_INTERSECT;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 1689.21query/s] 


### The `INSTRUCTORS_DIM` Table

In [14]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.INSTRUCTORS_DIM;
CREATE TABLE course_data_warehouse.INSTRUCTORS_DIM (
    InstructorID INTEGER NOT NULL,
    InstructorName STRING
) AS
SELECT 
    InstructorID,
    InstructorName
FROM course_data_warehouse.IMPORT_INSTRUCTORS_DIM;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 804.59query/s] 


### The `COURSE_OFFERINGS_DIM` Table

In [15]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.COURSE_OFFERINGS_DIM;
CREATE TABLE course_data_warehouse.COURSE_OFFERINGS_DIM (
    CourseOfferingID INTEGER NOT NULL,
    CRN INTEGER NOT NULL, 
    TermCode STRING, 
    CourseCode STRING, 
    Section STRING, 
    CourseTitle STRING, 
    CreditsTxt STRING, 
    CreditsMin FLOAT64, 
    CreditsMax FLOAT64, 
    NumStudents INTEGER, 
    CapStudents INTEGER, 
    ScheduleSpec STRING, 
    DeliveryStyle STRING, 
    ScheduleType STRING
) AS
SELECT 
    CourseOfferingID, 
    CRN, 
    TermCode, 
    CourseCode, 
    Section, 
    CourseTitle, 
    CreditsTxt, 
    CreditsMin, 
    CreditsMax, 
    NumStudents, 
    CapStudents, 
    ScheduleSpec, 
    DeliveryStyle, 
    ScheduleType
FROM course_data_warehouse.IMPORT_COURSE_OFFERINGS_DIM
ORDER BY CourseOfferingID;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 902.58query/s] 


### The `COURSE_SECTION_FACTS` Table

In [17]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.COURSE_SECTION_FACTS;
CREATE TABLE course_data_warehouse.COURSE_SECTION_FACTS (
    CourseSectionFactID INTEGER NOT NULL,
    CourseOfferingID INTEGER NOT NULL,
    ProgramID INTEGER NOT NULL,
    PrimaryInstructorID INTEGER,
    TermID INTEGER,
    FirstClass DATE,
    LastClass DATE,
    NumMeetings INTEGER DEFAULT 0,
    MeetingHours FLOAT64,
    
    SchoolYearEnd INTEGER, -- for partitioning
    TermCode STRING        -- for clustering
) 
PARTITION BY
    -- Each school year until 2040
    RANGE_BUCKET(SchoolYearEnd, GENERATE_ARRAY(2015,2040,1)) 
CLUSTER BY
    -- Each specific term (e.g., 'Fall2014')
    TermCode 
OPTIONS 
    -- Require all queries to specify school years
    (require_partition_filter = TRUE)
AS (
    SELECT 
        CourseSectionFactID,
        CourseOfferingID,
        ProgramID,
        CAST(PrimaryInstructorID AS INTEGER),
        TermID,
    
        -- STRING --> DATE
        parse_date('%F',FirstClass) AS FirstClass,
        parse_date('%F',LastClass) AS LastClass,
        NumMeetings,
        MeetingHours,
        SchoolYearEnd,
        TermCode
    FROM course_data_warehouse.IMPORT_COURSE_SECTION_FACTS
        JOIN course_data_warehouse.TERMS_DIM USING (TermID)
);

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 876.37query/s] 


### The `CLASS_MEETING_FACTS` Table

In [18]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.CLASS_MEETING_FACTS;
CREATE TABLE course_data_warehouse.CLASS_MEETING_FACTS (
    ClassMeetingFactID INTEGER NOT NULL,
    CourseOfferingID INTEGER NOT NULL,
    ProgramID INTEGER NOT NULL,
    PrimaryInstructorID INTEGER,
    LocationID INTEGER,
    TimeCodeID INTEGER,
    TermID INTEGER,
    ClassDate DATE,
    SchoolYearEnd INTEGER,
    TermCode STRING
) 
PARTITION BY
    RANGE_BUCKET(SchoolYearEnd, GENERATE_ARRAY(2015,2040,1))
CLUSTER BY
    TermCode
OPTIONS 
    (require_partition_filter = TRUE)
AS (
    SELECT 
        ClassMeetingFactID,
        CourseOfferingID,
        ProgramID,
    
        -- FLOAT --> INTEGER
        CAST(PrimaryInstructorID AS INTEGER) AS PrimaryInstructorID,
        CAST(LocationID AS INTEGER) AS LocationID,
        CAST(TimeCodeID AS INTEGER) AS TimeCodeID,
        TermID,
    
       -- STRING --> DATE
        parse_date('%F',ClassDate) AS ClassDate,
        
        SchoolYearEnd INTEGER,
        TermCode STRING
    FROM course_data_warehouse.IMPORT_CLASS_MEETING_FACTS
    JOIN course_data_warehouse.TERMS_DIM USING (TermID)
);

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 933.10query/s] 


## **Cleanup**

In [19]:
%%bigquery
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_PROGRAMS_DIM;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_LOCATIONS_DIM;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_DAYS_OF_WEEK_UTIL;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_TERMS_DIM;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_TIMECODES_DIM;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_TIME_SEGMENTS_UTIL;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_TIMECODE_SEGMENTS_INTERSECT;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_INSTRUCTORS_DIM;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_COURSE_OFFERINGS_DIM;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_CLASS_MEETING_FACTS;
DROP TABLE IF EXISTS course_data_warehouse.IMPORT_COURSE_SECTION_FACTS;

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 863.38query/s] 
