# Working with JSON files from S3 in Snowflake

In [None]:
# Import python packages
import streamlit as st

# Snowpark Pandas API
#import modin.pandas as spd
# Import the Snowpark pandas plugin for modin
#import snowflake.snowpark.modin.plugin

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()
import snowflake.snowpark.functions as F

List files located on AWS S3 bucket 

In [None]:
--List files located on S3 bucket 
list @citibike_trips_json

### Read comressed JSON from AWS S3

In [None]:
json_file_raw = session.read.option("file_format", "json").\
json('@citibike_trips_json/2015-09-01/data_01a304b5-0601-4bbe-0045-e8030021523e_005_1_0.json.gz')

### Preview the JSON file

In [None]:
json_file_raw.show(1)

### Adding the data from the JSON file to a DataFrame \
Using . notation to navigate the JSON file and adding data types to the data


In [None]:
json_file_raw_df = json_file_raw.select(
    F.to_varchar(json_file_raw["$1"]["BIKE"]["BIKEID"]).as_("BIKE_ID"), 
    F.to_varchar(json_file_raw["$1"]["BIKE"]["BIKE_TYPE"]).as_("BIKE_TYPE"),  
    F.to_timestamp(json_file_raw["$1"]["ENDTIME"]).as_("END_TIME"),
    F.to_varchar(json_file_raw["$1"]["END_STATION_ID"]).as_("END_STATION_ID"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["DOB"]).as_("DOB"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["EMAIL"]).as_("EMAIL"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["FIRST_NAME"]).as_("FIRST_NAME"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["GENDER"]).as_("GENDER"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["LAST_NAME"]).as_("LAST_NAME"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["MEMBER_TYPE"]).as_("MEMBER_TYPE"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["PAYMENT"]["CC_NUM"]).as_("PAYMENT_CC_NUM"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["PAYMENT"]["CC_TYPE"]).as_("PAYMENT_CC_TYPE"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["PAYMENT"]["TYPE"]).as_("PAYMENT_TYPE"),
    F.to_varchar(json_file_raw["$1"]["RIDER"]["RIDERID"]).as_("RIDER_ID"),
    F.to_timestamp(json_file_raw["$1"]["STARTTIME"]).as_("START_TIME"),
    F.to_varchar(json_file_raw["$1"]["START_STATION_ID"]).as_("START_STATION_ID")
                )
json_file_raw_df.show(5)

Cound number of rows in the dataframe

In [None]:
json_file_raw_df.count()

### Same thing as before but using SQL instead of Python

In [None]:
select 
    $1:STARTTIME::timestamp_ntz starttime,
    $1:ENDTIME::timestamp_ntz endtime,
    datediff('minute', starttime, endtime) duration,
    $1:START_STATION_ID::integer start_station_id,
    $1:END_STATION_ID::integer end_station_id,
    $1:BIKE.BIKEID::string bikeid,
    $1:BIKE.BIKE_TYPE::string bike_type,
    $1:RIDER.RIDERID::integer riderid,
    $1:RIDER.FIRST_NAME::string || ' ' || $1:RIDER.LAST_NAME::string rider_name,
    to_date($1:RIDER.DOB::string, 'YYYY-MM-DD') dob,
    $1:RIDER.GENDER::string gender,
    $1:RIDER.MEMBER_TYPE::string member_type,
    $1:RIDER.PAYMENT.TYPE::string payment,
    ifnull($1:RIDER.PAYMENT.CC_TYPE::string, 
      $1:RIDER.PAYMENT.PHONE_TYPE::string) payment_type,
    ifnull($1:RIDER.PAYMENT.PHONE_NUM::string,
      $1:RIDER.PAYMENT.CC_NUM::string) payment_num
FROM
    @citibike_trips_json/2015-09-01/data_01a304b5-0601-4bbe-0045-e8030021523e_005_1_0.json.gz 
    (file_format=>json) ;


Filter the datadrame using start time

In [None]:
json_file_raw_df.filter(F.to_timestamp(F.col("START_TIME")) == '2015-09-27 21:54:00')

### Create new column using Datediff to caclulate the trip duration of each ride

In [None]:
json_file_raw_df.select(
    F.col("RIDER_ID"),
    F.col("START_TIME"),
    F.col("END_TIME"),
    F.datediff("seconds", F.col("START_TIME"), F.col("END_TIME")).as_("TRIP_DURATION")
           )

Calculate average trip duration in seconds per bike

In [None]:
json_file_raw_df.group_by(
    F.col("BIKE_ID")).agg(F.avg( 
    F.datediff("seconds", F.col("START_TIME"), F.col("END_TIME"))).as_("AVG_TRIP_DURATION_PER_BIKE"))

Calculate average trip duration in seconds per bike per day

In [None]:
AVG_TRIPDURATION_PER_DAY = json_file_raw_df\
    .group_by(F.to_date(F.col("START_TIME")), F.col("BIKE_ID"))\
    .agg(F.avg(F.datediff("seconds", F.col("START_TIME"), F.col("END_TIME"))).as_("AVG_TRIP_PER_DAY(SEC)"))\
    .order_by(F.col("TO_DATE(START_TIME)"))

AVG_TRIPDURATION_PER_DAY

Create Snowflake table from the Dataframe \
Alternative create Snowflake View or a Dynamic Table \
Schedule dynamic table to run on a schedule \
Schedule Notebook to create a table on a schedule

