In [2]:
# Example weather station data
#
# 1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H
# 1419408017	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M
# 1419408018	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M
# 1419408019	0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M

In [1]:
# Key for measurements:
#
# Sn      Wind speed minimum m/s, km/h, mph, knots #,M, K, S, N
# Sm      Wind speed average m/s, km/h, mph, knots #,M, K, S, N
# Sx      Wind speed maximum m/s, km/h, mph, knots #,M, K, S, N
# Dn      Wind direction minimum deg #, D
# Dm      Wind direction average deg #, D
# Dx      Wind direction maximum deg #, D
# Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I
# Ta      Air temperature °C, °F #, C, F
# Tp      Internal temperature °C, °F #, C, F
# Ua      Relative humidity %RH #, P
# Rc      Rain accumulation mm, in #, M, I
# Rd      Rain duration s #, S
# Ri      Rain intensity mm/h, in/h #, M, I
# Rp      Rain peak intensity mm/h, in/h #, M, I
# Hc      Hail accumulation hits/cm2, hits/in2, hits #, M, I, H
# Hd      Hail duration s #, S
# Hi      Hail intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Hp      Hail peak intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Th      Heating temperature °C, °F #, C, F
# Vh      Heating voltage V #, N, V, W, F2
# Vs      Supply voltage V V
# Vr      3.5 V ref. voltage V V

In [2]:
# Parse a line of weather station data, returning the average wind direction measurement 
#
import re
def parse(line):
    match = re.search("Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

Import and create streaming context.

In [5]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,1)

Similar to SparkContext. The 1 specifies a batch of 1 second interval.

Create DStream of weather data. Lets open to the streaming weather data:

In [7]:
lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)

This creates the variable lines that streams the lines of output from the weather station.

Read measurement. Let's read the average wind speed from each line and store it in a new DStream vals.

In [8]:
vals = lines.flatMap(parse)

This line uses flatMap() to iterate over the lines DStream and calls the parse() function we defined above to get the average wind speed.

Create sliding window of data. Create a new DStream called window that combines the ten seconds worth of data and moves by five seconds.

In [9]:
window = vals.window(10, 5)

Define and call analysis function. We would like to find the min and max values in our window. Let's define a function that prints these values for an RDD:

In [15]:
def stats(rdd):
    print(rdd.collect())
    if rdd.count() > 0:
        print("max = {}, min = {}".format(rdd.max(), rdd.min()))

max = {}, min = {}


This function first prints the entire contents of the RDD. This would be practical usually. Then we check the size of the rdd and if it contains something, we compute and print the max and min values.

In [11]:
window.foreachRDD(lambda rdd : stats(rdd))

This line calls the stats() function defined aobve for each RDD in the DStream window.

Start the stream processing:

In [16]:
ssc.start()

max = 76, min = 49
[59, 64, 71, 76, 75, 72, 64, 63, 59, 59]
max = 76, min = 59
[72, 64, 63, 59, 59, 59, 60, 60, 55, 58]
max = 72, min = 55
[59, 60, 60, 55, 58, 55, 63, 64, 67, 62]
max = 67, min = 55
[55, 63, 64, 67, 62, 62, 60, 60, 64, 62]
max = 67, min = 55
[62, 60, 60, 64, 62, 66, 66, 66, 63, 68]
max = 68, min = 60
[66, 66, 66, 63, 68, 67, 64, 62, 58, 63]
max = 68, min = 58
[67, 64, 62, 58, 63, 55, 55, 55, 50, 48, 48]
max = 67, min = 48
[55, 55, 55, 50, 48, 48, 50, 56, 59, 64, 60]


In [17]:
ssc.stop()

max = 64, min = 48
