<a href="https://colab.research.google.com/github/MarinaWolters/Coding-Tracker/blob/master/W12_Streamparse_Low_Level_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Low-Level Streaming


This notebook focuses on *StreamParse*, which is a Python library for using Apache Storm.

Storm and its relatives process tuples one-at-a-time.

We'll see StreamParse in action, then look at some functions written **for** StreamParse but executed outside, to get better ideas of how they work.

## Installation of Storm, StreamParse and Their Support Libraries

In [None]:
!wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein

--2020-04-19 15:25:19--  https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13413 (13K) [text/plain]
Saving to: ‘lein.2’


2020-04-19 15:25:20 (1.09 MB/s) - ‘lein.2’ saved [13413/13413]



In [None]:
!wget http://apache.mirrors.hoobly.com/storm/apache-storm-1.2.3/apache-storm-1.2.3.tar.gz

--2020-04-19 15:25:21--  http://apache.mirrors.hoobly.com/storm/apache-storm-1.2.3/apache-storm-1.2.3.tar.gz
Resolving apache.mirrors.hoobly.com (apache.mirrors.hoobly.com)... 69.64.41.166
Connecting to apache.mirrors.hoobly.com (apache.mirrors.hoobly.com)|69.64.41.166|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 169095091 (161M) [application/x-gzip]
Saving to: ‘apache-storm-1.2.3.tar.gz.2’


2020-04-19 15:25:23 (70.2 MB/s) - ‘apache-storm-1.2.3.tar.gz.2’ saved [169095091/169095091]



In [None]:
!tar zxvf apache-storm-1.2.3.tar.gz

apache-storm-1.2.3/lib/storm-core-1.2.3.jar
apache-storm-1.2.3/lib/kryo-3.0.3.jar
apache-storm-1.2.3/lib/reflectasm-1.10.1.jar
apache-storm-1.2.3/lib/asm-5.0.3.jar
apache-storm-1.2.3/lib/minlog-1.3.0.jar
apache-storm-1.2.3/lib/objenesis-2.1.jar
apache-storm-1.2.3/lib/clojure-1.7.0.jar
apache-storm-1.2.3/lib/ring-cors-0.1.5.jar
apache-storm-1.2.3/lib/disruptor-3.3.11.jar
apache-storm-1.2.3/lib/log4j-api-2.8.2.jar
apache-storm-1.2.3/lib/log4j-core-2.8.2.jar
apache-storm-1.2.3/lib/log4j-slf4j-impl-2.8.2.jar
apache-storm-1.2.3/lib/slf4j-api-1.7.21.jar
apache-storm-1.2.3/lib/log4j-over-slf4j-1.6.6.jar
apache-storm-1.2.3/lib/metrics-core-3.1.0.jar
apache-storm-1.2.3/lib/metrics-graphite-3.1.0.jar
apache-storm-1.2.3/lib/servlet-api-2.5.jar
apache-storm-1.2.3/lib/storm-rename-hack-1.2.3.jar
apache-storm-1.2.3/bin/storm-config.cmd
apache-storm-1.2.3/bin/storm.ps1
apache-storm-1.2.3/bin/storm-kafka-monitor
apache-storm-1.2.3/bin/storm.cmd
apache-storm-1.2.3/bin/storm.py
apache-storm-1.2.3/bin/st

In [None]:
!chmod a+x ./lein
!chmod a+x apache-storm-1.2.3/bin/storm

In [None]:
!bash lein version

Leiningen 2.9.3 on Java 11.0.6 OpenJDK 64-Bit Server VM


In [None]:
!pip install streamparse



## Initial Configuration and Execution

Here you can see Streamparse in action, using a sample word count topology.

In [None]:
import os

os.environ['LEIN_ROOT']='1'
os.environ['PATH'] = os.environ['PATH'] + ':/content/apache-storm-1.2.3/bin/:/content'

In [None]:
!sparse quickstart wordcount

