# Compute the Average of Pseudo Streaming Data

In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import time
import math

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate() 

sc = spark.sparkContext
pickup_data = sc.textFile('/data/uber_pickup.csv')

In [2]:
# show data
pickup_data.take(10)

['7/1/2014,12:00:00 AM," 874 E 139th St Mott Haven, BX",,,',
 '7/1/2014,12:01:00 AM," 628 E 141st St Mott Haven, BX",,,',
 '7/1/2014,12:01:00 AM," 601 E 156th St South Bronx, BX",,,',
 '7/1/2014,12:01:00 AM," 708 E 138th St Mott Haven, BX",,,',
 '7/1/2014,12:02:00 AM," 700 E 140th St Mott Haven, BX",,,',
 '7/1/2014,12:03:00 AM," 514 E 163rd St Cortlandt, BX",,,',
 '7/1/2014,12:08:00 AM," 300 E 150th St Cortlandt, BX",,,',
 '7/1/2014,12:10:00 AM," 370 E 153rd St South Bronx, BX",,,',
 '7/1/2014,12:11:00 AM," 455 E 148th St South Bronx, BX",,,',
 '7/1/2014,12:11:00 AM," 600 E 141st St Mott Haven, BX",,,']

## Batch Data Processing Pipeline
* In the order of Key: Count

In [3]:
def batch_pipeline(source):
    def udf(row):
        row = row.split(",")
        hour = int(row[1].split(" ")[0].split(":")[0])
        if row[1].split(" ")[1] == 'PM':
            if hour != 12:
                hour += 12
        if hour == 12 and row[1].split(" ")[1] == 'AM':
            hour = 0
        loc = row[2][2:]
        return (hour, loc), 1
    target = (
        source.map(udf)
        .reduceByKey(lambda x, y: x + y)
        .map(lambda x: (x[0][0], (x[1], x[0][1])))
        .reduceByKey(max)
        .sortBy(lambda x: int(x[0]), ascending=True)
    )
    return target

In [4]:
batch_res = batch_pipeline(pickup_data)
batch_res.collect()

[(0, (64, '400 Brook Ave Mott Haven')),
 (1, (63, '525 Jackson Ave South Bronx')),
 (2, (35, '545 E 145th St Mott Haven')),
 (3, (65, '355 E 143rd St Mott Haven')),
 (4, (56, '752 Kelly St South Bronx')),
 (5, (52, '435 E 143rd St Mott Haven')),
 (6, (71, '671 Westchester Ave South Bronx')),
 (7, (141, '400 Brook Ave Mott Haven')),
 (8, (112, '331 E 132nd St Mott Haven')),
 (9, (104, '545 E 145th St Mott Haven')),
 (10, (62, '500 Southern Blvd South Bronx')),
 (11, (50, '400 Brook Ave Mott Haven')),
 (12, (60, '105 Willis Ave Mott Haven')),
 (13, (54, '400 Brook Ave Mott Haven')),
 (14, (44, '388 E 141st St Mott Haven')),
 (15, (45, '400 Brook Ave Mott Haven')),
 (16, (88, '350 Saint Anns Ave Mott Haven')),
 (17, (59, '436 E 149th St South Bronx')),
 (18, (55, '400 Brook Ave Mott Haven')),
 (19, (56, '281 E 143rd St Cortlandt')),
 (20, (62, '400 Brook Ave Mott Haven')),
 (21, (74, '400 Brook Ave Mott Haven')),
 (22, (60, '400 Brook Ave Mott Haven')),
 (23, (77, '400 Brook Ave Mott Have

## Streaming Data Processing

### Generate Pseudo Streaming Dataset (Queue of RDDs)

* Randomly split the original rdd into a list of N_SPLIT rdds as the pseudo input for streaming data.
* Each of the RDD can be considered as a single batch data at certain timestamp in the stream.

In [2]:
# Create the queue through which RDDs can be pushed to a QueueInputDStream
N_SPLIT = 5

queue_rdds = pickup_data.randomSplit([0.1 for _ in range(N_SPLIT)])

In [3]:
def stream_pipeline(source):
    def udf(row):
        row = row.split(",")
        hour = int(row[1].split(" ")[0].split(":")[0])
        if row[1].split(" ")[1] == 'PM':
            if hour != 12:
                hour += 12
        if hour == 12 and row[1].split(" ")[1] == 'AM':
            hour = 0
        loc = row[2][2:]
        return (hour, loc), 1
    
    def updateFunc(newValues, runningCount):
        if runningCount is None:
            runningCount = 0
        # add the new values with the previous running count to get the new count
        return sum(newValues, runningCount)  
    target = (
        source.map(udf)
        .reduceByKey(lambda a, b: a + b)
        .updateStateByKey(updateFunc)
        .map(lambda x: (x[0][0], (x[1], x[0][1])))
        .reduceByKey(max)
        .transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
    )
    return target

#     def updateFunc(new_values, running_value):
#         if not running_value:
#             running_value = 
#         key = new_values[0]
#         count = new_values[1]
#         if key not in running_values:
#             running_value[key] = 0
#         running_value[key] += count
#         return running_value

def batch_pipeline(source):
    def udf(row):
        row = row.split(",")
        hour = int(row[1].split(" ")[0].split(":")[0])
        if row[1].split(" ")[1] == 'PM':
            if hour != 12:
                hour += 12
        if hour == 12 and row[1].split(" ")[1] == 'AM':
            hour = 0
        loc = row[2][2:]
        return (hour, loc), 1
    target = (
        source.map(udf)
        .reduceByKey(lambda x, y: x + y)
        .map(lambda x: (x[0][0], (x[1], x[0][1])))
        .reduceByKey(max)
        .sortBy(lambda x: int(x[0]), ascending=True)
    )
    return target

### Compute and aggregate word counts over streaming data

In [4]:
# initial spark streaming context with batch interval for 1 sec
ssc = StreamingContext(sc, 1)

# checkpoints is required for state operation
ssc.checkpoint("./checkpoints")

# init input stream
inputStream = ssc.queueStream(queue_rdds)

# call stream pipeline
out = stream_pipeline(inputStream)

# log
out.pprint()

# run streaming for 5 sec.
ssc.start()
time.sleep(10)
ssc.stop(stopSparkContext=True, stopGraceFully=True) 

-------------------------------------------
Time: 2021-04-27 15:21:47
-------------------------------------------
(7, (37, '400 Brook Ave Mott Haven'))
(9, (26, '512 E 145th St Mott Haven'))
(8, (25, '331 E 132nd St Mott Haven'))
(16, (22, '350 Saint Anns Ave Mott Haven'))
(10, (19, '545 E 145th St Mott Haven'))
(21, (19, '400 Brook Ave Mott Haven'))
(6, (18, '671 Westchester Ave South Bronx'))
(23, (16, '383 E 141st St Mott Haven'))
(13, (16, '301 E 156th St Cortlandt'))
(1, (14, '525 Jackson Ave South Bronx'))
...

-------------------------------------------
Time: 2021-04-27 15:21:48
-------------------------------------------
(7, (54, '400 Brook Ave Mott Haven'))
(9, (43, '512 E 145th St Mott Haven'))
(8, (42, '331 E 132nd St Mott Haven'))
(21, (35, '400 Brook Ave Mott Haven'))
(16, (31, '350 Saint Anns Ave Mott Haven'))
(6, (30, '671 Westchester Ave South Bronx'))
(18, (29, '400 Brook Ave Mott Haven'))
(23, (28, '383 E 141st St Mott Haven'))
(10, (27, '545 E 145th St Mott Haven'))
