# Crime-Type Multiclass Classification

In [29]:
import wget
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import Row

from pyspark.sql import SparkSession

In [30]:
session=SparkSession.builder.appName("CrimeMCCML").master("local").getOrCreate()
session.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
data=session.read.csv("Crime_Data_from_2020_to_Present_20250510.csv", header=True, inferSchema=True)

In [31]:
data.show(5, truncate=False)

print("Before cleansing:", data.count())

+---------+----------------------+----------------------+--------+----+-----------+-----------+--------+------+-----------------+------------------------+--------+--------+------------+---------+----------------------+--------------+----------------------------------------------+------+-----------+--------+--------+--------+--------+---------------------------------------+-------------------------------+-------+---------+
|DR_NO    |Date Rptd             |DATE OCC              |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc      |Mocodes                 |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc           |Weapon Used Cd|Weapon Desc                                   |Status|Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION                               |Cross Street                   |LAT    |LON      |
+---------+----------------------+----------------------+--------+----+-----------+-----------+--------+------+-----------------+-------------------

In [32]:
data = data.dropna(subset=["Crm Cd Desc", "Premis Cd", "DATE OCC", "TIME OCC", "LAT", "LON", "Vict Age", "Vict Sex", "Vict Descent", "Cross Street"])

In [33]:
from pyspark.sql.functions import when

data = data.fillna({"Weapon Used Cd": "Unknown", "Weapon Desc": "Unknown", "Premis Desc": "Unknown", "Mocodes": "Unknown", "Crm Cd 1": "Unknown", "Crm Cd 2": "None", "Crm Cd 3": "None", "Crm Cd 4": "None"})

In [34]:
from pyspark.sql.functions import regexp_replace, to_date

# Remove the " 12:00:00 AM" suffix from the date string
data = data.withColumn("date_occ_clean", regexp_replace("DATE OCC", " 12:00:00 AM", ""))

# Convert to Spark DateType
data = data.withColumn("date_occ_date", to_date("date_occ_clean", "MM/dd/yyyy"))

In [35]:
from pyspark.sql.functions import month, dayofweek

# Extract month and day of week
data = data.withColumn("month", month("date_occ_date"))
data = data.withColumn("day_of_week", dayofweek("date_occ_date"))  # 1=Sunday, 7=Saturday

In [36]:
from pyspark.sql.functions import lpad, col

# Pad time string to always be 4 digits (e.g., 900 → 0900)
data = data.withColumn("time_occ_padded", lpad(col("TIME OCC").cast("string"), 4, "0"))

# Extract hour as integer
data = data.withColumn("hour", col("time_occ_padded").substr(1, 2).cast("int"))

# Flag for night time
data = data.withColumn("is_night", (col("hour") < 6) | (col("hour") >= 18))

In [37]:
print("After cleaning:", data.count())

After cleaning: 38566


In [38]:
data.columns

['DR_NO',
 'Date Rptd',
 'DATE OCC',
 'TIME OCC',
 'AREA',
 'AREA NAME',
 'Rpt Dist No',
 'Part 1-2',
 'Crm Cd',
 'Crm Cd Desc',
 'Mocodes',
 'Vict Age',
 'Vict Sex',
 'Vict Descent',
 'Premis Cd',
 'Premis Desc',
 'Weapon Used Cd',
 'Weapon Desc',
 'Status',
 'Status Desc',
 'Crm Cd 1',
 'Crm Cd 2',
 'Crm Cd 3',
 'Crm Cd 4',
 'LOCATION',
 'Cross Street',
 'LAT',
 'LON',
 'date_occ_clean',
 'date_occ_date',
 'month',
 'day_of_week',
 'time_occ_padded',
 'hour',
 'is_night']

In [39]:
# copy or duplicate data

mcc_data = data

In [40]:
mcc_data.show(5, truncate=False)

