# Import Modules

In [1]:
import os
import pandas as pd
from credentials import *
import snowflake.connector
from Queries.BIKE_SHARE import *

  warn_incompatible_dep(
Failed to import ArrowResult. No Apache Arrow result set format can be used. ImportError: DLL load failed while importing arrow_iterator: The specified procedure could not be found.


# Helper Functions

In [2]:
def create_wh(name: str, size: str, initially_suspended: str, statement_timeout_in_seconds: int, comment: str):
    """
    Creates a new virtual warehouse in the system
    
    https://docs.snowflake.com/en/sql-reference/sql/create-warehouse

    Parameters

    name: str
        Identifier for the virtual warehouse; must be unique for your account.

    size: str
        Specifies the size of the virtual warehouse. The size determines the amount of compute resources in each 
        cluster in the warehouse and, therefore, the number of credits consumed while the warehouse is running.

    initially_suspended: str
        Specifies whether to automatically resume a warehouse when a SQL statement (e.g. query) is submitted to it.

    statement_timeout_in_seconds: int
        Object parameter that specifies the time, in seconds, after which a running SQL statement (query, DDL, DML, etc.) is canceled by the system.

    comment: str
        Specifies a comment for the warehouse.
        
    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID)
        create_warehouse_query = f"""CREATE WAREHOUSE IF NOT EXISTS {name}
                                     WAREHOUSE_SIZE = {size}
                                     INITIALLY_SUSPENDED = {initially_suspended}
                                     STATEMENT_TIMEOUT_IN_SECONDS = {statement_timeout_in_seconds}
                                     COMMENT = '{comment}'
                                  """
        conn.cursor().execute(command=create_warehouse_query)
        conn.close()
        
        return f"WAREHOUSE '{name}' with size '{size}' was created successfully!"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    
    
def create_db(name: str, comment: str):
    """
    Creates a new database in the system

    https://docs.snowflake.com/en/sql-reference/sql/create-database

    Parameters

    name: str
        Specifies the identifier for the database; must be unique for your account.

    comment: str
        Specifies a comment for the database.

    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID)
        create_database_query = f"""CREATE DATABASE IF NOT EXISTS {name}
                                    COMMENT = '{comment}'
                                 """
        conn.cursor().execute(command=create_database_query)
        conn.close()
        
        return f"DATABASE '{name}' was created successfully!"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    
    
def create_schema(db_name: str, name: str, comment: str):
    """
    Creates a new schema in the current database.

    https://docs.snowflake.com/en/sql-reference/sql/create-schema

    Parameters

    db_name: str
        Specifies database for the schema

    name: str
        Specifies the identifier for the schema; must be unique for the database in which the schema is created.

    comment: str
        Specifies a comment for the schema.

    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID, database=db_name)
        create_schema_query = f"""CREATE SCHEMA IF NOT EXISTS {name}
                                  COMMENT = '{comment}'
                               """
        conn.cursor().execute(command=create_schema_query)
        conn.close()

        return f"SCHEMA '{name}' was created successfully!"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    

def create_table(db_name: str, schema_name: str, name: str, comment: str):
    """
    Creates a new table in the current/specified schema or replaces an existing table.

    https://docs.snowflake.com/en/sql-reference/sql/create-table

    Parameters

    db_name: str
        Specifies database for the schema

    schema_name: str
        Specifies the identifier for the schema; must be unique for the database in which the schema is created.

    name: str
        Specifies the identifier (i.e. name) for the table; must be unique for the schema in which the table is created.

    comment: str
        Specifies a comment for the schema.

    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID, database=db_name, schema=schema_name)
        create_table_query = f"""CREATE TABLE IF NOT EXISTS {name} (
                                                                     trip_id VARCHAR,
                                                                     duration VARCHAR,
                                                                     start_time VARCHAR,
                                                                     end_time VARCHAR,
                                                                     start_station VARCHAR,
                                                                     start_lat VARCHAR,
                                                                     start_lon VARCHAR,
                                                                     end_station VARCHAR,
                                                                     end_lat VARCHAR,
                                                                     end_lon VARCHAR,
                                                                     bike_id VARCHAR,
                                                                     plan_duration VARCHAR,
                                                                     trip_route_category STRING,
                                                                     passholder_type STRING,
                                                                     bike_type STRING
                                                                    )
                         
                                    COMMENT = '{comment}'
                                """
        conn.cursor().execute(command=create_table_query)
        conn.close()

        return f"TABLE '{name}' was created successfully!"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    
    
