In [1]:
# Example weather station data (streaming measurements coming from 
# the weather station)
#
# 1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M

Each line contains timestamp and a set of measurements.We are interested in the average wind direction:
Dn - Wind direction minimum deg #, D(Dm).

In [2]:
# Parse a line and returning the average wind direction measurement 
import re
def parse(line):
    match = re.search("Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

In [3]:
#to create a new instance of Spark's StreamingContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1) #batch interval 1 sec

In [4]:
#to create DStream of Weather Data
weather_data = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)

In [5]:
av_wind_speed =   weather_data.flatMap(parse)

In [6]:
#to create sliding window of data
window = av_wind_speed.window(10, 5)
# the window combines 10 sec worth of data and moves by 5 sec

In [7]:
#this function will print max and min values for the window
def max_min(rdd):
    print(rdd.collect())
    if rdd.count() > 0:
        print("max = {}, min = {}".format(rdd.max(), rdd.min()))

In [8]:
window.foreachRDD(lambda rdd: max_min(rdd))

In [9]:
#Now we are ready to start stream processing!
ssc.start()

[310, 311]
max = 311, min = 310
[310, 311, 312, 310, 318, 326, 312]
max = 326, min = 310
[312, 310, 318, 326, 312, 312, 320, 320, 323, 326, 327]
max = 327, min = 310
[312, 320, 320, 323, 326, 327, 331, 339, 7, 1, 337]
max = 339, min = 1
[331, 339, 7, 1, 337, 338, 328, 324, 315, 316]
max = 339, min = 1
[338, 328, 324, 315, 316, 311, 309, 306, 318, 318]


In [10]:
ssc.stop()

max = 338, min = 306


Our sliding window has 10 seconds worth of data and it moves every 5 seconds. Thus, the second half of a window becomes the first half of the next window.