# Spark streaming: 2 + 1

    We chose the following queries: 2-> Q1 and Q5, 1-> Q6
    This queries only aggregate statistics and do not need pattern matching

In [32]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import socket
from pyspark.streaming.kafka import KafkaUtils


SPEED_UP_FACTOR = 1000
sc = SparkContext("local[2]", "KafkaExample")
print("SparkContext created!")
ssc = StreamingContext(sc, 5)
print("StreamingContext created!")

SparkContext created!
StreamingContext created!


In [33]:
def to_dict(item):
    event = item[-1].split(',')

    keys = [
        'medallion',
        'hack_license',
        'pickup_datetime',
        'dropoff_datetime',
        'trip_time_in_secs',
        'trip_distance',
        'pickup_longitude',
        'pickup_latitude',
        'dropoff_longitude',
        'dropoff_latitude',
        'payment_type',
        'fare_amount',
        'surcharge',
        'mta_tax',
        'tip_amount',
        'tolls_amount',
        'total_amount'
    ]

    return {keys[i]: event[i] for i in range(len(keys))}

## 300x300 grid
Maps longitude and latitude to 300x300 grid

In [34]:
def to_route_300_grid(d, value):
    start_cell_x = int(-1*((-74.913585 - float(d['pickup_longitude'])) / 0.005986 ) + 1)
    start_cell_y = int(((41.474937 - float(d['pickup_latitude']))/ 0.004491556) + 1)
    ending_cell_x = int(-1*((-74.913585 - float(d['dropoff_longitude'])) / 0.005986 ) + 1)
    ending_cell_y = int(((41.474937 - float(d['dropoff_latitude'])) / 0.004491556) + 1)
    
    key = f'{start_cell_x}_{start_cell_y}_{ending_cell_x}_{ending_cell_y}'
    
    if start_cell_x >= 1 and start_cell_y >= 1 and ending_cell_x <= 300 and ending_cell_y <= 300:
        return key, value
    
    return key, 0

## Q1: Find the top 10 most frequent routes during the last 30 minutes

In [4]:
stream = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

result = stream.window((30*60)/SPEED_UP_FACTOR) \
            .filter(lambda line : len(line) > 0) \
            .map(lambda line: to_dict(line)) \
            .map(lambda d: to_route_300_grid(d, 1)) \
            .reduceByKey(lambda a, b: a + b) \
            .transform(lambda rdd: sc.parallelize(rdd.top(10, lambda r: r[1])))

result.pprint()
ssc.start()

-------------------------------------------
Time: 2019-06-17 17:47:00
-------------------------------------------
('156_163_156_163', 11)
('154_162_154_162', 9)
('156_164_157_163', 9)
('155_163_155_163', 8)
('157_162_157_162', 8)
('161_156_161_155', 8)
('155_163_155_166', 7)
('158_159_158_161', 7)
('156_164_161_156', 7)
('155_166_157_162', 7)

-------------------------------------------
Time: 2019-06-17 17:47:05
-------------------------------------------
('158_161_157_163', 30)
('159_160_157_163', 28)
('159_160_155_162', 26)
('161_156_160_158', 26)
('157_163_155_162', 26)
('159_160_159_160', 24)
('159_160_161_157', 21)
('157_161_155_162', 20)
('157_162_157_162', 20)
('161_156_161_155', 20)

-------------------------------------------
Time: 2019-06-17 17:47:10
-------------------------------------------
('158_161_157_163', 33)
('159_160_157_163', 32)
('159_160_155_162', 30)
('159_160_159_160', 26)
('161_156_160_158', 26)
('157_163_155_162', 26)
('159_160_161_157', 23)
('156_163_156_163

## Q5: Select the most pleasant taxi driver

In [9]:
stream = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

result = stream.window(30, 30) \
            .filter(lambda line : len(line) > 0) \
            .map(lambda line: to_dict(line)) \
            .map(lambda line: ( line["medallion"], float(line["tip_amount"]) ) ) \
            .reduceByKey(lambda a, b: a + b) \
            .transform(lambda rdd: sc.parallelize(rdd.top(1, lambda r: r[1])))
result.pprint()
ssc.start()

-------------------------------------------
Time: 2019-06-17 18:13:00
-------------------------------------------
('DB6FC742268C83E83A069DE6B3FB3BA1', 81.45)

-------------------------------------------
Time: 2019-06-17 18:13:30
-------------------------------------------
('EE8A706C1D8EE8897A1FEB591B5E5884', 80.11)

-------------------------------------------
Time: 2019-06-17 18:14:00
-------------------------------------------
('7D61EF14AA7A367FB6F8D66AB32A1E49', 97.73)

-------------------------------------------
Time: 2019-06-17 18:14:30
-------------------------------------------
('DF55E62C009EE4F61F8F34552741884B', 45.0)



## Q6: Tips hall of fame!
Store biggest tip per route - sorted for convenience

In [36]:
stream = KafkaUtils.createDirectStream(ssc, ["debs"], \
            {"metadata.broker.list": "kafka:9092"})

result = stream.filter(lambda line : len(line) > 0) \
                .map(lambda line: to_dict(line)) \
                .map(lambda d: to_route_300_grid(d, float(d["tip_amount"]))) \
                .reduceByKey(lambda a, b: max(a,b)) 
result.pprint()
ssc.start()

-------------------------------------------
Time: 2019-06-17 18:21:35
-------------------------------------------
('158_159_157_161', 0.0)
('156_159_155_162', 1.5)
('154_163_152_166', 1.0)
('157_160_155_159', 1.95)
('156_159_156_159', 0.0)
('157_154_189_186', 0.0)
('155_160_189_186', 0.0)
('156_163_161_148', 0.0)
('155_159_155_162', 1.38)
('158_158_155_160', 0.0)
...

-------------------------------------------
Time: 2019-06-17 18:21:40
-------------------------------------------
('160_158_190_185', 10.4)
('152_164_159_159', 0.0)
('156_164_164_151', 0.0)
('156_166_153_161', 0.0)
('156_156_156_162', 1.0)
('156_159_176_158', 7.0)
('158_155_159_156', 0.0)
('162_154_160_154', 0.0)
('156_162_158_159', 1.8)
('153_160_151_170', 0.0)
...

-------------------------------------------
Time: 2019-06-17 18:21:45
-------------------------------------------
('154_167_159_160', 2.0)
('161_157_159_159', 1.2)
('156_166_155_162', 2.75)
('155_179_155_162', 6.6)
('160_155_154_165', 3.88)
('155_173_153_167'

In [37]:
ssc.stop()
sc.stop()