#Step 1: Import Libraries and Initialize Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, when, avg, count
from pyspark.sql.types import StringType, DoubleType, IntegerType
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import classification_report

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Telecom Churn Analysis") \
    .getOrCreate()

# Step 2: Define Helper Functions

In [None]:
def null_prcnt_spark(df):
    """
    Calculates the percentage of missing values in each column of a PySpark DataFrame.
    """
    total_count = df.count()
    null_percent = {
        col_name: (df.filter(col(col_name).isNull()).count() / total_count) * 100
        for col_name in df.columns
    }
    return null_percent

# Step 3: Read and Understand the Data

In [None]:
# Replace with the actual path of your dataset
df = spark.read.csv('/content/drive/MyDrive/telecom_churn_data (1).csv', header=True, inferSchema=True)

# Display schema and initial records
df.printSchema()
df.show(5)

# Check missing values
missing_vals = null_prcnt_spark(df)
print("Missing Values:\n", missing_vals)

# Step 4: Data Cleaning and Manipulation




In [None]:
# Remove whitespace from column names
for c in df.columns:
    if ' ' in c:  # Check if column name contains whitespace
        df = df.withColumnRenamed(c, c.strip())

# Drop duplicates
df = df.dropDuplicates()

# Rename columns
df = df.withColumnRenamed('last_day_rch_amt_', 'last_day_rech_amt_')

# Derive high-value customers
high_value_threshold = 1000
df = df.withColumnRenamed('total_rech_amt_6', 'monthly_revenue')

# Create high_value column based on monthly_revenue
df = df.withColumn('high_value', when(col('monthly_revenue') >= high_value_threshold, 1).otherwise(0))

# Filter high-value customers
df_high_value = df.filter(col('high_value') == 1)

# Create churn column
df = df.withColumn('churn', when(col('total_rech_amt_9') == 0, 1).otherwise(0))

# Drop churn-phase columns (adjust pattern as needed)
churn_phase_cols = [col_name for col_name in df.columns if 'month9' in col_name]
df = df.drop(*churn_phase_cols)

# Step 5: Exploratory Data Analysis (EDA)

In [None]:
# Convert to Pandas for visualizations
df_pandas = df.toPandas()

# Select only numeric columns for correlation
numeric_df = df_pandas.select_dtypes(include=['number'])

# Plot monthly revenue distribution
plt.figure(figsize=(10, 6))
sns.histplot(df_pandas['monthly_revenue'], kde=True)
plt.title('Monthly Revenue Distribution')
plt.show()

# Box plot for churn vs monthly revenue
plt.figure(figsize=(10, 6))
sns.kdeplot(data=df_pandas, x='monthly_revenue', hue='churn', fill=True, common_norm=False, alpha=0.5)
plt.title('Density Plot: Monthly Revenue vs Churn')
plt.xlabel('Monthly Revenue')
plt.ylabel('Density')
plt.show()

# Correlation heatmap
plt.figure(figsize=(14, 10))
corr = numeric_df.corr()  # Calculate correlation on numeric columns only
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title('Correlation Heatmap')
plt.show()

# Step 6: Feature Combination and Modeling

In [None]:
# Combine and average features for months 6 and 7
df = df.withColumn('avg_revenue_6_7', (col('monthly_revenue') + col('total_rech_amt_7')) / 2)

# Feature selection
feature_columns = [
    col_name
    for col_name in df.columns
    if col_name not in ['churn', 'high_value'] and df.schema[col_name].dataType != StringType()
]

# Impute missing values with 0 or mean
for column in feature_columns:
    df = df.fillna(0, subset=[column])  # Option 1: Fill with 0 (imputation can be replaced with mean if needed)

# VectorAssembler to combine features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
df = assembler.transform(df)

# Split data into training and testing sets (70/30)
train, test = df.randomSplit([0.7, 0.3], seed=42)

# Logistic Regression model
lr = LogisticRegression(labelCol="churn", featuresCol="features", maxIter=100)
model = lr.fit(train)

# Predictions
predictions = model.transform(test)

# Evaluation: Accuracy
evaluator = BinaryClassificationEvaluator(labelCol="churn", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")

# Optional: Classification report (convert to Pandas for sklearn classification report)
predictions_pandas = predictions.select('churn', 'prediction').toPandas()
print(classification_report(predictions_pandas['churn'], predictions_pandas['prediction']))

# Optional: Drop the 'features' column after model evaluation if needed
df = df.drop('features')
