## 6.1.8 Window operations

In [1]:
from pyspark.streaming import StreamingContext

In [2]:
batchDurationInSecs = 5
ssc = StreamingContext(sc, batchDuration=batchDurationInSecs)

#### Monitors the given dir and reads each newly created file
[Ref: textFileStream](http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.textFileStream)

In [3]:
# This will create a DStream (discretized stream), representing a sequence of RDDs,
# periodically created from the input stream.
fileStream = ssc.textFileStream('/home/deepakt/Documents/dev/spark/sia/sia-ch6/dataIn')

In [4]:
# Parse each line of DStream
from datetime import datetime
def parseOrder(line):
    f = line.split(',')
    if f[6] == 'B' or f[6] == 'S':
        return [{
            'time': datetime.strptime(f[0], '%Y-%m-%d %H:%M:%S'),
            'orderId': long(f[1]),
            'clientId': long(f[2]),
            'symbol': f[3],
            'amount': int(f[4]),
            'price': float(f[5]),
            'buy': f[6] == 'B'
        }]
    return []

In [5]:
ds_orders = fileStream.flatMap(parseOrder)

In [6]:
# Count number of buys and sells by creating tuple and reduce
from operator import add
ds_numPerType = ds_orders.map(lambda order: (order['buy'], 1L)).reduceByKey(add)

In [7]:
# DStream to calculate total amount per client
ds_amountPerClient = ds_orders.map(lambda order: (order['clientId'], order['amount'] * order['price']))

#### updateStateByKey(updateFunction)
updateFunction : Function to update state for each key
    
The first argument of this function is a Seq object with new values of a key that came in the current mini-batch. The second argument is the state value of the key, or None if the state for that key hasn’t been calculated yet. If the state for the key has been calculated, but no new values for the key were received in the current mini-batch, the first argument will be an empty Seq. The function should return the new value for the key’s state.

In [8]:
ds_amountState = ds_amountPerClient \
    .updateStateByKey(lambda l_amount, total_amount: sum(l_amount) + total_amount if total_amount != None else sum(l_amount))

In [9]:
# In each batch/rdd reverse sort by total_amount and pick top 5 clients
# transform() is use to work on each rdd in each DStream

ds_top5Clients = ds_amountState \
    .transform(lambda rdd: rdd.sortBy(lambda (clientId, amount): amount, ascending=False)
                                .map(lambda (clientId, amount): clientId) \
                                .zipWithIndex() \
                                .filter(lambda (clientId, i): i < 5) \
                                .map(lambda (clientId, i): clientId)
              )

In [10]:
ds_top5Clients.pprint(5)

### Find top 5 most traded securities during last hour

In [11]:
# For testing i've set window size to 1 minute
ds_stocksPerWindow = ds_orders.map(lambda order: (order['symbol'], order['amount'])) \
                                .window(60, slideDuration=5) \
                                .reduceByKey(add)

In [12]:
ds_stocksPerWindow.pprint(2)

In [13]:
# Find top 5 stocks
ds_top5Stocks = ds_stocksPerWindow \
    .transform(lambda rdd: rdd.sortBy(lambda (symbol, amout): amount, ascending=False) \
                               .map(lambda (symbol, amount): symbol) \
                               .zipWithIndex() \
                               .filter(lambda (symbol, i): i < 5) \
                               .map(lambda (symbol, i): symbol)
              ) \
    .repartition(1) \
    .glom() \
    .map(lambda l_symbol: ('TOP5STOCKS', l_symbol))

In [14]:
ds_top5Stocks.pprint(5)

### Start merging streams

In [15]:
# Add 'BUYS'/'SELLS' to ds_numPerType
ds_buySellMetric = ds_numPerType.map(lambda (bors, num): ('BUYS', [str(num)]) if bors else ('SELLS', [str(num)]))

In [16]:
# Add 'TOP5CLIENTS' to ds_top5Clients and convert list of clientIds to list of str(clientId)
ds_top5ClinetMetric = ds_top5Clients \
                        .map(lambda clientId: str(clientId)) \
                        .repartition(1) \
                        .glom() \
                        .map(lambda l_clientId: ('TOP5CLIENTS', l_clientId))

In [17]:
# Add 'TOP5STOCKS'
ds_top5StockMetric = ds_top5Stocks \
                        .repartition(1) \
                        .glom() \
                        .map(lambda l_symbol: ('TOP5STOCKS', l_symbol))

In [17]:
# Join the two streams
ds_final = ds_buySellMetric.union(ds_top5ClinetMetric)

In [18]:
# Store it to file
ds_final.repartition(1).saveAsTextFiles(prefix='dataOut/output', suffix='txt')

#### Checkpointing

updateStateByKeyexpands RDD’s DAG in each mini-batch, and that can quickly lead to stack overflow exceptions. By periodically checkpointing RDDs, their calculation plan’s dependence on previous mini-batches is broken.

In [19]:
sc.setCheckpointDir('.')

In [20]:
ssc.start()

-------------------------------------------
Time: 2017-10-04 14:22:10
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 14:22:10
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 14:22:10
-------------------------------------------
('TOP5STOCKS', [])

-------------------------------------------
Time: 2017-10-04 14:22:15
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 14:22:15
-------------------------------------------

-------------------------------------------
Time: 2017-10-04 14:22:15
-------------------------------------------
('TOP5STOCKS', [])

-------------------------------------------
Time: 2017-10-04 14:24:10
-------------------------------------------
87
23
70
15
10

-------------------------------------------
Time: 2017-10-04 14:24:10
-------------------------------------------

---------------------------

In [21]:
ssc.stop(False)

In [22]:
# Read the result
rdd_allMetrics = sc.textFile("dataOut/output-*.txt/")

In [23]:
# Each line is a string; needs to converted to tuple
rdd_allMetrics.take(100)

[u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', [])",
 u"('TOP5CLIENTS', [])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])",
 u"('TOP5CLIENTS', ['87', '23', '70', '15', '10'])"]

In [None]:
ds_amountPerClient.m