In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Add a query tag to the session. This helps with troubleshooting and performance monitoring.
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"notebook_demo_pack", 
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":1, "source":"notebook", "vignette":"csv_from_s3"}}
print(session)

In [None]:
CREATE DATABASE IF NOT EXISTS CITIBIKE;

USE DATABASE CITIBIKE;

USE SCHEMA PUBLIC;

CREATE STAGE IF NOT EXISTS CITIBIKE_STAGE 
	URL = 's3://logbrain-datalake/datasets/citibike-trips-csv/';
     

In [None]:

LS @CITIBIKE_STAGE;

In [None]:
# Create a DataFrame that is configured to load data from the CSV file.
df = session.read.options({"infer_schema":True}).csv('@CITIBIKE_STAGE/trips_2018_0_0_0.csv.gz')

In [None]:
df

In [None]:
df.describe()

In [None]:
df.dtypes

In [None]:
from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType

trips_schema = StructType([
    StructField("tripduration", IntegerType()),
    StructField("starttime", TimestampType()),
    StructField("stoptime", TimestampType()),
    StructField("start_station_id", IntegerType()),
    StructField("start_station_name", StringType()), 
    StructField("start_station_latitude", FloatType()),
    StructField("start_station_longitude", FloatType()),
    StructField("end_station_id", IntegerType()), 
    StructField("end_station_name", StringType()),
    StructField("end_station_latitude", FloatType()),
    StructField("end_station_longitude", FloatType()),  
    StructField("bikeid", IntegerType()),
    StructField("membership_type", StringType()), 
    StructField("usertype", StringType()), 
    StructField("birth_year", IntegerType()), 
    StructField("gender", IntegerType())
    ])
     

In [None]:

# re Create a DataFrame that is configured to load data from the CSV file with schema.
df = session.read.options({"infer_schema":True,"field_delimiter": ",", "skip_header": 1, "field_optionally_enclosed_by" : '\042' , "null_if" : ('')}).schema(trips_schema).csv('@CITIBIKE_STAGE/trips_2018_0_0_0.csv.gz')

In [None]:
df

In [None]:
format = session.sql("create or replace file format csv_format type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1  field_optionally_enclosed_by = '\042' null_if = (''); ").collect()

df = session.read.option("format_name", "csv_format").schema(trips_schema).csv('@CITIBIKE_STAGE/trips_2018_0_0_0.csv.gz')

In [None]:
df

In [None]:
df.write.mode("overwrite").save_as_table("TRIPS")

In [None]:
-- Preview the newly created TRIPS table
SELECT * from TRIPS;
     

In [None]:
df = session.table("TRIPS")
df

In [None]:

df.groupBy('"START_STATION_NAME"').count()

In [None]:
USE DATABASE CITIBIKE;

CREATE STAGE IF NOT EXISTS WEATHER_STAGE 
	URL = 's3://logbrain-datalake/datasets/weather-nyc-json/';

In [None]:
LS @WEATHER_STAGE;

In [None]:
# Create a DataFrame that is configured to load data from the json file.
df = session.read.option("compression", "gzip").json('@WEATHER_STAGE/hourlyData-2018-1.json.gz')

In [None]:
df

In [None]:

df.describe()

In [None]:
df.dtypes

In [None]:
df.write.mode("overwrite").save_as_table("WEATHER_JSON")

In [None]:

-- Preview the newly created weather table
SELECT * from WEATHER_JSON;

In [None]:
df = session.table("WEATHER_JSON")
df

In [None]:
USE DATABASE CITIBIKE;

CREATE TABLE IF NOT EXISTS weather as
select 
   $1:"coco"::STRING as "coco" ,
   $1:"country"::STRING as "country",
   $1:"dwpt"::FLOAT as "dwpt",
   $1:"ele$1ation"::STRING as "ele$1ation",
   $1:"icao"::STRING as "icao",
   $1:"latitude"::DECIMAL as "latitude",
   $1:"longitude"::DECIMAL as "longitude",
   $1:"name"::STRING as "name",
   $1:"obsTime"::TIMESTAMP as "obsTime",
   $1:"prcp"::STRING as "prcp" ,
   $1:"pres"::DECIMAL as "pres",
   $1:"region"::STRING as "region",
   $1:"rhum"::STRING as "rhum",
   $1:"snow"::STRING as "snow",
   $1:"station"::STRING as "station",
   $1:"temp"::DECIMAL "temp",
   $1:"timezone"::STRING as "timezone",
   $1:"tsun"::STRING as "tsun",
   $1:"wdir"::STRING as "wdir",
   $1:"weatherCondition"::STRING as "weatherCondition",
   $1:"wmo"::STRING as "wmo",
   $1:"wpgt"::STRING as "wpgt",
   $1:"wspd"::DECIMAL as "wspd"
 from WEATHER_JSON;


In [None]:

-- Preview the newly created weather table
SELECT * from WEATHER;

In [None]:
df = session.table("WEATHER")
df