# **Logistic Regression with Mathematical Insights using PySpark ML**

### **`Dr Amin Karami (PhD, FHEA, EE), UEL UK - Docklands Campus`**

`E: amin.karami@ymail.com`

`W: https://www.youtube.com/@AminKarami`

`W: https://www.aminkarami.com`

---

**Learning Outcomes**:

`Master Logistic Regression and Optimization`: Understand and apply Logistic Regression and Gradient Descent optimization to classify data using PySpark ML.

`Data Handling Expertise`: Acquire skills in preprocessing, feature selection, and data management within a Jupyter Notebook environment for large-scale datasets.

`Accuracy Assessment`: Gain proficiency in evaluating model performance with various accuracy metrics including confusion matrix, ensuring reliable classification insights.


# **Step 1:** Import the required libraries and initialize SparkSession.

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

# Initialize SparkSession
spark = SparkSession.builder \
                    .appName("LogisticRegressionExample") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()

spark

24/02/26 17:28:44 WARN Utils: Your hostname, Philips-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.210.83.44 instead (on interface en0)
24/02/26 17:28:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/26 17:28:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# **Step 2:** Load and preprocess the data.

# **KDDCup Data**:

The dataset contains network intrusion detection data, which is used to develop and test algorithms for detecting unauthorized access to computer networks. The KDD Cup 1999 Data has been widely used in the research community for developing and testing intrusion detection systems.

URL: https://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

Data Description: https://kdd.ics.uci.edu/databases/kddcup99/task.html

Features: https://kdd.ics.uci.edu/databases/kddcup99/kddcup.names

Labels: https://kdd.ics.uci.edu/databases/kddcup99/training_attack_types


# Data Collection:

