## Random Forest classifier

Install pyspark library

In [None]:
!pip install --upgrade pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=eef230613ef96a9ad6556c765a990b9fbd954f700ef25231cf9227b456e49e98
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
!pip install pyspark



In [None]:
!pip install scikit-learn



A Spark session is encapsulated in an instance of org.apache.spark.sql.SparkSession. The session object has information about the Spark Master, the Spark application, and the configuration options.



In [None]:
# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


RDD is a parallelised data structure that gets the workload distributed across the worker nodes. They are the basic units of Spark programming. To work with RDDs, we need to create a SparkContext first.
A SparkContext is the entry gate for Spark environment. For every Sparkapp you need to create the SparkContext object. It allows your Spark Application to access Spark Cluster with the help of Resource Manager. Now, we need to create SparkContext:


In [None]:
sc=spark.sparkContext

Next, we will import the dataset using read.csv function:

In [None]:
df = spark.read.csv('/content/cleaned_data.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- State: string (nullable = true)
 |-- Agency_type: string (nullable = true)
 |-- Solved: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Report_status: string (nullable = true)
 |-- Crime_type: string (nullable = true)
 |-- Crime_status: string (nullable = true)
 |-- Victim_age: integer (nullable = true)
 |-- Victim_sex: string (nullable = true)
 |-- Victim_race: string (nullable = true)
 |-- Offender_age: integer (nullable = true)
 |-- Offender_sex: string (nullable = true)
 |-- Offender_race: string (nullable = true)
 |-- Weapon: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Crime_cause: string (nullable = true)
 |-- Victim_prior_offense_status: string (nullable = true)
 |-- add_victim_count: integer (nullable = true)
 |-- add_offender_count: integer (nullable = true)
 |-- County: string (nullable = true)
 |-- Offender_demo: string (nullable = true)



## PySpark and Machine Learning

For the Machine Learning part, we use Pipeline, which is an optimization method, to chain multiple Transformers and Estimators together to specify our machine learning workflow that Spark uses to improve the performance of computations.
Using Random Forest Method in PySpark



Random forest method

In [None]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import roc_curve, auc
import numpy as np

from pyspark.ml import Pipeline

# Initializing SparkSession
spark = SparkSession.builder \
    .appName("Random Forest Classifier") \
    .getOrCreate()


# Convertin string columns to numerical using StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in df.columns if column != "Offender_demo"]

# StringIndexer for the target variable
target_indexer = StringIndexer(inputCol="Offender_demo", outputCol="Offender_demo_index", handleInvalid="keep")

# Assembling feature vector
assembler = VectorAssembler(inputCols=[column+"_index" for column in df.columns if column != "Offender_demo"], outputCol="features")

(trainingData, testData) = df.randomSplit([0.7, 0.3])

# Defining Random Forest Classifier with increased maxBins
rf = RandomForestClassifier(featuresCol="features", labelCol="Offender_demo_index", maxBins=2000)

# Creating Pipeline
pipeline = Pipeline(stages=indexers + [target_indexer,assembler, rf])

# Fiting the pipeline to the data
model = pipeline.fit(trainingData)

# Make predictions
predictions = model.transform(testData)

# Show predictions
predictions.select("Offender_demo", "prediction").show()


# Selecting (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="Offender_demo_index", predictionCol="prediction", metricName="accuracy")


# Evaluating the model
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = {:.2f}%".format(accuracy * 100))

rfModel = model.stages[2]
print(rfModel)  # summary only

+-------------+----------+
|Offender_demo|prediction|
+-------------+----------+
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55F|       2.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
|         <18M|       0.0|
|       19-55M|       0.0|
|       19-55M|       0.0|
+-------------+----------+
only showing top 20 rows

Test Accuracy = 89.36%
StringIndexerModel: uid=StringIndexer_91cb12ce02f0, handleInvalid=keep
