# Analysis

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

In [None]:
# Încărcarea datelor
df = spark.read.csv("ChurnModelling.csv", header=True, inferSchema=True)
df.show()

In [None]:
df.printSchema()

In [None]:
# Eliminarea valorilor duplicate
df = df.dropDuplicates()

In [None]:
from pyspark.sql.functions import col, when, isnan, count

# Verificarea valorilor lipsă
df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

In [None]:
# Count of Exited values
df.groupBy("Exited").count().show()

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Vectorizarea coloanelor numerice
num_cols = ["CreditScore", "Age", "Tenure", "Balance", "NumOfProducts", "EstimatedSalary"]
vec_assembler = VectorAssembler(inputCols=num_cols, outputCol="features")
df_vec = vec_assembler.transform(df)

# Calculul corelației Pearson
cor_matrix = Correlation.corr(df_vec, "features", method="pearson").head()[0]

# Conversia matricei DenseMatrix într-un array numpy
corr_array = cor_matrix.toArray()

# Crearea unui DataFrame Pandas frumos cu etichete
corr_df = pd.DataFrame(corr_array, index=num_cols, columns=num_cols)

# Vizualizare
plt.figure(figsize=(10, 8))
sns.heatmap(corr_df, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Matricea de corelație (Pearson)")
plt.tight_layout()
plt.show()

In [None]:
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, StandardScaler
)

# Codificare variabile categorice
indexer = StringIndexer(inputCols=["Geography", "Gender"], outputCols=["GeographyIndex", "GenderIndex"])
df_indexed = indexer.fit(df).transform(df)

encoder = OneHotEncoder(inputCols=["GeographyIndex", "GenderIndex"],
                        outputCols=["GeographyVec", "GenderVec"])
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_encoded.show()

In [None]:
# Feature Engineering
df_fe = df_encoded.withColumn("BalanceToSalaryRatio", col("Balance") / (col("EstimatedSalary") + 1))
df_fe = df_fe.withColumn("IsSenior", when(col("Age") > 60, 1).otherwise(0))
df_fe = df_fe.withColumn("EngagementScore", col("NumOfProducts") + col("IsActiveMember") + (1 - col("HasCrCard")))
df_fe = df_fe.withColumn("HasZeroBalance", when(col("Balance") == 0, 1).otherwise(0))

In [None]:
from pyspark.sql.functions import col, when

# 1. Sold mediu per produs (evităm împărțirea la zero)
df = df.withColumn(
    "AverageBalancePerProduct",
    when(col("NumOfProducts") == 0, 0)
    .otherwise(col("Balance") / col("NumOfProducts"))
)

# 2. Indicator de client loial și activ
df = df.withColumn(
    "LoyalActiveCustomer",
    col("Tenure") * col("IsActiveMember")
)

# Verificăm noile coloane
df.select("Balance", "NumOfProducts", "AverageBalancePerProduct", "Tenure", "IsActiveMember", "LoyalActiveCustomer").show(5)

In [None]:
# Standardizare variabile numerice
vec_features = VectorAssembler(
    inputCols=["CreditScore", "BalanceToSalaryRatio", "EstimatedSalary", "Tenure", "EngagementScore"],
    outputCol="unscaledFeatures"
)
df_unscaled = vec_features.transform(df_fe)

scaler = StandardScaler(inputCol="unscaledFeatures", outputCol="scaledFeatures")
scaler_model = scaler.fit(df_unscaled)
df_scaled = scaler_model.transform(df_unscaled)

In [None]:
# Împărțirea în seturi de antrenare și testare
train_data, test_data = df_scaled.randomSplit([0.7, 0.3], seed=42)
train_data.show()