In [29]:
from pyspark.sql import SparkSession
from pymongo import MongoClient
import pandas as pd
import os

In [30]:
os.environ["PYSPARK_PYTHON"] = os.path.join(os.getcwd(), "venv", "Scripts", "python.exe")

In [31]:
# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("Churn Prediction") \
    .config("spark.mongodb.input.uri", "mongodb+srv://nguyenminhy7714:minhy112@cluster0.xxkrzas.mongodb.net/telco_churn") \
    .config("spark.mongodb.output.uri", "mongodb+srv://nguyenminhy7714:minhy112@cluster0.xxkrzas.mongodb.net/telco_churn") \
    .getOrCreate()

# Kết nối MongoDB bằng pymongo
client = MongoClient("mongodb+srv://nguyenminhy7714:minhy112@cluster0.xxkrzas.mongodb.net")
db = client["telco_churn"]  

In [1]:
# Lấy dữ liệu từ các collection và gộp bằng Aggregation Pipeline
pipeline = [
    {"$lookup": {"from": "contract_info", "localField": "customerID", "foreignField": "customerID", "as": "contract"}},
    {"$lookup": {"from": "internet_service", "localField": "customerID", "foreignField": "customerID", "as": "internet"}},
    {"$unwind": "$contract"},
    {"$unwind": "$internet"},
    {"$project": {
        "customerID": 1,
        "PaymentMethod": "$contract.PaymentMethod",
        "InternetService": "$internet.InternetService",
        "Churn": 1
    }}
]

data = list(db.churn_info.aggregate(pipeline))
df_pandas = pd.DataFrame(data)
df_pandas = df_pandas.drop("_id", axis=1)

NameError: name 'db' is not defined

In [37]:
df_pandas.head()

Unnamed: 0,customerID,Churn,PaymentMethod,InternetService
0,7590-VHVEG,No,Electronic check,DSL
1,5575-GNVDE,No,Mailed check,DSL
2,3668-QPYBK,Yes,Mailed check,DSL
3,7795-CFOCW,No,Bank transfer (automatic),DSL
4,9237-HQITU,Yes,Electronic check,Fiber optic


In [38]:
# Chuyển sang PySpark DataFrame
df = spark.createDataFrame(df_pandas)

In [39]:
df.printSchema()
df.show(5)

root
 |-- customerID: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- InternetService: string (nullable = true)

+----------+-----+--------------------+---------------+
|customerID|Churn|       PaymentMethod|InternetService|
+----------+-----+--------------------+---------------+
|7590-VHVEG|   No|    Electronic check|            DSL|
|5575-GNVDE|   No|        Mailed check|            DSL|
|3668-QPYBK|  Yes|        Mailed check|            DSL|
|7795-CFOCW|   No|Bank transfer (au...|            DSL|
|9237-HQITU|  Yes|    Electronic check|    Fiber optic|
+----------+-----+--------------------+---------------+
only showing top 5 rows



In [40]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from imblearn.over_sampling import SMOTE

In [41]:
# Chuyển đổi dữ liệu phân loại
indexers = [
    StringIndexer(inputCol="PaymentMethod", outputCol="PaymentMethodIdx"),
    StringIndexer(inputCol="InternetService", outputCol="InternetServiceIdx"),
    StringIndexer(inputCol="Churn", outputCol="ChurnIdx")
]

for indexer in indexers:
    df = indexer.fit(df).transform(df)

In [42]:
# OneHotEncoding
encoders = [
    OneHotEncoder(inputCols=["PaymentMethodIdx"], outputCols=["PaymentMethodVec"]),
    OneHotEncoder(inputCols=["InternetServiceIdx"], outputCols=["InternetServiceVec"])
]

for encoder in encoders:
    df = encoder.fit(df).transform(df)

In [43]:
# Tạo vector đặc trưng
assembler = VectorAssembler(
    inputCols=["PaymentMethodVec", "InternetServiceVec"],
    outputCol="features"
)
df = assembler.transform(df)

In [44]:
# Chuẩn hóa dữ liệu
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
df = scaler.fit(df).transform(df)

In [50]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import StructType, StructField, DoubleType

In [51]:
# Hàm chuyển đổi array thành VectorUDT
array_to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())

In [52]:
# Xử lý mất cân bằng dữ liệu (nếu cần)
# Chuyển về Pandas để dùng SMOTE
pandas_df = df.select("scaledFeatures", "ChurnIdx").toPandas()
X = pandas_df["scaledFeatures"].apply(lambda x: x.toArray()).tolist()
y = pandas_df["ChurnIdx"]

