In [2]:
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, stddev, collect_list, explode, array, lit, expr, udf, desc, struct, rand
from pyspark.sql.types import DoubleType, ArrayType, BooleanType, IntegerType, StructType, StructField, StringType, FloatType
import math

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("CollaborativeFiltering") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

# Tạo schema để đọc dữ liệu
schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("user", IntegerType(), True),
    StructField("item", IntegerType(), True),
    StructField("rating", DoubleType(), True)
])

class CollaborativeFilteringSpark:
    """
    Lớp triển khai thuật toán Collaborative Filtering sử dụng hệ số tương quan Pearson
    để đo độ tương đồng giữa các người dùng và gợi ý sản phẩm trên nền tảng PySpark.
    """

    def __init__(self, N, data_frame):
        """
        Hàm khởi tạo cho CollaborativeFilteringSpark

        Tham số:
        - N: Số lượng người dùng tương tự cần xem xét
        - data_frame: DataFrame (PySpark) chứa dữ liệu ratings
        """
        self.N = N
        self.data = data_frame
        self.user_item_matrix = None
        self.user_similarity_matrix = None
        self.valid_users = None
        self.user_avg_ratings = None
        self.normalized_data = None

        # Loại bỏ users không khách quan
        self._remove_constant_rating_users()

        # Tính rating trung bình cho mỗi user
        self._calculate_user_avg_ratings()

        # Chuẩn hóa dữ liệu ratings
        self._normalize_ratings()

        # Xây dựng ma trận user-item
        self._build_user_item_matrix()

        # Tính ma trận độ tương đồng
        self._calculate_user_similarity()

    def _remove_constant_rating_users(self):
        """
        Loại bỏ các users có constant ratings (đánh giá tất cả items với cùng một rating value)
        """
        # Tính độ lệch chuẩn của ratings cho mỗi user
        user_ratings_std = self.data.groupBy("user").agg(
            stddev("rating").alias("std_rating"),
            count("rating").alias("count_rating")
        )
        
        # Lọc ra các users có std = 0 (constant ratings) hoặc chỉ có 1 rating
        constant_users = user_ratings_std.filter(
            (col("std_rating").isNull()) | (col("std_rating") == 0)
        ).select("user").rdd.flatMap(lambda x: x).collect()
        
        # In thông tin về constant users
        print(f"Tổng số lượng người dùng: {self.data.select('user').distinct().count()}")
        print(f"Số người dùng đánh giá không khách quan: {len(constant_users)}")
        print(f"Tỉ lệ phần trăm: {len(constant_users) / self.data.select('user').distinct().count() * 100:.2f}%")
        
        # In thông tin chi tiết về các constant users
        print("\n Các người dùng đánh giá không khách quan:")
        constant_user_info = self.data.filter(col("user").isin(constant_users)) \
                                  .groupBy("user") \
                                  .agg(count("rating").alias("count_ratings"), 
                                       avg("rating").alias("avg_rating")) \
                                  .collect()
        
        for row in constant_user_info[:8]:  # Giới hạn hiển thị 8 người dùng
            print(f"Người dùng {row['user']}: {row['count_ratings']} lượt đánh giá, giá trị {row['avg_rating']}")
        
        # Lưu lại danh sách users hợp lệ
        all_users = self.data.select("user").distinct().rdd.flatMap(lambda x: x).collect()
        self.valid_users = list(set(all_users) - set(constant_users))
        
        # Cập nhật data chỉ giữ lại valid users
        self.data = self.data.filter(col("user").isin(self.valid_users))

    def _calculate_user_avg_ratings(self):
        """
        Tính rating trung bình cho mỗi user
        """
        self.user_avg_ratings = self.data.groupBy("user").agg(
            avg("rating").alias("avg_rating")
        )

    def _normalize_ratings(self):
        """
        Chuẩn hóa ratings để xử lý vấn đề người dùng đánh giá khó tính và người dùng đánh giá dễ tính.
        Phương pháp: Trừ đi rating trung bình của từng người dùng.
        """
        # Join với rating trung bình của user
        joined_data = self.data.join(
            self.user_avg_ratings,
            on="user",
            how="inner"
        )
        
        # Tính normalized rating = rating - avg_rating
        self.normalized_data = joined_data.withColumn(
            "normalized_rating",
            col("rating") - col("avg_rating")
        ).select("user", "item", "rating", "avg_rating", "normalized_rating")
        
        # In thông tin về dữ liệu chuẩn hóa
        print("\nThông tin về dữ liệu chuẩn hóa:")
        normalized_stats = self.normalized_data.agg(
            avg("normalized_rating").alias("avg_normalized"),
            stddev("normalized_rating").alias("std_normalized"),
            min("normalized_rating").alias("min_normalized"),
            max("normalized_rating").alias("max_normalized")
        ).collect()[0]
        
        print(f"Giá trị trung bình normalized: {normalized_stats['avg_normalized']}")
        print(f"Độ lệch chuẩn normalized: {normalized_stats['std_normalized']}")
        print(f"Giá trị nhỏ nhất normalized: {normalized_stats['min_normalized']}")
        print(f"Giá trị lớn nhất normalized: {normalized_stats['max_normalized']}")

    def _build_user_item_matrix(self):
        """
        Xây dựng ma trận user-item từ dữ liệu đã chuẩn hóa
        """
        # Sử dụng dữ liệu đã chuẩn hóa để xây dựng ma trận user-item
        self.user_item_matrix = self.normalized_data.select("user", "item", "normalized_rating")

    def _calculate_user_similarity(self):
        """
        Tính ma trận độ tương đồng giữa người dùng sử dụng hệ số tương quan Pearson
        dựa trên dữ liệu đã chuẩn hóa
        """
        # Tạo các vế để join
        ratings1 = self.user_item_matrix.alias("r1")
        ratings2 = self.user_item_matrix.alias("r2")
        
        # Join dữ liệu để tạo các cặp người dùng có cùng item
        user_pairs = ratings1.join(
            ratings2,
            ratings1["item"] == ratings2["item"]
        ).filter(
            ratings1["user"] < ratings2["user"]  # Chỉ lấy nửa trên ma trận (vì đối xứng)
        )
        
        # Tạo UDF để tính tương quan Pearson
        def pearson_correlation(sum_xx, sum_xy, sum_yy, sum_x, sum_y, n):
            """
            Tính hệ số tương quan Pearson giữa hai users
            """
            if n < 2:  # Cần ít nhất 2 item chung
                return 0.0
                
            denominator = math.sqrt(sum_xx - (sum_x ** 2) / n) * math.sqrt(sum_yy - (sum_y ** 2) / n)
            
            if denominator == 0:
                return 0.0
                
            numerator = sum_xy - (sum_x * sum_y) / n
            return numerator / denominator
            
        pearson_udf = udf(pearson_correlation, DoubleType())
        
        # Nhóm theo các cặp user và tính tương quan Pearson trên dữ liệu đã chuẩn hóa
        similarities = user_pairs.groupBy(
            col("r1.user").alias("user_i"),
            col("r2.user").alias("user_j")
        ).agg(
            count("r1.item").alias("n"),
            sum(col("r1.normalized_rating") * col("r2.normalized_rating")).alias("sum_xy"),
            sum(col("r1.normalized_rating") * col("r1.normalized_rating")).alias("sum_xx"),
            sum(col("r2.normalized_rating") * col("r2.normalized_rating")).alias("sum_yy"),
            sum(col("r1.normalized_rating")).alias("sum_x"),
            sum(col("r2.normalized_rating")).alias("sum_y")
        ).withColumn(
            "pearson",
            pearson_udf(
                col("sum_xx"), col("sum_xy"), col("sum_yy"),
                col("sum_x"), col("sum_y"), col("n")
            )
        ).select("user_i", "user_j", "pearson")
        
        # Tạo ma trận đối xứng (thêm các cặp (user_j, user_i))
        reversed_similarities = similarities.select(
            col("user_j").alias("user_i"),
            col("user_i").alias("user_j"),
            col("pearson")
        )
        
        # Thêm tự tương quan (người dùng hoàn toàn tương đồng với chính họ)
        self_similarities = spark.createDataFrame(
            [(user, user, 1.0) for user in self.valid_users],
            ["user_i", "user_j", "pearson"]
        )
        
        # Kết hợp tất cả thành ma trận tương đồng đầy đủ
        self.user_similarity_matrix = similarities.union(reversed_similarities).union(self_similarities)

    def predict(self, user_ratings, num_recommendations=10):
        """
        Dự đoán và gợi ý các sản phẩm cho người dùng
        
        Tham số:
        - user_ratings: Vector ratings của người dùng (DataFrame hoặc dict)
        - num_recommendations: Số lượng sản phẩm cần gợi ý
        
        Trả về:
        - DataFrame (PySpark) với các sản phẩm được sắp xếp theo điểm dự đoán giảm dần
        """
        # Xác định user_id từ user_ratings
        user_id = None
        
        if isinstance(user_ratings, dict):
            # Nếu là dict, trích xuất user_id trực tiếp
            user_id = user_ratings.get("user")
            
            # Nếu dict chứa các đánh giá, chuyển đổi thành DataFrame
            if "ratings" in user_ratings:
                user_items = []
                for item_id, rating in user_ratings["ratings"].items():
                    user_items.append((user_id, int(item_id), float(rating)))
                
                if user_items:
                    user_ratings_df = spark.createDataFrame(
                        user_items, 
                        ["user", "item", "rating"]
                    )
                    # Cập nhật ratings cho người dùng này
                    self.update_user_ratings(user_ratings_df)
        
        elif hasattr(user_ratings, "select"):
            # Nếu là Spark DataFrame
            try:
                user_id = user_ratings.select("user").first()[0]
                self.update_user_ratings(user_ratings)
            except:
                pass
        
        # Nếu không xác định được user_id, trả về DataFrame rỗng
        if user_id is None or user_id not in self.valid_users:
            print(f"User không hợp lệ hoặc không có trong danh sách valid users")
            return spark.createDataFrame([], schema=StructType([
                StructField("user", IntegerType(), True),
                StructField("item", IntegerType(), True),
                StructField("predicted_rating", DoubleType(), True)
            ]))
        
        # Lấy N người dùng tương tự nhất
        top_similar_users = self.user_similarity_matrix \
            .filter(col("user_i") == user_id) \
            .filter(col("user_j") != user_id) \
            .orderBy(col("pearson").desc()) \
            .limit(self.N)
        
        # Lấy danh sách item đã được đánh giá bởi người dùng
        rated_items = self.normalized_data \
            .filter(col("user") == user_id) \
            .select("item") \
            .rdd.flatMap(lambda x: x).collect()
        
        # Lấy ratings của những người dùng tương tự cho các item chưa được đánh giá
        similar_users_ratings = self.normalized_data \
            .filter(col("user").isin([row["user_j"] for row in top_similar_users.collect()])) \
            .filter(~col("item").isin(rated_items))
        
        # Join với thông tin độ tương đồng
        joined_ratings = similar_users_ratings.join(
            top_similar_users,
            (similar_users_ratings["user"] == top_similar_users["user_j"]),
            "inner"
        )
        
        # Tính weighted normalized ratings
        weighted_ratings = joined_ratings.select(
            col("item"),
            (col("normalized_rating") * col("pearson")).alias("weighted_normalized_rating"),
            col("pearson").alias("similarity"),
            col("avg_rating").alias("avg_rating")
        )
        
        # Nhóm theo item và tính normalized rating dự đoán
        predictions_normalized = weighted_ratings.groupBy("item") \
            .agg(
                sum("weighted_normalized_rating").alias("sum_weighted_ratings"),
                sum("similarity").alias("sum_similarities"),
                avg("avg_rating").alias("avg_item_avg_ratings")
            ) \
            .filter(col("sum_similarities") > 0) \
            .withColumn(
                "normalized_predicted_rating", 
                col("sum_weighted_ratings") / col("sum_similarities")
            )
        
        # Lấy rating trung bình của user hiện tại
        user_avg = self.user_avg_ratings.filter(col("user") == user_id).select("avg_rating").first()
        user_avg_rating = user_avg["avg_rating"] if user_avg else 0
        
        # Tính predicted rating bằng cách thêm rating trung bình của user
        predictions = predictions_normalized \
            .withColumn(
                "predicted_rating",
                col("normalized_predicted_rating") + lit(user_avg_rating)
            ) \
            .withColumn("user", lit(user_id)) \
            .select("user", "item", "predicted_rating") \
            .orderBy(col("predicted_rating").desc()) \
            .limit(num_recommendations)
        
        return predictions

    def update_user_ratings(self, user_ratings_df):
        """
        Cập nhật ratings của người dùng trong dataset
        
        Tham số:
        - user_ratings_df: DataFrame chứa ratings mới
        """
        # Lấy user_id từ DataFrame
        user_id = user_ratings_df.select("user").first()[0]
        
        # Xóa ratings hiện tại của user
        self.data = self.data.filter(col("user") != user_id)
        
        # Thêm ratings mới vào
        self.data = self.data.union(user_ratings_df)
        
        # Cập nhật rating trung bình
        self._calculate_user_avg_ratings()
        
        # Cập nhật dữ liệu chuẩn hóa
        self._normalize_ratings()
        
        # Cập nhật ma trận user-item
        self._build_user_item_matrix()
        
        # Cập nhật ma trận tương đồng
        self._calculate_user_similarity()
        
        # Thêm user vào valid_users nếu chưa có
        if user_id not in self.valid_users:
            self.valid_users.append(user_id)

    def handle_cold_start(self, user_id, num_recommendations=10):
        """
        Xử lý trường hợp người dùng mới không có đủ dữ liệu đánh giá
        
        Tham số:
        - user_id: ID của người dùng
        - num_recommendations: Số lượng sản phẩm cần gợi ý
        
        Trả về:
        - DataFrame (PySpark) với các sản phẩm phổ biến nhất
        """
        # Lấy danh sách item đã được đánh giá bởi người dùng
        rated_items = []
        if user_id in self.valid_users:
            rated_items = self.data \
                .filter(col("user") == user_id) \
                .select("item") \
                .rdd.flatMap(lambda x: x).collect()
        
        # Tìm các sản phẩm phổ biến nhất (được đánh giá cao nhất)
        popular_items = self.data \
            .groupBy("item") \
            .agg(
                count("rating").alias("count"),
                avg("rating").alias("avg_rating")
            ) \
            .filter(~col("item").isin(rated_items)) \
            .orderBy(col("avg_rating").desc(), col("count").desc()) \
            .limit(num_recommendations) \
            .withColumn("user", lit(user_id)) \
            .withColumn("predicted_rating", col("avg_rating")) \
            .select("user", "item", "predicted_rating")
            
        return popular_items

