# Text classification on Spark with MMLSpark

This notebook shows how to make a text classfication web service using MML Spark serving and deploy it to a Sparl cluster.

## Get data

In [1]:
# get the text data from the github repo and unzip it
from fit_and_store_pipeline import unzip_file_here
import urllib
import os

if not os.path.isfile('./text_data/attack_data.csv'):
    if not os.path.isfile('./text_data.zip'): 
        urllib.request.urlretrieve('https://activelearning.blob.core.windows.net/activelearningdemo/text_data.zip', 'text_data.zip')
    unzip_file_here('text_data.zip')

if not os.path.isfile('miniglove_6B_50d_w2v.txt'):
    unzip_file_here('miniglove_6B_50d_w2v.zip')
    
print('Data files here')

Data files here


In [2]:
# ensure workers spawned use the same environment/executable
import os
import sys 

os.environ["PYSPARK_PYTHON"] = sys.executable

In [3]:
# make a train-test data pair

from fit_and_store_pipeline import create_train_test_split

# requires training_set_01.csv and test_set_01.csv to be present
training_data, test_data = create_train_test_split()

In [4]:
# if pyspark is missing on your machine, you could do
# !{sys.executable} -m pip install pyspark

from pyspark.sql import SparkSession

# configure Spark session to use mmlspark v0.13 (DSVM comes with 0.12)
sparkSB = SparkSession.builder.appName("MyApp")\
        .config("spark.jars.packages", "Azure:mmlspark:0.13")\
        .config("spark.pyspark.python", sys.executable)\
        .config("spark.pyspark.driver.python", sys.executable)

spark = sparkSB.getOrCreate()

import mmlspark
spark

In [5]:
# put data in the spark format

train_sdf = spark.createDataFrame(training_data)
train_sdf = train_sdf\
            .withColumn("label", train_sdf["is_attack"].cast('integer'))\
            .select(["comment", "label"])
                                                             
test_sdf = spark.createDataFrame(test_data)
test_sdf = test_sdf\
            .withColumn("label", test_sdf["is_attack"].cast('integer'))\
            .select(["comment", "label"])

# What have we?
# train_sdf.limit(10).toPandas()
# train_sdf.groupBy("label").count().toPandas()

In [6]:
# make an ML-Lib pipeline involving preprocessor and vectorizer

from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml.classification import RandomForestClassifier

# comment is the text field
tokenizer = Tokenizer(inputCol="comment", outputCol="words")
partitions = train_sdf.rdd.getNumPartitions()
word2vec = Word2Vec(maxIter=4, seed=44, inputCol="words", outputCol="features"
                    # , numPartitions=partitions
                    )
rfc = RandomForestClassifier(labelCol="label")
textClassifier = Pipeline(stages = [tokenizer, word2vec, rfc]).fit(train_sdf)

In [7]:
# if you are going to try a couple different models, pre-featurize first
textFeaturizer = Pipeline(stages = [tokenizer, word2vec]).fit(train_sdf)
ptrain = textFeaturizer.transform(train_sdf).select(["label", "features"])
ptest = textFeaturizer.transform(test_sdf).select(["label", "features"])
ptrain.limit(5).toPandas()

Unnamed: 0,label,features
0,0,"[-0.023442650213837624, 0.02603577682748437, -..."
1,0,"[-0.02291071817411908, 0.0390844551979431, -0...."
2,0,"[-0.030262907435436075, 0.04049098381727207, -..."
3,0,"[-0.053226795885711914, 0.04567255172878504, -..."
4,0,"[-0.03638704048875624, 0.04077244628793918, -0..."


In [8]:
# test prediction on some new data
import pandas as pd

test_attacks = ['You are scum.', 'I like your shoes.', 'You are pxzx.', 
             'Your mother was a hamster and your father smelt of elderberries',
             'One bag of hagfish slime, please']

ta_sdf = spark.createDataFrame(pd.DataFrame({"comment" : test_attacks}))

prediction = textClassifier.transform(ta_sdf)
prediction.toPandas()

Unnamed: 0,comment,words,features,rawPrediction,probability,prediction
0,You are scum.,"[you, are, scum.]","[-0.028500227102388937, 0.07817639410495758, 0...","[9.263528611802501, 10.736471388197497]","[0.46317643059012503, 0.5368235694098749]",1.0
1,I like your shoes.,"[i, like, your, shoes.]","[-0.029791212640702724, 0.07729054428637028, 0...","[12.289011431327564, 7.710988568672435]","[0.6144505715663782, 0.3855494284336217]",0.0
2,You are pxzx.,"[you, are, pxzx.]","[-0.028500227102388937, 0.07817639410495758, 0...","[9.263528611802501, 10.736471388197497]","[0.46317643059012503, 0.5368235694098749]",1.0
3,Your mother was a hamster and your father smel...,"[your, mother, was, a, hamster, and, your, fat...","[-0.031833043110302904, 0.05804957855831493, -...","[10.379377440137818, 9.620622559862184]","[0.5189688720068909, 0.4810311279931092]",0.0
4,"One bag of hagfish slime, please","[one, bag, of, hagfish, slime,, please]","[-0.028880382111916937, 0.04981327926119168, -...","[11.372786956168316, 8.627213043831686]","[0.5686393478084157, 0.4313606521915843]",0.0


In [9]:
# test prediction on the larger test set

scored_test = textClassifier.transform(test_sdf)
scored_test.groupBy(["label", "prediction"]).count()\
            .toPandas().pivot(index="label", columns="prediction")

Unnamed: 0_level_0,count,count
prediction,0.0,1.0
label,Unnamed: 1_level_2,Unnamed: 2_level_2
0,798,60
1,110,32


## Deploy the model as a Spark Streaming job

In [10]:
# now deploy the trained classifier as a streaming job
# define the interface to be like the model's input

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
import uuid

serving_inputs = spark.readStream.server() \
    .address("localhost", 9977, "text_api") \
    .load()\
    .withColumn("variables", from_json(col("value"), test_sdf.schema))\
    .select("id","variables.*")

# says to extract "variables" from the "value" field of json-encoded webservice input

In [11]:
serving_outputs = textClassifier.transform(serving_inputs) \
  .withColumn("prediction", col("prediction").cast("string"))

In [12]:
server = serving_outputs.writeStream \
    .server() \
    .option("name", "text_api") \
    .queryName("mml_text_query") \
    .option("replyCol", "prediction") \
    .option("checkpointLocation", "checkpoints-{}".format(uuid.uuid1())) \
    .start()

In [17]:
# if we want to change something above (like the port), we'll need
# to stop the active server

server.stop()


## Test web service

In [13]:
# inputs and outputs - schema
serving_inputs

DataFrame[id: bigint, comment: string, label: int]

In [14]:
serving_outputs

DataFrame[id: bigint, comment: string, label: int, words: array<string>, features: vector, rawPrediction: vector, probability: vector, prediction: string]

In [16]:
import requests
import json
import time

# calling the service
data = pd.DataFrame({ "comment" : test_attacks })

for instance in range(len(test_attacks)):    
    row_as_dict = data.to_dict('records')[instance]        
    r = requests.post(data=json.dumps(row_as_dict), url="http://localhost:9977/text_api")
    time.sleep(0.2)
    print("Response to : '{}' is {}".format(test_attacks[instance], r.text))

Response to : 'You are scum.' is 1.0
Response to : 'I like your shoes.' is 0.0
Response to : 'You are pxzx.' is 1.0
Response to : 'Your mother was a hamster and your father smelt of elderberries' is 0.0
Response to : 'One bag of hagfish slime, please' is 0.0