def create_stage(db_name: str, schema_name: str, name: str, comment: str):
    """
    Creates a new named internal or external stage to use for loading data from files into Snowflake tables and unloading data from tables into files

    https://docs.snowflake.com/en/sql-reference/sql/create-stage

    Parameters

    db_name: str
        Specifies database for the schema

    schema_name: str
        Specifies the identifier for the schema; must be unique for the database in which the schema is created.

    name: str
        Specifies the identifier for the stage; must be unique for the schema in which the stage is created.

    comment: str
        Specifies a comment for the stage.

    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID, database=db_name, schema=schema_name)
        create_stage_query = f"""CREATE STAGE IF NOT EXISTS {name}
                                 DIRECTORY = (ENABLE = TRUE)
                                 FILE_FORMAT = (TYPE = 'CSV')
                                 ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
                                 COPY_OPTIONS = (ON_ERROR='abort_statement')
                                 COMMENT = '{comment}'
                              """
        conn.cursor().execute(create_stage_query)
        conn.close()

        return f"STAGE '{name}' was created successfully!"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    

def upload_file_to_stage(db_name: str, schema_name: str, full_file_path: str, stage_name: str, filename: str):
    """
    Uploads (i.e. stages) data files from a local directory/folder on a client machine to one of the following Snowflake stages:
    Named internal stage, Internal stage for a specified table or Internal stage for the current user.

    https://docs.snowflake.com/en/sql-reference/sql/put

    Parameters

    db_name: str
        Specifies database for the schema

    schema_name: str
        Specifies the identifier for the schema; must be unique for the database in which the schema is created.

    full_file_path: str
        full path to the file

    stage_name: str
        Specifies the identifier for the stage; must be unique for the schema in which the stage is created.

    filename: str
        name of the file(s) to upload

    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID, database=db_name, schema=schema_name)
        upload_file_query = f"""PUT file://{full_file_path} @{stage_name}"""

        conn.cursor().execute(upload_file_query)
        conn.close()

        return f"'{filename}' was successfully uploaded!"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    

def copy_into_table(db_name: str, schema_name: str, stage_name: str, table_name: str):
    """
    Loads data from staged files to an existing table. 

    https://docs.snowflake.com/en/sql-reference/sql/copy-into-table

    Parameters

    db_name: str
        Specifies database for the schema

    schema_name: str
        Specifies the identifier for the schema; must be unique for the database in which the schema is created.

    stage_name: str
        Specifies the identifier for the stage; must be unique for the schema in which the stage is created.

    table_name: str
        Specifies the identifier (i.e. name) for the table; must be unique for the schema in which the table is created.

    """
    try:
        conn = snowflake.connector.connect(user=USERNAME, password=PASSWORD, account=ACCOUNT_ID, database=db_name, schema=schema_name)
        copy_data_query = f"""COPY INTO {table_name}
                              FROM @{stage_name}
                              FILE_FORMAT = (TYPE = CSV)
                              ON_ERROR = ABORT_STATEMENT
                              PURGE = FALSE
                           """
        
        conn.cursor().execute(copy_data_query)
        conn.close()

        return f"data was successfully copied into {table_name}"
    
    except snowflake.connector.errors.ProgrammingError as e:
        return f"Error: {e}"
    

def fetch_data(user: str, password: str, account: str, warehouse: str, database: str, schema: str, sql_query, col_names) -> pd.DataFrame:
    """
    Fetches data from a Snowflake database and returns it as a Pandas DataFrame.

    Parameters

    user: str
      name of snowflake user

    password: str
      password of snowflake user

    account: str
      id of account holder

    warehouse: str  
      name of snowflake warehouse
          
    database: str
      database name

    schema: str
      schema name

    sql_query: str
        query to execute

    col_names: list[str]
        column names to use

    Returns:
        result as pandas dataframe
    
    """
    try:
        conn = snowflake.connector.connect(user=user, password=password, account=account, warehouse=warehouse, database=database, schema=schema)
        results = conn.cursor().execute(f"{sql_query}").fetchall()
        print("Query executed successfully!")

        return pd.DataFrame(results, columns=col_names)
    
    except snowflake.connector.errors.Error as e:
        return f"Error executing query: {e}"

# Create Data Warehouse

In [3]:
warehouse_name = 'MY_DATA_WH'
warehouse_size = 'XSMALL'
initially_suspended = 'TRUE'
statement_timeout = 300
warehouse_comment = 'MY_DATA_WH'

create_wh(warehouse_name, warehouse_size, initially_suspended, statement_timeout, warehouse_comment)

