# Spark streaming: 2 + 1

    We chose the following queries: 2-> Q1 and Q5, 1-> Q6
    This queries only aggregate statistics and do not need pattern matching

In [270]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import socket
from pyspark.streaming.kafka import KafkaUtils


SPEED_UP_FACTOR = 60
sc = SparkContext("local[2]", "KafkaExample")
print("SparkContext created!")
ssc = StreamingContext(sc, 5)
print("StreamingContext created!")

SparkContext created!
StreamingContext created!


In [271]:
def to_dict(item):
    event = item[-1].split(',')

    keys = [
        'medallion',
        'hack_license',
        'pickup_datetime',
        'dropoff_datetime',
        'trip_time_in_secs',
        'trip_distance',
        'pickup_longitude',
        'pickup_latitude',
        'dropoff_longitude',
        'dropoff_latitude',
        'payment_type',
        'fare_amount',
        'surcharge',
        'mta_tax',
        'tip_amount',
        'tolls_amount',
        'total_amount'
    ]

    return {keys[i]: event[i] for i in range(len(keys))}

## 300x300 grid
Maps longitude and latitude to 300x300 grid

In [272]:
def to_route_300_grid(d):
    start_cell_x = int(-1*((-74.913585 - float(d['pickup_longitude'])) / 0.005986 ) + 1)
    start_cell_y = int(((41.474937 - float(d['pickup_latitude']))/ 0.004491556) + 1)
    ending_cell_x = int(-1*((-74.913585 - float(d['dropoff_longitude'])) / 0.005986 ) + 1)
    ending_cell_y = int(((41.474937 - float(d['dropoff_latitude'])) / 0.004491556) + 1)
    
    key = f'{start_cell_x}_{start_cell_y}_{ending_cell_x}_{ending_cell_y}'
    
    if start_cell_x >= 1 and start_cell_y >= 1 and ending_cell_x <= 300 and ending_cell_y <= 300:
        return key, 1
    
    return key, 0

## Q1: Find the top 10 most frequent routes during the last 30 minutes

In [273]:
stream = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

result = stream.window(30) \
            .filter(lambda line : len(line) > 0) \
            .map(lambda line: to_dict(line)) \
            .map(lambda d: to_route_300_grid(d)) \
            .reduceByKey(lambda a, b: a + b) \
            .transform(lambda rdd: sc.parallelize(rdd.top(10, lambda r: r[1])))

result.pprint()
ssc.start()

-------------------------------------------
Time: 2019-06-13 22:38:05
-------------------------------------------
('189_186_165_160', 1)
('156_165_157_162', 1)
('154_160_153_162', 1)
('156_159_155_168', 1)
('154_167_150_187', 1)
('155_160_157_162', 1)
('157_164_175_157', 1)
('162_155_190_185', 1)
('188_186_158_161', 1)
('190_185_165_165', 1)

-------------------------------------------
Time: 2019-06-13 22:38:10
-------------------------------------------
('156_165_157_162', 2)
('154_162_155_160', 2)
('156_160_156_160', 2)
('162_155_160_158', 2)
('156_157_175_157', 2)
('190_185_151_171', 2)
('189_186_165_160', 1)
('154_160_153_162', 1)
('156_159_155_168', 1)
('154_167_150_187', 1)

-------------------------------------------
Time: 2019-06-13 22:38:15
-------------------------------------------
('156_165_157_162', 2)
('154_162_155_160', 2)
('157_159_157_164', 2)
('156_160_156_160', 2)
('157_165_155_162', 2)
('155_158_155_160', 2)
('156_163_175_157', 2)
('155_162_156_162', 2)
('156_156_15

## Q5: Select the most pleasant taxi driver

stream = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

result = stream.window((24*60*60)/SPEED_UP_FACTOR, (24*60*60)/SPEED_UP_FACTOR) \
            .filter(lambda line : len(line) > 0) \
            .map(lambda line: to_dict(line)) \
            .map(lambda ldict: (ldict["medallion"], ldict["tip_amount"])) \
            .reduce(lambda a, b: a + b) \
            .transform(lambda rdd: sc.parallelize(rdd.top(1, lambda r: r[1])))

## Q6: Tips hall of fame!
Store biggest tip per route

In [None]:
stream = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

result = stream.filter(lambda line : len(line) > 0) \
                .map(lambda line: to_dict(line)) \
                .map(lambda d: to_route_300_grid(d)[1] = d["tip_amount"] ) \
                .reduceByKey(lambda a, b: max(a,b))

In [274]:
ssc.stop()
sc.stop()