In [1]:
#INPUT: train.csv
#Output: 
"""

1. Clean up wrong X and Y values (very few of them)

2. visualize data.

2. Parse input to get features: for e.g: get date, time, year, month, etc..)

3. Select, and generate features.

3. Remove outliers.

4. do PCA

Output: train dataframe with features and labels column

        test dataframe with features and lables column

        visuals to provide insights on data that help select, and tune the models.       

 a toolbox list to choose from:

         Typical graphical techniques used in EDA are


Box plot

Histogram

Multi-vari chart

Run chart

Pareto chart

Scatter plot

Stem-and-leaf plot

Parallel coordinates

Odds ratio

Targeted projection pursuit

Glyph-based visualization methods such as PhenoPlot[8] and Chernoff faces

Projection methods such as grand tour, guided tour and manual tour

Interactive versions of these plots

        Dimensionality reduction:

Multidimensional scaling

Principal component analysis (PCA)

Multilinear PCA

Nonlinear dimensionality reduction (NLDR)

        Typical quantitative techniques are:
Median polish

Trimean

Ordination

History

        
"""
"""
Dates - timestamp of the crime incident
Category - category of the crime incident (only in train.csv). This is the target variable you are going to predict.
Descript - detailed description of the crime incident (only in train.csv)
DayOfWeek - the day of the week
PdDistrict - name of the Police Department District
Resolution - how the crime incident was resolved (only in train.csv)
Address - the approximate street address of the crime incident 
X - Longitude
Y - Latitude
"""

In [2]:
filename="/FileStore/tables/train.csv"
data=spark.read.csv(filename, header=True, inferSchema=True)
print(data.count())
print(len(data.columns))
data.printSchema() #the data was inferred properly. Class is an int. Features are double.

In [3]:
data = data.drop('Descript').drop('Resolution') # we drop both as they are not available in test data. 

In [4]:
#1. Data Wrangling to audit the quality of the data and perform all the necessary actions to clean the dataset.
#1- check how many categorical and numerical features we have
cat_cols = [item[0] for item in data.dtypes if (item[1]=='string') & (item[0]!='Category')]
print(str(len(cat_cols)) + '  categorical features')
num_var = [i[0] for i in data.dtypes if ((i[1]=='int') | (i[1]=='double')) ]
print(str(len(num_var)) + '  numerical features')
#Last feature is timestamp

In [5]:
#class count (labels)
print('Number of labels is', data.groupBy('Category').count().count())

In [6]:
labels=data[['Category']].distinct().toPandas().values.tolist()
labels #we shall use it later.


In [7]:
#check for nulls
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when( col(c).isNull(), c)).alias(c) for c in data.columns]).show()
#conclusion :From above it seems the data is clean with no missing values

In [8]:
#check for data imbalance (classes imbalance)
#report distinct classes and their prior probabilities as in the dataset.

classList = data.select('Category').groupBy('Category').agg((100*count('Category')/data.count()).alias('prior'))
print(classList.toPandas())

#consider removing 'Other offences', note that it accounts for 14%

In [9]:
#explore categorical features
#check # of unique number of categories for categorical features.
if 'Category' not in cat_cols:
  cat_cols.append('Category') #add category to cat_cols to view unique values of labels and for the remaining steps
countUniqueValues = [data.select(c).distinct().count() for c in cat_cols]
print(cat_cols)
print(countUniqueValues)

In [10]:
#identify the most frequent items in the categorical features.
from pyspark.sql.functions import desc
N=10 #the N number of most frequent points.
frequentCategories = [ data.groupBy(c).agg((100*count(c)/data.count()).alias('Percentage')).sort(desc('Percentage')).limit(N) for c in cat_cols]
for i in range(len(cat_cols)):
  print(frequentCategories[i].toPandas())

In [11]:
#check correlations of features with label column (crime category)
N=10 #the N number of most frequent pairs of features and class.
if 'Category' in cat_cols:
  cat_cols.remove('Category') #remove Category from cat_cols to validate syntax in the loop