smote = SMOTE()
X_res, y_res = smote.fit_resample(X, y)

In [53]:
# Chuyển lại thành PySpark DataFrame
resampled_pd = pd.DataFrame({"features": list(X_res), "label": y_res})
resampled_df = spark.createDataFrame(resampled_pd)

In [54]:
# Chuyển cột features từ ArrayType sang VectorUDT
resampled_df = resampled_df.withColumn("features", array_to_vector("features"))

In [60]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col, when

In [56]:
# Chia dữ liệu
train_data, test_data = resampled_df.randomSplit([0.8, 0.2], seed=42)

In [57]:
# Huấn luyện mô hình Random Forest
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    featureSubsetStrategy="auto"
)
rf_model = rf.fit(train_data)

In [67]:
# Lưu mô hình vào đĩa
model_path = "rf_churn_model"
rf_model.write().overwrite().save(model_path)
print(f"Mô hình đã được lưu tại: {model_path}")

Mô hình đã được lưu tại: rf_churn_model


In [70]:
# Chuyển train_data từ PySpark DataFrame sang Pandas DataFrame
train_pandas = train_data.toPandas()

# Tách X_train (đặc trưng) và y_train (nhãn)
X_train = train_pandas["features"].apply(lambda x: x.toArray()).tolist()  # Chuyển Vector thành list
y_train = train_pandas["label"].tolist()  # Nhãn đã ở dạng số (0 hoặc 1)

# Tương tự với test_data (nếu cần)
test_pandas = test_data.toPandas()
X_test = test_pandas["features"].apply(lambda x: x.toArray()).tolist()
y_test = test_pandas["label"].tolist()

In [71]:
import numpy as np

# Chuyển X_train và y_train thành mảng NumPy
X_train = np.array(X_train)
y_train = np.array(y_train)

# Tương tự với X_test và y_test
X_test = np.array(X_test)
y_test = np.array(y_test)

# Kiểm tra kích thước
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

X_train shape: (8290, 5)
y_train shape: (8290,)
X_test shape: (2058, 5)
y_test shape: (2058,)


In [72]:
from sklearn.ensemble import RandomForestClassifier
import joblib

# Huấn luyện mô hình
model = RandomForestClassifier(n_estimators=100, max_depth=10)
model.fit(X_train, y_train)

# Lưu mô hình
joblib.dump(model, "rf_model.pkl")

['rf_model.pkl']

In [58]:
# Dự đoán trên tập kiểm tra
predictions = rf_model.transform(test_data)

In [61]:
# Chuyển đổi kết quả dự đoán về nhãn Yes/No
predictions = predictions.withColumn(
    "Churn_Predicted",
    when(col("prediction") == 0, "No").otherwise("Yes")
)

In [64]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [63]:
# Đánh giá các chỉ số
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")

Accuracy: 0.684645286686103
Precision: 0.6995299658651501
Recall: 0.684645286686103
F1-Score: 0.6802746515246025


In [65]:
# Tính Confusion Matrix và Classification Report
# Chuyển đổi predictions thành RDD để sử dụng MulticlassMetrics
prediction_and_labels = predictions.select("prediction", "label") \
    .rdd.map(lambda row: (float(row["prediction"]), float(row["label"])))

# Khởi tạo MulticlassMetrics
metrics = MulticlassMetrics(prediction_and_labels)

# In Confusion Matrix
print("\nConfusion Matrix:")
confusion_matrix = metrics.confusionMatrix().toArray()
print(confusion_matrix)


Confusion Matrix:
[[595. 457.]
 [192. 814.]]


In [66]:
# Tính toán thủ công các chỉ số cho từng lớp (classification report)
labels = [0.0, 1.0]  # Các nhãn: 0 (No), 1 (Yes)
print("\nClassification Report:")
print("Class\tPrecision\tRecall\t\tF1-Score")
for label in labels:
    precision = metrics.precision(label)
    recall = metrics.recall(label)
    f1 = metrics.fMeasure(label)
    print(f"{label}\t{precision:.4f}\t\t{recall:.4f}\t\t{f1:.4f}")

# Tổng số mẫu trong từng lớp (support)
support_0 = predictions.filter(col("label") == 0).count()
support_1 = predictions.filter(col("label") == 1).count()
print(f"\nSupport: Class 0 (No): {support_0}, Class 1 (Yes): {support_1}")


Classification Report:
Class	Precision	Recall		F1-Score
0.0	0.7560		0.5656		0.6471
1.0	0.6404		0.8091		0.7150

Support: Class 0 (No): 1052, Class 1 (Yes): 1006
