# Machine Learning on Streams with Apache Beam

# What we will build 

In this workshop, we will learn how to use Apache Beam to detect potential disasters occurring in the world. We will receive a stream of Tweets as input, use machine learning to detect anomalies, and send alert events in an output stream. These alerts can be consumed in real-time by any service of interest.

# Coding environment

You simply need to install Apache Beam using `pip install apache-beam` to run this notebook. You also need pandas and scikit-learn.

If you are using Noto, you can simply run these commands in your terminal

```
my_venvs_create dag
my_venvs_activate dag
pip install apache-beam
my_kernels_create dag "DAG"
```

Then, switch your notebook kernel using the kernel selector on the top-right of this page. If it does not appear in the dropdown, close the window, shut down the notebook's kernel and re-open the notebook.

# Streaming Data Processing

Data processing is either offline or online. There are a lot of applications for streaming/online machine learning, where you receive a continuous flux of data to transform. The output can then be either stored or directly sent to another output stream.
For instance, detecting anomalies in a continuous stream of events (IoT sensors, transactions, ...) and sending alterts to an output stream.

![Batch vs. Stream Processing](assets/batch-stream.png)


# Apache Beam

**Apache Beam** is a unified programming model to define batch and **streaming** data processing jobs. Once you define a job, you need an execution engine, or runner, to run your batch. Beam is compatible with multiple runners such as *Apache Spark*, *Apache Flink* or *Google DataFlow*. It follows the philosopy "Write once, run everywhere" and has SDK in multiple languages, i.e **Python**, Java and Go.


## Assumptions

We assume a basic knowledge in machine learning (mainly Pandas and SKlearn) and Python programming language.






# Dataset

