# Week 11 PySpark Data Processing

## Setup
Initiate the Spark session with 8GB of memory for distributed processing. The local mode is used with all avaiable cores to process the large dataset as efficiently as possible.

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import countDistinct
import time

spark = (
    SparkSession.builder.appName("Ecommerce Analysis")
    .master("local[*]")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

print(f"Spark version: {spark.version} is ready")

Spark version: 4.0.1 is ready


## Load Data
Load the e-commerce dataset containing 42+ million user events from October 2019. The `inferSchema=True` option automatically detects column types instead of treating everything as strings. This dataset includes customer browsing, cart additions, and purchase events with product details, pricing, and user information.

In [17]:
csv_path = "Dataset/2019-Oct.csv"

print("Loading data...")
start = time.time()

df = spark.read.csv(csv_path, header=True, inferSchema=True)

print(f"Loaded in {time.time()-start:.1f}s")
print(f"Rows: {df.count():,}")
df.printSchema()
df.show(5)

Loading data...


                                                                                

Loaded in 4.5s




Rows: 42,448,764
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-09-30 20:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-09-30 20:00:00|      view|   3900821|2053013552326770905|applianc

                                                                                

## Data Transformations

### Filter 1: Remove Invalid Data
Cleaning the dataset by removing invalid records. Events are filtered if they have zero/negative prices, extreme outliers (>$10,000), and records that are missing essential information (brand/user_id). 
This ensures data quality and focuses analysis on valid, complete transactions.

In [18]:
print(f"Before filtering: {df.count():,}")

df_filtered = df.filter(
    (col("price") > 0)
    & (col("price") < 10000)
    & (col("brand").isNotNull())
    & (col("user_id").isNotNull())
)

print(f"After Filter 1: {df_filtered.count():,}")

                                                                                

Before filtering: 42,448,764




After Filter 1: 36,335,756


                                                                                

### Filter 2: Focus on Purchases

Narrowing analysis to focus on high-intent user actions, with cart additions and purchases. 'view' events are excludedd to concentrate on actual buying behavior rather than just browsing. This significantly reduces data volume, but valuable conversion funnel data is still kept.

In [19]:
df_filtered = df_filtered.filter(col("event_type").isin(["cart", "purchase"]))

print(f"After Filter 2: {df_filtered.count():,}")
print(f"Reduction: {(1 - df_filtered.count()/df.count())*100:.1f}%")

                                                                                

After Filter 2: 1,592,345




Reduction: 96.2%


                                                                                

### Column Transformations with withColumn
Creating derived features from existing data using withColumn(). Extract the hour from timestamps to analyze time-based patterns, and categorize products into price tiers (budget/mid-range/premium/luxury) for segmentation analysis.

In [20]:
df_enriched = df_filtered.withColumn("event_hour", hour("event_time")).withColumn(
    "price_category",
    when(col("price") < 50, "budget")
    .when(col("price") < 200, "mid-range")
    .when(col("price") < 500, "premium")
    .otherwise("luxury"),
)

print("Added columns: event_hour, price_category")
df_enriched.select("brand", "price", "price_category", "event_hour", "event_type").show(
    10
)

Added columns: event_hour, price_category
+-------+------+--------------+----------+----------+
|  brand| price|price_category|event_hour|event_type|
+-------+------+--------------+----------+----------+
|samsung|130.76|     mid-range|        20|  purchase|
|  apple|642.69|        luxury|        20|  purchase|
| xiaomi| 29.51|        budget|        20|      cart|
| xiaomi| 29.51|        budget|        20|  purchase|
|santeri| 54.42|     mid-range|        20|  purchase|
|  apple|189.91|     mid-range|        20|  purchase|
|  apple|515.67|        luxury|        20|      cart|
|  apple|161.98|     mid-range|        20|  purchase|
|  apple|515.67|        luxury|        20|  purchase|
|  oasis| 28.03|        budget|        20|  purchase|
+-------+------+--------------+----------+----------+
only showing top 10 rows


### Aggregation 1: Brand Performance Analysis
Analyze brand performance using `groupBy()` with multiple aggregation functions. Calculate total events, unique customers, average price, and total revenue for each brand. The results are cached in memory to reuse the DataFrame later.

In [21]:
brand_stats = (
    df_enriched.groupBy("brand")
    .agg(
        count("*").alias("total_events"),
        countDistinct("user_id").alias("unique_users"),
        avg("price").alias("avg_price"),
        sum("price").alias("total_revenue"),
    )
    .orderBy(desc("total_revenue"))
)

print("\nTop 20 Brands by Revenue:")
brand_stats.show(20)

# Cache for reuse
brand_stats.cache()
print("Cached brand_stats for repeated use")


Top 20 Brands by Revenue:
+--------+------------+------------+------------------+-------------------+
|   brand|total_events|unique_users|         avg_price|      total_revenue|
+--------+------------+------------+------------------+-------------------+
|   apple|      351957|      100061| 766.1664613574962|2.696576492400003E8|
| samsung|      476145|      140911| 260.3464964874154|1.239626825700004E8|
|  xiaomi|      161113|       55456| 151.3008518865641|      2.437653415E7|
|  huawei|       65633|       22973|207.38721679642862|       1.36114452E7|
|    acer|       16282|        6721| 535.7303175285585|   8722761.02999999|
|      lg|       22273|        9500| 388.6474314192069|  8656344.239999995|
|    oppo|       31438|       10475|222.12704402315714|  6983230.010000014|
|    sony|       16789|        7142|359.29966168324506|  6032282.020000001|
| lucente|       11578|        6928|269.83186819830735| 3124113.3700000024|
|   bosch|       12860|        6209| 238.0437698289271| 30612

25/11/11 14:48:35 WARN CacheManager: Asked to cache already cached data.


### Aggregation 2: Time-Based Patterns
This is more sophisticated aggregation with multiple grouping dimensions (hour and event_type). Analyze how shopping behavior changes throughout the day for both cart additions and purchases. This gives peak shopping hours, average transaction values at different times, and user engagement patterns.

In [22]:
hourly_patterns = (
    df_enriched.groupBy("event_hour", "event_type")
    .agg(
        count("*").alias("event_count"),
        avg("price").alias("avg_price"),
        countDistinct("user_id").alias("unique_users"),
    )
    .orderBy("event_hour", "event_type")
)

print("\nShopping Patterns by Hour:")
hourly_patterns.show(48)


Shopping Patterns by Hour:




+----------+----------+-----------+------------------+------------+
|event_hour|event_type|event_count|         avg_price|unique_users|
+----------+----------+-----------+------------------+------------+
|         0|      cart|      47020| 321.9434891535514|       26194|
|         0|  purchase|      37872| 306.3722451943384|       28359|
|         1|      cart|      55950| 327.0312763181413|       30671|
|         1|  purchase|      44326| 312.5543561340972|       33153|
|         2|      cart|      61465|329.63629756772156|       33777|
|         2|  purchase|      48065|316.96922937688527|       35813|
|         3|      cart|      63350|328.30210639305466|       34515|
|         3|  purchase|      49500|318.68473030303034|       36913|
|         4|      cart|      65671| 332.6047763853149|       35551|
|         4|  purchase|      51056| 322.2660674553431|       37906|
|         5|      cart|      66024|344.42474812189516|       35443|
|         5|  purchase|      51234| 330.72904223

                                                                                

### Aggregation 3: Price Category Analysis
Analyze how different price tiers perform across event types. This shows which price segments see the most activity, how many brands compete in each tier, and validates the price categorization logic. This helps to understand customer preferences across budget ranges and informs pricing strategies.

In [23]:
price_analysis = (
    df_enriched.groupBy("price_category", "event_type")
    .agg(
        count("*").alias("event_count"),
        countDistinct("brand").alias("unique_brands"),
        avg("price").alias("avg_price"),
    )
    .orderBy("price_category")
)

print("\nAnalysis by Price Category:")
price_analysis.show()


Analysis by Price Category:




+--------------+----------+-----------+-------------+------------------+
|price_category|event_type|event_count|unique_brands|         avg_price|
+--------------+----------+-----------+-------------+------------------+
|        budget|  purchase|      97967|         1145| 30.33056774219888|
|        budget|      cart|      91842|          338|28.953864789529806|
|        luxury|      cart|     181149|          111|  935.822696012673|
|        luxury|  purchase|     134378|          322| 934.3565004688294|
|     mid-range|      cart|     360296|          370|135.89159713124835|
|     mid-range|  purchase|     264104|         1130|126.91886983915452|
|       premium|  purchase|     188186|          628|315.47919340439773|
|       premium|      cart|     274423|          197|313.64668446157884|
+--------------+----------+-----------+-------------+------------------+



                                                                                

## SQL Queries

### SQL Query 1: Brand Revenue Analysis
Using Spark SQL to analyze brand performance with conversion metrics. The query calculates conversion funnel metrics (cart adds vs purchases) and filters for brands with significant activity (>1000 events). 

In [24]:
# register DataFrame as temp view
df_enriched.createOrReplaceTempView("ecommerce")
print("Created temporary view: 'ecommerce'")

# Top brands with conversion metrics
print("Top Brands by Revenue: ")

sql_result_1 = spark.sql(
    """
    SELECT 
        brand,
        COUNT(*) as total_events,
        COUNT(DISTINCT user_id) as unique_users,
        SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchases,
        SUM(CASE WHEN event_type = 'cart' THEN 1 ELSE 0 END) as cart_adds,
        ROUND(AVG(price), 2) as avg_price,
        ROUND(SUM(CASE WHEN event_type = 'purchase' THEN price ELSE 0 END), 2) as total_revenue
    FROM ecommerce
    WHERE price > 0
    GROUP BY brand
    HAVING COUNT(*) > 1000
    ORDER BY total_revenue DESC
    LIMIT 20
"""
)

sql_result_1.show()

Created temporary view: 'ecommerce'
Top Brands by Revenue: 




+--------+------------+------------+---------+---------+---------+--------------+
|   brand|total_events|unique_users|purchases|cart_adds|avg_price| total_revenue|
+--------+------------+------------+---------+---------+---------+--------------+
|   apple|      351957|      100061|   142873|   209084|   766.17|1.1120926882E8|
| samsung|      476145|      140911|   172896|   303249|   260.35| 4.640753261E7|
|  xiaomi|      161113|       55456|    56616|   104497|    151.3|    9194033.29|
|  huawei|       65633|       22973|    23501|    42132|   207.39|    4883421.74|
|    acer|       16282|        6721|     6882|     9400|   535.73|    3576719.52|
|      lg|       22273|        9500|     8727|    13546|   388.65|    3387887.96|
| lucente|       11578|        6928|    11578|        0|   269.83|    3124113.37|
|    sony|       16789|        7142|     6729|    10060|    359.3|    2478196.68|
|    oppo|       31438|       10475|    10891|    20547|   222.13|    2412959.76|
|  lenovo|      

                                                                                

### SQL Query 2: Peak Shopping Hours
Identifying peak shopping hours and transaction patterns throughout the day. The query groups by hour and event type to find when customers are most active and what they're spending. SQL and DataFrame operations are mixed. SQL is used for the initial query, chain DataFrame methods for more filtering.

In [25]:
print("Peak Shopping Hours Analysis")

sql_result_2 = spark.sql(
    """
    SELECT 
        event_hour,
        event_type,
        COUNT(*) as event_count,
        COUNT(DISTINCT user_id) as unique_users,
        ROUND(AVG(price), 2) as avg_price,
        ROUND(SUM(price), 2) as total_value
    FROM ecommerce
    WHERE event_type IN ('cart', 'purchase')
    GROUP BY event_hour, event_type
    ORDER BY event_hour, event_type
"""
)

sql_result_2.show(48)

# Mix SQL with DataFrame operations
print("\nHigh-value purchases in peak hours:")
spark.sql(
    """
    SELECT brand, price, event_type, event_hour
    FROM ecommerce 
    WHERE event_type = 'purchase' 
      AND price > 500
"""
).filter(col("event_hour").between(18, 22)).show(10)

Peak Shopping Hours Analysis




+----------+----------+-----------+------------+---------+-------------+
|event_hour|event_type|event_count|unique_users|avg_price|  total_value|
+----------+----------+-----------+------------+---------+-------------+
|         0|      cart|      47020|       26194|   321.94|1.513778286E7|
|         0|  purchase|      37872|       28359|   306.37|1.160292967E7|
|         1|      cart|      55950|       30671|   327.03|1.829739991E7|
|         1|  purchase|      44326|       33153|   312.55|1.385428439E7|
|         2|      cart|      61465|       33777|   329.64|2.026109503E7|
|         2|  purchase|      48065|       35813|   316.97|1.523512601E7|
|         3|      cart|      63350|       34515|    328.3|2.079793844E7|
|         3|  purchase|      49500|       36913|   318.68|1.577489415E7|
|         4|      cart|      65671|       35551|    332.6|2.184248827E7|
|         4|  purchase|      51056|       37906|   322.27|1.645361634E7|
|         5|      cart|      66024|       35443|   

                                                                                

## Performance Analysis

### Execution Plan Analysis
Using `.explain(mode='formatted')` to view Spark's physical execution plan. The execution plan reveals stages, tasks, and where data movement occurs across the cluster. 

In [26]:
print("Execution Plan Analysis")

# analyze brand_stats query
print("\nExecution plan for brand aggregation:")
brand_stats.explain(mode="formatted")

print("Complex Query Execution Plan")

# analyze hourly patterns query
print("\nExecution plan for hourly patterns:")
hourly_patterns.explain(mode="formatted")

# check partitions
print(f"\nPartitions in df_enriched: {df_enriched.rdd.getNumPartitions()}")

Execution Plan Analysis

Execution plan for brand aggregation:
== Physical Plan ==
AdaptiveSparkPlan (31)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- AdaptiveSparkPlan (30)
               +- == Final Plan ==
                  ResultQueryStage (19)
                  +- * Sort (18)
                     +- ShuffleQueryStage (17), Statistics(sizeInBytes=112.4 KiB, rowCount=2.01E+3)
                        +- Exchange (16)
                           +- * HashAggregate (15)
                              +- AQEShuffleRead (14)
                                 +- ShuffleQueryStage (13), Statistics(sizeInBytes=918.2 KiB, rowCount=1.45E+4)
                                    +- Exchange (12)
                                       +- * HashAggregate (11)
                                          +- * HashAggregate (10)
                                             +- AQEShuffleRead (9)
                                                +- ShuffleQueryStage (8), Statistics(si

### Caching Performance
Showing the performance benefits of `.cache()` for DataFrames used multiple times. Without caching, Spark recomputes the entire lineage (all transformations) for each action. With caching, Spark stores the computed result in memory after the first action, making subsequent operations dramatically faster.

In [27]:
print("Caching Demonstration")

# create a DataFrame, this will be used multiple times
test_df = df_enriched.filter(col("event_type") == "purchase")

# Without caching
print("\nWithout Cache:")
start = time.time()
count_1 = test_df.count()
time_1 = time.time() - start

start = time.time()
count_2 = test_df.count()
time_2 = time.time() - start

print(f"First execution: {time_1:.2f}s")
print(f"Second execution: {time_2:.2f}s")

# With caching
print("\nWith Cache:")
test_df.cache()

start = time.time()
count_3 = test_df.count()
time_cached_1 = time.time() - start

start = time.time()
count_4 = test_df.count()
time_cached_2 = time.time() - start

print(f"First execution (builds cache): {time_cached_1:.2f}s")
print(f"Second execution (uses cache): {time_cached_2:.2f}s")
print(f"\nSpeedup: {time_2 / time_cached_2:.1f}x faster")

# clean up
test_df.unpersist()

Caching Demonstration

Without Cache:


                                                                                

First execution: 2.82s
Second execution: 2.77s

With Cache:


                                                                                

First execution (builds cache): 3.87s
Second execution (uses cache): 0.04s

Speedup: 71.7x faster


DataFrame[event_time: timestamp, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: double, user_id: int, user_session: string, event_hour: int, price_category: string]

## Actions vs Transformations

Demonstrating the lazy evaluation model. Transformations (filter, select, withColumn, groupBy, join) build up a logical execution plan but don't compute anything. Actions (show, count, collect, write, take) trigger actual execution of the entire plan. This allows Spark to optimize the complete pipeline before running and minimizing data shuffling. 

In [28]:
print("Actions vs Transformations")

# Transformation
print("\n Transformation (Lazy, No Execution):")

lazy_df = df.filter(col("price") > 100)
print("  filter(): defined but not executed")

lazy_df = lazy_df.select("brand", "price", "event_type")
print("  select(): added to plan but not executed")

lazy_df = lazy_df.withColumn("high_value", col("price") > 500)
print("  withColumn(): added to plan but not executed")

lazy_df = lazy_df.groupBy("brand").count()
print("  groupBy().count(): transformation, not executed")

print("\nNothing computed yet. Spark is building execution plan.")

# Actions
print("Actions(Eager, trigger execution):")

print("\n1. show() - Action triggers execution")
start = time.time()
lazy_df.show(5)
print(f"   Execution time: {time.time() - start:.2f}s")

print("\n2. count() - Action triggers execution")
start = time.time()
result = lazy_df.count()
print(f"   Result: {result:,} brands")
print(f"   Execution time: {time.time() - start:.2f}s")

print("\n3. collect() - Action (use carefully!)")
start = time.time()
sample = lazy_df.limit(3).collect()
print(f"   Collected {len(sample)} rows to driver")
print(f"   Execution time: {time.time() - start:.2f}s")

print("Summary:")
print("   Transformations: filter, select, withColumn, groupBy, join")
print("      - Lazy (don't execute)")
print("   Actions: show, count, collect, write, take")
print("      - Eager (trigger execution)")

Actions vs Transformations

 Transformation (Lazy, No Execution):
  filter(): defined but not executed
  select(): added to plan but not executed
  withColumn(): added to plan but not executed
  groupBy().count(): transformation, not executed

Nothing computed yet. Spark is building execution plan.
Actions(Eager, trigger execution):

1. show() - Action triggers execution


                                                                                

+--------+-----+
|   brand|count|
+--------+-----+
|yokohama|30219|
|   globo|   31|
| edifier|  798|
|   sonel| 1648|
|  alutec| 2655|
+--------+-----+
only showing top 5 rows
   Execution time: 3.98s

2. count() - Action triggers execution


                                                                                

   Result: 1,908 brands
   Execution time: 3.92s

3. collect() - Action (use carefully!)




   Collected 3 rows to driver
   Execution time: 6.54s
Summary:
   Transformations: filter, select, withColumn, groupBy, join
      - Lazy (don't execute)
   Actions: show, count, collect, write, take
      - Eager (trigger execution)


                                                                                

## Machine Learning: Price Prediction
Building a linear regression model to predict purchase prices based on the hour of day. MLlib's VectorAssembler is used to prepare features, split data into training/test sets, train a LinearRegression model, and evaluate using Root Mean Square Erro). The model shows if there's a relationship between shopping time and purchase value.

In [29]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

print("Machine Learning: Price Prediction")

# prepare data
# predict price based on event_hour
ml_data = df_enriched.filter(col("event_type") == "purchase").select(
    "event_hour", "price"
)

# create feature vector
assembler = VectorAssembler(inputCols=["event_hour"], outputCol="features")
ml_data = assembler.transform(ml_data)

# split data
train, test = ml_data.randomSplit([0.8, 0.2], seed=42)

print(f"Training set: {train.count():,} rows")
print(f"Test set: {test.count():,} rows")

# train model
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train)

# make predictions
predictions = model.transform(test)

# evaluate
evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"\nModel Performance:")
print(f"   RMSE: ${rmse:.2f}")
print(f"   Coefficient: {model.coefficients[0]:.4f}")
print(f"   Intercept: {model.intercept:.2f}")

print("\nSample Predictions:")
predictions.select("event_hour", "price", "prediction").show(10)

Machine Learning: Price Prediction


                                                                                

Training set: 547,618 rows


25/11/11 14:49:23 WARN Instrumentation: [a0e8ea23] regParam is zero, which might cause numerical instability and overfitting.


Test set: 137,017 rows


                                                                                


Model Performance:
   RMSE: $356.82
   Coefficient: 0.5047
   Intercept: 319.33

Sample Predictions:
+----------+-----+------------------+
|event_hour|price|        prediction|
+----------+-----+------------------+
|         0| 6.34|319.33095256804916|
|         0|12.02|319.33095256804916|
|         0|12.61|319.33095256804916|
|         0|15.42|319.33095256804916|
|         0|17.99|319.33095256804916|
|         0|21.85|319.33095256804916|
|         0|22.39|319.33095256804916|
|         0|24.68|319.33095256804916|
|         0|29.51|319.33095256804916|
|         0|29.51|319.33095256804916|
+----------+-----+------------------+
only showing top 10 rows


                                                                                

## Write Results

Writing the analysis results to disk using Parquet format and CSV format. `mode('overwrite')` ensures that the analysis can be rerun without errors. Verify the write operation succeeded by reading the data back. The output files can be used for reporting, visualization, or further analysis.

In [30]:
print("Writing Results...")

# create output directory
output_dir = "./output"

# wrote brand statistics to Parquet
brand_output = f"{output_dir}/brand_stats"
print(f"\nWriting to Parquet: {brand_output}")
brand_stats.write.mode("overwrite").parquet(brand_output)
print("Saved brand statistics to Parquet")

# write hourly patterns to CSV for easy viewing
hourly_output = f"{output_dir}/hourly_patterns"
print(f"\nWriting to CSV: {hourly_output}")
hourly_patterns.write.mode("overwrite").option("header", "true").csv(hourly_output)
print("Saved hourly patterns to CSV")

# Verify saved data
print("\nVerifying saved data...")
saved_df = spark.read.parquet(brand_output)
print(f"Successfully loaded {saved_df.count():,} rows from saved Parquet file")
saved_df.show(5)

Writing Results...

Writing to Parquet: ./output/brand_stats
Saved brand statistics to Parquet

Writing to CSV: ./output/hourly_patterns


                                                                                

Saved hourly patterns to CSV

Verifying saved data...
Successfully loaded 2,013 rows from saved Parquet file
+----------+------------+------------+------------------+------------------+
|     brand|total_events|unique_users|         avg_price|     total_revenue|
+----------+------------+------------+------------------+------------------+
|altinbasak|           1|           1|             38.66|             38.66|
|   insight|           3|           3|12.873333333333335|38.620000000000005|
|   minimen|           1|           1|             38.61|             38.61|
|  rozenbal|           3|           3|12.613333333333335|             37.84|
| allocacoc|           4|           3|              9.27|             37.08|
+----------+------------+------------+------------------+------------------+
only showing top 5 rows


25/11/11 17:19:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 124212 ms exceeds timeout 120000 ms
25/11/11 17:19:16 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/11 17:19:17 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at o