#### This notebook is part 2 of the quickstart. In part1, we have written pyspark code to perform analysis on uber dataset. In this notebook, we will utilize same dataset and perform same analysis using snowpark python.

In [1]:
#import required libraries
from snowflake.snowpark import Session

from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
from snowflake.snowpark import functions as F
from snowflake.snowpark import Window
import pandas as pd
import numpy as np

- create a snowflake trail account if you do not have one.
- create a database, schema and a stage
- Upload uber dataset to your stage. We will read data from this stage and make some transformations

### Connect to Snowflake and load data

In [2]:
connection_parameters = {
       "account": "<your snowflake account excluding snowflakecomputing.com>",
       "user": "<snowflake username>",
       "password": "<password>",
       "warehouse": "<snowflake warehouse name>"
}  

session = Session.builder.configs(connection_parameters).create()  

In [3]:
session.sql("use database snowpark").collect()
session.sql("use schema dev").collect()
session.sql("list @snowpark_stage/").collect()

[Row(name='snowpark_stage/uber-dataset-1.csv.gz', size=2320, md5='6b963527e5d908ea9bff6e7438a9fbff', last_modified='Thu, 6 Apr 2023 13:14:08 GMT'),
 Row(name='snowpark_stage/uber_transformed.csv.gz', size=3232, md5='ea9f1c4a40a6269767aa5b94f803a3c1', last_modified='Thu, 6 Apr 2023 13:26:12 GMT')]

In [4]:
uber_schema = StructType([
    StructField("Date", StringType()),
    StructField("Time_Local", IntegerType()),
    StructField("Eyeballs", IntegerType()),
    StructField("Zeroes", IntegerType()),
    StructField("Completed_Trips", IntegerType()),
    StructField("Requests", IntegerType()),
    StructField("Unique_Drivers", IntegerType())
])

In [5]:
uber_pd = session.read.option("field_delimiter", ",").option("skip_header", 1)\
                 .schema(uber_schema).csv("@snowpark_stage/uber-dataset-1.csv.gz")

In [6]:
uber_df = uber_pd.toPandas()
uber_df

Unnamed: 0,DATE,TIME_LOCAL,EYEBALLS,ZEROES,COMPLETED_TRIPS,REQUESTS,UNIQUE_DRIVERS
0,10-Sep-12,7,5,0,2,2,9
1,10-Sep-12,8,6,0,2,2,14
2,10-Sep-12,9,8,3,0,0,14
3,10-Sep-12,10,9,2,0,1,14
4,10-Sep-12,11,11,1,4,4,11
...,...,...,...,...,...,...,...
331,24-Sep-12,2,3,3,0,2,0
332,24-Sep-12,3,3,3,0,1,0
333,24-Sep-12,4,1,1,0,0,0
334,24-Sep-12,5,4,2,1,1,3


In [7]:
uber_df['Datetime'] = uber_df['DATE'].astype('str')+' '+uber_df['TIME_LOCAL'].astype('str')

In [8]:
uber_df['Datetime']=pd.to_datetime(uber_df['Datetime'], format='%d-%b-%y %H')
uber_df['DATE']=pd.to_datetime(uber_df['DATE'], format='%d-%b-%y')
uber_df.head()

Unnamed: 0,DATE,TIME_LOCAL,EYEBALLS,ZEROES,COMPLETED_TRIPS,REQUESTS,UNIQUE_DRIVERS,Datetime
0,2012-09-10,7,5,0,2,2,9,2012-09-10 07:00:00
1,2012-09-10,8,6,0,2,2,14,2012-09-10 08:00:00
2,2012-09-10,9,8,3,0,0,14,2012-09-10 09:00:00
3,2012-09-10,10,9,2,0,1,14,2012-09-10 10:00:00
4,2012-09-10,11,11,1,4,4,11,2012-09-10 11:00:00


In [9]:
uber_df.to_csv('uber_transformed.csv',index=False)

The above command will generate a file in your pc at the current location of this notebook. Upload this transformed dataset to snowflake stage

In [10]:
uber_schema1 = StructType([
    StructField("Date", DateType()),
    StructField("Time_Local", IntegerType()),
    StructField("Eyeballs", IntegerType()),
    StructField("Zeroes", IntegerType()),
    StructField("Completed_Trips", IntegerType()),
    StructField("Requests", IntegerType()),
    StructField("Unique_Drivers", IntegerType()),
    StructField("Datetime", TimestampType())
])

In [11]:
uber_df = session.read.option("field_delimiter", ",").option("skip_header", 1)\
                 .schema(uber_schema1).csv("@snowpark_stage/uber_transformed.csv.gz")

In [12]:
uber_df.show()

-------------------------------------------------------------------------------------------------------------------------------
|"DATE"      |"TIME_LOCAL"  |"EYEBALLS"  |"ZEROES"  |"COMPLETED_TRIPS"  |"REQUESTS"  |"UNIQUE_DRIVERS"  |"DATETIME"           |
-------------------------------------------------------------------------------------------------------------------------------
|2012-09-10  |7             |5           |0         |2                  |2           |9                 |2012-09-10 07:00:00  |
|2012-09-10  |8             |6           |0         |2                  |2           |14                |2012-09-10 08:00:00  |
|2012-09-10  |9             |8           |3         |0                  |0           |14                |2012-09-10 09:00:00  |
|2012-09-10  |10            |9           |2         |0                  |1           |14                |2012-09-10 10:00:00  |
|2012-09-10  |11            |11          |1         |4                  |4           |11                

