In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods_adv').getOrCreate()

In [2]:
data = spark.read.csv('crime.csv',inferSchema=True,header=True)

In [3]:
# Print data schema.
data.printSchema()

# Print data columns.
data.columns

root
 |-- INCIDENT_NUMBER: string (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)



['INCIDENT_NUMBER',
 'OFFENSE_CODE',
 'OFFENSE_CODE_GROUP',
 'OFFENSE_DESCRIPTION',
 'DISTRICT',
 'REPORTING_AREA',
 'YEAR',
 'MONTH',
 'DAY_OF_WEEK',
 'HOUR',
 'UCR_PART',
 'STREET']

In [4]:
my_cols = data.select(['OFFENSE_CODE_GROUP' , 'DISTRICT','MONTH', 'DAY_OF_WEEK','HOUR'])

In [5]:
my_final_data = my_cols.na.drop()

In [6]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

Data Transformation

In [7]:
OFFENSE_indexer = StringIndexer(inputCol='OFFENSE_CODE_GROUP',outputCol='OFFENSEIndex')
OFFENSE_encoder = OneHotEncoder(inputCol='OFFENSEIndex',outputCol='OFFENSEVec')

In [8]:
WEEK_indexer = StringIndexer(inputCol='DAY_OF_WEEK',outputCol='WEEKIndex')
WEEK_encoder = OneHotEncoder(inputCol='WEEKIndex',outputCol='WEEKVec')

In [9]:
DIS_indexer = StringIndexer(inputCol='DISTRICT',outputCol='DISIndex')
DIS_encoder = OneHotEncoder(inputCol='DISIndex',outputCol='DISVec')

In [10]:
assembler = VectorAssembler(inputCols=['OFFENSEVec','WEEKVec','DISVec','HOUR','MONTH'],outputCol='features')

In [11]:
from pyspark.ml.classification import LogisticRegression

In [12]:
from pyspark.ml import Pipeline

In [13]:
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='HOUR')

In [14]:
pipeline = Pipeline(stages=[OFFENSE_indexer,WEEK_indexer,DIS_indexer,OFFENSE_encoder,WEEK_encoder,DIS_encoder,
                           assembler,log_reg_titanic])

In [15]:
train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,0.3])

In [16]:
fit_model = pipeline.fit(train_titanic_data)

In [17]:
results = fit_model.transform(test_titanic_data)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='HOUR')

In [20]:
results.select('HOUR','prediction').show()

+----+----------+
|HOUR|prediction|
+----+----------+
|   9|       9.0|
|  14|      14.0|
|  21|      21.0|
|   2|       2.0|
|  21|      22.0|
|   0|       0.0|
|   3|       3.0|
|  21|      21.0|
|   1|       1.0|
|   2|       2.0|
|  21|      21.0|
|  12|      12.0|
|  17|      18.0|
|  20|      21.0|
|  23|      23.0|
|   3|       3.0|
|  15|      15.0|
|  20|      20.0|
|  21|      21.0|
|   1|       1.0|
+----+----------+
only showing top 20 rows



In [21]:
AUC = my_eval.evaluate(results)

AUC

0.9996268492344039