The dataset comes from a Kaggle competition: [Natural Language Processing with Disaster Tweets
](https://www.kaggle.com/c/nlp-getting-started).

The objective is to predict which Tweets talks about about natural disasters and which does not.

Example of disaster Tweet:
> Our Deeds are the Reason of this #earthquake May ALLAH Forgive us all

> Forest fire near La Ronge Sask. Canada

> All residents asked to 'shelter in place' are being notified by officers. No other evacuation or shelter in place orders are expected

Example of normal Tweet:
> What's up man?

> I love fruits

> Summer is lovely


# Technologies

Since we don't assume any access to cloud providers, we will run this notebook locally. Doing so, we will simulate some critical parts like streaming and storage. I will give you a quick overview of the services you could use in a production setup on the Google cloud. Other cloud providers have equivalent for all these technologies.

1. **Google Cloud Storage**: bucket to store the data and the trained model (replaced with our local disk)
2. **Apache Beam**: Python SDK to create data processing job
3. **Google Cloud Pub/Sub**: ingestion platform for event-driven systems and streaming analytics (replaced with a simulated stream)
4. **Google Cloud DataFlow**: cloud executor for job expressed with Apache Beam SDK (replaced with a local runner)


# Overall System
![overall pipeline](https://i.postimg.cc/sDSB2Tzm/overall-pipeline.png
"Overall System")





# Beam Pipeline

![beam pipeline](https://i.postimg.cc/T1rRkTD9/beam-pipeline.png
"Beam Pipeline")

# Plan

1. Quick introduction on streams
2. Train a simple model and save it locally
3. Prepare the input stream (Tweets)
4. Build a Beam streaming pipeline to process the Tweets


# Setup and Configuration
Import libraries 

In [None]:
from typing import Tuple, List, Dict, Iterable, Any

# A glance at Google Cloud Pub/Sub

It's the service used in this workshop to handle stream of events. We briefly introduce the 2 main concepts to understand how it works: **Topics** and **Subscriptions**.


## Topics: enqueuing messages

A topic can be seen as queue. It's the place where you push/enqueue messages. In our setup we want a topic to push Tweets (input) and a topic to push predictions (output).

## Subscriptions: dequeuing (consuming) messages

Now that messages are pushed to the topics (*queues*) we use subscriptions to pull messages out of it (*dequeue*). Any application who need to access message in the queue does it through a subscription. In our case we want a subscription to pull Tweets for the first queue and a subscription to pull predictions from the second queue.

A topic can have multiple subscriptions. It will deliver the messages once to every subscriptions. 



## Overview
![overall pipeline](https://i.postimg.cc/zvL1f2Rk/pub-sub.png
"Overall System")



# Modeling

We need a model to classify tweets as disaster or not. 

**The purpose of this workshop is not modeling, so we will not spend to much time on it and go for an easy solution. The pre-processing and modeling are purposedly rudimentary.**

We build the model as a SKlearn pipeline, it has the advantage to bundle multiple steps into one estimator. We can then have our preprocessing included in the model:

1. First we select the column of interest in the DataFrame, in our case the text column. This column does not contain any NA values so ...
2. We vectorize the raw text using TFIDF ([TFIDF details](https://en.wikipedia.org/wiki/Tf%E2%80%93idf), [Sklearn TFIDF](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html))
3. We apply a Random Forest classifier ([Sklearn RandomForestClassfier](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html))


---

Here are two links for the train and test datasets:

- [Train dataset](https://drive.google.com/file/d/1rsrAu4F13UCHsKpWjxRIh0ObsjWSMVSE/view?usp=sharing)
- [Test dataset](https://drive.google.com/file/d/1yjX4e2U2auLQn01HYUgBKEJT8q3g15BJ/view?usp=sharing)

Download them and place them in the sames folder as the notebook.

In [None]:
import pickle

import pandas as pd
from sklearn.model_selection import cross_validate
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer

In [None]:
# Read the data from our Google Cloud bucket
df_train = pd.read_csv("data/train.csv")

df_train.dropna().head()

# Model architecture

In [None]:
def select_text(df: pd.DataFrame) -> pd.Series:
    return df["text"]

# Model pipeline
pipe = Pipeline([
    ('selector', FunctionTransformer(select_text)),
    ('tfidf', TfidfVectorizer(lowercase=True)),
    ('model', RandomForestClassifier(random_state=42))

])

In [None]:
# Cross-validation step to get an idea of model performances

cv_results = cross_validate(pipe, df_train, df_train['target'], cv=3, scoring=["roc_auc"], return_train_score=True, n_jobs=-1)
cv_results = pd.DataFrame(cv_results)
cv_results.agg(("mean", "std"))

In [None]:
model = pipe.fit(df_train, df_train["target"])

In [None]:
# We pickle and save the model
pickle.dump(model, open("model.pkl", 'wb'))

# Beam Pipeline Overview

We can summarize beam pipelines as follow:

1. Read data from any source into a `PCollection` A
2. Transform A into another `PCollection` B by applying a `PTranform` (map, filter, ...)
3. Transform B intot another `PCollection`
4. ... keep chaining tranformers
5. Save the final output somewhere


<br/>

![beam-pipeline-doc](https://beam.apache.org/images/design-your-pipeline-linear.svg "Windowing")
[*Image taken from official documentation*](https://beam.apache.org/documentation/programming-guide/#transforms)

<br/>

## Initialization

We first need an pipeline. It will be the entry point:

```python
pipeline = beam.Pipeline()
```

Or to create a pipeline and run it locally directly:

```python

with beam.Pipeline() as pipeline:
    ...
```

## Applying a transform on your pipeline

Use the **pipe** operator `|` to apply a transform on a collection. You can pair it with the `>>` to give a name to the transformation:

Without name:

```python
pipeline | transform
```

With name:

```python
pipeline | "step_name" >> transform
```

## Read data


In [None]:
import apache_beam as beam

In [None]:
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )


## Common operators

### beam.Map (map a function on a collection)

In [None]:
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "add_one" >> beam.Map(lambda x: x + 1)
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )

### beam.ParDo with beam.DoFn

Beam has a generic transform for data processing: `beam.ParDo`. 
It takes as input a function to apply: `beam.DoFn`. 

### ParDo
1. run on each element in the input `PCollection`
2. apply a processing function
3. emit **zero**, **one**, or **many outputs** for each input element

It's generic and can implement any usual transform: map, flatmap, filter, ...
Indeed all these are special cases of `beam.ParDo` processing.


#### DoFn
`beam.DoFn` represent a processing function applied by the `beam.ParDo`.

A beam.DoFn function **should always return an iterable or None**. beam.ParDo will flatten the iterable.
For instance if you want to return one element, you need to wrap it into an iterable.
The first dimension of your output is flattened. 

Another nice feature is **stateful computation**. Since `beam.DoFn` is a class, it is possible to have an **internal state**. 





Quick example: add an incremental index, starting from N, to elements

```
Input: 1, 2, 3, 4, 5

Apply IdxFn

Output:  (N, 1), (N+1, 2), (N+2, 3), (N+3, 4), (N+4, 5)
```

In [None]:
class IdxFn(beam.DoFn):
    """Custom DoFn function to add an index. It will be applied using beam.ParDo"""
    def __init__(self, init_state: int):
        # this is is the first number to use as an index, we will increment it for each element
        self.state = init_state

    def process(self, elem: Any) -> Iterable[Any]:
        # create a tuple with the current state as index and the element
        res = (self.state, elem)
        # increment the state
        self.state += 1
        return [res]

print("Like a Map:")
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "with_idx" >> beam.ParDo(IdxFn(init_state=10))
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )

### beam.Filter (filter a collection) - Optional, not used in this workshop

Write a similar pipeline that prints only the even numbers in a created list of numbers

In [None]:
with beam.Pipeline() as pipeline:
    ### 
    ### YOUR CODE HERE
    ###

### beam.window.WindowInto

#### Without window

![without-window](https://i.postimg.cc/N0Nksgrx/without-window.png
"Windowing")

#### With session window

![with-window](https://i.postimg.cc/cLFWzjCV/with-window.png
"Windowing")

In [None]:
with beam.Pipeline() as pipeline:
    input_stream = [(1, 100), (2, 101), (3, 102), (4, 200)]
    input_stream = [beam.window.TimestampedValue(elem, timestamp) for elem, timestamp in input_stream]
    pc = (pipeline
        | beam.Create(input_stream)
        | 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
        | 'group' >> beam.GroupBy()
        | 'pp' >> beam.Map(lambda x: print(f"Batch: {x}"))
    )

# Our Beam Pipeline

As a reminder, this is what we want:

![beam pipeline](https://i.postimg.cc/xT5TvZnJ/beam-pipeline-focus.png
"Beam Pipeline")

We will focus on two important parts: **Grouping by chunk** of time and **Predicting**.






# Predict

The most custom and critical step here is the **prediction**. Indeed we need to build a custom `beam.DoFn` with an initialization step pulling the pickled model from Google Cloud and loading the model. Then use the model to transform the incoming tweets. 

Here are the steps for the prediction part:
1. we implement a custom `beam.DoFn` class called `ApplyModel`
2. in the `__init__` method we pull and load the model
3. we implement the `process` method which receives batch of tweets and use the model to label them as *DISASTER* or *NORMAL*


![dofn](https://i.postimg.cc/KYpdxdBT/dofn.png
"Windowing")

In [None]:
import re
import json

# Create a simulated Tweet stream (from the testing set)

In [None]:
test_jsons = pd.read_csv("data/test.csv").to_dict("records")

timestamps = list(range(0, 10)) + list(range(50, 60)) 
tweets_input_stream = list(zip(test_jsons[:len(timestamps)], timestamps))

tweets_input_stream = [beam.window.TimestampedValue(elem, timestamp) for elem, timestamp in tweets_input_stream]

# Apply model in Beam pipeline

In [None]:
class ApplyModel(beam.DoFn):
    def __init__(self):
        """Initialize the function"""
        self._model = None
        # We import in the init statement to have the library available even when we run the pipeline 
        # on remote executors such as DataFlow
        import pandas as pd
        import pickle
        self._pd = pd
        self._model = pickle.load(open("model.pkl", 'rb'))

     
    def process(self, group: Tuple[Any, Iterable[Any]]) -> Iterable[Any]:
        """Process every batch of tweets"""
        # extract batch and build DataFrame
        
        ### 
        ### YOUR CODE HERE
        ###
        
        # predict
        # modify "prediction" column in df as "NORMAL" for 0 and "DISASTER" for 1
        # format DataFrame to JSON format in variable `res`
        
        ### 
        ### YOUR CODE HERE
        ###
    
        return res


def build_pipeline(pipeline: beam.Pipeline) -> beam.Pipeline:
    """Takes an empty beam.Pipeline in input and returns the full beam.Pipeline"""

    parse = pipeline | beam.Create(tweets_input_stream)

    
    # group the tweets in time windows, this will be our batches
    group = (parse 
        | 'window' >> beam.WindowInto(beam.window.FixedWindows(10))
        | 'group' >> beam.GroupBy()
    )

    def debug_fn(json_with_prediction: Dict) -> Dict:
        """Pretty print for model predictions"""
        txt = json_with_prediction["text"]

        dots = re.findall('.{1,80}', ' '*len(txt))
        txt = re.findall('.{1,80}', txt)

        for txt_line, dots_line in zip(txt, dots):
            if json_with_prediction["prediction"] == "NORMAL":
                ff = f"{txt_line:80s} | {dots_line:80s}"
            else:
                ff = f"{dots_line:80s} | {txt_line:80s}"
            print(ff)

        print(f"{' '*80} | {' '*80}")

        return json_with_prediction
    
    # make predictions and converts them to JSON then to bytes
    predict = (
        group
        | 'predict' >> beam.ParDo(ApplyModel())
        | 'debug' >> beam.Map(debug_fn)
        #| 'to_bytes' >> beam.Map(lambda x: json.dumps(x).encode("utf-8"))
    )


    return pipeline

# How to run a pipeline

## Locally

We can run a Beam pipeline locally using the `DirectRunner`, it's useful to for debugging and testing.

## Remotely
In production, we want to run the pipeline into a remote executor for maximum performance and potentially scaling capabilities. 
In this workshop we will use the Google Cloud DataFlow executor. 

## How we proceed

We will first run it locally to make sure that everything runs smoothly.

Then we will run it on Google Cloud DataFlow runner to simulate a production application.

> Disable warnings

In [None]:
from apache_beam.runners import DataflowRunner
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions, GoogleCloudOptions

In [None]:
print()
print(f"{'NORMAL':80s} | {'DISASTER':80s}")
print(f"{'':80s} | {'':80s}")

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with beam.Pipeline(options=options) as pipeline:
    build_pipeline(pipeline)

### Necessary snippet to launch our code in a real production environment

As `google` is not defined in this case, the code will not run.

```python
# Set up Apache Beam pipeline options.
options = PipelineOptions(streaming=True)

# Set the project to the default project in your current Google Cloud
# environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'europe-west6'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://bucket/dataflow'

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# Set the SDK location. This is used by Dataflow to locate the
# SDK needed to run the pipeline.
#options.view_as(SetupOptions).sdk_location = (
#    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
#    beam.version.__version__)

pipe = build_pipeline(beam.Pipeline())
runner = DataflowRunner()
runner.run_pipeline(pipe, options)
```

# Conclusion


## Machine Learning on Streams 
Processing streams with machine learning has many applications:

1. IoT sensors
2. Server logs ...


## Apache Beam
Apache Beam facilitate stream manipulation. It has numerous advantages:

1. Multiple compatible backends
2. Available in multiple programming languages
3. Built-in support for batch and **streaming** processing


## Serverless
We used multiple managed services (Storage, Pub/Sub, DataFlow). It simple to setup and it scales smoothly.
Just be careful with the bill, features like auto-scaling can surprise you.





# Thank You For Attending !

## We hope this workshop will be helpful !