In [13]:
uber_df.schema

StructType([StructField('DATE', DateType(), nullable=True), StructField('TIME_LOCAL', LongType(), nullable=True), StructField('EYEBALLS', LongType(), nullable=True), StructField('ZEROES', LongType(), nullable=True), StructField('COMPLETED_TRIPS', LongType(), nullable=True), StructField('REQUESTS', LongType(), nullable=True), StructField('UNIQUE_DRIVERS', LongType(), nullable=True), StructField('DATETIME', TimestampType(), nullable=True)])

In [14]:
#create table in snowflake from snowpark dataframe
uber_df.write.save_as_table("uber", mode="overwrite", table_type="transient")

### 1. Which date had the most completed trips during the two-week period?

In [16]:
df = session.sql('select * from uber')
df.show()

-------------------------------------------------------------------------------------------------------------------------------
|"DATE"      |"TIME_LOCAL"  |"EYEBALLS"  |"ZEROES"  |"COMPLETED_TRIPS"  |"REQUESTS"  |"UNIQUE_DRIVERS"  |"DATETIME"           |
-------------------------------------------------------------------------------------------------------------------------------
|2012-09-10  |7             |5           |0         |2                  |2           |9                 |2012-09-10 07:00:00  |
|2012-09-10  |8             |6           |0         |2                  |2           |14                |2012-09-10 08:00:00  |
|2012-09-10  |9             |8           |3         |0                  |0           |14                |2012-09-10 09:00:00  |
|2012-09-10  |10            |9           |2         |0                  |1           |14                |2012-09-10 10:00:00  |
|2012-09-10  |11            |11          |1         |4                  |4           |11                

In [17]:
trips_per_date = uber_df.groupBy('DATE').agg(sum('COMPLETED_TRIPS').alias('total_completed_trips')).sort("total_completed_trips", ascending=False)
trips_per_date.show()

----------------------------------------
|"DATE"      |"TOTAL_COMPLETED_TRIPS"  |
----------------------------------------
|2012-09-22  |248                      |
|2012-09-15  |199                      |
|2012-09-21  |190                      |
|2012-09-23  |111                      |
|2012-09-14  |108                      |
|2012-09-16  |93                       |
|2012-09-12  |91                       |
|2012-09-20  |70                       |
|2012-09-17  |57                       |
|2012-09-13  |45                       |
----------------------------------------



In [18]:
trips_per_date.select('Date').show(1)

--------------
|"DATE"      |
--------------
|2012-09-22  |
--------------



### 2. What was the highest number of completed trips within a 24-hour period?

In [19]:
# Group the data by 24-hour window and sum the completed trips
completed_trips_24hrs = uber_df.groupBy(F.date_trunc('day', 'DATETIME'),floor(hour('DATETIME')/24)) \
                                   .agg(F.sum("completed_Trips").alias("Total_Completed_Trips"))

# Get the highest number of completed trips within a 24-hour period
completed_trips_24hrs.select(max('Total_Completed_Trips').alias('Total_Completed_Trips_in_24hrs')).show()

------------------------------------
|"TOTAL_COMPLETED_TRIPS_IN_24HRS"  |
------------------------------------
|248                               |
------------------------------------



### 3) Which hour of the day had the most requests during the two-week period?

In [20]:
df_hour = uber_df.groupBy("TIME_LOCAL").agg(sum('REQUESTS').alias('Total_Requests')).sort("Total_Requests", ascending=False)
df_hour.show(5)

-----------------------------------
|"TIME_LOCAL"  |"TOTAL_REQUESTS"  |
-----------------------------------
|23            |184               |
|22            |174               |
|19            |156               |
|0             |142               |
|18            |119               |
-----------------------------------



In [21]:
most_req_hr = df_hour.select('TIME_LOCAL').first()[0]
most_req_hr

23

### 4) What percentages of all zeroes during the two-week period occurred on weekend (Friday at 5 pm to Sunday at 3 am)? Tip: The local time value is the start of the hour (e.g. 15 is the hour from 3:00 pm - 4:00 pm)

In [22]:
# count number of zeros that occurred on weekends
zeroes_weekend = df.filter((df.TIME_LOCAL >= 17) | (df.TIME_LOCAL < 3)).filter((dayofweek("DATE") == 6) | (dayofweek("DATE") == 7))\
                   .agg(sum('ZEROES').alias('zeroes_weekend')).collect()[0]['ZEROES_WEEKEND']

# total number of zeros
total_zeroes = df.agg(sum("ZEROES").alias('total_zeroes')).collect()[0]['TOTAL_ZEROES']

print(zeroes_weekend/total_zeroes *100)

19.174247725682296


