# Batch Predictions
- In the previous two mlops lessons we implemented a real time prediction endpoint using Flask and Docker
- This is great as long as our predictions need to run on a small number of samples every time but require fast processing speed. For example, to publish on Rapid API and users are expected to hit the API with 10 rows of data at a time.
- For most applications this is the way to go since you can expect each user to only require predictions for their own data.
- The real time APIs can be scaled to support very high concurrency (many users hitting the API at the same time but each user only has few rows to predict) by using technologies like Kubernetes.
- However, for some special use cases, especially within Data Engineering, you might need to run model predictions for several thousands or millions of data points at the same time. 
    - These applications will generally run at a regular cadence, such as once per week or once per day
    - They can also tolerate a longer running time. Though the running time should be longer than the interval between 2 runs. For example, if predicting on a million data points takes 2 days to run, we cannot schedule this task to run every day!
    
<details><summary>Based on your current Python knowledge, how would you run predictions on several rows of data if you can only feed one row of data to the model at a time?</summary>

- You would use a `for` loop!
- Loop over the rows one by one, feed it to the model, get predictions and save the predictions to a new column
![](../images/for_loop.png)
</details>

<details><summary>If one row takes 1second to get predictions, how long will the above strategy take to complete all the predictions for `N` rows?</summary>

- If each row takes 1s and we have total `N` rows, since we feed one row at a time, total time to complete all predicions is `N` seconds!
- If `N` is small this is not an issue. However, if `N` is 1 Million, then predictions will take 11.5 days to complete!
</details>

<details><summary>How can we speed this up?</summary>

- Parallelisation!
- Instead of setting up one machine running our model, we can setup `N` machines and each machine is fed 1 row of data
- All the `N` machines make predictions concurrently
- Total prediction time is only 1 second!
- This is also known as horizontal scaling
- In practice, setting up `N` machines is too expensive especially if `N` is large. So instead we decide how fast we want to the predictions to complete, say 1 day. Then calculate the minimum number of machines needed to complete the predictions within this time. 
- It's not an exact calculation, so we always add some buffer.
- In this way, we feed `batches` of rows into each machine at the same time.
![](../images/spark.png)
</details>

<details><summary>Sounds great! How do we do this?</summary>

