In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
from pyspark.sql import functions as F


spark = SparkSession.builder.appName("ChurnFix").getOrCreate()

# Đọc file gốc bạn đã upload
df = spark.read.option("header", True).option("inferSchema", True).csv(r"C:\Users\nguye\CODE\TimeSeries\BTL\Data\2019_Data.csv")

# Đảm bảo cột churn là số
df = df.withColumn("churn", col("churn").cast("int"))


In [None]:
# Lấy top 2000 user_id xuất hiện nhiều nhất (giảm dùng collect)
top_users_df = df.groupBy("user_id") \
                 .count() \
                 .orderBy(F.desc("count")) \
                 .limit(2000)

# Hiển thị kiểm tra nhanh (không collect)
top_users_df.show()

# Lấy danh sách user_id thành list (ít tốn bộ nhớ hơn)
top_users = [row['user_id'] for row in top_users_df.collect()]


+---------+-----+
|  user_id|count|
+---------+-----+
|568778435|22929|
|569335945|14810|
|512475445|13547|
|512365995|10055|
|568818636| 6171|
|514649263| 5779|
|512505687| 5568|
|559249905| 5394|
|512388419| 5225|
|532769022| 5014|
|568793129| 4771|
|513021392| 4461|
|537873067| 4391|
|568804062| 4257|
|544123737| 4161|
|573277455| 4128|
|512401084| 4081|
|512786243| 4034|
|546159478| 4029|
|516308435| 3750|
+---------+-----+
only showing top 20 rows



In [20]:
# 5. Lọc dữ liệu chỉ chứa các user này
filtered_df = df.filter(col("user_id").isin(top_users))

# 6. Kiểm tra phân phối churn (tùy chọn)
filtered_df.groupBy("churn").count().show()


+-----+-------+
|churn|  count|
+-----+-------+
|    1|1215607|
|    0|1271593|
+-----+-------+



In [21]:
# Chuyển sang pandas
filtered_df = filtered_df.toPandas()

In [22]:
filtered_df.to_csv("2019_Data_Cleaned.csv", index=False)