In [1]:
# if required
import findspark
findspark.init()
findspark.find()
from pyspark import SparkConf

# all imports
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
# set context
conf = SparkConf().setAppName('Crime Data').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# load data
crime_df = spark.read.csv('Final_Crime_Data.csv', inferSchema=True, header=True)
print("no. of rows:", crime_df.count())
print("Final Crime Dataset")
crime_df.show(5)

no. of rows: 29362
Final Crime Dataset
+--------+--------------------+------------+-----------+--------------+------+-------------------+------------+--------+----+----+----+
|RecordID|             Offense|  IncidentID|BlockNumber|    StreetName|Agency|       DateReported|HourReported|Holidays|TMAX|TMIN|SNOW|
+--------+--------------------+------------+-----------+--------------+------+-------------------+------------+--------+----+----+----+
|       1|      Assault Simple|201900031384|      100.0| ELLIEWOOD AVE|   CPD|2019-10-26 22:47:59|        2247|       0|69.0|  51| 0.0|
|       2|Drug/Narcotics Vi...|201900031350|      400.0|   E MARKET ST|   CPD|2019-10-26 16:55:25|        1655|       0|69.0|  51| 0.0|
|       3| Larceny - All Other|201900031344|      600.0|  MONTROSE AVE|   CPD|2019-10-26 16:22:06|        1622|       0|69.0|  51| 0.0|
|       4| Motor Vehicle Theft|201900031326|      200.0|    14TH ST NW|   CPD|2019-10-26 12:43:43|        1243|       0|69.0|  51| 0.0|
|       5

In [3]:
# Grouping labels of same category

final_split = split(crime_df['Offense'], ' - ')
final1 = crime_df.withColumn('Offense1', final_split.getItem(0))
final_split2 = split(final1['Offense1'], '-')
final2 = final1.withColumn('OffenseType', final_split2.getItem(0))
offense = final2.groupBy('OffenseType').count().orderBy('count',ascending=False)
n = offense.count()
print("No. of labels after grouping:", n)

No. of labels after grouping: 92


In [4]:
## Prediction using fillna for handling missing values ##

In [5]:
val = final2.approxQuantile(col='BlockNumber', probabilities=[0.5], relativeError=0)
crime_fill = final2.na.fill(val[0])

# picking the top 4 labels for prediction
offense2 = crime_fill.groupBy('OffenseType').count().orderBy('count',ascending=True)
offense2.createOrReplaceTempView("OFF_TABLE")
crime_fill.createOrReplaceTempView("CRIME_TABLE")
crime_fill_df = spark.sql("SELECT * FROM CRIME_TABLE WHERE OFFENSETYPE NOT IN (SELECT OFFENSETYPE FROM OFF_TABLE WHERE COUNT<2000)")

In [7]:
offense3 = crime_fill_df.groupBy('OffenseType').count().orderBy('count',ascending=True)
print("labels used for prediciton")
offense3.show()

labels used for prediciton
+--------------+-----+
|   OffenseType|count|
+--------------+-----+
| Towed Vehicle| 2116|
|   Hit and Run| 2277|
|Assault Simple| 2526|
|       Larceny| 4585|
+--------------+-----+



In [8]:
# adding features DayOfWeek, DayOfMonth, MonthOfYear and Hour
crime_fill_df = crime_fill_df.withColumn('DayOfWeek', dayofweek(crime_fill_df['DateReported']))\
.withColumn('DayOfMonth', dayofmonth(crime_fill_df['DateReported']))\
.withColumn('MonthOfYear', month(crime_fill_df['DateReported']))\
.withColumn('Hour', hour(crime_fill_df['DateReported']))
print("Dataset sample")
crime_fill_df.show(5)
print("Dataset Schema")
crime_fill_df.printSchema()

