## Data Engineering
We begin where all ML use cases do: data engineering. In this section of the demo, we will utilize Snowpark's Python client-side Dataframe API to build an **ELT pipeline**.  We will extract the data from the source system (s3), load it into snowflake and add transformations to clean the data before analysis. 

The data engineer has been told that there is historical data going back to 2013 and new data will be made available at the end of each month. 

Input: Historical bulk data at `https://s3.amazonaws.com/tripdata/`. Incremental data to be loaded one month at a time.  
Output: `trips` table

In [1]:
import snowflake.snowpark as snp
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T

import pandas as pd
from datetime import datetime
import requests
from zipfile import ZipFile
from io import BytesIO
import os

### 1. Load  credentials and connect to Snowflake


We will utilize a simple json file to store our credentials. This should **never** be done in production and is for demo purposes only.

In [2]:
from dags.snowpark_connection import snowpark_connect
session, state_dict = snowpark_connect('./include/state.json')

### 2. Create a stage for loading data to Snowflake

In [3]:
state_dict['load_stage_name']='LOAD_STAGE' 
state_dict['download_base_url']='https://s3.amazonaws.com/tripdata/'
state_dict['trips_table_name']='TRIPS'
state_dict['load_table_name'] = 'RAW_'

import json
with open('./include/state.json', 'w') as sdf:
    json.dump(state_dict, sdf)

In [4]:
def reset_database(session, state_dict:dict, prestaged=False):
    _ = session.sql('CREATE OR REPLACE DATABASE '+state_dict['connection_parameters']['database']).collect()
    _ = session.sql('CREATE SCHEMA '+state_dict['connection_parameters']['schema']).collect() 

    if prestaged:
        sql_cmd = 'CREATE OR REPLACE STAGE '+state_dict['load_stage_name']+\
                  ' url='+state_dict['connection_parameters']['download_base_url']
        _ = session.sql(sql_cmd).collect()
    else: 
        _ = session.sql('CREATE STAGE IF NOT EXISTS '+state_dict['load_stage_name']).collect()

In [5]:
reset_database(session, state_dict)

### 3. Extract:  


Create a list of files to download and upload to stage

In [6]:
import pandas as pd
from datetime import datetime

#For files like 201306-citibike-tripdata.zip
date_range1 = pd.period_range(start=datetime.strptime("201306", "%Y%m"), 
                             end=datetime.strptime("201612", "%Y%m"), 
                             freq='M').strftime("%Y%m")
file_name_end1 = '-citibike-tripdata.zip'
files_to_download = [date+file_name_end1 for date in date_range1.to_list()]


Starting in January 2017 Citibike changed the format of the file name.

In [7]:
#For files like 201701-citibike-tripdata.csv.zip
date_range2 = pd.period_range(start=datetime.strptime("201701", "%Y%m"), 
                             end=datetime.strptime("202112", "%Y%m"), 
                             freq='M').strftime("%Y%m")
file_name_end2 = '-citibike-tripdata.csv.zip'
files_to_download = files_to_download + [date+file_name_end2 for date in date_range2.to_list()]

For development purposes we will start with loading just a couple of files.  We will create a bulk load process afterwards.

In [8]:
files_to_download = [files_to_download[i] for i in [0,102]] #19,50,100,102]]
files_to_download

['201306-citibike-tripdata.zip', '202112-citibike-tripdata.csv.zip']

In [9]:
session.use_warehouse(state_dict['compute_parameters']['fe_warehouse'])

In [10]:
schema1_download_files = list()
schema2_download_files = list()
schema2_start_date = datetime.strptime('202102', "%Y%m")

for file_name in files_to_download:
    file_start_date = datetime.strptime(file_name.split("-")[0], "%Y%m")
    if file_start_date < schema2_start_date:
        schema1_download_files.append(file_name)
    else:
        schema2_download_files.append(file_name)

In [11]:
schema1_download_files, schema2_download_files

(['201306-citibike-tripdata.zip'], ['202112-citibike-tripdata.csv.zip'])