+---------+----------------------+----------------------+--------+----+-----------+-----------+--------+------+-----------------------------------------------+------------------------+--------+--------+------------+---------+-----------+--------------+----------------------------------------------+------+-----------+--------+--------+--------+--------+-------------------------------+----------------------------------+-------+---------+--------------+-------------+-----+-----------+---------------+----+--------+
|DR_NO    |Date Rptd             |DATE OCC              |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                                    |Mocodes                 |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc|Weapon Used Cd|Weapon Desc                                   |Status|Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION                       |Cross Street                      |LAT    |LON      |date_occ_clean|date_occ_date|month|day_of_we

In [41]:
mcc_data = mcc_data.drop("DR_NO", "Date Rptd", "Crm Cd", "Crm Cd 1", "Crm Cd 2", "Crm Cd 3", "Crm Cd 4",
                        "Mocodes", "Status", "Status Desc", "LOCATION", "Cross Street", "Rpt Dist No")
mcc_data.show(5, truncate=False)

+----------------------+--------+----+-----------+--------+-----------------------------------------------+--------+--------+------------+---------+-----------+--------------+----------------------------------------------+-------+---------+--------------+-------------+-----+-----------+---------------+----+--------+
|DATE OCC              |TIME OCC|AREA|AREA NAME  |Part 1-2|Crm Cd Desc                                    |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc|Weapon Used Cd|Weapon Desc                                   |LAT    |LON      |date_occ_clean|date_occ_date|month|day_of_week|time_occ_padded|hour|is_night|
+----------------------+--------+----+-----------+--------+-----------------------------------------------+--------+--------+------------+---------+-----------+--------------+----------------------------------------------+-------+---------+--------------+-------------+-----+-----------+---------------+----+--------+
|02/01/2020 12:00:00 AM|1700    |21  |Topanga 

In [42]:
from pyspark.sql.functions import col, lpad

# Pad the time with zeros to ensure it’s 4 digits (e.g., 900 -> 0900)
mcc_data = mcc_data.withColumn("time_occ_padded", lpad(col("TIME OCC").cast("string"), 4, "0"))

# Extract hour
mcc_data = mcc_data.withColumn("hour", col("time_occ_padded").substr(1, 2).cast("int"))

# Create a binary feature for Night (1) vs Day (0) — night = 8 PM to 6 AM
mcc_data = mcc_data.withColumn("is_night", ((col("hour") >= 20) | (col("hour") < 6)).cast("int"))

In [43]:
from pyspark.sql.functions import to_date, month, dayofweek

# Convert to Spark date
mcc_data = mcc_data.withColumn("date_occ_date", to_date(col("DATE OCC"), "MM/dd/yyyy"))

# Extract month and day of week (1 = Sunday, 7 = Saturday)
mcc_data = mcc_data.withColumn("month", month(col("date_occ_date")))
mcc_data = mcc_data.withColumn("day_of_week", dayofweek(col("date_occ_date")))

In [44]:
from pyspark.sql.functions import when

mcc_data = mcc_data.withColumn("vict_age_group",
    when(col("Vict Age") < 18, "Under 18")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 35), "18-35")
    .when((col("Vict Age") > 35) & (col("Vict Age") <= 65), "36-65")
    .when(col("Vict Age") > 65, "65+")
    .otherwise("Unknown")
)

In [45]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Drop rows with null or invalid lat/lon (e.g., (0.0, 0.0))
df_geo = mcc_data.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()) &
                   (~((col("LAT") == 0.0) & (col("LON") == 0.0))))

# Assemble features
geo_assembler = VectorAssembler(inputCols=["LAT", "LON"], outputCol="geo_features")
df_geo_vec = geo_assembler.transform(df_geo)

# Train KMeans (you can tune k = number of zones)
kmeans = KMeans(featuresCol="geo_features", predictionCol="geo_cluster", k=10)
model = kmeans.fit(df_geo_vec)

# Add cluster to original dataframe
df_geo_clustered = model.transform(df_geo_vec)

                                                                                