def calculate_rmse(actual_df, predicted_df):
    """
    Tính RMSE từ hai DataFrame
    
    Tham số:
    - actual_df: DataFrame chứa ratings thực tế
    - predicted_df: DataFrame chứa ratings dự đoán
    
    Trả về:
    - RMSE
    """
    # Join hai DataFrame
    joined_df = actual_df.join(
        predicted_df,
        ["user", "item"]
    )
    
    # Tính bình phương sai số
    joined_df = joined_df.withColumn(
        "squared_error",
        (col("rating") - col("predicted_rating")) * (col("rating") - col("predicted_rating"))
    )
    
    # Tính RMSE
    mse = joined_df.agg(avg("squared_error")).collect()[0][0]
    rmse = math.sqrt(mse) if mse is not None else None
    
    return rmse

def main():
    # Load dataset và in thông tin
    data = spark.read.csv("ratings2k.csv", header=True, schema=schema)
    
    print(f"Cấu trúc của dataset: ({data.count()}, {len(data.columns)})")
    print(f"Các trường của dataset: {data.columns}")
    
    # Chia dữ liệu thành training và test sets với tỷ lệ 8:2 sử dụng PySpark
    # Thêm cột ngẫu nhiên để chia dữ liệu
    data_with_rand = data.withColumn("rand", rand())
    
    # Chia dữ liệu dựa trên cột ngẫu nhiên
    train_data = data_with_rand.filter(col("rand") < 0.8).drop("rand")
    test_data = data_with_rand.filter(col("rand") >= 0.8).drop("rand")
    
    print(f"Training set size: {train_data.count()}")
    print(f"Test set size: {test_data.count()}")
    
    # Đánh giá thuật toán với N trong khoảng [2, 16]
    print("\nĐánh giá thuật toán với N từ 2 đến 16...")
    N_values = list(range(2, 17))
    rmse_values = []
    
    for N in N_values:
        print(f"\nN = {N}")
        
        # Khởi tạo model với training data
        cf_model = CollaborativeFilteringSpark(N, train_data)
        
        # Lấy danh sách người dùng trong test set
        test_users = test_data.select("user").distinct().rdd.flatMap(lambda x: x).collect()
        
        # Lưu trữ tất cả predictions cho việc tính RMSE
        all_predictions_list = []
        
        for user in test_users:
            # Bỏ qua users có constant ratings
            if user not in cf_model.valid_users:
                continue
                
            # Lấy tất cả ratings của user trong test set
            user_test_data = test_data.filter(col("user") == user)
            
            # Dự đoán cho các item trong test set
            user_test_items = user_test_data.select("item").rdd.flatMap(lambda x: x).collect()
            if not user_test_items:
                continue
                
            # Lấy các ratings hiện tại của user từ training set
            user_train_data = train_data.filter(col("user") == user)
            
            # Dự đoán cho user này
            predictions = cf_model.predict(user_train_data, num_recommendations=len(user_test_items))
            
            # Lấy các predictions cho các item trong test set
            valid_predictions = predictions.filter(col("item").isin(user_test_items))
            
            if valid_predictions.count() > 0:
                all_predictions_list.append(valid_predictions)
        
        # Tính RMSE
        if all_predictions_list:
            # Gộp tất cả predictions lại
            all_predictions_df = all_predictions_list[0]
            for i in range(1, len(all_predictions_list)):
                all_predictions_df = all_predictions_df.union(all_predictions_list[i])
            
            if all_predictions_df.count() > 0:
                # Join với test data để lấy ratings thực tế
                joined_preds = all_predictions_df.join(
                    test_data.select("user", "item", "rating"),
                    ["user", "item"],
                    "inner"
                )
                
                if joined_preds.count() > 0:
                    # Tính RMSE
                    rmse = calculate_rmse(
                        joined_preds.select("user", "item", "rating"),
                        joined_preds.select("user", "item", "predicted_rating")
                    )
                    
                    rmse_values.append(rmse)
                    print(f"RMSE: {rmse:.4f}")
                else:
                    rmse_values.append(None)
                    print("Không có dự đoán trùng với test set")
            else:
                rmse_values.append(None)
                print("Không có giá trị dự đoán hợp lệ")
        else:
            rmse_values.append(None)
            print("Không dự đoán được")
    
    # Vẽ biểu đồ cho RMSE values bằng matplotlib
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 6))
    valid_indices = [i for i, rmse in enumerate(rmse_values) if rmse is not None]
    valid_N = [N_values[i] for i in valid_indices]
    valid_rmse = [rmse_values[i] for i in valid_indices]
    
    bars = plt.bar(valid_N, valid_rmse, color='skyblue', edgecolor='black')
    plt.xlabel('N (Số lượng người dùng cần xét)')
    plt.ylabel('RMSE')
    plt.title('Biểu đồ so sánh RMSE với các giá trị N khác nhau')
    plt.xticks(valid_N)
    plt.grid(axis='y', alpha=0.3)
    
    for bar, rmse in zip(bars, valid_rmse):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
                f'{rmse:.4f}', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()
    
    # Tìm N với RMSE tốt nhất
    if valid_rmse:
        best_index = valid_rmse.index(min(valid_rmse))
        best_N = valid_N[best_index]
        best_rmse = valid_rmse[best_index]
        print(f"\nVới N = {best_N} có giá trị RMSE tốt nhất = {best_rmse:.4f}")
    
        # Tạo model cuối cùng với toàn bộ dataset để deploy
        final_model = CollaborativeFilteringSpark(best_N, data)
        return final_model
    else:
        print("\nKhông thể xác định N tốt nhất do không có RMSE hợp lệ")
        return None

if __name__ == "__main__":
    model = main()
    
    # Đóng Spark session khi hoàn thành
    spark.stop()

Cấu trúc của dataset: (2365, 4)
Các trường của dataset: ['index', 'user', 'item', 'rating']
Training set size: 1894
Test set size: 471

Đánh giá thuật toán với N từ 2 đến 16...

N = 2


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 15) (KakaSheesh executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more
