In [25]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()

In [26]:
df = spark.read.csv('NYPD_Arrest_Data__Year_to_Date_.csv', header=True, inferSchema=True)
df.show()

+----------+-----------+-----+--------------------+-----+--------------------+----------+----------+-----------+---------------+-----------------+---------+--------+--------------+----------+----------+----------------+-----------------+------------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|             PD_DESC|KY_CD|           OFNS_DESC|  LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|     PERP_RACE|X_COORD_CD|Y_COORD_CD|        Latitude|        Longitude|New Georeferenced Column|
+----------+-----------+-----+--------------------+-----+--------------------+----------+----------+-----------+---------------+-----------------+---------+--------+--------------+----------+----------+----------------+-----------------+------------------------+
| 261249590| 01/02/2023|  339|LARCENY,PETIT FRO...|  341|       PETIT LARCENY|PL 1552500|         M|          M|              6|                0|    25-44|       M|         BLACK|    982745|    206647|       40

In [27]:
df.printSchema()

root
 |-- ARREST_KEY: integer (nullable = true)
 |-- ARREST_DATE: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- LAW_CODE: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- ARREST_BORO: string (nullable = true)
 |-- ARREST_PRECINCT: integer (nullable = true)
 |-- JURISDICTION_CODE: integer (nullable = true)
 |-- AGE_GROUP: string (nullable = true)
 |-- PERP_SEX: string (nullable = true)
 |-- PERP_RACE: string (nullable = true)
 |-- X_COORD_CD: integer (nullable = true)
 |-- Y_COORD_CD: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- New Georeferenced Column: string (nullable = true)



In [28]:
stats = df.describe()
stats.show()

+-------+--------------------+-----------+------------------+--------------------+------------------+--------------------+----------+----------+-----------+------------------+------------------+---------+--------+--------------------+------------------+------------------+-------------------+-------------------+------------------------+
|summary|          ARREST_KEY|ARREST_DATE|             PD_CD|             PD_DESC|             KY_CD|           OFNS_DESC|  LAW_CODE|LAW_CAT_CD|ARREST_BORO|   ARREST_PRECINCT| JURISDICTION_CODE|AGE_GROUP|PERP_SEX|           PERP_RACE|        X_COORD_CD|        Y_COORD_CD|           Latitude|          Longitude|New Georeferenced Column|
+-------+--------------------+-----------+------------------+--------------------+------------------+--------------------+----------+----------+-----------+------------------+------------------+---------+--------+--------------------+------------------+------------------+-------------------+-------------------+------------

In [29]:
num_rows = df.count()
num_cols = len(df.columns)

print("Number of Rows: {}".format(num_rows))
print("Number of Columns: {}".format(num_cols))

Number of Rows: 112571
Number of Columns: 19


In [30]:
from pyspark.sql.functions import col, sum

null_counts = df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()


+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|PD_DESC|KY_CD|OFNS_DESC|LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|PERP_RACE|X_COORD_CD|Y_COORD_CD|Latitude|Longitude|New Georeferenced Column|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|         0|          0|  461|      0|  466|        0|       0|       846|          0|              0|                0|        0|       0|        0|         0|         0|       0|        0|                       0|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+------

In [31]:
df = df.dropna()
null_counts_cleaned = df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])

null_counts_cleaned.show()

+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|PD_DESC|KY_CD|OFNS_DESC|LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|PERP_RACE|X_COORD_CD|Y_COORD_CD|Latitude|Longitude|New Georeferenced Column|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|         0|          0|    0|      0|    0|        0|       0|         0|          0|              0|                0|        0|       0|        0|         0|         0|       0|        0|                       0|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+------

In [32]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

boro_indexer = StringIndexer(inputCol='ARREST_BORO', outputCol='ARREST_BORO_Index')
age_group_indexer = StringIndexer(inputCol='AGE_GROUP', outputCol='AGE_GROUP_Index')
sex_indexer = StringIndexer(inputCol='PERP_SEX', outputCol='PERP_SEX_Index')

assembler = VectorAssembler(inputCols=['ARREST_BORO_Index', 'AGE_GROUP_Index', 'ARREST_PRECINCT'], outputCol='features')

df = boro_indexer.fit(df).transform(df)
df = age_group_indexer.fit(df).transform(df)
df = sex_indexer.fit(df).transform(df)
df = assembler.transform(df)

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=42)

rf = RandomForestClassifier(labelCol='PERP_SEX_Index', featuresCol='features', numTrees=100)
model = rf.fit(trainingData)

predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol='PERP_SEX_Index', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f'Accuracy: {accuracy}')

Accuracy: 0.804122468659595


In [34]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=42)

lr = LinearRegression(labelCol='PERP_SEX_Index', featuresCol='features')

lr_model = lr.fit(trainingData)

predictions = lr_model.transform(testData)

evaluator = RegressionEvaluator(labelCol='PERP_SEX_Index', predictionCol='prediction', metricName='rmse')

rmse = evaluator.evaluate(predictions)
print(f'Root Mean Square Error: {rmse}')

coefficients = lr_model.coefficients
intercept = lr_model.intercept

print("Coefficients: " + str(coefficients))
print("Intercept: " + str(intercept))


Root Mean Square Error: 0.4795570089203282
Coefficients: [0.0022752942107723776,0.0016682245442554914,-1.152603446407698e-05]
Intercept: 0.2185883353705178