In [46]:
from pyspark.sql.functions import regexp_replace, to_date
from pyspark.sql.functions import month, dayofweek

# Remove the " 12:00:00 AM" from date strings
df_geo_clustered = df_geo_clustered.withColumn("date_occ_clean", regexp_replace("DATE OCC", " 12:00:00 AM", ""))

# Parse to Spark DateType
df_geo_clustered = df_geo_clustered.withColumn("date_occ_date", to_date("date_occ_clean", "MM/dd/yyyy"))


df_geo_clustered = df_geo_clustered.withColumn("month", month("date_occ_date"))
df_geo_clustered = df_geo_clustered.withColumn("day_of_week", dayofweek("date_occ_date"))

In [47]:
df_geo_clustered.show(5, truncate=False)

+----------------------+--------+----+-----------+--------+-----------------------------------------------+--------+--------+------------+---------+-----------+--------------+----------------------------------------------+-------+---------+--------------+-------------+-----+-----------+---------------+----+--------+--------------+-------------------+-----------+
|DATE OCC              |TIME OCC|AREA|AREA NAME  |Part 1-2|Crm Cd Desc                                    |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc|Weapon Used Cd|Weapon Desc                                   |LAT    |LON      |date_occ_clean|date_occ_date|month|day_of_week|time_occ_padded|hour|is_night|vict_age_group|geo_features       |geo_cluster|
+----------------------+--------+----+-----------+--------+-----------------------------------------------+--------+--------+------------+---------+-----------+--------------+----------------------------------------------+-------+---------+--------------+-------------+-

In [48]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# === Step 1: Index Categorical Columns ===
categorical_cols = [
    "Vict Sex", "Vict Descent", "Premis Desc",
    "Weapon Desc", "vict_age_group", "AREA NAME"
]

indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
    for col in categorical_cols
]

# === Step 2: One-Hot Encode Indexed Columns ===
encoders = [
    OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_ohe")
    for col in categorical_cols
]

# === Step 3: Label Indexer ===
label_indexer = StringIndexer(
    inputCol="Crm Cd Desc",  # this is your target variable
    outputCol="label",
    handleInvalid="skip"
)

# === Step 4: Vector Assembler ===
assembler = VectorAssembler(
    inputCols=[
        "month", "day_of_week", "hour", "is_night", "geo_cluster"
    ] + [f"{col}_ohe" for col in categorical_cols],
    outputCol="features"
)

# === Step 5: Random Forest Classifier ===
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100
)

# === Step 6: Pipeline ===
pipeline = Pipeline(
    stages=indexers + encoders + [label_indexer, assembler, rf]
)

In [49]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 80/20 split
train_data, test_data = df_geo_clustered.randomSplit([0.8, 0.2], seed=42)

# Train the model

model = pipeline.fit(train_data)
predictions = model.transform(test_data)



25/05/12 11:44:23 WARN DAGScheduler: Broadcasting large task binary with size 1198.9 KiB
25/05/12 11:44:26 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/05/12 11:44:29 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/05/12 11:44:33 WARN DAGScheduler: Broadcasting large task binary with size 1097.4 KiB
                                                                                

In [50]:
# Evaluate the model
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

f1_score = evaluator_f1.evaluate(predictions)
print("Test Set F1 Score =", f1_score)

accuracy = evaluator_accuracy.evaluate(predictions)
print("Test Set Accuracy =", accuracy)

precision = evaluator_precision.evaluate(predictions)
print("Test Set Precision =", precision)

recall = evaluator_recall.evaluate(predictions)
print("Test Set Recall =", recall)

25/05/12 11:44:33 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
25/05/12 11:44:34 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB


Test Set F1 Score = 0.22476587145517102


25/05/12 11:44:36 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB


Test Set Accuracy = 0.35590796828285454


25/05/12 11:44:37 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB


Test Set Precision = 0.2207177840784593


