In [1]:
# Install packages on the EMR cluster
sc.install_pypi_package("pandas")
sc.install_pypi_package("s3fs")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
11,application_1591548674320_0012,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Using cached https://files.pythonhosted.org/packages/a4/5f/1b6e0efab4bfb738478919d40b0e3e1a06e3d9996da45eb62a77e9a090d9/pandas-1.0.4-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.4 python-dateutil-2.8.1

Collecting s3fs
  Using cached https://files.pythonhosted.org/packages/b8/e4/b8fc59248399d2482b39340ec9be4bb2493846ac23641b43115a7e5cd675/s3fs-0.4.2-py3-none-any.whl
Collecting fsspec>=0.6.0 (from s3fs)
  Using cached https://files.pythonhosted.org/packages/0f/31/f27a81686b2f1b2f6776bd5db10efc7d88f28a50e8888f55409ef6501a50/fsspec-0.7.4-py3-none-any.whl
Collecting botocore>=1.12.91 (from s3fs)
  Using cached https://files.pythonhosted.org/packages/7f/66/e0d17816e957ee440bd75b13a7f94b

In [2]:
import random
import pyspark
import pandas as pd
import numpy as np
import s3fs
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import MultilayerPerceptronClassifier
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

First, we read the data from an AWS S3 Bucket. The data frame contains 369 observations.

In [3]:
data = spark.read.csv('s3://cpdp-complaints/plain_text.csv',
                      header=True, 
                      inferSchema=True,
                      multiLine=True)
data = data.filter(data.text_content.isNotNull())
data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- text_content: string (nullable = true)
 |-- tasers_baton_aggressive_physical_touch_gun: integer (nullable = true)
 |-- trespass_robbery: integer (nullable = true)
 |-- racial_slurs_xenophobic_remarks_: integer (nullable = true)
 |-- planting_drug_guns: double (nullable = true)

In [4]:
data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

369

Create functions that tokenizes the text, removes stop words, and applies a word2vec embedding.

In [5]:
def create_piped_data(target_variable, data):
    # Create Label
    model_data = (data.withColumn('label',
                                  when(data[target_variable]
                                       .isNull(), 0)
                                  .otherwise(data[target_variable])
                                  .cast('integer'))
                  .select('text_content', 'label'))
    # Create tokenizer
    tokenizer = Tokenizer(inputCol='text_content', outputCol='tokens')
    # Define stop words remover
    stopwords = StopWordsRemover.loadDefaultStopWords('english')
    stopwords.append(' ')
    stopwords.append('')
    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered",
                               stopWords=stopwords)
    # Create word2vec
    word2vec = Word2Vec(inputCol="filtered", outputCol="features",
                        vectorSize=300, minCount=5)
    # Create pipeline
    stages = [tokenizer, remover, word2vec]
    pipeline = Pipeline(stages=stages)
    
    # Transform data
    piped_data = pipeline.fit(data).transform(model_data)
    
    # Split data
    training, test = piped_data.randomSplit([0.6, 0.4])
    
    return training, test


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create a function that trains a variety of multi-layer perceptrons with different number of layers and returns the best performing model.

In [6]:
def train_model(training_data):
    # Create a LogisticRegression Estimator
    mlp = MultilayerPerceptronClassifier()

    # Create an evaluator 
    evaluator = evals.MulticlassClassificationEvaluator(metricName='accuracy')
    
    # Create the parameter grid
    grid = tune.ParamGridBuilder()

    # Add the hyperparameter
    grid = grid.addGrid(mlp.layers, [[300, 100, 2], [300, 50, 2]])

    # Build the grid
    grid = grid.build()
    
    # Create the CrossValidator
    cv = tune.CrossValidator(estimator=mlp,
                             estimatorParamMaps=grid,
                             evaluator=evaluator)
    # Fit cross validation models
    models = cv.fit(training_data)

    # Extract the best model
    best_mlp = models.bestModel
    
    return  best_mlp, evaluator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We then train models for each one of the four complaint categories of interest and store the results in a dataframe. We then save the results of this dataframe to an s3 bucket.

In [7]:
results = pd.DataFrame(columns = ['category', 'layers', 'model_test_accuraccy'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
train, test =  create_piped_data('tasers_baton_aggressive_physical_touch_gun', data)
best_model, evaluator = train_model(train)
test_results = best_model.transform(test)
results = results.append({
    'category': 'tasers_baton_aggressive_physical_touch_gun',
    'layers': best_model.layers,
    'model_test_accuraccy': evaluator.evaluate(test_results)        
}, ignore_index=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
best_model.layers


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[300, 50, 2]

In [10]:
train, test =  create_piped_data('trespass_robbery', data)
best_model, evaluator = train_model(train)
test_results = best_model.transform(test)
results = results.append({
    'category': 'trespass_robbery',
    'layers': best_model.layers,
    'model_test_accuraccy': evaluator.evaluate(test_results)        
}, ignore_index=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
train, test =  create_piped_data('racial_slurs_xenophobic_remarks_', data)
best_model, evaluator = train_model(train)
test_results = best_model.transform(test)
results = results.append({
    'category': 'racial_slurs_xenophobic_remarks_',
    'layers': best_model.layers,
    'model_test_accuraccy': evaluator.evaluate(test_results)        
}, ignore_index=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
train, test =  create_piped_data('planting_drug_guns', data)
best_model, evaluator = train_model(train)
test_results = best_model.transform(test)
results = results.append({
    'category': 'planting_drug_guns',
    'layers': best_model.layers,
    'model_test_accuraccy': evaluator.evaluate(test_results)        
}, ignore_index=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
results.to_csv('s3n://cpdp-complaints/mlp_results.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…