## Streams

In [None]:
from pyspark.streaming import StreamingContext

### Extract stationId and timestamp

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 2)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
linesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Filter data
fullReadingsDStream = linesDStream.filter(lambda line: int(line.split(",")[1])==0)

In [None]:
# Extract stationid and timestamp
stationIdTimestampDStream = fullReadingsDStream.map(lambda line: (line.split(",")[0],line.split(",")[3]))

In [None]:
stationIdTimestampDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Count the selected readings

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 2)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
linesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Filter data
fullReadingsDStream = linesDStream.filter(lambda line: int(line.split(",")[1])==0)

In [None]:
# Count the number of selected readings
numReadingsFullDStream = fullReadingsDStream.count()

In [None]:
numReadingsFullDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Print the stationId removing the duplicates

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 2)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
linesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Filter data
fullReadingsDStream = linesDStream.filter(lambda line: int(line.split(",")[1])==0)

In [None]:
# Select the set of StationIDs
stationIDsDuplicatesDStream = fullReadingsDStream.map(lambda line: line.split(",")[0])

In [None]:
# Remove duplicates
# distinct() is not avaiable for DStreams. We must use transfrom to remove duplicates
distinctStationIDsDStream = stationIDsDuplicatesDStream.transform(lambda batchRDD: batchRDD.distinct())

In [None]:
distinctStationIDsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Compute the maximum number of free slots

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 2)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
linesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Extract # free slots
freeSlotsDStream = linesDStream.map(lambda line: int(line.split(",")[1]))

In [None]:
# Compute the maximum number of free slots
maxFreeSlotsDStream = freeSlotsDStream.reduce(lambda v1, v2: max(v1,v2))

In [None]:
maxFreeSlotsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Every 30 seconds print the stocks with a variation greater than 0.5% in the last 30 seconds

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 30)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
linesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Computer for each stockID the price variation (compute it for each batch).
# Select only the stocks with a price variation (%) greater than 0.5%

In [None]:
# Return one pair (stockId, (price, price) )  for each input record

def extractStockIdPricePrice(line):
    fields = line.split(",")
    
    stockId = fields[1]
    price = fields[2]
    
    return (stockId, (float(price), float(price)) )

stockIdPriceDStream = linesDStream.map(extractStockIdPricePrice)

In [None]:
# Compute max and min for each stockId
stockIdMaxMinPriceDStream = stockIdPriceDStream\
.reduceByKey(lambda v1, v2: ( max(v1[0],v2[0]), min(v1[1],v2[1]) ) )

In [None]:
# Compute variation for each stock
stockIdVariationDStream = stockIdMaxMinPriceDStream\
.mapValues(lambda MaxMinValue: 100.0*(MaxMinValue[0]-MaxMinValue[1])/MaxMinValue[0] )

In [None]:
# Select only the stocks with variation greater than 0.5%
selectedStockIdsVariationsDStream = stockIdVariationDStream.filter(lambda pair: pair[1]>0.5)

In [None]:
selectedStockIdsVariationsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Every 30 seconds print the stocks with a variation greater than 0.5% in the last 60 seconds

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 30)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
linesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Computer for each stockID the price variation (compute it for each batch).
# Select only the stocks with a price variation (%) greater than 0.5%

In [None]:
# Return one pair (stockId, (price, price) )  for each input record

def extractStockIdPricePrice(line):
    fields = line.split(",")
    
    stockId = fields[1]
    price = fields[2]
    
    return (stockId, (float(price), float(price)) )

stockIdPriceDStream = linesDStream.map(extractStockIdPricePrice)

In [None]:
# Compute max and min for each stockId
# Set the windows zise to 60 seconds
# The sliding interval is equal to 30 seconds, i.e., 1 batch
stockIdMaxMinPriceDStream = stockIdPriceDStream\
.reduceByKeyAndWindow(lambda v1, v2: ( max(v1[0],v2[0]), min(v1[1],v2[1]) ), None, 60)

In [None]:
# Compute variation for each stock
stockIdVariationDStream = stockIdMaxMinPriceDStream\
.mapValues(lambda MaxMinValue: 100.0*(MaxMinValue[0]-MaxMinValue[1])/MaxMinValue[0] )

In [None]:
# Select only the stocks with variation greater than 0.5%
selectedStockIdsVariationsDStream = stockIdVariationDStream.filter(lambda pair: pair[1]>0.5)

In [None]:
selectedStockIdsVariationsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 200 seconds
ssc.awaitTerminationOrTimeout(200)
ssc.stop(stopSparkContext=False)

### Every 2 seconds the name of the station in the last 2 seconds with 0 free slots

In [None]:
# Create a Spark Streaming Context object
ssc = StreamingContext(sc, 2)

In [None]:
inputFileStations = "stations.csv"

In [None]:
# "Standard" RDD associated with the characteristics of the stations
# Extract (stationId, name)
stationNameRDD = sc.textFile(inputFileStations)\
.map(lambda line: (line.split("\t")[0], line.split("\t")[3]) ).cache()

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
readingsDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Each readings has the format:
# stationId,#free slots,#used slots,timestamp
# Select readings with num. free slots = 0
fullReadingsDStream = readingsDStream.filter(lambda line: int(line.split(",")[1])==0)

In [None]:
# Extract pairs (stationId, timestamp)
stationIdTimestampDStream = fullReadingsDStream.map(lambda line: (line.split(",")[0],line.split(",")[3]))

In [None]:
# Join the content of the DStream with the "standard" RDD to retrieve
# the name of each station. 
# To perform this join between streaming and
# non-streaming RDDs the transform transformation must be used
joinDStream = stationIdTimestampDStream.transform(lambda batchRDD: batchRDD.join(stationNameRDD))

In [None]:
# Extract (name of the station, timestamp)
# It is the value part of the returned pairs
stationNameTimestampDStream = joinDStream.map(lambda pair: pair[1])

In [None]:
stationNameTimestampDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Every 60 seconds print the stock with a price lower than the historical minimum or higher than the historical maximum in the last 60 seconds

In [None]:
historicalInputFile = "historicalData.txt"

In [None]:
# Read the historical data and compute the maximum and minimum price for each stock
# Non-streaming RDD
historicalDataRDD = sc.textFile(historicalInputFile)

In [None]:
# Return one pair (stockId, (price, price) )  for each input record
def extractStockIdPricePrice(line):
    fields = line.split(",")
    
    stockId = fields[1]
    price = fields[2]
    
    return (stockId, (float(price), float(price)) )

stockIdPriceHistoricalRDD = historicalDataRDD.map(extractStockIdPricePrice)

In [None]:
# Compute max and min for each stockId based on the historical data
stockIdPriceHistoricalMaxMinRDD = stockIdPriceHistoricalRDD\
.reduceByKey(lambda v1, v2: ( max(v1[0],v2[0]), min(v1[1],v2[1]) ) ).cache()

In [None]:
# Create a Spark Streaming Context object
#ssc = StreamingContext(sc, 60)
ssc = StreamingContext(sc, 10)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
pricesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Join on the stockid each input record of the input stream with the 
# content of stockIdPriceHistoricalMaxMinRDD to retrieve 
# the historical maximum-minimum range of the stock

In [None]:
# Return one pair (stockId,price) for each input record
stockIdPriceDStream = pricesDStream.map(lambda record: ( record.split(",")[1] , float(record.split(",")[2])) )

In [None]:
# Join the RDD associated with the content of the current batch and 
# the non-streaming RDD stockIdPriceHistoricalMaxMinRDD
stockIdPriceMaxMinDStream = stockIdPriceDStream\
.transform(lambda batchRDD: batchRDD.join(stockIdPriceHistoricalMaxMinRDD))

In [None]:
# Select only lines with price > maximum historical price 
# or price < minimum historical price
def anomalyValue(pair):
    currentPrice = pair[1][0]
    stockHistoricalMaxPrice = pair[1][1][0]
    stockHistoricalMinPrice = pair[1][1][1]
    
    if currentPrice>stockHistoricalMaxPrice or currentPrice<stockHistoricalMinPrice:
        return True
    else:
        return False

selectedStockPricesDStream = stockIdPriceMaxMinDStream.filter(anomalyValue)

In [None]:
# Retrieve only the stockIDs and apply distinct to remove duplicates
# keys and distinct are not available for DStreams.
# transform must be used
selectStockIdsDStream = selectedStockPricesDStream\
.transform(lambda batchRDD: batchRDD.keys().distinct())

In [None]:
selectStockIdsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

### Every 30 seconds print the stock with a price lower than the historical minimum or higher than the historical maximum in the last 60 seconds

In [None]:
historicalInputFile = "historicalData.txt"

In [None]:
# Read the historical data and compute the maximum and minimum price for each stock
# Non-streaming RDD
historicalDataRDD = sc.textFile(historicalInputFile)

In [None]:
# Return one pair (stockId, (price, price) )  for each input record
def extractStockIdPricePrice(line):
    fields = line.split(",")
    
    stockId = fields[1]
    price = fields[2]
    
    return (stockId, (float(price), float(price)) )

stockIdPriceHistoricalRDD = historicalDataRDD.map(extractStockIdPricePrice)

In [None]:
# Compute max and min for each stockId based on the historical data
stockIdPriceHistoricalMaxMinRDD = stockIdPriceHistoricalRDD\
.reduceByKey(lambda v1, v2: ( max(v1[0],v2[0]), min(v1[1],v2[1]) ) ).cache()

In [None]:
# Create a Spark Streaming Context object
#ssc = StreamingContext(sc, 30)
ssc = StreamingContext(sc, 5)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
# Specify that we want to apply a sliding window on the input stream
# - windowDuration = 60 seconds
# - slideDuration = 30 seconds
pricesDStream = ssc.socketTextStream("localhost", 9999)\
.window(10, 5)
#.window(60, 30)

In [None]:
# Join on the stockid each input record of the input stream with the 
# content of stockIdPriceHistoricalMaxMinRDD to retrieve 
# the historical maximum-minimum range of the stock

In [None]:
# Return one pair (stockId,price) for each input record
stockIdPriceDStream = pricesDStream.map(lambda record: ( record.split(",")[1] , float(record.split(",")[2])) )

In [None]:
# Join the RDD associated with the content of the current batch and 
# the non-streaming RDD stockIdPriceHistoricalMaxMinRDD
stockIdPriceMaxMinDStream = stockIdPriceDStream\
.transform(lambda batchRDD: batchRDD.join(stockIdPriceHistoricalMaxMinRDD))

In [None]:
# Select only lines with price > maximum historical price 
# or price < minimum historical price
def anomalyValue(pair):
    currentPrice = pair[1][0]
    stockHistoricalMaxPrice = pair[1][1][0]
    stockHistoricalMinPrice = pair[1][1][1]
    
    if currentPrice>stockHistoricalMaxPrice or currentPrice<stockHistoricalMinPrice:
        return True
    else:
        return False   

selectedStockPricesDStream = stockIdPriceMaxMinDStream.filter(anomalyValue)

In [None]:
# Retrieve only the stockIDs and apply distinct to remove duplicates
# keys and distinct are not available for DStreams.
# transform must be used
selectStockIdsDStream = selectedStockPricesDStream\
.transform(lambda batchRDD: batchRDD.keys().distinct())

In [None]:
selectStockIdsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)