[Stage 1436:>                                                       (0 + 1) / 1]

Test Set Recall = 0.35590796828285454


                                                                                

In [51]:
mcc_data.groupBy("Crm Cd Desc").count().orderBy("count", ascending=False).show(30, truncate=False)

+--------------------------------------------------------+-----+
|Crm Cd Desc                                             |count|
+--------------------------------------------------------+-----+
|ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT          |5772 |
|BATTERY - SIMPLE ASSAULT                                |4653 |
|BURGLARY FROM VEHICLE                                   |3828 |
|ROBBERY                                                 |3555 |
|VANDALISM - FELONY ($400 & OVER, ALL CHURCH VANDALISMS) |2907 |
|THEFT FROM MOTOR VEHICLE - GRAND ($950.01 AND OVER)     |1781 |
|INTIMATE PARTNER - SIMPLE ASSAULT                       |1697 |
|THEFT PLAIN - PETTY ($950 & UNDER)                      |1558 |
|BRANDISH WEAPON                                         |1072 |
|THEFT-GRAND ($950.01 & OVER)EXCPT,GUNS,FOWL,LIVESTK,PROD|1068 |
|VANDALISM - MISDEAMEANOR ($399 OR UNDER)                |794  |
|FAILURE TO YIELD                                        |665  |
|ATTEMPTED ROBBERY       

In [52]:
from pyspark.sql.functions import col

# Define the top 10 crime types
top_crimes = [
    "BATTERY - SIMPLE ASSAULT",
    "BURGLARY FROM VEHICLE",
    "THEFT OF IDENTITY",
    "VANDALISM - FELONY ($400 & OVER, ALL CHURCH VANDALISMS)",
    "THEFT PLAIN - PETTY ($950 & UNDER)",
    "SHOPLIFTING - PETTY THEFT ($950 & UNDER)",
    "BURGLARY",
    "ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT",
    "THEFT FROM MOTOR VEHICLE - GRAND ($950.01 AND OVER)",
    "THEFT-GRAND ($950.01 & OVER)EXCPT,GUNS,FOWL,LIVESTK,PROD"
]

# Filter the main DataFrame
filtered_data = df_geo_clustered.filter(col("Crm Cd Desc").isin(top_crimes))

In [53]:
train_data, test_data = filtered_data.randomSplit([0.8, 0.2], seed=42)

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Index categorical features
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in [
        "Vict Sex", "Vict Descent", "Premis Desc", "Weapon Desc", "vict_age_group", "AREA NAME"
    ]
]

# One-hot encode them
encoders = [
    OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_ohe") for col in [
        "Vict Sex", "Vict Descent", "Premis Desc", "Weapon Desc", "vict_age_group", "AREA NAME"
    ]
]

# Label indexer for Crm Cd Desc
label_indexer = StringIndexer(inputCol="Crm Cd Desc", outputCol="label", handleInvalid="skip")

# Assemble features
assembler = VectorAssembler(
    inputCols=[
        "hour", "is_night", "month", "day_of_week", "geo_cluster",
        "Vict Sex_ohe", "Vict Descent_ohe", "Premis Desc_ohe", "Weapon Desc_ohe",
        "vict_age_group_ohe", "AREA NAME_ohe"
    ],
    outputCol="features"
)

# Random Forest classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Optional: label converter to decode predictions
label_converter = IndexToString(inputCol="prediction", outputCol="predicted_crime", labels=label_indexer.fit(filtered_data).labels)

# Final pipeline
pipeline = Pipeline(stages=indexers + encoders + [label_indexer, assembler, rf, label_converter])

# Train the model
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

print("F1 Score:", evaluator_f1.evaluate(predictions))
print("Accuracy:", evaluator_acc.evaluate(predictions))
print("Precision:", evaluator_precision.evaluate(predictions))
print("Recall:", evaluator_recall.evaluate(predictions))

                                                                                