### 5) What is the weighted average ratio of completed trips per driver during the two-week period? Tip: “Weighted average” means your answer should account for the total trip volume in each hour to determine the most accurate number in the whole period.

In [29]:
uber_df.select(min('Unique_Drivers')).show()

-----------------------------
|"MIN(""UNIQUE_DRIVERS"")"  |
-----------------------------
|0                          |
-----------------------------



In [30]:
weighted_avg_ratio = uber_df.withColumn("trips_per_driver", when(uber_df["Unique_Drivers"]==0, 1).otherwise(uber_df["Completed_Trips"] / uber_df["Unique_Drivers"])) \
                 .groupBy("Date", "Time_Local") \
                 .agg(avg("trips_per_driver").alias("avg_completed_per_driver"), sum("Completed_trips").alias("total_completed_trips")) \
                 .withColumn("weighted_ratio", col("avg_completed_per_driver") * col("total_completed_trips")) \
                 .agg(sum("weighted_ratio") / sum("total_completed_trips"))
weighted_avg_ratio.show()

------------------------------------------------------
|"DIVIDE(SUM(WEIGHTED_RATIO), SUM(TOTAL_COMPLETE...  |
------------------------------------------------------
|0.828403309158                                      |
------------------------------------------------------



### 6) In drafting a driver schedule in terms of 8 hours shifts, when are the busiest 8 consecutive hours over the two-week period in terms of unique requests? A new shift starts every 8 hours. Assume that a driver will work the same shift each day.

In [None]:
df.groupBy('Time_Local').agg(countDistinct('Requests').alias('Total_Requests')).show()

In [31]:

requests_per_hour = uber_df.groupBy('Time_Local').agg(countDistinct('Requests').alias('Total_Requests'))
window_8hr = Window.orderBy(col('Total_Requests').desc()).rowsBetween(0,7)

busiest_8_hrs = requests_per_hour.select('*', sum('Total_Requests').over(window_8hr).alias("sum_8_hrs"))\
                                 .orderBy(col("sum_8_hrs").desc())
busiest_8_hrs.show()

-------------------------------------------------
|"TIME_LOCAL"  |"TOTAL_REQUESTS"  |"SUM_8_HRS"  |
-------------------------------------------------
|20            |12                |80           |
|0             |11                |76           |
|17            |10                |73           |
|19            |10                |71           |
|21            |10                |69           |
|22            |10                |67           |
|18            |9                 |65           |
|15            |8                 |64           |
|2             |8                 |64           |
|23            |8                 |63           |
-------------------------------------------------



In [33]:
busiest_8_hrs.limit(1).show()

-------------------------------------------------
|"TIME_LOCAL"  |"TOTAL_REQUESTS"  |"SUM_8_HRS"  |
-------------------------------------------------
|20            |12                |80           |
-------------------------------------------------



### 8) In which 72-hour period is the ratio of Zeroes to Eyeballs the highest?

In [50]:
# Group the data by 72-hour periods and calculate the ratio of zeroes to eyeballs for each period
period_ratios = (uber_df.groupBy((hour(col("Datetime")) / (72*3600)).cast("int"))\
                   .agg(sum("Zeroes").alias("zeroes"), sum("Eyeballs").alias("eyeballs"))\
                   .withColumn("ratio", col("zeroes") / col("eyeballs"))
)

# Find the period with the highest ratio
highest_ratio_period = period_ratios.orderBy(col("ratio").desc()).limit(1)

# Print the result
highest_ratio_period.show()

--------------------------------------------------------------------------------
|"CAST(DIVIDE(HOUR(DATETIME), LITERAL()))"  |"ZEROES"  |"EYEBALLS"  |"RATIO"   |
--------------------------------------------------------------------------------
|0                                          |1429      |6687        |0.213698  |
--------------------------------------------------------------------------------



### 8. If you could add 5 drivers to any single hour of every day during the two-week period, which hour should you add them to? Hint: Consider both rider eyeballs and driver supply when choosing

In [53]:
requests_per_driver = (uber_df.groupBy('Time_Local')\
                         .agg((sum('Requests') / countDistinct('Unique_Drivers')).alias('requests_per_driver'))
)

requests_per_driver.sort('requests_per_driver', ascending=False).show(1)

----------------------------------------
|"TIME_LOCAL"  |"REQUESTS_PER_DRIVER"  |
----------------------------------------
|2             |20.000000              |
----------------------------------------



### 9. Looking at the data from all two weeks, which time might make the most sense to consider a true “end day” instead of midnight? (i.e when are supply and demand at both their natural minimums)

In [58]:
uber_df.groupBy('Time_Local')\
       .agg(avg('Completed_Trips').alias('Avg_Completed_Trips'), avg('Unique_Drivers').alias('avg_unique_drivers'))\
       .orderBy('Avg_Completed_Trips', 'avg_unique_drivers').show(1)

---------------------------------------------------------------
|"TIME_LOCAL"  |"AVG_COMPLETED_TRIPS"  |"AVG_UNIQUE_DRIVERS"  |
---------------------------------------------------------------
|4             |0.142857               |0.642857              |
---------------------------------------------------------------