Dataset sample
+--------+--------------------+------------+-----------+--------------+------+-------------------+------------+--------+----+----+----+--------------+--------------+---------+----------+-----------+----+
|RecordID|             Offense|  IncidentID|BlockNumber|    StreetName|Agency|       DateReported|HourReported|Holidays|TMAX|TMIN|SNOW|      Offense1|   OffenseType|DayOfWeek|DayOfMonth|MonthOfYear|Hour|
+--------+--------------------+------------+-----------+--------------+------+-------------------+------------+--------+----+----+----+--------------+--------------+---------+----------+-----------+----+
|       1|      Assault Simple|201900031384|      100.0| ELLIEWOOD AVE|   CPD|2019-10-26 22:47:59|        2247|       0|69.0|  51| 0.0|Assault Simple|Assault Simple|        7|        26|         10|  22|
|       3| Larceny - All Other|201900031344|      600.0|  MONTROSE AVE|   CPD|2019-10-26 16:22:06|        1622|       0|69.0|  51| 0.0|       Larceny|       Larceny|    

In [9]:
selected_columns = ['BlockNumber', 'StreetName', 'DayOfWeek', 'DayOfMonth', 'MonthOfYear', 'Hour', 'OffenseType', 'Holidays', 'SNOW']
X_df = crime_fill_df.select(selected_columns)
from pyspark.sql.types import IntegerType
X_df = X_df.withColumn("BlockNumber", X_df["BlockNumber"].cast(IntegerType()))
X_df.show(5)
X_df.printSchema()

selected_columns = ['BlockNumber', 'StreetName', 'DayOfWeek', 'DayOfMonth', 'MonthOfYear', 'Hour', 'Holidays', 'SNOW']
X_df2 = crime_fill_df.select(selected_columns)
from pyspark.sql.types import IntegerType
X_df2 = X_df2.withColumn("BlockNumber", X_df2["BlockNumber"].cast(IntegerType()))
X_df2.show(5)
X_df2.printSchema()

+-----------+--------------+---------+----------+-----------+----+--------------+--------+----+
|BlockNumber|    StreetName|DayOfWeek|DayOfMonth|MonthOfYear|Hour|   OffenseType|Holidays|SNOW|
+-----------+--------------+---------+----------+-----------+----+--------------+--------+----+
|        100| ELLIEWOOD AVE|        7|        26|         10|  22|Assault Simple|       0| 0.0|
|        600|  MONTROSE AVE|        7|        26|         10|  16|       Larceny|       0| 0.0|
|        100|UNIVERSITY MNR|        7|        26|         10|  11|       Larceny|       0| 0.0|
|        900|    EMMET ST N|        7|        26|         10|  10|       Larceny|       0| 0.0|
|       1500|UNIVERSITY AVE|        6|        25|         10|  23|       Larceny|       0| 0.0|
+-----------+--------------+---------+----------+-----------+----+--------------+--------+----+
only showing top 5 rows

root
 |-- BlockNumber: integer (nullable = true)
 |-- StreetName: string (nullable = true)
 |-- DayOfWeek: inte

In [10]:
# One hot encoding using pandas get_dummies
import pandas as pd
crime_pd = X_df.toPandas()
crime_pd.head(5)
crime_pd2 = pd.get_dummies(crime_pd, prefix = ['BlockNumber','StreetName','DayOfWeek', 'DayOfMonth', 'MonthOfYear', 'Hour'], columns = ['BlockNumber','StreetName','DayOfWeek', 'DayOfMonth','MonthOfYear','Hour'])
crime_pd2.head(5)

Unnamed: 0,OffenseType,Holidays,SNOW,BlockNumber_100,BlockNumber_200,BlockNumber_300,BlockNumber_400,BlockNumber_500,BlockNumber_600,BlockNumber_700,...,Hour_14,Hour_15,Hour_16,Hour_17,Hour_18,Hour_19,Hour_20,Hour_21,Hour_22,Hour_23
0,Assault Simple,0,0.0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,0
1,Larceny,0,0.0,0,0,0,0,0,1,0,...,0,0,1,0,0,0,0,0,0,0
2,Larceny,0,0.0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,Larceny,0,0.0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,Larceny,0,0.0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1


In [11]:
# Converting the table to pyspark dataframe
X_DF = spark.createDataFrame(crime_pd2)

In [12]:
# Final dataframe datatypes
X_DF.dtypes