[31merror[0m: folder "wordcount" already exists


We need to make a quick fix to tell StreamParse to use Storm 1.2.3 instead of 1.1.0...

In [None]:
!cat wordcount/project.clj | sed s/1.1.0/1.2.3/ > wordcount/project2.clj

In [None]:
!mv wordcount/project2.clj wordcount/project.clj

## Running StreamParse for Real

Run this code and see the output...  At some point you'll want to interrupt it.  Press [Ctrl]-[M][I] when you are done.

In [None]:
!cd /content/wordcount ; sparse run

# Streaming, Joins, and Learning

Let's take a look at how we might use StreamParse spouts and bolts to implement a stream-based incremental machine learning application, for our airline flight data from the High-Level Streaming application.

Here we'll use a combination of Pandas (and PandaSQL) for our incremental joins and grouping, and Creme for incremental machine learning!

In [None]:
!pip install pandasql
!pip install creme

Collecting pandasql
  Downloading https://files.pythonhosted.org/packages/6b/c4/ee4096ffa2eeeca0c749b26f0371bd26aa5c8b611c43de99a4f86d3de0a7/pandasql-0.7.3.tar.gz
Building wheels for collected packages: pandasql
  Building wheel for pandasql (setup.py) ... [?25l[?25hdone
  Created wheel for pandasql: filename=pandasql-0.7.3-cp36-none-any.whl size=26819 sha256=f051309f5493e6f79f1d90c689459ddb7ac4c3062dd6a4662f771d5e3dbec514
  Stored in directory: /root/.cache/pip/wheels/53/6c/18/b87a2e5fa8a82e9c026311de56210b8d1c01846e18a9607fc9
Successfully built pandasql
Installing collected packages: pandasql
Successfully installed pandasql-0.7.3


Let's get our data...

In [None]:
!pip install googledrivedownloader



In [None]:
from google_drive_downloader import GoogleDriveDownloader as gdd

# One month's flights
gdd.download_file_from_google_drive(file_id='1PPtjGx8lr_cDUfVa3qwlk1W8yY6hY91n',
                                    dest_path='/content/ontime.csv')

# Airports
gdd.download_file_from_google_drive(file_id='1Qe4FpLg473FjfVhdRGGO4dHLmJa1ETq4',
                                    dest_path='/content/airports.csv')


## Our Spout

Let's define a spout.  To actually run this in StreamParse we would need to put it in a separate source file, which is inconvenient on Colab.

So instead we'll simulate StreamParse and watch what happens.

This spout reads one row at a time from the `ontime` dataframe.

In [None]:
from streamparse.spout import Spout
import pandas as pd

stream = []

#class FlightSpout:(Spout):
class FlightSpout:
  outputs=['YEAR','MONTH','DAY_OF_MONTH','AIRLINE_ID','CARRIER','FL_NUM','ORIGIN','DEST','ARR_DELAY','CANCELLED']
  # Position of next tuple in dataframe
  inx = 0
  # Overall stream tuple id
  tid = 0

  # Open the file as a dataframe
  def initialize(self, stormconf, context):
    self.ontime_df = pd.read_csv('/content/ontime.csv')
    self.ontime_df.dropna(inplace=True,subset=['ARR_DELAY_NEW'])

  # Read one row, increment the pointer, return the
  # row as a list
  def next_tuple(self):
    tup = self.ontime_df.iloc[self.inx]
    self.inx = self.inx + 1

    # We'll wrap around if we exceed the size of the table
    if self.inx >= len(self.ontime_df):
      self.inx = 0

    # We have to turn the series into a list of columns.
    # There is also an extra, blank column in the file
    self.emit(tup.tolist()[0:-1], tup_id=self.tid)
    self.tid = self.tid + 1

  # If a tuple was processed properly, do nothing
  def ack(self, tup_id):
    pass

  # If a tuple was processed incorrectly, we'll still
  # do nothing
  def fail(self, tup_id):
    pass

  # This would be omitted in the real code
  def emit(self, tuple, tup_id):
    global stream
    stream = stream + [tuple]

## A Bolt that Joins

As we get tuples, we'll want to join the flight with latitude and longitude information for origin and destination.  We can do this by preloading the `airports` as a dataframe, and incrementally joining tuples as they arrive.

In [None]:
from streamparse.bolt import Bolt
from pystorm.component import Tuple
import pandas as pd
from pandasql import sqldf

stream2 = []

#class JoinBolt(Bolt):

class JoinBolt:
  outputs=['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST','FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','ARR_DELAY']

  def initialize(self, stormconf, context):
    self.airports_df = pd.read_csv('/content/airports.csv')

  def process(self, tup):
    flights_df = pd.DataFrame([tup],columns=['YEAR','MONTH','DAY_OF_MONTH','AIRLINE_ID','CARRIER','FL_NUM','ORIGIN','DEST','ARR_DELAY','CANCELLED'])
    airports_df = self.airports_df
    incremental_results = \
      sqldf('select YEAR,MONTH,DAY_OF_MONTH,CARRIER,FL_NUM, ORIGIN, DEST, org.LATITUDE AS FROM_LAT, '\
            'org.LONGITUDE AS FROM_LONG, '\
            'dst.LATITUDE AS TO_LAT, dst.LONGITUDE AS TO_LONG, ARR_DELAY '\
            'from flights_df f join airports_df org on f.origin=org.IATA_CODE '\
            'join airports_df dst on f.dest=dst.IATA_CODE'
            ,locals())
    self.emit(incremental_results.iloc[0].tolist())

  def emit(self, tuple):
    global stream2
    stream2 = stream2 + [tuple]


## Windowed Grouping

Let's group by day.  We are going to assume that the stream is well-behaved in that days are monotonically increasing in the stream.

Thus, we buffer up tuples as long as they have the same day.  Once the day switches, we group them by the day and compute our statistics, then emit that.

In [None]:
from streamparse.bolt import Bolt
from pystorm.component import Tuple
import pandas as pd
from pandasql import sqldf

stream3 = []

class GroupBolt:#(Bolt):
  outputs = ['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST',\
                             'FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','NBR_FLIGHTS','AVG_DELAY']

  def initialize(self, stormconf, context):
    self.groups = pd.DataFrame([],columns=['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST','FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','ARR_DELAY'])
    self.last_year = None
    self.last_month = None
    self.last_day_of_month = None

  def process(self, tup):
    flights_df = pd.DataFrame([tup],columns=['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST','FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','ARR_DELAY'])

    if flights_df.iloc[0,0] != self.last_year or flights_df.iloc[0,1] != self.last_month \
      or flights_df.iloc[0,2] != self.last_day_of_month:
      print('** DONE WITH {}-{}-{} with {} entries'.format(self.last_year,self.last_month,self.last_day_of_month,len(self.groups)))
      if len(self.groups) > 0:
        groups_df = self.groups
        grouped_results = sqldf('select YEAR,MONTH,DAY_OF_MONTH,CARRIER,FL_NUM,ORIGIN,DEST, '\
                        'FROM_LAT,FROM_LONG,TO_LAT,TO_LONG, COUNT(ARR_DELAY) as NbrFlights, avg(ARR_DELAY) as avg_delay '\
                        'FROM groups_df GROUP BY YEAR,MONTH,DAY_OF_MONTH,CARRIER,FL_NUM,ORIGIN,DEST,FROM_LAT,FROM_LONG,TO_LAT,TO_LONG '\
                        'ORDER BY CARRIER,FL_NUM,ORIGIN,DEST',
                        locals())
        
        for result in grouped_results.itertuples(index=False):
          self.emit(list(result))

      self.groups = flights_df
      self.last_year = flights_df.iloc[0,0]
      self.last_month = flights_df.iloc[0,1]
      self.last_day_of_month = flights_df.iloc[0,2]
    else:
      self.groups = pd.concat([self.groups, flights_df])

  def emit(self, tuple):
    global stream3
    stream3 = stream3 + [tuple]


## Incremental Learning via Creme

We want to incrementally train a linear regression algorithm to predict our delays.

The Creme package (https://creme-ml.github.io/) allows us to do incrementally process data, using stochastic gradient descent-style methods to incrementally perform linear regression.  It also has incremental methods for one-hot encoding, scaling, etc.

In [None]:
from streamparse.bolt import Bolt
from pystorm.component import Tuple
import pandas as pd
import creme.preprocessing
import creme.linear_model

stream4 = []

class LearnBolt:#(Bolt):
  outputs = ['predicted','actual','error']

  def initialize(self, stormconf, context):
    self.scaler = creme.preprocessing.StandardScaler()
    self.lin_reg = creme.linear_model.LinearRegression()
    self.carrier_one_hot = creme.preprocessing.OneHotEncoder('CARRIER', sparse=True)
    self.origin_one_hot = creme.preprocessing.OneHotEncoder('ORIGIN', sparse=True)
    self.dest_one_hot = creme.preprocessing.OneHotEncoder('DEST', sparse=True)

  def process(self, tup):
    x = pd.Series(tup, index=['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST',\
                             'FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','NBR_FLIGHTS','AVG_DELAY'])
    x_onehot = pd.Series(self.carrier_one_hot.fit_one(x).transform_one(x))
    y = x['AVG_DELAY']
    x = x.append(x_onehot).drop(['CARRIER','AVG_DELAY'])
    x_onehot = pd.Series(self.origin_one_hot.fit_one(x).transform_one(x))
    x = x.append(x_onehot).drop(['ORIGIN'])
    x_onehot = pd.Series(self.dest_one_hot.fit_one(x).transform_one(x))
    x = x.append(x_onehot).drop(['DEST'])

    x = self.scaler.fit_one(x).transform_one(x)
    yhat = self.lin_reg.predict_one(x)
    print('Predicted {}, actual {}, error {}'.format(yhat, y, abs(yhat - y)))
    self.emit([yhat,y,abs(yhat-y)])

    self.lin_reg.fit_one(x, y)


  def emit(self, tuple):
    global stream4
    stream4 = stream4 + [tuple]


In [None]:
stream = []
fs = FlightSpout()

fs.initialize(None, None)
for i in range(2000):
  fs.next_tuple()

pd.DataFrame(stream,columns=['YEAR','MONTH','DAY_OF_MONTH','AIRLINE_ID','CARRIER','FL_NUM','ORIGIN','DEST','ARR_DELAY','CANCELLED'])

Unnamed: 0,YEAR,MONTH,DAY_OF_MONTH,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN,DEST,ARR_DELAY,CANCELLED
0,2018,1,2,19393,WN,1325,SJU,MCO,0.0,0.0
1,2018,1,2,19393,WN,5159,SJU,MCO,0.0,0.0
2,2018,1,2,19393,WN,5890,SJU,MCO,9.0,0.0
3,2018,1,2,19393,WN,6618,SJU,MCO,0.0,0.0
4,2018,1,2,19393,WN,1701,SJU,MDW,8.0,0.0
...,...,...,...,...,...,...,...,...,...,...
1995,2018,1,3,19393,WN,762,LAS,OAK,55.0,0.0
1996,2018,1,3,19393,WN,1029,LAS,OAK,0.0,0.0
1997,2018,1,3,19393,WN,1358,LAS,OAK,18.0,0.0
1998,2018,1,3,19393,WN,1396,LAS,OAK,0.0,0.0


In [None]:
stream2 = []
jb = JoinBolt()

jb.initialize(None, None)

for e in stream:
  jb.process(e)

pd.DataFrame(stream2,columns=['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST','FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','ARR_DELAY'])

Unnamed: 0,YEAR,MONTH,DAY_OF_MONTH,CARRIER,FL_NUM,ORIGIN,DEST,FROM_LAT,FROM_LONG,TO_LAT,TO_LONG,ARR_DELAY
0,2018,1,2,WN,1325,SJU,MCO,18.43942,-66.00183,28.42889,-81.31603,0.0
1,2018,1,2,WN,5159,SJU,MCO,18.43942,-66.00183,28.42889,-81.31603,0.0
2,2018,1,2,WN,5890,SJU,MCO,18.43942,-66.00183,28.42889,-81.31603,9.0
3,2018,1,2,WN,6618,SJU,MCO,18.43942,-66.00183,28.42889,-81.31603,0.0
4,2018,1,2,WN,1701,SJU,MDW,18.43942,-66.00183,41.78598,-87.75242,8.0
...,...,...,...,...,...,...,...,...,...,...,...,...
1995,2018,1,3,WN,762,LAS,OAK,36.08036,-115.15233,37.72129,-122.22072,55.0
1996,2018,1,3,WN,1029,LAS,OAK,36.08036,-115.15233,37.72129,-122.22072,0.0
1997,2018,1,3,WN,1358,LAS,OAK,36.08036,-115.15233,37.72129,-122.22072,18.0
1998,2018,1,3,WN,1396,LAS,OAK,36.08036,-115.15233,37.72129,-122.22072,0.0


In [None]:
stream3 = []
gb = GroupBolt()

gb.initialize(None, None)

for e in stream2:
  gb.process(e)

pd.DataFrame(stream3,columns=['YEAR','MONTH','DAY_OF_MONTH','CARRIER','FL_NUM','ORIGIN','DEST',\
                             'FROM_LAT','FROM_LONG','TO_LAT','TO_LONG','NBR_FLIGHTS','AVG_DELAY'])

** DONE WITH None-None-None with 0 entries
** DONE WITH 2018-1-2 with 383 entries


Unnamed: 0,YEAR,MONTH,DAY_OF_MONTH,CARRIER,FL_NUM,ORIGIN,DEST,FROM_LAT,FROM_LONG,TO_LAT,TO_LONG,NBR_FLIGHTS,AVG_DELAY
0,2018,1,2,WN,23,SMF,PDX,38.69542,-121.59077,45.58872,-122.59750,1,0.0
1,2018,1,2,WN,24,TPA,HOU,27.97547,-82.53325,29.64542,-95.27889,1,0.0
2,2018,1,2,WN,28,TPA,MSY,27.97547,-82.53325,29.99339,-90.25803,1,5.0
3,2018,1,2,WN,29,SNA,DEN,33.67566,-117.86822,39.85841,-104.66700,1,19.0
4,2018,1,2,WN,50,STL,MCO,38.74769,-90.35999,28.42889,-81.31603,1,6.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
378,2018,1,2,WN,6963,SNA,OAK,33.67566,-117.86822,37.72129,-122.22072,1,25.0
379,2018,1,2,WN,6973,STL,DAL,38.74769,-90.35999,32.84711,-96.85177,1,8.0
380,2018,1,2,WN,6986,STL,BNA,38.74769,-90.35999,36.12448,-86.67818,1,10.0
381,2018,1,2,WN,6997,STL,DAL,38.74769,-90.35999,32.84711,-96.85177,1,0.0


In [None]:
stream4 = []
lb = LearnBolt()

lb.initialize(None, None)

for e in stream3:
  lb.process(e)

pd.DataFrame(stream4,columns=['predicted','actual','error'])

Predicted 0.0, actual 0.0, error 0.0
Predicted 0.0, actual 0.0, error 0.0
Predicted 0.0, actual 5.0, error 5.0
Predicted 0.11506785759732426, actual 19.0, error 18.884932142402675
Predicted 1.0301725324668687, actual 6.0, error 4.969827467533131
Predicted 1.0147117000442663, actual 0.0, error 1.0147117000442663
Predicted 1.6955509700847504, actual 0.0, error 1.6955509700847504
Predicted 1.3583920872662598, actual 0.0, error 1.3583920872662598
Predicted 1.1291731455517737, actual 0.0, error 1.1291731455517737
Predicted 1.0565963343981761, actual 0.0, error 1.0565963343981761
Predicted 0.9438499206259802, actual 6.0, error 5.05615007937402
Predicted 1.650476895487974, actual 0.0, error 1.650476895487974
Predicted 1.4574779284218136, actual 6.0, error 4.542522071578187
Predicted 1.7858917214543037, actual 0.0, error 1.7858917214543037
Predicted 1.2666363875668571, actual 0.0, error 1.2666363875668571
Predicted 1.017426845539113, actual 1.0, error 0.01742684553911289
Predicted 0.9666246618

Unnamed: 0,predicted,actual,error
0,0.000000,0.0,0.000000
1,0.000000,0.0,0.000000
2,0.000000,5.0,5.000000
3,0.115068,19.0,18.884932
4,1.030173,6.0,4.969827
...,...,...,...
378,9.584332,25.0,15.415668
379,9.729228,8.0,1.729228
380,10.207437,10.0,0.207437
381,9.680903,0.0,9.680903
