In [1]:
from operator import add, sub
from time import sleep
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [2]:
sc = SparkContext(appName="PysparkNotebook")
ssc = StreamingContext(sc, 1)

rddQueue = []
for i in range(5):
    rddQueue += [ssc.sparkContext.parallelize([i, i+1])]

inputStream = ssc.queueStream(rddQueue)

inputStream.map(lambda x: "Input: " + str(x)).pprint()
inputStream.reduce(add)\
    .map(lambda x: "Output: " + str(x))\
    .pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2016-08-10 00:06:15
-------------------------------------------
Input: 0
Input: 1

-------------------------------------------
Time: 2016-08-10 00:06:15
-------------------------------------------
Output: 1

-------------------------------------------
Time: 2016-08-10 00:06:16
-------------------------------------------
Input: 1
Input: 2

-------------------------------------------
Time: 2016-08-10 00:06:16
-------------------------------------------
Output: 3

-------------------------------------------
Time: 2016-08-10 00:06:17
-------------------------------------------
Input: 2
Input: 3

-------------------------------------------
Time: 2016-08-10 00:06:17
-------------------------------------------
Output: 5

-------------------------------------------
Time: 2016-08-10 00:06:18
-------------------------------------------
Input: 3
Input: 4

-------------------------------------------
Time: 2016-08-10 00:06:18
-----------------------

In [3]:
sc = SparkContext(appName="PysparkNotebook")
ssc = StreamingContext(sc, 1)

inputData = [
    [1,2,3],
    [0],
    [4,4,4],
    [0,0,0,25],
    [1,-1,10],
]

rddQueue = []
for datum in inputData:
    rddQueue += [ssc.sparkContext.parallelize(datum)]

inputStream = ssc.queueStream(rddQueue)
inputStream.reduce(add).pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2016-08-08 13:43:38
-------------------------------------------
6

-------------------------------------------
Time: 2016-08-08 13:43:39
-------------------------------------------
0

-------------------------------------------
Time: 2016-08-08 13:43:40
-------------------------------------------
12

-------------------------------------------
Time: 2016-08-08 13:43:41
-------------------------------------------
25

-------------------------------------------
Time: 2016-08-08 13:43:42
-------------------------------------------
10

-------------------------------------------
Time: 2016-08-08 13:43:43
-------------------------------------------

-------------------------------------------
Time: 2016-08-08 13:43:44
-------------------------------------------



In [4]:
sc = SparkContext(appName="ActiveUsers")
ssc = StreamingContext(sc, 1)

activeUsers = [
    ["Alice", "Bob"],
    ["Bob"],
    ["Carlos", "Dan"],
    ["Carlos", "Dan", "Erin"],
    ["Carlos", "Frank"],
]

rddQueue = []
for datum in activeUsers:
    rddQueue += [ssc.sparkContext.parallelize(datum)]

inputStream = ssc.queueStream(rddQueue)
inputStream.window(5, 1)\
    .map(lambda x: set([x]))\
    .reduce(lambda x, y: x.union(y))\
    .pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2016-08-08 13:43:46
-------------------------------------------
{'Bob', 'Alice'}

-------------------------------------------
Time: 2016-08-08 13:43:47
-------------------------------------------
{'Bob', 'Alice'}

-------------------------------------------
Time: 2016-08-08 13:43:48
-------------------------------------------
{'Carlos', 'Alice', 'Bob', 'Dan'}

-------------------------------------------
Time: 2016-08-08 13:43:49
-------------------------------------------
{'Carlos', 'Alice', 'Erin', 'Bob', 'Dan'}

-------------------------------------------
Time: 2016-08-08 13:43:50
-------------------------------------------
{'Erin', 'Alice', 'Frank', 'Dan', 'Carlos', 'Bob'}

-------------------------------------------
Time: 2016-08-08 13:43:51
-------------------------------------------
{'Carlos', 'Erin', 'Frank', 'Bob', 'Dan'}

-------------------------------------------
Time: 2016-08-08 13:43:52
-------------------------------------

In [5]:
sc = SparkContext(appName="HighScores")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("highscore-checkpoints")

player_score_pairs = [
    [("Alice", 100), ("Bob", 60)],
    [("Bob", 60)],
    [("Carlos", 90), ("Dan", 40)],
    [("Carlos", 10), ("Dan", 20), ("Erin", 90)],
    [("Carlos", 20), ("Frank", 200)],
]

rddQueue = []
for datum in player_score_pairs:
    rddQueue += [ssc.sparkContext.parallelize(datum)]

