In [1]:
!pip install pyspark
!pip install -q findspark



In [2]:
import os

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import time

In [3]:
# This function creates SparkContext and StreamingContext
def initStreamingContext():
    try:
      ssc.end()
    except:
      pass
    finally:
      spark_conf = SparkConf()\
            .setAppName("IrTest")\
            .setMaster("local[*]")
      sc = SparkContext.getOrCreate(spark_conf)
      # Creating Streaming Context with batch window size of 1 second
      ssc = StreamingContext(sc, 1)
      return ssc

In [4]:
# Let's create ssc.
ssc = initStreamingContext()
# Initialize a DStream by connecting it to a TCP socket.
# The server will start sending data which goes to the robotData DStream.
robotData = ssc.socketTextStream("datasci.cs.uwaterloo.ca", 4321)
robotData.pprint()
ssc.start()
# Just wait 5 seconds before the stream stop.
time.sleep(5)
ssc.stop()



-------------------------------------------
Time: 2024-05-25 17:40:46
-------------------------------------------

-------------------------------------------
Time: 2024-05-25 17:40:47
-------------------------------------------

-------------------------------------------
Time: 2024-05-25 17:40:48
-------------------------------------------
0.438,0.498,3.625,3.645,5.000,2.918,5.000,2.351,2.332,2.643,1.698,1.687,1.698,1.717,1.744,0.593,0.502,0.493,0.504,0.445,0.431,0.444,0.440,0.429,Slight-Right-Turn
0.438,0.498,3.625,3.648,5.000,2.918,5.000,2.637,2.332,2.649,1.695,1.687,1.695,1.720,1.744,0.592,0.502,0.493,0.504,0.449,0.431,0.444,0.443,0.429,Slight-Right-Turn

-------------------------------------------
Time: 2024-05-25 17:40:49
-------------------------------------------
0.438,0.498,3.625,3.629,5.000,2.918,5.000,2.637,2.334,2.643,1.696,1.687,1.695,1.717,1.744,0.593,0.502,0.493,0.504,0.449,0.431,0.444,0.446,0.429,Slight-Right-Turn
0.437,0.501,3.625,3.626,5.000,2.918,5.000,2.353,2.334,2

In [5]:
ssc = initStreamingContext()
robotData = ssc.socketTextStream("datasci.cs.uwaterloo.ca", 4321)

# flattening the sensor values, windowing back to 3 seconds and reducing to find the minimum
robotData_min = robotData.flatMap(lambda x: x.split(',')[:24]).window(3, 1).reduce(lambda x, y: x if x < y else y)
robotData_min.pprint()

ssc.start()
# Let's wait for 10 seconds before the program is stopped.
time.sleep(10)
ssc.stop()

-------------------------------------------
Time: 2024-05-25 17:41:18
-------------------------------------------
0.429

-------------------------------------------
Time: 2024-05-25 17:41:19
-------------------------------------------
0.429



In [6]:
ssc = initStreamingContext()
robotData = ssc.socketTextStream("datasci.cs.uwaterloo.ca", 4321)

# extract movements and perform windowing
robotData = robotData.map(lambda x: x.split(',')[24]).window(3, 1).cache()

# counting total and adding temporary key for joining
count = robotData.count().map(lambda x: ('windowKey', x))

# counting by value and adding temporary key for joining
robotData = robotData.countByValue().map(lambda x: ('windowKey', x))

# joining to get ratios
merged = robotData.join(count).map(lambda x: (x[1][0][1] / x[1][1], x[1][0][0]))

def print_rdd(rdd):
  "Custom print function for individual RDDs."
  rdd = rdd.sortByKey(False).map(lambda x: (x[1], x[0]))
  result = rdd.collect()
  print("----------")
  for record in result:
    print(record[0], record[1])

# perform action for each RDD
merged.foreachRDD(print_rdd)

ssc.start()
time.sleep(10)
ssc.stop()

----------
Slight-Right-Turn 1.0
