# Citibike ML
In this example we use the [Citibike dataset](https://ride.citibikenyc.com/system-data). Citibike is a bicycle sharing system in New York City. Everyday users choose from 20,000 bicycles at 1300 stations around New York City.

To ensure customer satisfaction Citibike needs to predict how many bicycles will be needed at each station. Maintenance teams from Citibike will check each station and repair or replace bicycles. Additionally, the team will relocate bicycles between stations based on predicted demand. The business needs to be able to run reports of how many bicycles will be needed at a given station on a given day.



## Data Engineering
We begin where all ML use cases do: data engineering. In this section of the demo, we will utilize Snowpark's Python client-side Dataframe API to build an **ELT pipeline**.  We will extract the data from the source system (s3), load it into snowflake and add transformations to clean the data before analysis. 

The data engineer has been told that there is historical data going back to 2013 and new data will be made available at the end of each month. 

For this demo flow we will assume that the organization has the following **policies and processes** :   
-**Dev Tools**: The data engineer can develop in their tool of choice (ie. VS Code, IntelliJ, Pycharm, Eclipse, etc.).  Snowpark Python makes it possible to use any environment where they have a python kernel.  For the sake of a demo we will use Jupyter.  
-**Data Governance**: To preserve customer privacy no data can be stored locally.  The ingest system may store data temporarily but it must be assumed that, in production, the ingest system will not preserve intermediate data products between runs. Snowpark Python allows the user to push-down all operations to Snowflake and bring the code to the data.   
-**Automation**: Although the data engineer can use any IDE or notebooks for development purposes the final product must be python code at the end of the work stream.  Well-documented, modularized code is necessary for good ML operations and to interface with the company's CI/CD and orchestration tools.  
-**Compliance**: Any ML models must be traceable back to the original data set used for training.  The business needs to be able to easily remove specific user data from training datasets and retrain models.  

Input: Historical bulk data at `https://s3.amazonaws.com/tripdata/`. Incremental data to be loaded one month at a time.  
Output: `trips` table

In [None]:
import snowflake.snowpark as snp
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T

import pandas as pd
from datetime import datetime
import requests
from zipfile import ZipFile
import gzip
import os

#import logging
#logging.basicConfig(level=logging.WARN)
#logging.getLogger().setLevel(logging.DEBUG)



### 1. Load  credentials and connect to Snowflake


We will utilize a simple json file to store our credentials. This should **never** be done in production and is for demo purposes only.

In [None]:
import json, getpass
with open('creds.json') as f:
    data = json.load(f)
    connection_parameters = {
      'account': data['account'],
      'user': data['username'],
      'password': data['password'], #getpass.getpass(),
      'role': data['role'],
      'warehouse': data['warehouse']}

session = snp.Session.builder.configs(connection_parameters).create()

### 2. Create objects in Snowflake
With Snowpark, we can utilize our session object to execute DDL SQL statements.

In [None]:
download_base_url = 'https://s3.amazonaws.com/tripdata/'

project_db_name = 'CITIBIKEML'
project_schema_name = 'DEMO'
project_db_schema = str(project_db_name)+'.'+str(project_schema_name)

load_table_name = str(project_db_schema)+'.'+'RAW_'
trips_table_name = str(project_db_schema)+'.'+'TRIPS'

_ = session.sql('CREATE OR REPLACE DATABASE '+str(project_db_name)).collect()
_ = session.sql('USE DATABASE '+str(project_db_name)).collect()

_ = session.sql('CREATE SCHEMA '+str(project_db_schema)).collect()
_ = session.sql('USE SCHEMA '+str(project_db_schema)).collect()

import uuid 
stage_id = str(uuid.uuid1()).replace('-', '_')
load_stage_name = 'load_stage_'+str(stage_id)

session.sql('CREATE OR REPLACE TEMPORARY STAGE '+load_stage_name).collect()

### 3. Extract:  Load Historical Data


Create a list of files to download and upload to stage

In [None]:
import pandas as pd
from datetime import datetime

#For files like 201306-citibike-tripdata.zip
date_range1 = pd.period_range(start=datetime.strptime("201306", "%Y%m"), 
                             end=datetime.strptime("201612", "%Y%m"), 
                             freq='M').strftime("%Y%m")
file_name_end1 = '-citibike-tripdata.zip'
files_to_download = [date+file_name_end1 for date in date_range1.to_list()]


Starting in January 2017 Citibike changed the format of the file name.

In [None]:
#For files like 201701-citibike-tripdata.csv.zip
date_range2 = pd.period_range(start=datetime.strptime("201701", "%Y%m"), 
                             end=datetime.strptime("202002", "%Y%m"), 
                             freq='M').strftime("%Y%m")
file_name_end2 = '-citibike-tripdata.csv.zip'
files_to_download = files_to_download + [date+file_name_end2 for date in date_range2.to_list()]

For development purposes we will start with loading just a couple of files.  We will create a bulk load process afterwards.

In [None]:
files_to_download = [files_to_download[i] for i in [0,14,18,19,50]]
files_to_download

In [None]:
%%time 
files_to_load = list()
for zip_file_name in files_to_download:
    gz_file_name = os.path.splitext(zip_file_name)[0]+'.gz'
    url = download_base_url+zip_file_name
    
    print('Downloading: '+url)
    r = requests.get(url)
    with open(zip_file_name, 'wb') as fh:
        fh.write(r.content)
        
    with ZipFile(zip_file_name, 'r') as zipObj:
        csv_file_names = zipObj.namelist()
        with zipObj.open(name=csv_file_names[0], mode='r') as zf:
            print('Gzipping: '+csv_file_names[0])
            with gzip.open(gz_file_name, 'wb') as gzf:
                gzf.write(zf.read())

    print('Putting to stage: '+gz_file_name)
    session.file.put(gz_file_name, '@'+load_stage_name)
    files_to_load.append(gz_file_name)
    os.remove(zip_file_name)
    os.remove(gz_file_name)

In [None]:
session.sql("list @"+load_stage_name+" pattern='.*20.*[.]gz'").collect()

### 4. ELT - Load into raw table
Load raw as all string type.  We will fix data types in the transform stage.

In [None]:
load_schema1 = T.StructType([T.StructField("TRIPDURATION", T.StringType()),
                           T.StructField("STARTTIME", T.StringType()), 
                           T.StructField("STOPTIME", T.StringType()), 
                           T.StructField("START_STATION_ID", T.StringType()),
                           T.StructField("START_STATION_NAME", T.StringType()), 
                           T.StructField("START_STATION_LATITUDE", T.StringType()),
                           T.StructField("START_STATION_LONGITUDE", T.StringType()),
                           T.StructField("END_STATION_ID", T.StringType()),
                           T.StructField("END_STATION_NAME", T.StringType()), 
                           T.StructField("END_STATION_LATITUDE", T.StringType()),
                           T.StructField("END_STATION_LONGITUDE", T.StringType()),
                           T.StructField("BIKEID", T.StringType()),
                           T.StructField("USERTYPE", T.StringType()), 
                           T.StructField("BIRTH_YEAR", T.StringType()),
                           T.StructField("GENDER", T.StringType())])

In [None]:
loaddf = session.read.option("SKIP_HEADER", 1)\
                     .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                     .option("COMPRESSION", "GZIP")\
                     .option("NULL_IF", "\\\\N")\
                     .option("NULL_IF", "NULL")\
                     .option("pattern", "'.*20.*[.]gz'")\
                     .schema(load_schema1)\
                     .csv("@"+load_stage_name)

In [None]:
%%time
csv_file_format_options = {"FIELD_OPTIONALLY_ENCLOSED_BY": "'\"'", "skip_header": 1}

print('Loading '+str(loaddf.count())+' records to table '+load_table_name+str('schema1'))
loaddf.copy_into_table(load_table_name+str('schema1'), 
                       files=files_to_load, 
                       format_type_options=csv_file_format_options)

### 5. ELT - Transform and load to raw table
We have the raw data loaded. Now let's transform this data and clean it up. This will push the data to a final \"transformed\" table to be consumed by our Data Science team.


In [None]:
transdf = session.table(load_table_name+'schema1')

There are three different date formats "2014-08-10 15:21:22", "1/1/2015 1:30" and "12/1/2014 02:04:53"

In [None]:
date_format_2 = "1/1/2015 [0-9]:.*$"      #1/1/2015 1:30 -> #M*M/D*D/YYYY H*H:M*M(:SS)*
date_format_3 = "1/1/2015 [0-9][0-9]:.*$" #1/1/2015 10:30 -> #M*M/D*D/YYYY H*H:M*M(:SS)*
date_format_4 = "12/1/2014.*"             #12/1/2014 02:04:53 -> M*M/D*D/YYYY 

#Change all dates to YYYY-MM-DD HH:MI:SS format
date_format_match = "^([0-9]?[0-9])/([0-9]?[0-9])/([0-9][0-9][0-9][0-9]) ([0-9]?[0-9]):([0-9][0-9])(:[0-9][0-9])?.*$"
date_format_repl = "\\3-\\1-\\2 \\4:\\5\\6"

In [None]:
transdf.withColumn('STARTTIME', F.regexp_replace(F.col('STARTTIME'),
                                            F.lit(date_format_match), 
                                            F.lit(date_format_repl)))\
      .withColumn('STARTTIME', F.to_timestamp('STARTTIME'))\
      .withColumn('STOPTIME', F.regexp_replace(F.col('STOPTIME'),
                                            F.lit(date_format_match), 
                                            F.lit(date_format_repl)))\
      .withColumn('STOPTIME', F.to_timestamp('STOPTIME'))\
      .select(F.col('STARTTIME'), 
              F.col('STOPTIME'), 
              F.col('START_STATION_ID'), 
              F.col('START_STATION_NAME'), 
              F.col('START_STATION_LATITUDE'), 
              F.col('START_STATION_LONGITUDE'), 
              F.col('END_STATION_ID'), 
              F.col('END_STATION_NAME'), F.col('END_STATION_LATITUDE'), 
              F.col('END_STATION_LONGITUDE'), 
              F.col('USERTYPE'))\
      .write.mode('overwrite').saveAsTable(trips_table_name)

In [None]:
testdf = session.table(trips_table_name)
testdf.schema

In [None]:
testdf.count()

### 6. Export code in functional modules for MLOps and orchestration

In [None]:
%%writefile citibike_ml/elt.py

def extract_trips_to_stage(session, files_to_download: list, download_base_url: str, load_stage_name:str):
    import os 
    import requests
    from zipfile import ZipFile
    import gzip
    
    files_to_load = list()
    
    for zip_file_name in files_to_download:
        gz_file_name = os.path.splitext(zip_file_name)[0]+'.gz'
        url = download_base_url+zip_file_name

        print('Downloading file '+url)
        r = requests.get(url)
        with open(zip_file_name, 'wb') as fh:
            fh.write(r.content)

        with ZipFile(zip_file_name, 'r') as zipObj:
            csv_file_names = zipObj.namelist()
            with zipObj.open(name=csv_file_names[0], mode='r') as zf:
                print('Gzipping file '+csv_file_names[0])
                with gzip.open(gz_file_name, 'wb') as gzf:
                    gzf.write(zf.read())

        print('Putting file '+gz_file_name+' to stage '+load_stage_name)
        session.file.put(gz_file_name, '@'+load_stage_name)
        
        files_to_load.append(gz_file_name)
        os.remove(zip_file_name)
        os.remove(gz_file_name)
    
    return load_stage_name, files_to_load
        
def load_trips_to_raw(session, files_to_load:list, load_stage_name:str, load_table_name:str):
    from snowflake.snowpark import functions as F
    from snowflake.snowpark import types as T
    from datetime import datetime

    stage_table_names = list()
    schema1_files = list()
    schema2_files = list()
    schema2_start_date = datetime.strptime('202102', "%Y%m")
    
    for file_name in files_to_load:
        file_start_date = datetime.strptime(file_name.split("-")[0], "%Y%m")
        if file_start_date < schema2_start_date:
            schema1_files.append(file_name)
        else:
            schema2_files.append(file_name)

    if len(schema1_files) > 0:
        load_schema1 = T.StructType([T.StructField("tripduration", T.StringType()),
                             T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("BIKEID", T.StringType()),
                             T.StructField("USERTYPE", T.StringType()), 
                             T.StructField("birth_year", T.StringType()),
                             T.StructField("gender", T.StringType())])
        csv_file_format_options = {"FIELD_OPTIONALLY_ENCLOSED_BY": "'\"'", "skip_header": 1}
        
        stage_table_name = load_table_name + str('schema1')
        
        loaddf = session.read.option("SKIP_HEADER", 1)\
                              .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                              .option("COMPRESSION", "GZIP")\
                              .option("NULL_IF", "\\\\N")\
                              .option("NULL_IF", "NULL")\
                              .schema(load_schema1)\
                              .csv("@"+load_stage_name)\
                              .copy_into_table(stage_table_name, 
                                               files=schema1_files, 
                                               format_type_options=csv_file_format_options)
        stage_table_names.append(stage_table_name)


    if len(schema2_files) > 0:
        load_schema2 = T.StructType([T.StructField("ride_id", T.StringType()), 
                             T.StructField("rideable_type", T.StringType()), 
                             T.StructField("STARTTIME", T.StringType()), 
                             T.StructField("STOPTIME", T.StringType()), 
                             T.StructField("START_STATION_NAME", T.StringType()), 
                             T.StructField("START_STATION_ID", T.StringType()),
                             T.StructField("END_STATION_NAME", T.StringType()), 
                             T.StructField("END_STATION_ID", T.StringType()),
                             T.StructField("START_STATION_LATITUDE", T.StringType()),
                             T.StructField("START_STATION_LONGITUDE", T.StringType()),
                             T.StructField("END_STATION_LATITUDE", T.StringType()),
                             T.StructField("END_STATION_LONGITUDE", T.StringType()),
                             T.StructField("USERTYPE", T.StringType())])
        csv_file_format_options = {"FIELD_OPTIONALLY_ENCLOSED_BY": "'\"'", "skip_header": 1}

        stage_table_name = load_table_name + str('schema2')
        loaddf = session.read.option("SKIP_HEADER", 1)\
                              .option("FIELD_OPTIONALLY_ENCLOSED_BY", "\042")\
                              .option("COMPRESSION", "GZIP")\
                              .option("NULL_IF", "\\\\N")\
                              .option("NULL_IF", "NULL")\
                              .schema(load_schema2)\
                              .csv("@"+load_stage_name)\
                              .copy_into_table(stage_table_name, 
                                               files=schema2_files, 
                                               format_type_options=csv_file_format_options)
        stage_table_names.append(stage_table_name)
        
    return list(set(stage_table_names))
    
def transform_trips(session, stage_table_names:list, trips_table_name:str):
    from snowflake.snowpark import functions as F
        
    #Change all dates to YYYY-MM-DD HH:MI:SS format
    date_format_match = "^([0-9]?[0-9])/([0-9]?[0-9])/([0-9][0-9][0-9][0-9]) ([0-9]?[0-9]):([0-9][0-9])(:[0-9][0-9])?.*$"
    date_format_repl = "\\3-\\1-\\2 \\4:\\5\\6"
    
    for stage_table_name in stage_table_names:
        
        transdf = session.table(stage_table_name)
        transdf.withColumn('STARTTIME', F.regexp_replace(F.col('STARTTIME'),
                                                F.lit(date_format_match), 
                                                F.lit(date_format_repl)))\
               .withColumn('STARTTIME', F.to_timestamp('STARTTIME'))\
               .withColumn('STOPTIME', F.regexp_replace(F.col('STOPTIME'),
                                                F.lit(date_format_match), 
                                                F.lit(date_format_repl)))\
               .withColumn('STOPTIME', F.to_timestamp('STOPTIME'))\
               .select(F.col('STARTTIME'), 
                       F.col('STOPTIME'), 
                       F.col('START_STATION_ID'), 
                       F.col('START_STATION_NAME'), 
                       F.col('START_STATION_LATITUDE'), 
                       F.col('START_STATION_LONGITUDE'), 
                       F.col('END_STATION_ID'), 
                       F.col('END_STATION_NAME'), F.col('END_STATION_LATITUDE'), 
                       F.col('END_STATION_LONGITUDE'), 
                       F.col('USERTYPE'))\
               .write.saveAsTable(trips_table_name)

    return trips_table_name
    

In [None]:
session.close()

In [None]:
#Todo: There are duplicate station_id and station_name.  Needs cleaning. We use station_id for analysis so need to cleanup.