frequentCategories = [data.groupBy(c, 'Category').agg((100*count(c)/data.count()).alias('PairFrequencyPercentage')).sort(desc('PairFrequencyPercentage')).limit(N) for c in cat_cols]
for i in range(len(cat_cols)):
  print(frequentCategories[i].toPandas())

In [12]:
#Explore numerical features. 

In [13]:
countUniqueValuesN = [data.select(c).distinct().count() for c in num_var]
print(num_var)
print(countUniqueValuesN)
#X, Y repeat.. possibly same locations witness several crimes over and over!

In [14]:
#to confirm that X repeats when Y repeat
NcrimeLocations = data.groupBy('X', 'Y').agg(count('X')).count()
print(NcrimeLocations) #shows only 34243 unique pairs

In [15]:
#we will need to visualize the data
import matplotlib.pyplot as plt
import pandas as pd
N1=data.count() if data.count()<=1000000 else 1000000

In [16]:
#plot a historgram of timestamp to check if a pattern is there.
import numpy as np
data_pd_timestamp = data[['Dates']].limit(N1).toPandas()
plt.clf()
plt.hist(np.array(data_pd_timestamp['Dates']), bins = 100)
display(plt.show())

In [17]:
# check for outliers
#check in XY spatial data to spot outliers. 
#visualize data to spot outliers.

data_pd = data[['X', 'Y']].limit(N1).toPandas()
plt.clf()
data_pd.plot(kind="scatter", x="X", y="Y")
display(plt.show())

In [18]:
#Outliers can be seen. Dropping them.

In [19]:
outliersCount=data.count()
data=data.where('X<-122')
outliersCount-=data.count()
data_pd=data.toPandas()
plt.clf()
data_pd.plot(kind="scatter", x="X", y="Y")
display(plt.show())

In [20]:
print('Removing ', str(outliersCount), ' outliers in spatial data.') #67 cases.

In [21]:
#plot XY data for different categories. 
NX=100
NY=100

groups = [(data.where(col('Category')==crime[0]).toPandas(), crime[0])  for crime in labels]



In [22]:
print(len(crimeRegionsHistogram.T))

In [23]:
#histogram values of x and y into bins and dipslay the bins content using imshow(). the light colors indicate high number of incidences. 
NX=70 #I find 70 to give good visualization. 
NY=70
i=1
fig= plt.figure(figsize=(30, 30)) #width and height in inches.
for g in groups:
    group=g[0]
    name=g[1]
    plt.subplot(8,5,i)
    crimeRegionsHistogram, xedges, yedges = np.histogram2d(np.array(group.X),np.array(group.Y), bins=(NX,NY)) # [int, int], the number of bins in each dimension (nx, ny = bins).
    #xedges now contains the bin boundaries along x and so is yedges along y, the first and last boundary define the region (interval)
    histoExtent  = [xedges[0],xedges[-1],yedges[0],yedges[-1]]
    plt.imshow(crimeRegionsHistogram.T, extent=histoExtent, aspect='auto', origin='low') #the points are automatically scaled linearly mapping the lowest value to 0 and the highest to 1. 
    #plt.imshow(crimeRegionsHistogram.T,origin='low',extent=histoExtent,interpolation='nearest',aspect='auto') #transpose so each row list bins with common y range.
    plt.title(name)
    i+=1
display(plt.show())
# we can tell how important is location features especially for some categories of crimes where the crime category is so dependent on location.

In [24]:
#the following code only plots the scatter plot of XY for different categories. You can tell that some crimes take place only in a short list of places. 
i=0
j=0
fig, axs = plt.subplots(8, 5, figsize=(30, 30))
for g in groups:
    group=g[0]
    name=g[1]
    axs[i,j].scatter(group.X, group.Y)
    axs[i,j].title.set_text(name)
    if j < 4:
      j+=1
    else:
      j=0
      i+=1
    
display(plt.show())

In [25]:
#SECOND BLOCK. PARSING AND FEATURE GENERATION.

In [26]:
#Parsing the time column to generate features, year, month, day, hour, season
from pyspark.sql.functions import col, hour, minute, second, year, month, dayofmonth, date_format

