In [1]:
recent_complaints = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/data_officerallegationofficerallegation.csv')
recent_complaints.createOrReplaceTempView("data_recentcomplaint")
recent_complaints.cache()

In [2]:
display(recent_complaints)
# Removing some portion of officers with recent complaints
rc = recent_complaints.where((recent_complaints.has_recent_complaint == 0) | (recent_complaints.officer_id > 10000))
rc.where(rc.has_recent_complaint == 1).count()

officer_id,gender,race,rank,age,has_recent_complaint
287,M,Hispanic,Police Officer,43,1
2083,M,White,Police Officer,46,1
2114,M,Black,Police Officer,42,1
2147,M,Black,Police Officer,49,1
2781,F,Hispanic,Detective,43,1
5096,M,White,Police Officer,47,1
6404,F,White,Police Officer,38,0
8691,M,White,Police Officer,40,1
9295,M,White,Sergeant,48,1
14230,M,White,Police Officer,45,1


In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

# Index all string columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(recent_complaints) for column in ['gender', 'race', 'rank']]
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(recent_complaints).transform(rc)
display(indexed_df)

officer_id,gender,race,rank,age,has_recent_complaint,gender_index,race_index,rank_index
6404,F,White,Police Officer,38,0,1.0,0.0,0.0
14230,M,White,Police Officer,45,1,0.0,0.0,0.0
14667,M,White,Police Officer,47,1,0.0,0.0,0.0
15539,M,White,Police Officer,33,1,0.0,0.0,0.0
15574,M,Asian/Pacific,Police Officer,32,1,0.0,3.0,0.0
16251,M,Hispanic,Police Officer,52,1,0.0,2.0,0.0
16952,M,White,Sergeant,43,1,0.0,0.0,1.0
20056,M,White,Sergeant,54,1,0.0,0.0,1.0
20477,M,Black,Field Training Officer,54,1,0.0,1.0,4.0
20561,F,Black,Detective,45,0,1.0,1.0,2.0


In [4]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

# Create feature vector column
feature_cols = ['gender_index', 'race_index', 'rank_index', 'age']
assembler = VectorAssembler(
  inputCols=feature_cols,
  outputCol='features')

vectorized_df = assembler.transform(indexed_df)
display(vectorized_df)

officer_id,gender,race,rank,age,has_recent_complaint,gender_index,race_index,rank_index,features
6404,F,White,Police Officer,38,0,1.0,0.0,0.0,"List(1, 4, List(), List(1.0, 0.0, 0.0, 38.0))"
14230,M,White,Police Officer,45,1,0.0,0.0,0.0,"List(0, 4, List(3), List(45.0))"
14667,M,White,Police Officer,47,1,0.0,0.0,0.0,"List(0, 4, List(3), List(47.0))"
15539,M,White,Police Officer,33,1,0.0,0.0,0.0,"List(0, 4, List(3), List(33.0))"
15574,M,Asian/Pacific,Police Officer,32,1,0.0,3.0,0.0,"List(1, 4, List(), List(0.0, 3.0, 0.0, 32.0))"
16251,M,Hispanic,Police Officer,52,1,0.0,2.0,0.0,"List(1, 4, List(), List(0.0, 2.0, 0.0, 52.0))"
16952,M,White,Sergeant,43,1,0.0,0.0,1.0,"List(1, 4, List(), List(0.0, 0.0, 1.0, 43.0))"
20056,M,White,Sergeant,54,1,0.0,0.0,1.0,"List(1, 4, List(), List(0.0, 0.0, 1.0, 54.0))"
20477,M,Black,Field Training Officer,54,1,0.0,1.0,4.0,"List(1, 4, List(), List(0.0, 1.0, 4.0, 54.0))"
20561,F,Black,Detective,45,0,1.0,1.0,2.0,"List(1, 4, List(), List(1.0, 1.0, 2.0, 45.0))"


In [5]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

(trainingData, testData) = vectorized_df.randomSplit([0.7, 0.3])

dt = DecisionTreeClassifier(labelCol="has_recent_complaint", featuresCol="features")
model = dt.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="has_recent_complaint", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % accuracy)

tree_model = model.toDebugString
for i, feat in enumerate(feature_cols):
    tree_model = tree_model.replace('feature ' + str(i), feat)
print(tree_model)

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

# Create feature vector column
feature_cols = ['gender_index', 'race_index', 'rank_index']
assembler = VectorAssembler(
  inputCols=feature_cols,
  outputCol='features_without_age')

vectorized_df = assembler.transform(indexed_df)
dt = DecisionTreeClassifier(labelCol="has_recent_complaint", featuresCol="features_without_age")
(trainingData, testData) = vectorized_df.randomSplit([0.7, 0.3])
model = dt.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="has_recent_complaint", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % accuracy)

tree_model = model.toDebugString
for i, feat in enumerate(feature_cols):
    tree_model = tree_model.replace('feature ' + str(i), feat)
print(tree_model)