[('OffenseType', 'string'),
 ('Holidays', 'bigint'),
 ('SNOW', 'double'),
 ('BlockNumber_100', 'bigint'),
 ('BlockNumber_200', 'bigint'),
 ('BlockNumber_300', 'bigint'),
 ('BlockNumber_400', 'bigint'),
 ('BlockNumber_500', 'bigint'),
 ('BlockNumber_600', 'bigint'),
 ('BlockNumber_700', 'bigint'),
 ('BlockNumber_800', 'bigint'),
 ('BlockNumber_900', 'bigint'),
 ('BlockNumber_1000', 'bigint'),
 ('BlockNumber_1100', 'bigint'),
 ('BlockNumber_1200', 'bigint'),
 ('BlockNumber_1300', 'bigint'),
 ('BlockNumber_1400', 'bigint'),
 ('BlockNumber_1500', 'bigint'),
 ('BlockNumber_1600', 'bigint'),
 ('BlockNumber_1700', 'bigint'),
 ('BlockNumber_1800', 'bigint'),
 ('BlockNumber_1900', 'bigint'),
 ('BlockNumber_2000', 'bigint'),
 ('BlockNumber_2100', 'bigint'),
 ('BlockNumber_2200', 'bigint'),
 ('BlockNumber_2300', 'bigint'),
 ('BlockNumber_2400', 'bigint'),
 ('BlockNumber_2500', 'bigint'),
 ('BlockNumber_2600', 'bigint'),
 ('BlockNumber_2700', 'bigint'),
 ('BlockNumber_2800', 'bigint'),
 ('BlockNum

In [13]:
# storing categorical and numerical columns
categorical_cols=[]
numerical_cols=[]
for typ in X_DF.dtypes:
    if(typ[1]=="string"):
        categorical_cols.append(typ[0])
    else:
        numerical_cols.append(typ[0])
print(categorical_cols)
print(numerical_cols)

['OffenseType']
['Holidays', 'SNOW', 'BlockNumber_100', 'BlockNumber_200', 'BlockNumber_300', 'BlockNumber_400', 'BlockNumber_500', 'BlockNumber_600', 'BlockNumber_700', 'BlockNumber_800', 'BlockNumber_900', 'BlockNumber_1000', 'BlockNumber_1100', 'BlockNumber_1200', 'BlockNumber_1300', 'BlockNumber_1400', 'BlockNumber_1500', 'BlockNumber_1600', 'BlockNumber_1700', 'BlockNumber_1800', 'BlockNumber_1900', 'BlockNumber_2000', 'BlockNumber_2100', 'BlockNumber_2200', 'BlockNumber_2300', 'BlockNumber_2400', 'BlockNumber_2500', 'BlockNumber_2600', 'BlockNumber_2700', 'BlockNumber_2800', 'BlockNumber_3000', 'BlockNumber_3100', 'BlockNumber_3400', 'BlockNumber_4000', 'BlockNumber_4400', 'BlockNumber_5300', 'BlockNumber_7200', 'BlockNumber_9100', 'StreetName_10 1/2 ST NW', 'StreetName_10TH ST', 'StreetName_10TH ST NE', 'StreetName_10TH ST NE / E HIGH ST', 'StreetName_10TH ST NW', 'StreetName_10TH ST NW / ANDERSON ST', 'StreetName_10TH ST NW / RUN ST', 'StreetName_10TH ST NW / WERTLAND ST', 'Str

In [14]:
# combining features
cols=numerical_cols
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
fin=assembler.transform(X_DF)
fin.select("features").show(truncate=False)
fin.show(5)

+--------------------------------------------------------+
|features                                                |
+--------------------------------------------------------+
|(816,[2,294,748,774,789,814],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[7,504,748,774,789,808],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[2,677,748,774,789,803],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[10,304,748,774,789,802],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[16,672,747,773,789,815],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[26,563,747,773,789,811],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[17,348,747,773,789,809],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[4,279,747,773,789,805],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[8,573,747,773,789,802],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[14,214,746,772,789,814],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[6,680,746,772,789,810],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[2,153,746,772,789,810],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[14,692,746,772,789,808],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[8,104,746,772,789,808],[1.0,1.0,1.0,1.0,1.0,1.0])

In [15]:
len(numerical_cols)

816

In [16]:
# Naive Bayes Classification

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="OffenseType", outputCol="IndOffenseType").fit(fin)

# Split the data into train and test
splits = fin.randomSplit([0.8, 0.2], 1234)
trainingData = splits[0]
testData = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(labelCol="IndOffenseType", featuresCol="features", smoothing=1.0, modelType="multinomial")

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, nb])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# select example rows to display.
predictions = model.transform(testData)
# predictions.select('OffenseType','IndOffenseType','prediction').show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="IndOffenseType", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.49803750545137376


