In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F, types as T
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np

In [2]:
np.random.seed(42)

In [3]:
conf = SparkConf()
spark_session = SparkSession.builder \
    .config(conf=conf) \
    .appName('test') \
    .getOrCreate()

In [4]:
data = [
    {'feature1': 1., 'feature2': 0., 'feature3': 0.3, 'feature4': 0.01},
    {'feature1': 10., 'feature2': 3., 'feature3': 0.9, 'feature4': 0.1},
    {'feature1': 101., 'feature2': 13., 'feature3': 0.9, 'feature4': 0.91},
    {'feature1': 111., 'feature2': 11., 'feature3': 1.2, 'feature4': 1.91},
]
df = spark_session.createDataFrame(data)
df.show()



+--------+--------+--------+--------+
|feature1|feature2|feature3|feature4|
+--------+--------+--------+--------+
|     1.0|     0.0|     0.3|    0.01|
|    10.0|     3.0|     0.9|     0.1|
|   101.0|    13.0|     0.9|    0.91|
|   111.0|    11.0|     1.2|    1.91|
+--------+--------+--------+--------+



In [5]:
scaler = StandardScaler()
classifier = IsolationForest(contamination=0.3, random_state=42, n_jobs=-1)
x_train = [list(n.values()) for n in data]

In [6]:
# fit on the data
x_train = scaler.fit_transform(x_train)
clf = classifier.fit(x_train)

In [7]:
# broadcast the scaler and the classifier objects
# remember: broadcasts work well for relatively small objects
SCL = spark_session.sparkContext.broadcast(scaler)
CLF = spark_session.sparkContext.broadcast(clf)

In [8]:
def predict_using_broadcasts(feature1, feature2, feature3, feature4):
    """
    Scale the feature values and use the model to predict
    :return: 1 if normal, -1 if abnormal 0 if something went wrong
    """
    prediction = 0

    x_test = [[feature1, feature2, feature3, feature4]]
    try:
        x_test = SCL.value.transform(x_test)
        prediction = CLF.value.predict(x_test)[0]
    except ValueError:
        import traceback
        traceback.print_exc()
        print('Cannot predict:', x_test)

    return int(prediction)

In [9]:
udf_predict_using_broadcasts = F.udf(predict_using_broadcasts, T.IntegerType())

In [11]:
df = df.withColumn(
    'prediction',
    udf_predict_using_broadcasts('feature1', 'feature2', 'feature3', 'feature4')
)
df.show()

IllegalArgumentException: 'Unsupported class file major version 55'