def season(month):
  switcher={
    1:'winter',
    2:'winter',
    3:'spring',
    4:'spring',
    5:'spring',
    6:'summer',
    7:'summer',
    8:'summer',
    9:'autumn',
    10:'autumn',
    11:'autumn',
    12:'winter'
  }
  return switcher.get(month,"NA")

from pyspark.sql.types import StringType
season_udf_string= udf(lambda x: season(x), StringType())

data = data.withColumn("hour", hour(col("Dates"))).withColumn("minute", minute(col("Dates"))).withColumn("dayOfMonth", dayofmonth(col("Dates"))).withColumn("year", year(col("Dates"))).withColumn("month", month(col("Dates"))).withColumn("weekday", date_format(col("Dates"), "EEEE")).withColumn("season", season_udf_string(col("month"))).drop(col("Dates"))

data.printSchema()

In [27]:
#generate more features here... get either numerical or string valued features. 

In [28]:
#Generate features:
#1
#define a pipeline to cluster X, Y data and generate a new feature; the cluster in which the crime happened
#we can use the district from the database, but this clusters are more representing since they section the regions according to observed crimes, not as dictated by authorities.
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
#

#2
#Generate distance from points of interests. e.g: police station, banks, casinos, pubs, clubs, business

In [29]:
#Updata num_var list of numerical variables in the dataframe to include the newly generated features.
num_var = [i[0] for i in data.dtypes if ((i[1]=='int') | (i[1]=='double')) ]

In [30]:
#Normalization, standardization of numerical features.

#1. get mean and stddev for each of the numerical features and then scale the features to standardize all to mean of 0 and stddev of 1. 
from pyspark.sql.functions import mean, stddev
data_stats={num_var[counter]:([data.select(mean(c)).first()[0], data.select(stddev(c)).first()[0]]) for counter, c in enumerate(data[num_var])}
for i in range(len(num_var)):
  data=data.withColumn(num_var[i], (data[num_var[i]]-data_stats.get(num_var[i])[0])/data_stats.get(num_var[i])[1])
  




  

In [31]:
#The next part is for steps of the pipeline:
#1.encoding categorical features.
#2.enconding labels
#3.Vector assembler
#4.Model(s)
#5.evaluation(s)

In [32]:
#PCA:
from pyspark.ml.feature import PCA
from pyspark.ml.feature import  VectorAssembler
from pyspark.ml import Pipeline

assemblerForPCA = VectorAssembler(inputCols = num_var, outputCol = "features")
pca = PCA(k=3, inputCol="features", outputCol="PCAFeatures")

PCApipeline = Pipeline(stages =[assemblerForPCA , pca])

dataPCA = PCApipeline.fit(data).transform(data)
dataPCA.printSchema()

#unpack the PCA feature vector to different columns and drop the old features column. 
#code here.

In [33]:
encoding_var = [i[0] for i in data.dtypes if (i[1]=='string')& (i[0]!='Category')] #where Category is the label/target
#encoding_var = [i[0] for i in crime_df.dtypes if (i[1]=='string')& (i[0]!='Category') ]#where category is the label/target

print(encoding_var)

In [34]:
#apply StringIndexer() to assign indices to each category in our categorical columns.
from pyspark.ml.feature import StringIndexer
string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
string_indexes

In [35]:
#ONE Hot Encoding 
from pyspark.ml.feature import OneHotEncoderEstimator
onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
onehot_indexes

In [36]:
label_indexes = StringIndexer(inputCol = 'Category', outputCol = 'label', handleInvalid = 'keep')

In [37]:
#the below section is to specify the pipeline

In [38]:
assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")

In [39]:
from pyspark.ml.classification import  RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,
                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)

In [40]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = string_indexes + onehot_indexes + [assembler,label_indexes, rf])pipeline = Pipeline(stages = string_indexes + onehot_indexes + [assembler,label_indexes, rf])

In [41]:
# model the 

In [42]:
fit the data:

pipelineModel = pipeline.fit(data)

In [43]:
#transform:
new_df = pipelineModel.transform(data)
vhouse_df = new_df.select(['features', 'label'])
vhouse_df.show()