In [1]:
from pyspark import SparkContext , SparkConf
from pyspark.sql import SparkSession
from pyspark.rdd import RDD

In [2]:
sc=SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [3]:
# Parse a line of weather station data, returning the average wind direction measurement 
import re
def parse(line):
    match = re.search("Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

In [4]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,1)

In [5]:
# The High Performance Wireless Research and Education Network (HPWREN), a University of 
# California San Diego partnership project led by the San Diego Supercomputer Center and the Scripps Institution 
# of Oceanography's Institute of Geophysics and Planetary Physics, supports Internet-data applications in the 
# research, education, and public safety realms.

lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)

In [6]:
vals = lines.flatMap(parse)

In [7]:
window = vals.window(10, 5)

In [8]:
def stats(rdd):
    print(rdd.collect())
    if rdd.count() > 0:
        print("max = {}, min = {}".format(rdd.max(), rdd.min()))

In [9]:
window.foreachRDD(lambda rdd: stats(rdd))

In [10]:
ssc.start()

[]
[111, 106, 105]
max = 111, min = 105
[111, 106, 105, 99, 98, 98, 98, 97, 97]
max = 111, min = 97
[99, 98, 98, 98, 97, 97, 97, 97, 97, 97, 97]
max = 99, min = 97
[97, 97, 97, 97, 97, 97, 97, 97, 97, 97]
max = 97, min = 97
[97, 97, 97, 97, 97, 111, 110, 110, 111, 107]
max = 111, min = 97
[111, 110, 110, 111, 107, 103, 98, 96, 93, 93]
max = 111, min = 93
[103, 98, 96, 93, 93, 91, 96, 96, 96, 96]
max = 103, min = 91


In [11]:
ssc.stop()