In [17]:
# Logistic Regression Classification

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="OffenseType", outputCol="IndOffenseType").fit(fin)

# Split the data into train and test
splits = fin.randomSplit([0.8, 0.2], 1234)
trainingData = splits[0]
testData = splits[1]

lr = LogisticRegression(labelCol='IndOffenseType', featuresCol='features', maxIter=25, family='multinomial')

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, lr])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# select example rows to display.
predictions = model.transform(testData)
# predictions.select('IndOffenseType','prediction').show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="IndOffenseType", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.5141735717400785


In [18]:
# Multilayer Perceptron Classification

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="OffenseType", outputCol="IndOffenseType").fit(fin)

# Split the data into train and test
splits = fin.randomSplit([0.7, 0.3], 1234)
trainingData = splits[0]
testData = splits[1]

# specify layers for the neural network:
layers = [816, 4]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(labelCol='IndOffenseType', featuresCol='features', maxIter=10, layers=layers, blockSize=128, seed=1234)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, trainer])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# select example rows to display.
predictions = model.transform(testData)
# predictions.select('IndOffenseType','prediction').show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="IndOffenseType", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.47166521360069746


In [19]:
# Random Forest Classification

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="OffenseType", outputCol="IndOffenseType").fit(fin)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(fin) #, maxCategories=4

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = fin.randomSplit([0.8, 0.2])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="IndOffenseType", featuresCol="indexedFeatures", numTrees=15, maxDepth=15,  maxBins=32)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictionLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
# predictions.select('IndOffenseType','prediction').show()

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="IndOffenseType", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.4418394950405771


In [20]:
#combining features
cols=numerical_cols
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
fin2=assembler.transform(X_DF)
fin2.select("features").show(truncate=False)
fin2.show(5)
fin2.count()

+--------------------------------------------------------+
|features                                                |
+--------------------------------------------------------+
|(816,[2,294,748,774,789,814],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[7,504,748,774,789,808],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[2,677,748,774,789,803],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[10,304,748,774,789,802],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[16,672,747,773,789,815],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[26,563,747,773,789,811],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[17,348,747,773,789,809],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[4,279,747,773,789,805],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[8,573,747,773,789,802],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[14,214,746,772,789,814],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[6,680,746,772,789,810],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[2,153,746,772,789,810],[1.0,1.0,1.0,1.0,1.0,1.0]) |
|(816,[14,692,746,772,789,808],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(816,[8,104,746,772,789,808],[1.0,1.0,1.0,1.0,1.0,1.0])

11504

In [21]:
# Decision Tree Classification

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="OffenseType", outputCol="IndOffenseType").fit(fin2)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(fin2) #, maxCategories=4

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = fin2.randomSplit([0.8, 0.2])
# Train a GBT model.
dt = DecisionTreeClassifier(labelCol="IndOffenseType", featuresCol="indexedFeatures", maxDepth=15)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
# predictions.select('IndOffenseType','prediction').show()

# # Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="IndOffenseType", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.43588621444201314


In [22]:
# OneVsRest Classification

from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="OffenseType", outputCol="IndOffenseType").fit(fin2)

# Split the data into train and test
splits = fin2.randomSplit([0.8, 0.2], 1234)
trainingData = splits[0]
testData = splits[1]

# lr = LogisticRegression(labelCol='IndOffenseType', featuresCol='features', maxIter=25, family='multinomial')
# instantiate the base classifier.
lr = LogisticRegression(maxIter=20, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(labelCol='IndOffenseType', featuresCol='features', classifier=lr)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, ovr])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# select example rows to display.
predictions = model.transform(testData)
# predictions.select('IndOffenseType','prediction').show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="IndOffenseType", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("accuracy = " + str(accuracy))

accuracy = 0.5106846925425207


In [23]:
# Referred the spark documentation to use pyspark functions, especially the following documentation showing Classification Algorithm implementation:
"https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier"

'https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier'