In [1]:
from typing import Tuple, Any
import os
import sys

import torch
import numpy as np
from pyspark.sql.types import DoubleType, ArrayType
from pyspark.sql import SparkSession
import pandas as pd

from ml_hadoop_experiment.pytorch.spark_inference import with_inference_column, with_inference_column_and_preprocessing
from ml_hadoop_experiment.common.spark_inference import SerializableObj, artifact_type

## Import your model

Here our model is a very simple neural network which hidden size is configurable.
Your model will be broadcasted to executors at some point and thus must be serializable. For that reason,
its definition must be in a Python module of your environment (and not in the notebook).

In [2]:
from ml_hadoop_experiment.pytorch.fixtures.test_models import ToyModel

In [3]:
# parameter: hidden size
model = ToyModel(500)
model

ToyModel(
  (hidden1): Linear(in_features=2, out_features=500, bias=True)
  (hidden2): Linear(in_features=500, out_features=10, bias=True)
  (softmax): Softmax(dim=None)
)

## Load your dataset used for inference

In [None]:
def create_local_spark_session():
    if "SPARK_HOME" in os.environ.keys():
        del os.environ["SPARK_HOME"]
    os.environ["PYSPARK_PYTHON"] = sys.executable
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
    SparkSession.builder._options = {}
    ssb = SparkSession.builder.master("local[1]").config("spark.submit.deployMode", "client")
    ss = ssb.getOrCreate()
    return ss

In [4]:
# Create a spark session with create_remote_spark_session to run on yarn cluster
# We create a local spark session here only for demonstration 
ss = create_local_spark_session("Inference with PyTorch")

For the sake of this example notebook, our dataset is composed of two features. Examples are randomly generated

In [5]:
n_examples = 200

In [6]:
feature1 = np.random.random([n_examples])
feature2 = np.random.random([n_examples])

In [7]:
df = ss.createDataFrame(
    list(zip(feature1.tolist(), feature2.tolist())),
    ["feature1", "feature2"]
)
df

DataFrame[feature1: double, feature2: double]

In [8]:
df.count()

200

## Make sure your model is serializable

Your model is going to be broadcasted to executors so it must be serializable. We provide the wrapper SerializableObj that guarantee that your model is serializable. Simply define and import the function that loads your model and provide it to SerializableObj along with any parameters that you would need to load your model. Again, it is important that your function lies in a Python module of your environment (and not in the notebook).

In [9]:
from ml_hadoop_experiment.pytorch.fixtures.test_models import load_toy_model
load_toy_model

<function thx.pytorch.fixtures.test_models.load_toy_model(hidden_size:int) -> thx.pytorch.fixtures.test_models.ToyModel>

In [10]:
# Create a serializable model by simply providing your the function and parameters to load our model
serializable_model = SerializableObj(ss, load_toy_model, 500)

## Inference only

### Define the inference function

This function describe how to call our model and how to run inference

In [11]:
# The 1st parameter is a list of artifacts that we need to run inference.
# In our example, this is simply our model. It could be a tokenizer + model for example.

# The 2nd parameter is a tuple of pandas Series, one series per input features.
# So in our example, it is a tuple of two pandas Series.

# The 3rd parameter is the device name ("cpu", "cuda:0", "cuda:1" ...). When running on CPU-only machines,
# the device is "cpu". When running on GPU machines, the device is "cuda:0" or "cuda:1" or ...
# depending on the number of GPUs available on the machine and the number of task per GPU machine.
# Tasks are uniformly distributed on all GPUs.

# The output must be a pandas series containing all your outputs. In our example, each element of the
# pandas series is a list of 10 doubles (output of the model)
def inference_fn(
    artifacts: artifact_type, features: Tuple[pd.Series, ...], device: str
) -> Tuple[pd.Series, ...]:
    model = artifacts
    model.to(device)
    feature1, feature2 = features
    feature_1_as_tensor = torch.Tensor(feature1.to_list())
    feature_2_as_tensor = torch.Tensor(feature2.to_list())
    results = model(feature_1_as_tensor, feature_2_as_tensor)
    return pd.Series(results.numpy().tolist())

### Run inference

In [12]:
df_with_predictions = with_inference_column(
    df=df,
    artifacts=serializable_model,
    input_cols=["feature1", "feature2"],
    inference_fn=inference_fn,
    # Our model output is an array of doubles
    output_type=ArrayType(DoubleType()),
    batch_size=50,
    output_col="predictions",
    num_threads=1
)

In [13]:
pdf = df_with_predictions.toPandas()



In [14]:
pdf

Unnamed: 0,feature1,feature2,predictions
0,0.585711,0.622785,"[0.14496324956417084, 0.06333291530609131, 0.0..."
1,0.260878,0.160317,"[0.11619735509157181, 0.07071886956691742, 0.0..."
2,0.979533,0.947563,"[0.1776239275932312, 0.052888333797454834, 0.0..."
3,0.212702,0.091959,"[0.11229074001312256, 0.07178390026092529, 0.0..."
4,0.098237,0.062555,"[0.10662975162267685, 0.07581344991922379, 0.0..."
...,...,...,...
195,0.826296,0.181083,"[0.1443796455860138, 0.052474234253168106, 0.0..."
196,0.206053,0.881988,"[0.13214512169361115, 0.07988481223583221, 0.0..."
197,0.689100,0.887212,"[0.15844134986400604, 0.06172007694840431, 0.0..."
198,0.089348,0.853725,"[0.12560969591140747, 0.0846186950802803, 0.07..."


## Preprocessing and inference

### Define the preprocessing function

In [15]:
# The 1st parameter is a list of artifacts that we need to run inference.
# In our example, this is simply our model

# The 2nd parameter is a tuple of features, as many component as input features.
# So in our example, it is a tuple of two components, one double for the first feature and
# a second double for the second feature

# The output must be Pytorch tensor(s). These tensors will be grouped by batches and provided to your
# inference function
def preprocessing_fn(
    artifacts: artifact_type, features: Tuple[Any, ...], device: str
) -> Tuple[torch.Tensor, ...]:
    feature1, feature2 = features
    return torch.Tensor([feature1]) + 1, torch.Tensor([feature2]) + 2

### Define the inference function

In [16]:
# The 1st parameter is a list of artifacts that we need to run inference.
# In our example, this is simply our model

# The 2nd parameter is a tuple of pytorch Tensors, one tensor per input features.
# So in our example, it is a tuple of two pytorch Tensors.

# The 3rd parameter is the device name ("cpu", "cuda:0", "cuda:1" ...)
def inference_fn(
    artifacts: artifact_type, features: Tuple[torch.Tensor, ...], device: str
) -> Tuple[Any, ...]:
    model = artifacts
    model.to(device)
    feature1, feature2 = features
    results = model(feature1, feature2)
    return results.numpy().tolist()

### Run inference

In [17]:
df_with_predictions = with_inference_column_and_preprocessing(
    df=df,
    artifacts=serializable_model,
    input_cols=["feature1", "feature2"],
    preprocessing=preprocessing_fn,
    inference_fn=inference_fn,
    # Our model output is an array of doubles
    output_type=ArrayType(DoubleType()),
    batch_size=50,
    output_col="predictions",
    num_threads=1
)

In [18]:
pdf = df_with_predictions.toPandas()



In [19]:
pdf

Unnamed: 0,feature1,feature2,predictions
0,0.585711,0.622785,"[0.2733425796031952, 0.041139136999845505, 0.0..."
1,0.260878,0.160317,"[0.23460765182971954, 0.049187857657670975, 0...."
2,0.979533,0.947563,"[0.31539714336395264, 0.03235138952732086, 0.0..."
3,0.212702,0.091959,"[0.22901983559131622, 0.05043504014611244, 0.0..."
4,0.098237,0.062555,"[0.21981926262378693, 0.053840599954128265, 0...."
...,...,...,...
195,0.826296,0.181083,"[0.2794216275215149, 0.03498457372188568, 0.04..."
196,0.206053,0.881988,"[0.2501583993434906, 0.05209602043032646, 0.04..."
197,0.689100,0.887212,"[0.28918448090553284, 0.03880692273378372, 0.0..."
198,0.089348,0.853725,"[0.24045471847057343, 0.05580238997936249, 0.0..."


In [20]:
ss.stop()