In [None]:

AVG_TRIPDURATION_PER_DAY.write.mode("overwrite").save_as_table("AVG_TRIPDURATION_PER_DAY")
#AVG_TRIPDURATION_PER_DAY.create_or_replace_dynamic_table("AVG_TRIPDURATION_PER_DAY_DT") 
#AVG_TRIPDURATION_PER_DAY.create_or_replace_view("AVG_TRIPDURATION_PER_DAY_VW")


Query the new table using SQL

In [None]:
SELECT * FROM AVG_TRIPDURATION_PER_DAY;

Query the new table using Python

In [None]:
session.table('AVG_TRIPDURATION_PER_DAY')

LOAD ALL DATA 41 695 813

In [None]:
json_full_data_raw = session.read.option("file_format", "json").json('@citibike_trips_json')


In [None]:
json_full_data_raw.count()

In [None]:
json_full_data_raw_df = json_full_data_raw.select(
    F.to_varchar(json_full_data_raw["$1"]["BIKE"]["BIKEID"]).as_("BIKE_ID"), 
    F.to_varchar(json_full_data_raw["$1"]["BIKE"]["BIKE_TYPE"]).as_("BIKE_TYPE"),  
    F.to_timestamp(json_full_data_raw["$1"]["ENDTIME"]).as_("END_TIME"),
    F.to_varchar(json_full_data_raw["$1"]["END_STATION_ID"]).as_("END_STATION_ID"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["DOB"]).as_("DOB"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["EMAIL"]).as_("EMAIL"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["FIRST_NAME"]).as_("FIRST_NAME"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["GENDER"]).as_("GENDER"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["LAST_NAME"]).as_("LAST_NAME"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["MEMBER_TYPE"]).as_("MEMBER_TYPE"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["PAYMENT"]["CC_NUM"]).as_("PAYMENT_CC_NUM"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["PAYMENT"]["CC_TYPE"]).as_("PAYMENT_CC_TYPE"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["PAYMENT"]["TYPE"]).as_("PAYMENT_TYPE"),
    F.to_varchar(json_full_data_raw["$1"]["RIDER"]["RIDERID"]).as_("RIDER_ID"),
    F.to_timestamp(json_full_data_raw["$1"]["STARTTIME"]).as_("START_TIME"),
    F.to_varchar(json_full_data_raw["$1"]["START_STATION_ID"]).as_("START_STATION_ID")
                )
json_full_data_raw_df.show(5)

In [None]:
ALTER WAREHOUSE MYWH SET warehouse_size=XLARGE;

In [None]:
show warehouses like  'MYWH';

In [None]:
AVG_TRIPDURATION_PER_DAY_FULL = json_full_data_raw_df\
    .group_by(F.to_date(F.col("START_TIME")), F.col("BIKE_ID"))\
    .agg(F.avg(F.datediff("seconds", F.col("START_TIME"), F.col("END_TIME"))).as_("AVG_TRIP_PER_DAY(SEC)"))\
    .order_by(F.col("TO_DATE(START_TIME)"))

AVG_TRIPDURATION_PER_DAY_FULL

In [None]:
AVG_TRIPDURATION_PER_DAY_FULL.write.mode("overwrite").save_as_table("AVG_TRIPDURATION_PER_DAY_FULL")

In [None]:
SELECT * FROM AVG_TRIPDURATION_PER_DAY_FULL;

In [None]:
session.table("AVG_TRIPDURATION_PER_DAY_FULL")

In [None]:
ALTER WAREHOUSE MYWH SET warehouse_size=SMALL;

In [None]:
ALTER WAREHOUSE MYWH SUSPEND;

In [None]:
json_full_data_raw_df.write.mode("overwrite").save_as_table("json_full_data_raw_df")
#json_full_data_raw_df.create_or_replace_view("trips_vw")

In [None]:
CREATE  VIEW trips_vw 
 COMMENT='HISTORICAL TRIPS DATA'
    AS
SELECT 
    $1:STARTTIME::timestamp_ntz starttime,
    $1:ENDTIME::timestamp_ntz endtime,
    datediff('minute', starttime, endtime) duration,
    $1:START_STATION_ID::integer start_station_id,
    $1:END_STATION_ID::integer end_station_id,
    $1:BIKE.BIKEID::string bikeid,
    $1:BIKE.BIKE_TYPE::string bike_type,
    $1:RIDER.RIDERID::integer riderid,
    $1:RIDER.FIRST_NAME::string || ' ' || $1:RIDER.LAST_NAME::string rider_name,
    to_date($1:RIDER.DOB::string, 'YYYY-MM-DD') dob,
    $1:RIDER.GENDER::string gender,
    $1:RIDER.MEMBER_TYPE::string member_type,
    $1:RIDER.PAYMENT.TYPE::string payment,
    ifnull($1:RIDER.PAYMENT.CC_TYPE::string, 
    $1:RIDER.PAYMENT.PHONE_TYPE::string) payment_type,
    ifnull($1:RIDER.PAYMENT.PHONE_NUM::string,
    $1:RIDER.PAYMENT.CC_NUM::string) payment_num
FROM
    @citibike_trips_json
    (file_format=>json);


In [None]:
SELECT * FROM trips_vw LIMIT 100;

In [None]:
show warehouses like  'MYWH';