In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("spark://MacBook-Pro.lan:7077") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/09 14:33:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/09 14:33:41 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master MacBook-Pro.lan:7077
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon$1.run(StandaloneAppClient.scala:108)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executor

25/03/09 14:34:41 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
25/03/09 14:34:41 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
25/03/09 14:34:42 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master


In [3]:
spark

In [5]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [6]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [7]:
df_green.printSchema()
# df_green.columns

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
df_yellow.printSchema()
# df_yellow.columns

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [9]:
##### Get common columns 

# Change datetimes to 'pickup_datetime' and 'dropoff_datetime'
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

set(df_green.columns) & set(df_yellow.columns)


{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [10]:
# Saving common columns (preserving order!)
common_columns = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns: 
    if col in yellow_columns:
        common_columns.append(col)
        
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [11]:
common_columns = [
    'VendorID',
    'pickup_datetime',
    'dropoff_datetime',
    'store_and_fwd_flag',
    'RatecodeID',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount',
    'payment_type',
    'congestion_surcharge'
]

In [12]:
from pyspark.sql import functions as F

In [13]:
df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [14]:
df_trips_data.groupBy('service_type').count().show()

                                                                                

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In order to run `spark.sql()`, we need to tell Spark what tables/dataframes exist for querying!

In [15]:
df_trips_data.registerTempTable('trips_data')



In [16]:
spark.sql("""

SELECT 
  service_type
, COUNT(1)
FROM trips_data
GROUP BY 1
-- LIMIT 10;

""").show()



+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+





                                                                                

Now, executing a more complicated query (taking `dim_monthly_zone_revenue.sql` from `04-analytics-engineering`, with some slight adjustments)...

In [None]:
df_result = spark.sql("""
SELECT 
  PULocationID AS revenue_zone 
, DATE_TRUNC('month', pickup_datetime) AS revenue_month
, service_type 

/* Revenues */
, SUM(fare_amount) AS revenue_monthly_fare
, SUM(extra) AS revenue_monthly_extra
, SUM(mta_tax) AS revenue_monthly_mta_tax
, SUM(tip_amount) AS revenue_monthly_tip_amount
, SUM(tolls_amount) AS revenue_monthly_tolls_amount
, SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge
, SUM(total_amount) AS revenue_monthly_total_amount
, SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge

/* Additional Calculations */
, AVG(passenger_count) AS avg_monthly_passenger_count
, AVG(trip_distance) AS avg_monthly_trip_distance
FROM trips_data 
WHERE 1=1
GROUP BY 1,2,3
""")

25/03/08 12:49:14 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
25/03/08 12:49:14 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:981)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

In [None]:
# # Use `.coalesce()` (which acts like `.repartition()` to REDUCE the number of partitions!)
# df_result.coalesce(1).write.parquet('data/report/revenue/', mode = 'overwrite')


In [None]:
df_result.show()