In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sqlfunc
from pyspark.sql.functions import monotonically_increasing_id as idd  # Needed to import this to add index values to data frames utilized in merging

spark = SparkSession.builder \
    .appName('DATA288 HW6-2') \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.3') \
    .getOrCreate()

## Reading data from BigQuery & storing to cache for faster processing

##### When filtering the data I used <bold>trip_end_timestamp</bold> even for the start filter since I assumed if a ride starts on 2020-09-04 and ends 2020-09-05 00:01 for a single minute that ride did occur on that date in which case it should count towards the numbers - the difference is about 3964 records instead of 3945 records if I consider only trips that start on 2020-09-05

In [2]:
table = "bigquery-public-data.chicago_taxi_trips.taxi_trips"

taxi_data = spark.read \
    .format("bigquery") \
    .option("table", table) \
    .option("filter", "trip_end_timestamp >= '2020-09-05' AND trip_end_timestamp < '2020-09-08'") \
    .load()

taxi_data.cache()


DataFrame[unique_key: string, taxi_id: string, trip_start_timestamp: timestamp, trip_end_timestamp: timestamp, trip_seconds: bigint, trip_miles: double, pickup_census_tract: bigint, dropoff_census_tract: bigint, pickup_community_area: bigint, dropoff_community_area: bigint, fare: double, tips: double, tolls: double, extras: double, trip_total: double, payment_type: string, company: string, pickup_latitude: double, pickup_longitude: double, pickup_location: string, dropoff_latitude: double, dropoff_longitude: double, dropoff_location: string]

## Filtering data & printing total rows

In [3]:
answer = taxi_data \
    .select("trip_end_timestamp", "trip_seconds", "trip_miles", "fare") \
    .where("trip_miles > 5 AND fare > 0") # Setting up answer & filtering for fare and trip miles

print(f"The total count is {answer.count()}") 

The total count is 3964


In [4]:
avg_seconds = answer.agg(sqlfunc.avg('trip_seconds').alias('avg_trip_seconds')) # Calculating average seconds from filtered dataframe "answer"
avg_miles = answer.agg(sqlfunc.avg('trip_miles').alias('avg_trip_miles')) # Calculating average miles from filtered dataframe "answer"
avg_fare = answer.agg(sqlfunc.avg('fare').alias('avg_fare')) # Calculating average fare from filtered dataframe "answer"

#### Adding index values to the three dataframes so I can merge them using outerjoin on that value & get desired format

In [5]:
i_sec = avg_seconds.withColumn("index",idd()) 
i_miles = avg_miles.withColumn("index", idd())
i_fare = avg_fare.withColumn("index", idd())

In [6]:
temp1 = i_sec.join(i_miles, on='index', how='outer')
temp2 = temp1.join(i_fare, on='index', how='outer')
final = temp2.drop('index')
final.show()

+------------------+------------------+-----------------+
|  avg_trip_seconds|    avg_trip_miles|         avg_fare|
+------------------+------------------+-----------------+
|1831.5329295987888|12.164659434914222|32.80173309788092|
+------------------+------------------+-----------------+

