In [1]:
from pyspark.sql import *
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import unix_timestamp, from_unixtime, date_format, col, when
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, Bucketizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [2]:
allegations = spark.sql("select distinct(ID) as Allegation, add2 as Address, incident_date as Date, is_officer_complaint as IsOfficerComplaint from allegations where incident_date >= '2001-01-01'")

categories = spark.sql("Select * from a_category")
connection = spark.sql("select allegation_id as Allegation, allegation_category_id as id from officer_allegation_csv")

allegations = allegations.join(connection, "Allegation").join(categories, "id").drop("id")
encoded = spark.sql("select * from encoded_allegations").drop("Address").drop("Date")
df = allegations.join(encoded, "Allegation", how="left").drop_duplicates(["Allegation"]).na.fill(0).drop("Allegation","subcategory")
sal = spark.sql("select officer_id as Officer, salary as Salary from salary_csv")
oa = spark.sql("select allegation_id as Allegation, officer_id as Officer, disciplined from officer_allegation_csv").join(connection,"Allegation").join(categories,"id")
officers = spark.sql("select id as Officer, rank, complaint_percentile as ComplaintPercentile from data_officer_csv")


In [3]:
#Use Random Forest Classifier for Allegation Category
assembler = VectorAssembler(inputCols=["IsOfficerComplaint","ARSON","ASSAULT","BATTERY","BURGLARY","CRIM SEXUAL ASSAULT", "CRIMINAL DAMAGE", "CRIMINAL TRESPASS", "DECEPTIVE PRACTICE", "GAMBLING", "INTERFERENCE WITH PUBLIC OFFICER", "KIDNAPPING", "LIQUOR LAW VIOLATION", "MOTOR VEHICLE THEFT", "NARCOTICS", "OFFENSE INVOLVING CHILDREN", "OTHER OFFENSE", "PROSTITUTION", "PUBLIC PEACE VIOLATION", "ROBBERY", "SEX OFFENSE", "STALKING", "THEFT", "WEAPONS VIOLATION"],outputCol="features")
output = assembler.transform(dt)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(output)
(trainingData, testData) = output.randomSplit([0.7, 0.3])
labelIndexer = StringIndexer(inputCol="category", outputCol="indexedCategory").fit(output)
rf = RandomForestClassifier(labelCol="indexedCategory", featuresCol="indexedFeatures", numTrees=10)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="indexedCategory", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [4]:
splits = [-float("inf"),50000,52500,55000,57500,60000,62500,65000,67500,70000,72500,75000,77500,80000,82500,85000,87500,90000,95000,100000,110000,120000,130000,140000,150000, 160000,170000,180000,float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="Salary", outputCol="SalaryBin")
bucketSalary = bucketizer.transform(sal)
dk = oa.join(bucketSalary, "Officer").join(officers, "Officer").drop_duplicates(["Allegation"]).dropna()
categoryIndexer = StringIndexer(inputCol="category", outputCol="indexedCategory").fit(dk)
subcategoryIndexer = StringIndexer(inputCol="subcategory", outputCol="indexedSubcategory").fit(dk)
rankIndexer = StringIndexer(inputCol="rank", outputCol="indexedRank").fit(dk)
CategoricalPipeline = Pipeline(stages=[categoryIndexer, subcategoryIndexer,rankIndexer])
model = CategoricalPipeline.fit(dk)
output = model.transform(dk)


In [5]:
assembler = VectorAssembler(inputCols=["indexedCategory","disciplined","indexedRank","ComplaintPercentile"],outputCol="features")
out2 = assembler.transform(output)
data = out2.drop("Officer","id","Allegation","category","subcategory","Salary","rank")
(trainingData, testData) = data.randomSplit([0.7, 0.3])
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=30).fit(data)
rf = RandomForestClassifier(labelCol="SalaryBin", featuresCol="features",numTrees=10)
pipe = Pipeline(stages=[featureIndexer,rf])
salary_model = pipe.fit(trainingData)
predictions = salary_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="SalaryBin", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