"WAREHOUSE 'MY_DATA_WH' with size 'XSMALL' was created successfully!"

# Create Database

In [4]:
database_name = 'Metro_Bike_Share'
database_comment = 'Metro Bike Share trip data'

create_db(database_name, database_comment)

"DATABASE 'Metro_Bike_Share' was created successfully!"

# Create Schema

In [5]:
schema_name = 'My_Bike_Share_Schema'
schema_comment = 'Metro Bike Share trip data'

create_schema(database_name, schema_name, schema_comment)

"SCHEMA 'My_Bike_Share_Schema' was created successfully!"

# Create Table

In [6]:
table_name = "Bike_Share"
tbl_comment = "Metro Bike Share trip data"

create_table(database_name, schema_name, table_name, tbl_comment)

"TABLE 'Bike_Share' was created successfully!"

# Create Stage

In [7]:
internal_stage_name = "la_metro"
stage_comment = "la metro bikeshare stage"

create_stage(database_name, schema_name, internal_stage_name, stage_comment)

"STAGE 'la_metro' was created successfully!"

# Upload Files To Stage

In [8]:
file_to_upload = 'metro-trips-2023-q1.csv'
stage_name = 'la_metro'
full_file_path = os.path.join(os.getcwd(), 'Datasets', 'MetroBikeShare', 'metro-trips-2023-q1.csv')

upload_file_to_stage(database_name, schema_name, full_file_path, stage_name, f'{file_to_upload}')

"'metro-trips-2023-q1.csv' was successfully uploaded!"

# Copy Data Into Table

In [9]:
copy_into_table(database_name, schema_name, stage_name, table_name)

'data was successfully copied into Bike_Share'

# Query Data

In [10]:
column_names = ["trip_id", "duration", "start_time", "end_time", "start_station", "start_lat", "start_lon", "end_station", "end_lat", "end_lon", "bike_id",\
                 "trip_route_category", "passholder_type"]

df = fetch_data(USERNAME, PASSWORD, ACCOUNT_ID, warehouse_name, database_name, schema_name, query1, column_names)

Query executed successfully!


# View DataFrame

In [11]:
df.head()

Unnamed: 0,trip_id,duration,start_time,end_time,start_station,start_lat,start_lon,end_station,end_lat,end_lon,bike_id,trip_route_category,passholder_type
0,231787362,9,2023-01-01 00:41:00,2023-01-01 00:50:00,4204,33.988419,-118.45163,4210,33.984341,-118.47155,23373,One Way,Monthly Pass
1,231801471,97,2023-01-01 01:05:00,2023-01-01 02:42:00,3054,34.039219,-118.236488,3054,34.039219,-118.236488,13870,Round Trip,Walk-up
2,231794062,15,2023-01-01 01:11:00,2023-01-01 01:26:00,3037,34.034801,-118.231277,3068,34.0532,-118.250954,23942,One Way,Walk-up
3,231794664,26,2023-01-01 01:31:00,2023-01-01 01:57:00,3063,34.048038,-118.253738,4454,34.017899,-118.291718,22873,One Way,Walk-up
4,231794562,8,2023-01-01 01:44:00,2023-01-01 01:52:00,3066,34.063389,-118.23616,3026,34.063179,-118.24588,14790,One Way,Walk-up


In [12]:
df.tail()

Unnamed: 0,trip_id,duration,start_time,end_time,start_station,start_lat,start_lon,end_station,end_lat,end_lon,bike_id,trip_route_category,passholder_type
72224,249764243,10,2023-03-31 06:44:00,2023-03-31 06:54:00,4300,34.04887,-118.2743,3007,34.05048,-118.254593,19289,One Way,Monthly Pass
72225,249862961,25,2023-03-31 15:23:00,2023-03-31 15:48:00,3019,34.038609,-118.260857,3006,34.04554,-118.256668,23778,One Way,Monthly Pass
72226,249862674,2,2023-03-31 15:33:00,2023-03-31 15:35:00,4211,33.984928,-118.469963,4211,33.984928,-118.469963,14113,Round Trip,Walk-up
72227,249911940,35,2023-03-31 17:39:00,2023-03-31 18:14:00,4210,33.984341,-118.47155,4210,33.984341,-118.47155,14899,Round Trip,Walk-up
72228,249935270,14,2023-03-31 21:11:00,2023-03-31 21:25:00,4206,33.998341,-118.461014,4212,33.988129,-118.471741,22826,One Way,Monthly Pass
