In [0]:
%pyspark
spark

<pyspark.sql.session.SparkSession object at 0x7f2ee5105c10>


In [1]:
%pyspark
# Create a socket readStream
fares_raw = (
    spark
    .readStream
    .format("socket")
    .option("host", "edge-1.au.adaltas.cloud")
    .option("port", 11111)
    .load()
)



In [2]:
%pyspark
from pyspark.sql import functions as F

# Parse the socket message "manually"
fares = fares_raw.select(
    F.split(F.col('value'), ',')[0].alias('ride_id').cast('int'),
    F.split(F.col('value'), ',')[1].alias('taxi_id').cast('int'),
    F.split(F.col('value'), ',')[2].alias('driver_id').cast('int'),
    F.split(F.col('value'), ',')[3].alias('start_time').cast('timestamp'),
    F.split(F.col('value'), ',')[4].alias('payment_type'),
    F.split(F.col('value'), ',')[5].alias('tip').cast('float'),
    F.split(F.col('value'), ',')[6].alias('tolls').cast('float'),
    F.split(F.col('value'), ',')[7].alias('total_fare').cast('float')
)



In [3]:
%pyspark
# Write all fares events to an in-memory table named "fares"
fares_query = (
    fares
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("fares")
    .start()
)



In [4]:
%pyspark
# Pretty print the result table
z.show(spark.table("fares"))

ride_id	taxi_id	driver_id	start_time	payment_type	tip	tolls	total_fare
1	2013000001	2013000001	2020-11-13 15:02:50.0	CSH	0.0	0.0	21.5
2	2013000002	2013000002	2020-11-13 15:02:50.0	CSH	0.0	0.0	7.0
3	2013000003	2013000003	2020-11-13 15:02:50.0	CRD	2.2	0.0	13.7
4	2013000004	2013000004	2020-11-13 15:02:50.0	CRD	1.7	0.0	10.7
5	2013000005	2013000005	2020-11-13 15:02:50.0	CRD	4.65	0.0	20.15
6	2013000006	2013000006	2020-11-13 15:02:50.0	CSH	0.0	4.8	34.3
7	2013000007	2013000007	2020-11-13 15:02:50.0	CRD	1.9	0.0	11.9
8	2013000008	2013000008	2020-11-13 15:02:50.0	CSH	0.0	0.0	6.0
9	2013000009	2013000009	2020-11-13 15:02:50.0	CRD	1.0	0.0	6.0
10	2013000010	2013000010	2020-11-13 15:02:50.0	CSH	0.0	0.0	15.5
11	2013000011	2013000011	2020-11-13 15:02:50.0	CRD	4.7	0.0	28.7
12	2013000012	2013000012	2020-11-13 15:02:50.0	CRD	4.0	0.0	30.5
13	2013000013	2013000013	2020-11-13 15:02:50.0	CRD	3.0	0.0	17.0
14	2013000014	2013000014	2020-11-13 15:02:50.0	CRD	1.65	0.0	7.65
15	2013000015	2013000015	2020-11-13 15:02:

In [5]:
%pyspark
fares_query.stop()



In [6]:
%pyspark
# Compute the number of rides and the average 
fares_count = (
    fares
    .withWatermark('start_time', '5 minutes')
    .groupBy(F.window(fares.start_time, '2 minutes', '2 minutes'))
    .agg({'ride_id': 'count', 'total_fare': 'mean'})
)

# Start writting the stream to an in-memory table
fares_count_query = (
    fares_count
    .writeStream
    .trigger(processingTime='30 seconds')
    .outputMode("complete")
    .format("memory")
    .queryName("fares_count")
    .start()
)



In [7]:
%pyspark
z.show(spark.table("fares_count").orderBy('window'))

window	count(ride_id)	avg(total_fare)
[2020-11-13 15:06:00.0,2020-11-13 15:08:00.0]	94	14.799574436025416
[2020-11-13 15:08:00.0,2020-11-13 15:10:00.0]	267	14.601498103766852
[2020-11-13 15:10:00.0,2020-11-13 15:12:00.0]	329	14.501246210289581


In [8]:
%pyspark
fares_count_query.stop()



In [9]:
%pyspark
rides_raw = (
    spark
    .readStream
    .format("socket")
    .option("host", "edge-1.au.adaltas.cloud")
    .option("port", 11113)
    .load()
)



In [10]:
%pyspark
rides = rides_raw.select(
    F.split(rides_raw.value, ',')[0].alias('ride_id').cast('int'),
    F.split(rides_raw.value, ',')[1].alias('is_start'),
    F.split(rides_raw.value, ',')[2].alias('start_time').cast('timestamp'),
    F.split(rides_raw.value, ',')[3].alias('end_time').cast('timestamp'),
    F.split(rides_raw.value, ',')[4].alias('start_lon').cast('float'),
    F.split(rides_raw.value, ',')[5].alias('start_lat').cast('float'),
    F.split(rides_raw.value, ',')[6].alias('end_lon').cast('float'),
    F.split(rides_raw.value, ',')[7].alias('end_lat').cast('float'),
    F.split(rides_raw.value, ',')[7].alias('passenger_count').cast('int'),
    F.split(rides_raw.value, ',')[8].alias('taxi_id').cast('int'),
    F.split(rides_raw.value, ',')[9].alias('driver_id').cast('int')
)

# Filter the rides dataset to keep only "END" events
rides_end = rides.where(rides.is_start == 'END').drop('is_start')



In [11]:
%pyspark
rides_query = (
    rides_end
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("rides")
    .start()
)



In [12]:
%pyspark
rides_query.stop()



In [13]:
%pyspark
rides_with_watermark = rides_end.withWatermark('start_time', '1 hour')
fares_with_watermark = fares.withWatermark('start_time', '1 hour')

