#Prediction for IoT Traffic Simulation

###Author: Neeraj Asthana and Professor Robert Brunner

###Collaborators: Anchal, Rishabh, Vaishali

___

##Problem Statement
Recent advances in sensor technology, infrastructure for estimating temperature, pressure, humidity, precipitation, etc. are readily available on roadways to predict traffic states. The goal and novelty of our project is to combine streamed GPS traffic estimation algorithms with dynamic road sensor data to accurately predict traffic states and efficiently advise and direct drivers.

![alt text](car_road.png "Cars and Sensors")

##Cluster Details

All of these resources are on the NCSA ACX cluster. 

###Spark Details


###Kafka Details

Nodes (brokers): 141.142.236.172, 141.142.236.194

Port: 9092

Zookeeper: 10.0.3.130:2181,10.0.3.131:2181

###Cassandra Details

Table name: traffic

Schema:

traffic (id uuid, time_stamp timestamp, latitude decimal, longitude decimal, PRIMARY KEY (id, time_stamp));

____

Visual of all resources:

![alt text](Data Pipeline.png "Data Pipeline")

Details of the cluster:

![alt text](Cluster Details.png "Cluster Details")

##Time Decay Model for Speed Estimation

Used to estimate Average Road Speeds for specific segments of road by weighting speed observations.

####Observations
* Higher Speeds are more valuable the lower speeds
* Recent Obersvations are favored over historical observations
* Each data point will be of the form ($t_i, v_i, l_i$) or (time, velocity, location)


####Time Weight
An observation's timestamp ($t_i$) is weighted  by: $w(i,t) = \frac{f(t_i - L)}{f(t - L)}$

*f* is some positive, monotonic, non-decreasing function

*L* is some Landmark time (starting time)

*t* is the most recent timestamp

####Velocity Weight
An observation's velocity ($v_i$) is weighted  by: $w^v (i) = g (v_i)$

*g* is some positive, monotonic, non-decreasing function

####Combination of Weights
Weight combinations of the time and velocity weights:

$w^* (i,t) = w(i,t) \cdot w^v (i) = \frac{f(t_i - L)}{f(t - L)} \cdot g(v_i)$

####Aggregating observatoins
Calculates average velocity of a road segment (*r*) by aggregating most recent and historical observations. In order compute these values efficiently and to be able to update values, I will persist 2 quantities in Spark , $X,Y$ which will be then be used to calulate $\overline{V}(r) = \frac{X}{Y}$

$$X = \sum_{n=1}^{m} f(t_i - L) \cdot g(v_i) \cdot v_i$$         

$$Y = \sum_{n=1}^{m} f(t_i - L) \cdot g(v_i)$$

Visual of the Time Decay Model:

![alt text](Time Decay Visual.png "Time Decay")

In [None]:
##Road Segments

class Segment:
    def __init__(self, latitude, longitude, landmark_time, identification):
        self.latitude = latitude
        self.longitude = longitude
        self.landmark = landmark_time
        self.current_time = landmark_time
        self.id = identification
        self.obs = 0
        self.X = 0
        self.Y = 0
        self.speed = 0
        
segments = [Segment(47.1,47.1,0, 0)]

In [8]:
##Dynamic Map Matching Algorithm
import math

#Distance in kilometers between two latitude and longitude locations
def distance(obsLoc, segmentLoc):
    lat1, lon1 = obsLoc
    lat2, lon2 = segmentLoc
    radius = 6371 # km

    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
        * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c

    return d

#match an observation to a road segment (within .25 km of a road segment)
def findSegment(latitude, longitude):
    obsLoc = (latitude,longitude)
    
    closest = None
    for segment in segments:
        segmentLoc = (segment.latitude, segment.longitude)
        dist = distance(obsLoc, segmentLoc)
        if closest == None or dist <= closest:
            if dist <= .25:
                closest = segment
    return segment

In [3]:
##Time Decay Model

def f(time):
    return time

def g(velocity):
    return (velocity ** 2)

def update(latitude, longitude, velocity, time):
    #find segment using dynamic map matching algorithm
    seg = findSegment(latitude,longitude)
    
    #weight time and velocity
    wtime = f(time-self.landmark)
    wvel = g(velocity)
    
    #produce X and Y for this observation and add these to the segment speed
    X = wtime*wvel*velocity
    Y = wtime*wvel
    seg.X += X
    seg.Y += Y
    seg.speed = seg.X/seg.Y
    seg.obs += 1
    seg.current_time = time

##Spark Kafka Integration

Steps:

1. Write file with kafka script (use ipython to write script)

2. use spark-submit to submit file with appropriate jars to Spark Cluster

In [5]:
%%writefile timedecaykafka.py

from __future__ import print_function

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# create SparkContext
conf = SparkConf().setAppName("Kafka-Spark")
sc = SparkContext(conf=conf)

# create StreamingContext
ssc = StreamingContext(sc, 30)

# new approach (w/o receivers)
topic = ["mytopic"]
brokers =  "141.142.236.172:9092,141.142.236.194:9092"
directKafkaStream = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})

lines = directKafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

print("finished this")

Overwriting timedecaykafka.py


In [6]:
!../../../spark/spark-1.5.0-bin-hadoop2.6/bin/spark-submit --master spark://10.0.3.70:7077 \
        --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.0 timedecaykafka.py

Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
:: loading settings :: url = jar:file:/home/ubuntu/spark/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka_2.10;1.5.0 in central
	found org.apache.kafka#kafka_2.10;0.8.2.1 in central
	found com.yammer.metrics#metrics-core;2.2.0 in central
	found org.slf4j#slf4j-api;1.7.10 in central
	found org.apache.kafka#kafka-clients;0.8.2.1 in central
	found net.jpountz.lz4#lz4;1.3.0 in central
	found org.xerial.snappy#snappy-java;1.1.1.7 in central
	found com.101tec#zkclient;0.3 in central
	found log4j#log4j;1.2.17 in central
	found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apa