inputStream = ssc.queueStream(rddQueue)
inputStream.reduceByKeyAndWindow(add, sub, 3, 1)\
    .pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2016-08-08 13:43:54
-------------------------------------------
('Bob', 60)
('Alice', 100)

-------------------------------------------
Time: 2016-08-08 13:43:55
-------------------------------------------
('Bob', 120)
('Alice', 100)

-------------------------------------------
Time: 2016-08-08 13:43:56
-------------------------------------------
('Carlos', 90)
('Dan', 40)
('Bob', 120)
('Alice', 100)

-------------------------------------------
Time: 2016-08-08 13:43:57
-------------------------------------------
('Carlos', 100)
('Erin', 90)
('Dan', 60)
('Bob', 60)

-------------------------------------------
Time: 2016-08-08 13:43:58
-------------------------------------------
('Carlos', 120)
('Erin', 90)
('Dan', 60)
('Frank', 200)

-------------------------------------------
Time: 2016-08-08 13:43:59
-------------------------------------------
('Carlos', 30)
('Erin', 90)
('Dan', 20)
('Frank', 200)

------------------------------------

In [4]:
from pyspark.mllib.regression import LabeledPoint
import random

random.seed(1111)

test_data = []
train_data = []
for i in range(10):
    train_data += [[]]
    
with open("temperature_data_small.txt") as t:
    for l in t:
        fields = l.split()
        point = LabeledPoint(fields[22],
                             [fields[7], fields[8], fields[11], fields[21]])
        if random.random() >= 0.8:
            test_data += [point]
        else:
            train_data[random.randrange(10)] += [point]

#8. Relative humidity (dining room), in %. 
#9. Relative humidity (room), in %. 
#12. Rain, the proportion of the last 15 minutes where rain was detected (a value in range [0,1]). 
#22. Outdoor temperature, in C. 
#23. Outdoor relative humidity, in %. 

In [8]:
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

sc = SparkContext(appName="HumidityPrediction")
ssc = StreamingContext(sc, 2)

training_data_stream = ssc.queueStream(
    [ssc.sparkContext.parallelize(d) for d in train_data])
test_data_stream = ssc.queueStream(
    [test_data for d in train_data])\
    .map(lambda lp: (lp.label, lp.features))

model = StreamingLinearRegressionWithSGD(
    numIterations=5,
    stepSize=0.00005)
model.setInitialWeights([1.0,1.0,1.0,1.0])

model.trainOn(training_data_stream)
predictions = model.predictOnValues(test_data_stream)\
    .map(lambda x: (x[0], x[1], (x[0] - x[1])))

predictions.map(lambda x:
                "Actual: " + str(x[0])
                + ", Predicted: " + str(x[1])
                + ", Error: " + str(x[2])).pprint()
predictions.map(lambda x: (x[2]**2, 1))\
    .reduce(lambda x,y: (x[0] + y[0], x[1] + y[1]))\
    .map(lambda x: "MSE: " + str(x[0]/x[1]))\
    .pprint()
    
ssc.start()
for i in range(10):
    sleep(2)
    print(model.latestModel())
ssc.stop(stopSparkContext=True, stopGraceFully=True)

(weights=[1.0,1.0,1.0,1.0], intercept=0.0)
-------------------------------------------
Time: 2016-08-10 00:09:28
-------------------------------------------
Actual: 45.416, Predicted: 103.288, Error: -57.872
Actual: 44.5467, Predicted: 104.292, Error: -59.7453
Actual: 45.2373, Predicted: 104.5974, Error: -59.3601
Actual: 44.128, Predicted: 101.3147, Error: -57.1867
Actual: 43.2747, Predicted: 100.0893, Error: -56.8146
Actual: 45.8293, Predicted: 98.6693, Error: -52.84
Actual: 57.1653, Predicted: 103.7994, Error: -46.6341
Actual: 57.712, Predicted: 102.092, Error: -44.38
Actual: 57.928, Predicted: 101.2813, Error: -43.3533
Actual: 57.3493, Predicted: 96.246, Error: -38.8967
...

(weights=[0.758584311581,0.7599443456,1.0,0.903152483361], intercept=0.0)
-------------------------------------------
Time: 2016-08-10 00:09:28
-------------------------------------------
MSE: 2122.49674861

-------------------------------------------
Time: 2016-08-10 00:09:30
-----------------------------------

In [7]:
# Use this if you have an error, and then start seeing complaints about an already
# running spark context.
ssc.stop(stopSparkContext=True, stopGraceFully=True)