full_rides = (
    rides_with_watermark
    .join(
        fares_with_watermark,
        'ride_id'
    )
)



In [14]:
%pyspark
full_rides_query = (
    full_rides
    .writeStream
    .outputMode("append")
    .format("memory")
    .queryName("full_rides")
    .start()
)



In [15]:
%pyspark
z.show(spark.table("full_rides"))

ride_id	start_time	end_time	start_lon	start_lat	end_lon	end_lat	passenger_count	taxi_id	driver_id	taxi_id	driver_id	start_time	payment_type	tip	tolls	total_fare
65	2020-11-13 15:16:47.0	2020-11-13 15:16:20.0	-73.99221	40.725124	-73.991646	40.726658	40	1	2013000065	2013000065	2013000065	2020-11-13 15:15:59.0	CSH	0.0	0.0	3.5
137	2020-11-13 15:17:11.0	2020-11-13 15:17:11.0	0.0	0.0	0.0	0.0	0	1	2013000137	2013000137	2013000137	2020-11-13 15:16:50.0	CSH	0.0	0.0	3.5
77	2020-11-13 15:17:33.0	2020-11-13 15:16:31.0	-73.9701	40.768005	-73.96977	40.767834	40	1	2013000077	2013000077	2013000077	2020-11-13 15:16:10.0	CSH	0.0	0.0	4.0
94	2020-11-13 15:17:51.0	2020-11-13 15:16:51.0	-74.005165	40.72053	-74.00393	40.725655	40	1	2013000094	2013000094	2013000094	2020-11-13 15:16:30.0	CSH	0.0	0.0	4.0
70	2020-11-13 15:17:48.0	2020-11-13 15:16:25.0	-73.97544	40.749657	-73.97733	40.75199	40	1	2013000070	2013000070	2013000070	2020-11-13 15:16:04.0	CSH	0.0	0.0	4.0
53	2020-11-13 15:18:11.0	2020-11-13 15:16:11.0	-7

In [16]:
%pyspark
full_rides_query.stop()



In [17]:
%pyspark
# Expected lab code for 1 KPI

# Create a socket readStream
fares_raw = (
    spark
    .readStream
    .format("socket")
    .option("host", "edge-1.au.adaltas.cloud")
    .option("port", 11115)
    .load()
)


# Parse the socket message "manually"
fares = fares_raw.select(
    F.split(fares_raw.value, ',')[0].alias('ride_id').cast('int'),
    F.split(fares_raw.value, ',')[1].alias('taxi_id').cast('int'),
    F.split(fares_raw.value, ',')[2].alias('driver_id').cast('int'),
    F.split(fares_raw.value, ',')[3].alias('start_time').cast('timestamp'),
    F.split(fares_raw.value, ',')[4].alias('payment_type'),
    F.split(fares_raw.value, ',')[5].alias('tip').cast('float'),
    F.split(fares_raw.value, ',')[6].alias('tolls').cast('float'),
    F.split(fares_raw.value, ',')[7].alias('total_fare').cast('float')
)

fares_count = (
    fares
    .withWatermark('start_time', '5 minutes')
    .groupBy(F.window(F.col('start_time'), '2 minutes', '2 minutes'), F.col('payment_type'))
    .agg({'ride_id': 'count', 'total_fare': 'mean', 'tip': 'mean'})
    .withColumn('payment_type',
                F.when((F.lit('CRD') == F.col('payment_type')) | (F.lit('CSH') == F.col('payment_type')), F.col('payment_type'))
                .otherwise(F.lit('OTHER'))
               )
)

# Start writting the stream to an in-memory table
fares_count_query = (
    fares_count
    .writeStream
    .outputMode("complete")
    .format("memory")
    .queryName("fares_count")
    .option('checkpointLocation', '/education/ece/big-data/2020/fall/bda/gr1/gauthier/spark-streaming/checkpoint')
    .start()
)



In [18]:
%pyspark
z.show(spark.table("fares_count"))

window	payment_type	count(ride_id)	avg(total_fare)	avg(tip)
[2020-11-13 16:00:00.0,2020-11-13 16:02:00.0]	OTHER	1	3.5	0.0
[2020-11-13 15:52:00.0,2020-11-13 15:54:00.0]	CRD	32	17.816249907016754	2.2521875016391277
[2020-11-13 15:58:00.0,2020-11-13 16:00:00.0]	OTHER	7	19.257142748151505	0.0
[2020-11-13 15:54:00.0,2020-11-13 15:56:00.0]	CRD	104	18.063076872092026	2.780865376385359
[2020-11-13 16:02:00.0,2020-11-13 16:04:00.0]	CRD	159	17.07559746316394	2.8001257897173084
[2020-11-13 16:00:00.0,2020-11-13 16:02:00.0]	CRD	240	18.214708373943964	2.845749986326943
[2020-11-13 16:02:00.0,2020-11-13 16:04:00.0]	OTHER	2	7.0	0.0
[2020-11-13 15:58:00.0,2020-11-13 16:00:00.0]	OTHER	1	6.5	0.0
[2020-11-13 15:58:00.0,2020-11-13 16:00:00.0]	OTHER	1	6.0	1.0
[2020-11-13 16:00:00.0,2020-11-13 16:02:00.0]	CSH	457	12.681619247670247	0.0
[2020-11-13 16:00:00.0,2020-11-13 16:02:00.0]	OTHER	5	10.7	0.0
[2020-11-13 15:56:00.0,2020-11-13 15:58:00.0]	CRD	122	17.001967254232188	2.660163929518007
[2020-11-13 16:02:00

In [19]:
%pyspark
fares_count_query.stop()



In [20]:
%pyspark