F1 Score: 0.5704415576226407
Accuracy: 0.6370930765703806


                                                                                

Precision: 0.5397334106064371
Recall: 0.6370930765703806


                                                                                

In [54]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

rfnew = RandomForestClassifier()

# ParamGridBuilder for Random Forest
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rfnew.maxDepth, [5, 8, 10]) \
    .addGrid(rfnew.maxBins, [35]) \
    .addGrid(rfnew.impurity, ["gini", "entropy"]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=5)  # use 3+ folds in practice

#Splitting the dataset in training and test
training, test=filtered_data.randomSplit([0.8,0.2],seed=123)

#Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

#Predicting the results on test dataset
cvresults=cvModel.transform(test)

#Displaying the results a
cvresults.show(10, truncate=False)

# Evaluate the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

print("F1 Score:", evaluator_f1.evaluate(cvresults))
print("Accuracy:", evaluator_acc.evaluate(cvresults))
print("Precision:", evaluator_precision.evaluate(cvresults))
print("Recall:", evaluator_recall.evaluate(cvresults))

                                                                                

+----------------------+--------+----+-----------+--------+--------------------------------------------------------+--------+--------+------------+---------+--------------------------------+--------------+----------------------------------------------+-------+---------+--------------+-------------+-----+-----------+---------------+----+--------+--------------+-------------------+-----------+------------+----------------+---------------+---------------+------------------+-------------+-------------+----------------+-----------------+---------------+------------------+---------------+-----+-----------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------

                                                                                

F1 Score: 0.579300416707858


                                                                                

Accuracy: 0.6488227454464682


                                                                                

Precision: 0.5412973692253104
Recall: 0.6488227454464682


                                                                                

In [55]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
import numpy as np

# Get label mapping from StringIndexerModel
label_indexer_index = len(indexers) + len(encoders)
label_indexer_model = model.stages[label_indexer_index]
label_mapping = label_indexer_model.labels

# UDF to check if true label is in top 5
def is_in_top_5(probability, label):
    top5 = np.argsort(probability)[-5:][::-1]
    return int(label) in top5

top5_udf = udf(is_in_top_5, BooleanType())

# Add column for top-5 correct
predictions_with_top5 = predictions.withColumn("top5_correct", top5_udf("probability", "label"))

# Compute top-5 accuracy
top5_accuracy = predictions_with_top5.filter("top5_correct = true").count() / predictions_with_top5.count()
print("Top-5 Accuracy:", top5_accuracy)

                                                                                

Top-5 Accuracy: 0.8878954607977991


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import numpy as np

# Step 1: Get label mapping from StringIndexerModel
label_indexer_index = len(indexers) + len(encoders)
label_indexer_model = model.stages[label_indexer_index]
label_mapping = label_indexer_model.labels

# Step 2: UDF to return top-5 predicted labels (descriptions)
def get_top_5_descriptions(probability_vector):
    top5_indices = np.argsort(probability_vector)[-5:][::-1]
    return [label_mapping[i] for i in top5_indices]

top5_labels_udf = udf(get_top_5_descriptions, ArrayType(StringType()))

# Step 3: Add top-5 prediction column to your DataFrame
predictions_with_top5_labels = predictions.withColumn("top5_predictions", top5_labels_udf("probability"))

# Optional: Select and show only relevant columns
predictions_with_top5_labels.select("AREA NAME", "Crm Cd Desc", "predicted_crime", "top5_predictions").show(truncate=False)

[Stage 2658:>                                                       (0 + 1) / 1]

+-----------+--------------------------------------------------------+-------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|AREA NAME  |Crm Cd Desc                                             |predicted_crime                                        |top5_predictions                                                                                                                                                                                                              |
+-----------+--------------------------------------------------------+-------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

25/05/12 17:14:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 311517 ms exceeds timeout 120000 ms
25/05/12 17:14:47 WARN SparkContext: Killing executors is not supported by current scheduler.
25/05/12 17:14:54 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$