[The full data set](https://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz) (18M; 743M Uncompressed)

[A 10% subset](https://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz) (2.1M; 75M Uncompressed)


In [5]:
# Load the compressed file as a text file
df = spark.read.csv("kddcup.data_10_percent.gz", header = False)

# Display the DataFrame
df.show(5)

# more info
print(df.count())
print(df.rdd.getNumPartitions())

24/02/26 17:29:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+---+----+---+---+----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|_c0|_c1| _c2|_c3|_c4| _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|   _c41|
+---+---+----+---+---+----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|  0|tcp|http| SF|181|5450|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   8|   8|0.00|0.00|0.00|0.00|1.00|0.00|0.00|   9|   9|1.00|0.00|0.11|0.00|0.00|0.00|0.00|0.00|normal.|
|  0|tcp|http| SF|239| 486|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   8|   8|0.00|0.00|0.00|0.00|1.00|0.00|0.00|  19|  19|1.00|0.00|0.05

[Stage 2:>                                                          (0 + 1) / 1]

494021
1


                                                                                

# Add header

In [6]:
df = df.withColumnRenamed("_c0","duration") \
      .withColumnRenamed("_c1","protocol_type")\
      .withColumnRenamed("_c2","service")\
      .withColumnRenamed("_c3","flag")\
      .withColumnRenamed("_c4","src_bytes")\
      .withColumnRenamed("_c5","dst_bytes")\
      .withColumnRenamed("_c6","land")\
      .withColumnRenamed("_c7","wrong_fragment")\
      .withColumnRenamed("_c8","urgent")\
      .withColumnRenamed("_c9","host")\
      .withColumnRenamed("_c10","num_failed_logins")\
      .withColumnRenamed("_c11","logged_in")\
      .withColumnRenamed("_c12","num_compromised")\
      .withColumnRenamed("_c13","root_shell")\
      .withColumnRenamed("_c14","su_attempted")\
      .withColumnRenamed("_c15","num_root")\
      .withColumnRenamed("_c16","num_file_creations")\
      .withColumnRenamed("_c17","num_shells")\
      .withColumnRenamed("_c18","num_access_files")\
      .withColumnRenamed("_c19","num_outbound_cmds")\
      .withColumnRenamed("_c20","is_host_login")\
      .withColumnRenamed("_c21","is_guest_login")\
      .withColumnRenamed("_c22","count")\
      .withColumnRenamed("_c23","srv_count")\
      .withColumnRenamed("_c24","serror_rate")\
      .withColumnRenamed("_c25","srv_serror_rate")\
      .withColumnRenamed("_c26","rerror_rate")\
      .withColumnRenamed("_c27","srv_rerror_rate")\
      .withColumnRenamed("_c28","same_srv_rate")\
      .withColumnRenamed("_c29","diff_srv_rate")\
      .withColumnRenamed("_c30","srv_diff_host_rate")\
      .withColumnRenamed("_c31","dst_host_count")\
      .withColumnRenamed("_c32","dst_host_srv_count")\
      .withColumnRenamed("_c33","dst_host_same_srv_rate")\
      .withColumnRenamed("_c34","dst_host_diff_srv_rate")\
      .withColumnRenamed("_c35","dst_host_same_src_port_rate")\
      .withColumnRenamed("_c36","dst_host_srv_diff_host_rate")\
      .withColumnRenamed("_c37","dst_host_serror_rate")\
      .withColumnRenamed("_c38","dst_host_srv_serror_rate")\
      .withColumnRenamed("_c39","dst_host_rerror_rate")\
      .withColumnRenamed("_c40","dst_host_srv_rerror_rate")\
      .withColumnRenamed("_c41","connection_status")

df.show(5)

+--------+-------------+-------+----+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_

# Check the Binary labels

In [7]:
#Check Binary labels
df.select("connection_status").distinct().show(30)

[Stage 6:>                                                          (0 + 1) / 1]

+-----------------+
|connection_status|
+-----------------+
|     warezmaster.|
|           smurf.|
|             pod.|
|            imap.|
|            nmap.|
|    guess_passwd.|
|         ipsweep.|
|       portsweep.|
|           satan.|
|            land.|
|      loadmodule.|
|       ftp_write.|
| buffer_overflow.|
|         rootkit.|
|     warezclient.|
|        teardrop.|
|            perl.|
|             phf.|
|        multihop.|
|         neptune.|
|            back.|
|             spy.|
|          normal.|
+-----------------+



                                                                                

In [9]:
from pyspark.sql.functions import when

df = df.withColumn("label", when(df["connection_status"] != 'normal.', 1).otherwise(0))

df.select("label").distinct().show()

[Stage 12:>                                                         (0 + 1) / 1]

+-----+
|label|
+-----+
|    1|
|    0|
+-----+



                                                                                

# Count the labels

In [11]:
# Count labels
df.groupBy("label").count().show()

# ~20% normal traffic
# ~80% attack traffic


# dealing with imbalanced labels:
 # Resampling
 # Wieghted Loss
 # Data Augmentation: SMOTE method
 # Ensemble methods
 # Evaluation metrics: precision, recall, F1 socre, AUC-ROC

[Stage 18:>                                                         (0 + 1) / 1]

+-----+------+
|label| count|
+-----+------+
|    1|396743|
|    0| 97278|
+-----+------+



                                                                                

# StringIndexer

In [12]:
from pyspark.ml.feature import StringIndexer

columns_to_index = ["protocol_type", "service", "flag"]

for column in columns_to_index:
  indexer = StringIndexer(inputCol = column, outputCol = column + "_indexed")
  df = indexer.fit(df).transform(df)

df.show(5)


                                                                                

+--------+-------------+-------+----+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------------+-----+---------------------+---------------+------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|s

# Missing values

In [13]:
from pyspark.sql.functions import col, sum

missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

missing_values.show()

[Stage 31:>                                                         (0 + 1) / 1]

+--------+-------------+-------+----+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------------+-----+---------------------+---------------+------------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|s

                                                                                

# To assess the linear separability of a feature


In [14]:
from pyspark.sql.functions import avg, stddev

# avg & std for class 0
class_0_stats = df.filter(df['label'] == 0).select(avg('src_bytes').alias("avg_src_bytes_0"),
                                                   stddev('src_bytes').alias("stddev_src_bytes_0")).first()

# avg & std for class 1
class_1_stats = df.filter(df['label'] == 1).select(avg('src_bytes').alias("avg_src_bytes_1"),
                                                   stddev('src_bytes').alias("stddev_src_bytes_1")).first()


print("Class 0 distribution: ")
print("AVG src_bytes: ", class_0_stats["avg_src_bytes_0"])
print("STD src_bytes: ", class_0_stats["stddev_src_bytes_0"])

print("Class 1 distribution: ")
print("AVG src_bytes: ", class_1_stats["avg_src_bytes_1"])
print("STD src_bytes: ", class_1_stats["stddev_src_bytes_1"])

[Stage 37:>                                                         (0 + 1) / 1]

Class 0 distribution: 
AVG src_bytes:  1157.047523592179
STD src_bytes:  34226.124718051324
Class 1 distribution: 
AVG src_bytes:  3483.7659517622237
STD src_bytes:  1102603.8255053996


                                                                                

# VectorAssembler
Prepare the features column (Use VectorAssembler to combine features into a single vector column)

In [15]:
df.printSchema()

root
 |-- duration: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: string (nullable = true)
 |-- dst_bytes: string (nullable = true)
 |-- land: string (nullable = true)
 |-- wrong_fragment: string (nullable = true)
 |-- urgent: string (nullable = true)
 |-- host: string (nullable = true)
 |-- num_failed_logins: string (nullable = true)
 |-- logged_in: string (nullable = true)
 |-- num_compromised: string (nullable = true)
 |-- root_shell: string (nullable = true)
 |-- su_attempted: string (nullable = true)
 |-- num_root: string (nullable = true)
 |-- num_file_creations: string (nullable = true)
 |-- num_shells: string (nullable = true)
 |-- num_access_files: string (nullable = true)
 |-- num_outbound_cmds: string (nullable = true)
 |-- is_host_login: string (nullable = true)
 |-- is_guest_login: string (nullable = true)
 |-- count: string (nullable = true)
 |-- srv_count: s

In [17]:
from pyspark.sql.functions import col

for column in df.columns:
  df = df.withColumn(column, col(column).cast("double"))

In [18]:
assembler = VectorAssembler(inputCols = ["src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
                                         "host", "num_failed_logins", "logged_in", "num_compromised",
                                         "root_shell", "num_root", "num_file_creations", "num_shells",
                                         "num_outbound_cmds", "is_host_login", "count", "srv_count",
                                         "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
                                         "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
                                         "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate",
                                         "dst_host_srv_rerror_rate",
                                         "protocol_type_indexed", "service_indexed", "flag_indexed"],
                            outputCol = "features")

data = assembler.transform(df)

data = data.select('features', 'label')
data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [19]:
assembler = VectorAssembler(inputCols = ["src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
                                         "host", "num_failed_logins", "logged_in", "num_compromised",
                                         "root_shell", "num_root", "num_file_creations", "num_shells",
                                         "num_outbound_cmds", "is_host_login", "count", "srv_count",
                                         "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
                                         "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
                                         "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate",
                                         "dst_host_srv_rerror_rate",
                                         "protocol_type_indexed", "service_indexed", "flag_indexed"],
                            outputCol = "features")

data = assembler.transform(df)

data = data.select('features', 'label')
data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
|(31,[0,1,7,15,16,...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [20]:
data.show(5, truncate = False)

#Sparse Vectors

+-----------------------------------------------------------------------------------------+-----+
|features                                                                                 |label|
+-----------------------------------------------------------------------------------------+-----+
|(31,[0,1,7,15,16,21,24,25,26,28,29],[181.0,5450.0,1.0,8.0,8.0,1.0,9.0,9.0,1.0,1.0,2.0])  |0.0  |
|(31,[0,1,7,15,16,21,24,25,26,28,29],[239.0,486.0,1.0,8.0,8.0,1.0,19.0,19.0,1.0,1.0,2.0]) |0.0  |
|(31,[0,1,7,15,16,21,24,25,26,28,29],[235.0,1337.0,1.0,8.0,8.0,1.0,29.0,29.0,1.0,1.0,2.0])|0.0  |
|(31,[0,1,7,15,16,21,24,25,26,28,29],[219.0,1337.0,1.0,6.0,6.0,1.0,39.0,39.0,1.0,1.0,2.0])|0.0  |
|(31,[0,1,7,15,16,21,24,25,26,28,29],[217.0,2032.0,1.0,6.0,6.0,1.0,49.0,49.0,1.0,1.0,2.0])|0.0  |
+-----------------------------------------------------------------------------------------+-----+
only showing top 5 rows



In [21]:
# Extract the values from Sparse Vectors to the list format
selected_data = data.select('features').limit(2).collect()

for row in selected_data:
  dense_vector = row[0].toArray()
  print(dense_vector)

[1.81e+02 5.45e+03 0.00e+00 0.00e+00 0.00e+00 0.00e+00 0.00e+00 1.00e+00
 0.00e+00 0.00e+00 0.00e+00 0.00e+00 0.00e+00 0.00e+00 0.00e+00 8.00e+00
 8.00e+00 0.00e+00 0.00e+00 0.00e+00 0.00e+00 1.00e+00 0.00e+00 0.00e+00
 9.00e+00 9.00e+00 1.00e+00 0.00e+00 1.00e+00 2.00e+00 0.00e+00]
[239. 486.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.
   0.   8.   8.   0.   0.   0.   0.   1.   0.   0.  19.  19.   1.   0.
   1.   2.   0.]


# StandardScaler
It is used to standardize a dataset along any axis. Standardization refers to scaling a set of values so that they have a mean of 0 and a standard deviation of 1 to normalize features.

In [22]:
scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures')

scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

data = data.select("scaledFeatures", "label")
data.show(3, truncate = False)

                                                                                

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|scaledFeatures                                                                                                                                                                                                                                                |label|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(31,[0,1,7,15,16,21,24,25,26,28,29],[1.83157948440338E-4,0.16495156759878016,2.814168444874875,0.03753270996838467,0.032477705818327186,2.5760614800997756,0.1390060564670239,0.08487328273976809,2.43438731331729

# **Step 3:** Apply Logistic Regression model.


Useful Parameters:

**maxIter:** Specifies the maximum number of iterations (or epochs) for the optimization algorithm. It controls how many times the algorithm iterates to optimize the model parameters. The default value is 100.

**regParam:** Controls the regularization parameter, which helps prevent overfitting by adding a penalty term to the loss function. A higher value of regParam increases the regularization strength. The default value is 0.0.

**elasticNetParam:** Allows you to tune the balance between L1 and L2 regularization. A value of 0.0 corresponds to L2 regularization, 1.0 corresponds to L1 regularization, and any value in between represents a combination of both. The default value is 0.0.

**threshold:** Sets the threshold for binary classification. Predicted probabilities above this threshold are classified as positive, while those below are classified as negative. The default value is 0.5.

In [23]:
#Split data
train_data, test_data = data.randomSplit([0.7, 0.3], seed = 1234)

In [24]:
lr = LogisticRegression(featuresCol = "scaledFeatures", labelCol = 'label',
                        threshold = 0.5, regParam = 0.01)

lr_model = lr.fit(train_data)

lr_predictions_train = lr_model.transform(train_data)
lr_predictions_test = lr_model.transform(test_data)

24/02/26 17:53:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [25]:
lr_predictions_test.select("label", "prediction").show(20)

[Stage 86:>                                                         (0 + 1) / 1]

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



                                                                                

# **Step 4:** Evaluate the model.

In [26]:
# Confusion Matrix

confusion_matrix = lr_predictions_test.groupBy("label", "prediction").count()
confusion_matrix.show()

[Stage 87:>                                                         (0 + 1) / 1]

+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|  1.0|       1.0|117437|
|  0.0|       1.0|   469|
|  1.0|       0.0|  1303|
|  0.0|       0.0| 28649|
+-----+----------+------+



                                                                                

In [27]:
cm_pandas = confusion_matrix.toPandas()
cm_pandas.pivot(index = 'label', columns = 'prediction', values = 'count')

                                                                                

prediction,0.0,1.0
label,Unnamed: 1_level_1,Unnamed: 2_level_1
0.0,28649,469
1.0,1303,117437


In [28]:
tp = lr_predictions_test[(lr_predictions_test.label == 1) & (lr_predictions_test.prediction == 1)].count()
fp = lr_predictions_test[(lr_predictions_test.label == 0) & (lr_predictions_test.prediction == 1)].count()
fn = lr_predictions_test[(lr_predictions_test.label == 1) & (lr_predictions_test.prediction == 0)].count()
tn = lr_predictions_test[(lr_predictions_test.label == 0) & (lr_predictions_test.prediction == 0)].count()

                                                                                

In [None]:
accuracy = (tp + tn) / (tp + fp + fn + tn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * (precision * recall) / (precision + recall)

print('accuracy: ', round(accuracy, 4) * 100)
print('precision: ', round(precision, 4) * 100)
print('recall: ', round(recall, 4) * 100)
print('f1 score: ', round(f1, 4) * 100)

accuracy:  98.8
precision:  99.6
recall:  98.9
f1 score:  99.25


24/02/27 05:33:34 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 626175 ms exceeds timeout 120000 ms
24/02/27 05:33:34 WARN SparkContext: Killing executors is not supported by current scheduler.
24/02/27 05:33:35 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o