In [12]:
schema1_load_stage = state_dict['load_stage_name']+'/schema1/'
schema2_load_stage = state_dict['load_stage_name']+'/schema2/'

schema1_files_to_load = list()
for zip_file_name in schema1_download_files:
    
    url = state_dict['download_base_url']+zip_file_name
    
    print('Downloading and unzipping: '+url)
    r = requests.get(url)
    file = ZipFile(BytesIO(r.content))
    csv_file_name=file.namelist()[0]
    file.extract(csv_file_name)
    file.close()
    
    print('Putting '+csv_file_name+' to stage: '+schema1_load_stage)
    session.file.put(local_file_name=csv_file_name, 
                     stage_location=schema1_load_stage, 
                     source_compression='NONE', 
                     overwrite=True)
    schema1_files_to_load.append(csv_file_name)
    os.remove(csv_file_name)
    
schema2_files_to_load = list()
for zip_file_name in schema2_download_files:
    
    url = state_dict['download_base_url']+zip_file_name
    
    print('Downloading and unzipping: '+url)
    r = requests.get(url)
    file = ZipFile(BytesIO(r.content))
    csv_file_name=file.namelist()[0]
    file.extract(csv_file_name)
    file.close()
    
    print('Putting '+csv_file_name+' to stage: '+schema2_load_stage)
    session.file.put(local_file_name=csv_file_name, 
                     stage_location=schema2_load_stage, 
                     source_compression='NONE', 
                     overwrite=True)
    schema2_files_to_load.append(csv_file_name)
    os.remove(csv_file_name)

Downloading and unzipping: https://s3.amazonaws.com/tripdata/201306-citibike-tripdata.zip
Putting 201306-citibike-tripdata.csv to stage: LOAD_STAGE/schema1/
Downloading and unzipping: https://s3.amazonaws.com/tripdata/202112-citibike-tripdata.csv.zip
Putting 202112-citibike-tripdata.csv to stage: LOAD_STAGE/schema2/


In [13]:
session.sql("list @"+state_dict['load_stage_name']+" pattern='.*20.*[.]gz'").collect()

[Row(name='load_stage/schema1/201306-citibike-tripdata.csv.gz', size=16218896, md5='bf8dd4843ed88b92567342257ddaa05c', last_modified='Tue, 31 May 2022 09:20:50 GMT'),
 Row(name='load_stage/schema2/202112-citibike-tripdata.csv.gz', size=57392960, md5='77d3efd8e12f3e9138964fedbb36b3a9', last_modified='Tue, 31 May 2022 09:21:11 GMT')]

### 4. Load: 
Load raw as all string type.  We will fix data types in the transform stage.

There are two schema types so we will create two ingest tables.

In [14]:
#Upper case fields are common to both schemas.
#Schema from 2013 to 2021
load_schema1 = T.StructType([T.StructField("tripduration", T.StringType()),
                             T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("bike_id", T.StringType()),
                             T.StructField("USERTYPE", T.StringType()), 
                             T.StructField("birth_year", T.StringType()),
                             T.StructField("gender", T.StringType())])

#starting in February 2021 the schema changed
load_schema2 = T.StructType([T.StructField("ride_id", T.StringType()), 
                             T.StructField("rideable_type", T.StringType()), 
                             T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("USERTYPE", T.StringType())])

trips_table_schema = T.StructType([T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("USERTYPE", T.StringType())])

Create empty tables.

In [15]:
session.create_dataframe([[None]*len(load_schema1.names)], schema=load_schema1)\
       .na.drop()\
       .write\
       .save_as_table(state_dict['load_table_name']+'schema1')

In [16]:
session.create_dataframe([[None]*len(load_schema2.names)], schema=load_schema2)\
       .na.drop()\
       .write\
       .save_as_table(state_dict['load_table_name']+'schema2')

Load schema1 data

In [17]:
csv_file_format_options = {"FIELD_OPTIONALLY_ENCLOSED_BY": "'\"'", "skip_header": 1}

loaddf = session.read.option("SKIP_HEADER", 1)\
                     .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                     .option("COMPRESSION", "GZIP")\
                     .option("NULL_IF", "\\\\N")\
                     .option("NULL_IF", "NULL")\
                     .option("pattern", "'.*20.*[.]gz'")\
                     .schema(load_schema1)\
                     .csv('@'+schema1_load_stage)\
                     .copy_into_table(state_dict['load_table_name']+str('schema1'), 
                                      format_type_options=csv_file_format_options)

Load schema2 data

In [18]:
loaddf = session.read.option("SKIP_HEADER", 1)\
                     .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                     .option("COMPRESSION", "GZIP")\
                     .option("NULL_IF", "\\\\N")\
                     .option("NULL_IF", "NULL")\
                     .option("pattern", "'.*20.*[.]gz'")\
                     .schema(load_schema2)\
                     .csv('@'+schema2_load_stage)\
                     .copy_into_table(state_dict['load_table_name']+str('schema2'), 
                                      format_type_options=csv_file_format_options)

### 5. Transform:
We have the raw data loaded. Now let's transform this data and clean it up. This will push the data to a final \"transformed\" table to be consumed by our Data Science team.  First we start by combining the two tables with the common columns.


In [19]:
trips_table_schema_names = [field.name for field in trips_table_schema.fields]
transdf1 = session.table(state_dict['load_table_name']+'schema1')[trips_table_schema_names]
transdf2 = session.table(state_dict['load_table_name']+'schema2')[trips_table_schema_names]
transdf = transdf1.union_by_name(transdf2)

There are three different date formats "2014-08-10 15:21:22", "1/1/2015 1:30" and "12/1/2014 02:04:53"

In [20]:
date_format_2 = "1/1/2015 [0-9]:.*$"      #1/1/2015 1:30 -> #M*M/D*D/YYYY H*H:M*M(:SS)*
date_format_3 = "1/1/2015 [0-9][0-9]:.*$" #1/1/2015 10:30 -> #M*M/D*D/YYYY H*H:M*M(:SS)*
date_format_4 = "12/1/2014.*"             #12/1/2014 02:04:53 -> M*M/D*D/YYYY 

#Change all dates to YYYY-MM-DD HH:MI:SS format
date_format_match = "^([0-9]?[0-9])/([0-9]?[0-9])/([0-9][0-9][0-9][0-9]) ([0-9]?[0-9]):([0-9][0-9])(:[0-9][0-9])?.*$"
date_format_repl = "\\3-\\1-\\2 \\4:\\5\\6"

In [21]:
transdf.with_column('STARTTIME', F.regexp_replace(F.col('STARTTIME'),
                                            F.lit(date_format_match), 
                                            F.lit(date_format_repl)))\
      .with_column('STARTTIME', F.to_timestamp('STARTTIME'))\
      .with_column('STOPTIME', F.regexp_replace(F.col('STOPTIME'),
                                            F.lit(date_format_match), 
                                            F.lit(date_format_repl)))\
      .with_column('STOPTIME', F.to_timestamp('STOPTIME'))\
      .select(F.col('STARTTIME'), 
              F.col('STOPTIME'), 
              F.col('START_STATION_ID'), 
              F.col('START_STATION_NAME'), 
              F.col('START_STATION_LATITUDE'), 
              F.col('START_STATION_LONGITUDE'), 
              F.col('END_STATION_ID'), 
              F.col('END_STATION_NAME'), F.col('END_STATION_LATITUDE'), 
              F.col('END_STATION_LONGITUDE'), 
              F.col('USERTYPE'))\
      .write.mode('overwrite').save_as_table(state_dict['trips_table_name'])

In [22]:
testdf = session.table(state_dict['trips_table_name'])
testdf.schema

StructType[StructField(STARTTIME, Timestamp, Nullable=True), StructField(STOPTIME, Timestamp, Nullable=True), StructField(START_STATION_ID, String, Nullable=True), StructField(START_STATION_NAME, String, Nullable=True), StructField(START_STATION_LATITUDE, String, Nullable=True), StructField(START_STATION_LONGITUDE, String, Nullable=True), StructField(END_STATION_ID, String, Nullable=True), StructField(END_STATION_NAME, String, Nullable=True), StructField(END_STATION_LATITUDE, String, Nullable=True), StructField(END_STATION_LONGITUDE, String, Nullable=True), StructField(USERTYPE, String, Nullable=True)]

In [23]:
testdf.count()

2325908

### 6. Export code in functional modules for MLOps and orchestration

In [24]:
%%writefile dags/elt.py
def schema1_definition():
    from snowflake.snowpark import types as T
    load_schema1 = T.StructType([T.StructField("TRIPDURATION", T.StringType()),
                             T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("BIKEID", T.StringType()),
                             T.StructField("USERTYPE", T.StringType()), 
                             T.StructField("BIRTH_YEAR", T.StringType()),
                             T.StructField("GENDER", T.StringType())])
    return load_schema1

def schema2_definition():
    from snowflake.snowpark import types as T
    load_schema2 = T.StructType([T.StructField("ride_id", T.StringType()), 
                             T.StructField("rideable_type", T.StringType()), 
                             T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("USERTYPE", T.StringType())])
    return load_schema2

def conformed_schema():
    from snowflake.snowpark import types as T
    trips_table_schema = T.StructType([T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("USERTYPE", T.StringType())])
    return trips_table_schema

def extract_trips_to_stage(session, files_to_download: list, download_base_url: str, load_stage_name:str):
    import os 
    import requests
    from zipfile import ZipFile
    import gzip
    from datetime import datetime
    from io import BytesIO
    
    schema1_download_files = list()
    schema2_download_files = list()
    schema2_start_date = datetime.strptime('202102', "%Y%m")

    for file_name in files_to_download:
        file_start_date = datetime.strptime(file_name.split("-")[0], "%Y%m")
        if file_start_date < schema2_start_date:
            schema1_download_files.append(file_name)
        else:
            schema2_download_files.append(file_name)
         
        
    schema1_load_stage = load_stage_name+'/schema1/'
    schema1_files_to_load = list()
    for zip_file_name in schema1_download_files:

        url = download_base_url+zip_file_name

        print('Downloading and unzipping: '+url)
        r = requests.get(url)
        file = ZipFile(BytesIO(r.content))
        csv_file_name=file.namelist()[0]
        file.extract(csv_file_name)
        file.close()

        print('Putting '+csv_file_name+' to stage: '+schema1_load_stage)
        session.file.put(local_file_name=csv_file_name, 
                         stage_location=schema1_load_stage, 
                         source_compression='NONE', 
                         overwrite=True)
        schema1_files_to_load.append(csv_file_name)
        os.remove(csv_file_name)

        
    schema2_load_stage = load_stage_name+'/schema2/'
    schema2_files_to_load = list()
    for zip_file_name in schema2_download_files:

        url = download_base_url+zip_file_name

        print('Downloading and unzipping: '+url)
        r = requests.get(url)
        file = ZipFile(BytesIO(r.content))
        csv_file_name=file.namelist()[0]
        file.extract(csv_file_name)
        file.close()

        print('Putting '+csv_file_name+' to stage: '+schema2_load_stage)
        session.file.put(local_file_name=csv_file_name, 
                         stage_location=schema2_load_stage, 
                         source_compression='NONE', 
                         overwrite=True)
        schema2_files_to_load.append(csv_file_name)
        os.remove(csv_file_name)
        
    load_stage_names = {'schema1' : schema1_load_stage, 'schema2' : schema2_load_stage}
    files_to_load = {'schema1': schema1_files_to_load, 'schema2': schema2_files_to_load}

    return load_stage_names, files_to_load
    
def load_trips_to_raw(session, files_to_load:dict, load_stage_names:dict, load_table_name:str):
    from snowflake.snowpark import functions as F
    from snowflake.snowpark import types as T
    from datetime import datetime
    
    csv_file_format_options = {"FIELD_OPTIONALLY_ENCLOSED_BY": "'\"'", "skip_header": 1}

    if len(files_to_load['schema1']) > 0:
        load_schema1 = schema1_definition()
        loaddf = session.read.option("SKIP_HEADER", 1)\
                         .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                         .option("COMPRESSION", "GZIP")\
                         .option("NULL_IF", "\\\\N")\
                         .option("NULL_IF", "NULL")\
                         .schema(load_schema1)\
                         .csv('@'+load_stage_names['schema1'])\
                         .copy_into_table(load_table_name+'schema1', 
                                          files=files_to_load['schema1'],
                                          format_type_options=csv_file_format_options)
                              
    if len(files_to_load['schema2']) > 0:
        load_schema2 = schema2_definition()
        loaddf = session.read.option("SKIP_HEADER", 1)\
                         .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                         .option("COMPRESSION", "GZIP")\
                         .option("NULL_IF", "\\\\N")\
                         .option("NULL_IF", "NULL")\
                         .schema(load_schema2)\
                         .csv('@'+load_stage_names['schema2'])\
                         .copy_into_table(load_table_name+'schema2', 
                                          files=files_to_load['schema2'],
                                          format_type_options=csv_file_format_options)
        
    load_table_names = {'schema1' : load_table_name+str('schema1'), 
                         'schema2' : load_table_name+str('schema2')}
                         
    return load_table_names
    
def transform_trips(session, stage_table_names:dict, trips_table_name:str):
    from snowflake.snowpark import functions as F
        
    #Change all dates to YYYY-MM-DD HH:MI:SS format
    date_format_match = "^([0-9]?[0-9])/([0-9]?[0-9])/([0-9][0-9][0-9][0-9]) ([0-9]?[0-9]):([0-9][0-9])(:[0-9][0-9])?.*$"
    date_format_repl = "\\3-\\1-\\2 \\4:\\5\\6"

    trips_table_schema = conformed_schema()
                         
    trips_table_schema_names = [field.name for field in trips_table_schema.fields]
                         
    transdf1 = session.table(stage_table_names['schema1'])[trips_table_schema_names]
    transdf2 = session.table(stage_table_names['schema2'])[trips_table_schema_names]
                         
    transdf = transdf1.union_by_name(transdf2)\
                      .with_column('STARTTIME', F.regexp_replace(F.col('STARTTIME'),
                                                                F.lit(date_format_match), 
                                                                F.lit(date_format_repl)))\
                      .with_column('STARTTIME', F.to_timestamp('STARTTIME'))\
                      .with_column('STOPTIME', F.regexp_replace(F.col('STOPTIME'),
                                                               F.lit(date_format_match), 
                                                               F.lit(date_format_repl)))\
                      .with_column('STOPTIME', F.to_timestamp('STOPTIME'))\
                      .write.mode('overwrite').save_as_table(trips_table_name)

    return trips_table_name

def reset_database(session, state_dict:dict, prestaged=False):
    _ = session.sql('CREATE OR REPLACE DATABASE '+state_dict['connection_parameters']['database']).collect()
    _ = session.sql('CREATE SCHEMA '+state_dict['connection_parameters']['schema']).collect() 

    if prestaged:
        sql_cmd = 'CREATE OR REPLACE STAGE '+state_dict['load_stage_name']+\
                  ' url='+state_dict['connection_parameters']['download_base_url']
        _ = session.sql(sql_cmd).collect()
    else: 
        _ = session.sql('CREATE STAGE IF NOT EXISTS '+state_dict['load_stage_name']).collect()

    load_schema1=schema1_definition()
    session.create_dataframe([[None]*len(load_schema1.names)], schema=load_schema1)\
           .na.drop()\
           .write\
           .save_as_table(state_dict['load_table_name']+'schema1')

    load_schema2=schema2_definition()
    session.create_dataframe([[None]*len(load_schema2.names)], schema=load_schema2)\
           .na.drop()\
           .write\
           .save_as_table(state_dict['load_table_name']+'schema2')
    

Overwriting dags/elt.py


In [25]:
session.close()