- As always, there are many ways. All of them have one thing in common: the ability to coordinate execution of code across a `cluster` of machines
- Some popular methods are:
    1. [Apache Spark](https://spark.apache.org/)
    1. [Apache Beam](https://beam.apache.org/)
    1. [Dask](https://dask.org/)
- Apache Spark is by far the most popular way to do this! Though I personally prefer Apache Beam. In my opinion, Apache Beam will be more popular in the future since its easier to implement than Spark. 
- Spark is moving in the right direction too! From Spark 3.2 onwards, there is a new feature called [Pandas API on Spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) which is inspired by Dask to make it easier to program in Pyspark using syntax very similar to Pandas! But this feature is still very new, and not many companies would've adopted it yet. So let's implement a Spark program to run model predictions on many images using the traditional way of doing it in Spark
</details>

<details><summary> How about Kubernetes? </summary>
    
- Kubernetes is another way to parallelise predictions but it does it slightly differently
    - Generally used when you expect a lot (many millions or billions) of users to access your API at the same time. But each user only posts a small number of rows each. Think of Google search. Each user only searches one search at a time, but many billion users are running the search concurrently. The key difference is the parallelisation should provide response in **REALTIME!**
    - K8S is still very complicated to manage in production. Typically managed by a dedicated Cloud Engineering team. Almost never involves a Data Scientist.
    - DO NOT attempt to manage a K8S cluster without a dedicated Cloud Engineering team!
    - For all DS purposes, we just need to wrap our flask app in a Docker container, and it can then run on either Cloud Run (medium traffic) or K8S (extremely large traffic)
![](../images/k8s.png)
</details>

<details><summary>I'm totally confused!</summary>

<img src="../images/confused_meme.png" style="width: 300px;"/>
    
- It's normal to be confused with all the options available out there. 
- That's why most companies have a dedicated team called *Solution Architect* to design the architecture for each service developed by the company. 
- Personally, I came up with this simple flowchart to help me decide what technology to use. Hope you find it useful!

![](../images/technology_selection.png)
</details>


# Spark
![](../images/spark.png)
- Spark is a technology to orchestrate running an application across multiple machines at the same time
- Fundamentally, if each row of data can be predicted independent of all the other rows, which is the case for any ML model, then we can theoretically feed each row to a different machine!
- In reality, we take our large dataframe and break it up into `N` *partitions*. Each *partition* containing `X` rows.
- The rows in each *partition* is then processed by one machine.
- This is how we're going to use Spark
- Similar to Docker, Spark has tremendous depth and can do a lot of fancy things. But if we stick to the basics as mentioned above, we can achieve our use case of running predicitions on millions of rows without a real time constraint

# Setup

## Create a Spark Cluster running on your local computer!
1. This step is only because we do not have access to a real Spark cluster. If you're working in a company, you'd skip this step and directly use your company's Spark cluster instead
1. On your Docker desktop, go to Settings -> Resources and set Memory: 4.00 GB 
1. Install [Docker Compose](https://docs.docker.com/compose/install/)
1. We'll be modifying the instructions from this [Github repo](https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker)
    ![](../images/cluster-architecture.png)
1. Run the following command from terminal: `curl -LO https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/docker-compose.yml --ssl-no-revoke`
1. It downloads a `docker-compose.yml` file.
1. The default setting is to only provide 512MB RAM for each worker node. Let's update it to 4GB. Open the newly downloaded `docker-compose.yml` and update `SPARK_WORKER_MEMORY=4G` and `SPARK_WORKER_CORES=2`
1. Let's update the jupyterlab service to forward to port `8889` instead of `8888`
1. Your `docker-compose.yml` should now look like this
    ```
    ---
    # ----------------------------------------------------------------------------------------
    # -- Docs: https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker --
    # ----------------------------------------------------------------------------------------
    version: "3.6"
    volumes:
      shared-workspace:
        name: "hadoop-distributed-file-system"
        driver: local
    services:
      jupyterlab:
        image: andreper/jupyterlab:3.0.0-spark-3.0.0
        container_name: jupyterlab
        ports:
          - 8889:8888
          - 4040:4040
        volumes:
          - shared-workspace:/opt/workspace
      spark-master:
        image: andreper/spark-master:3.0.0
        container_name: spark-master
        ports:
          - 8080:8080
          - 7077:7077
        volumes:
          - shared-workspace:/opt/workspace
      spark-worker-1:
        image: andreper/spark-worker:3.0.0
        container_name: spark-worker-1
        environment:
          - SPARK_WORKER_CORES=2
          - SPARK_WORKER_MEMORY=4G
        ports:
          - 8081:8081
        volumes:
          - shared-workspace:/opt/workspace
        depends_on:
          - spark-master
      spark-worker-2:
        image: andreper/spark-worker:3.0.0
        container_name: spark-worker-2
        environment:
          - SPARK_WORKER_CORES=2
          - SPARK_WORKER_MEMORY=4G
        ports:
          - 8082:8081
        volumes:
          - shared-workspace:/opt/workspace
        depends_on:
          - spark-master
    ...
    ```
1. Run this command from a terminal to start up the local Spark cluster! `docker-compose up`
1. Open JupyterLab running on the Spark Cluster: http://localhost:8889/. 
1. **Upload this notebook into the Spark Cluster using Jupyter Lab**
1. **Upload the model file `cats_vs_dogs.h5` into the Spark Cluster using Jupyter Lab**
1. You can check the SparkUI as well: http://localhost:4040

----

# Spark Code
- All the code below this line needs to be run from Jupyterlab within your Spark Cluster: http://localhost:8889/

## Download data on your Spark cluster
1. This step is only because we do not have access to a real Spark cluster. If you're working in a company, you'd skip this step and directly use the data already present on your company's Spark cluster instead

In [None]:
# Download and unzip the data
! curl https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip --output ./data/cats_and_dogs_filtered.zip
! unzip data/cats_and_dogs_filtered.zip -d ./data/
! rm data/cats_and_dogs_filtered.zip

## Install the necessary libraries

In [None]:
# Install the necessary libraries
!pip install pillow pandas tensorflow-cpu pyarrow

## Find the paths of all the image files we want to predict

In [None]:
# Find the paths of all the images
from pathlib import Path
import pandas as pd

# Path to all the downloaded images
img_path = Path('data/cats_and_dogs_filtered')

# Find list of all files in the path
images = [path for path in img_path.glob('**/*.jpg')]

# Load the file names to a dataframe
image_df = pd.DataFrame(images, columns=['img_path'])
image_df.to_csv('data/images.csv', index=False)

print(image_df.shape)
image_df.head()

## Imports

In [None]:
# Import necessary libraries
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
import numpy as np
import pandas as pd

# Spark works in a session. Let's create a spark session
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# To prevent memory errors
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5")

## Read the image paths csv file as a Spark Dataframe. 
- Spark Dataframe is different from a Pandas dataframe but they both share lots of similarities

In [None]:
spark_df = spark.read.csv('data/images.csv', header=True)

# Print number of rows
print(spark_df.count())

# Equivalent of pandas_df.head()
spark_df.show(5)

## Repartition the Spark Dataframe
- Spark dataframe derives its power by its ability to process chunks of rows in parallel
- Each chunk of row is a partition
- To ensure we exploit the Spark dataframe's power, let's repartition our dataframe into 10 partitions. You can use more if you have more worker nodes!

![](../images/spark.png)

In [None]:
print(f'Initial number of partitions: {spark_df.rdd.getNumPartitions()}')

spark_df = spark_df.repartition(10)
print(f'Final number of partitions: {spark_df.rdd.getNumPartitions()}')

## Create the PandasUDF (Pandas user defined function)
- Spark dataframe provides a special type of operation called `PandasUDF`
- PandasUDF allows us to apply any Python function on a Spark dataframe
- In this case, we'll apply the data preprocessing and model predictions on the spark dataframe using a PandasUDF
- The syntax is a little confusing and documentation is poor. Ends up you need to trial and error a bit to find out how to implement it. The good new is that once you implement for one model, you can reuse the code for any other model! 
- For more info on PandasUDF, check out this [Link](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)

In [None]:
# Prediction code
@F.pandas_udf('string', F.PandasUDFType.SCALAR_ITER)
def predict(image_paths: pd.Series) -> pd.Series:
    # Load the model
    trf_model = load_model('cats_vs_dogs.h5')
    
    # Iterate over each Partition
    for images in image_paths:
        # Load all the images within the partition and preprocess them
        test_images = [image.load_img(img, target_size=(224, 224)) for img in images]
        test_images = [image.img_to_array(img).tolist() for img in test_images]
        test_images = preprocess_input(np.array(test_images))
        
        # Run predictions
        result = trf_model.predict(test_images, batch_size=5)
        result = ['Dog' if pred[0]>0.5 else 'Cat' for pred in result]
        
        # Yield the results as a pandas series
        yield pd.Series(result)

## Run the PandasUDF on the Spark dataframe and save results to csv files

In [None]:
# Run the pandasUDF on our Spark dataframe
spark_df = spark_df.withColumn('prediction', predict('img_path'))

# Write results to csv files
spark_df.write.csv('output', mode='overwrite', header=True)

# What's next?
- We only ran on 3000 rows. You can run the same code on 300 Million rows by just adding more worker nodes.
- Spark is mostly in the realm of Data Engineering.
- But as a DS, it's handy to know at least how to run predictions using Spark.
- Spark has lots more depth and we only scratched the surface to solve a very specific and narrow problem (model predictions).
- If you want to learn more about Spark, recommend this book: https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/