In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import joblib
import ast
from datetime import datetime

StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 3, Finished, Available, Finished)

In [2]:
# --- Load saved models ---
kmeans = joblib.load('/lakehouse/default/Files/model_dir/kmeans_model.pkl')
scaler = joblib.load('/lakehouse/default/Files/model_dir/scaler.pkl')
pca = joblib.load('/lakehouse/default/Files/model_dir/pca.pkl')

# --- Load feature-label mapping ---
with open('/lakehouse/default/Files/model_dir/feature_label_mapping.txt', 'r') as file:
    feature_label_mapping = ast.literal_eval(file.read())



StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 4, Finished, Available, Finished)

In [3]:
# --- Load new data ---
data = df = pd.read_csv("/lakehouse/default/Files/raw/call_records.csv")

data.shape

StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 5, Finished, Available, Finished)

(600, 66)

In [4]:
# --- Keep retail columns for final output ---
retail_cols = ['msisdn', 'activation_date', 'cell_id', 'Date_of_birth', 'Value_band','tenure']
df_retail = data[retail_cols].copy()

# --- Data Preprocessing (same as training) ---
df_cleaned = data.drop(columns=['momo_bundle', 'int_inc_dur', 'total_freebiz','tenure','age','bonus_duration'], errors='ignore')

df_cleaned['activation_date'] = pd.to_datetime(df_cleaned['activation_date'], errors='coerce')
df_cleaned['tenure_years'] = (pd.to_datetime('today') - df_cleaned['activation_date']).dt.days / 365

df_cleaned['Date_of_birth'] = pd.to_datetime(df_cleaned['Date_of_birth'], errors='coerce')
df_cleaned['age'] = (pd.to_datetime('today') - df_cleaned['Date_of_birth']).dt.days / 365

df_cleaned['service_class'] = df_cleaned['service_class'].astype('category').cat.codes

cols_to_drop = ['msisdn', 'activation_date', 'cell_id', 'Date_of_birth', 'Value_band']
df_cleaned = df_cleaned.drop(columns=cols_to_drop, errors='ignore')

df_cleaned = df_cleaned.fillna(df_cleaned.median())

df_cleaned['Revenue_per_Minute'] = df_cleaned['voice_spent'] / (df_cleaned['out_call_duration_billed'] + 1)
df_cleaned['Recharge_Frequency'] = df_cleaned['total_recharge_amount'] / (df_cleaned['total_recharge_count'] + 1)



StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 6, Finished, Available, Finished)

In [5]:
# --- Apply loaded scaler and PCA ---
df_scaled = scaler.transform(df_cleaned)
df_pca = pca.transform(df_scaled)

# --- Predict Clusters using saved model ---
df_cleaned['Cluster'] = kmeans.predict(df_pca)



StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 7, Finished, Available, Finished)

In [6]:
# --- Cluster Labeling ---
def get_top_features(cluster_centers_df, n=3):
    top_features = {}
    for i, row in cluster_centers_df.iterrows():
        top_3 = row.abs().nlargest(n).index.tolist()
        top_features[i] = tuple(top_3)
    return top_features

def assign_dynamic_labels(top_features):
    cluster_labels = {}
    for cluster, features in top_features.items():
        label = feature_label_mapping.get(features, "Other")
        cluster_labels[cluster] = label
    return cluster_labels



StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 8, Finished, Available, Finished)

In [7]:
# --- Identify dominant features from saved cluster centers ---
cluster_centers = pca.inverse_transform(kmeans.cluster_centers_)
cluster_centers_df = pd.DataFrame(cluster_centers, columns=df_cleaned.drop(columns=['Cluster']).columns)

top_features = get_top_features(cluster_centers_df)
cluster_labels = assign_dynamic_labels(top_features)

df_cleaned['Segment_Label'] = df_cleaned['Cluster'].map(cluster_labels)



StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 9, Finished, Available, Finished)

In [8]:
cluster_labels

StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 10, Finished, Available, Finished)

{0: 'Call Heavy User',
 1: 'Subscription Heavy',
 2: 'Peak-Time Receiver',
 3: 'Spend-Driven Data User',
 4: 'Unbilled Texter'}

In [10]:
# --- Final output with retail columns ---
df_output = pd.concat([df_retail.reset_index(drop=True), df_cleaned.reset_index(drop=True)], axis=1)

# --- Add run_date column ---
run_date = datetime.today().strftime('%Y-%m-%d')
df_output['run_date'] = run_date

# --- Convert to Spark DataFrame ---
spark_df = spark.createDataFrame(df_output)

# --- Save as Delta Table ---
name = 'segmentation'
spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(name)


StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 12, Finished, Available, Finished)

In [11]:
print(df_output[['msisdn','tenure', 'Cluster', 'Segment_Label','run_date']].head())

StatementMeta(, 3dbba8b8-dcd5-403f-9931-1ed6c8f13635, 13, Finished, Available, Finished)

             msisdn  tenure  Cluster    Segment_Label    run_date
0  02F058CB6F95494A     257        0  Call Heavy User  2025-04-07
1  79D5D6E4311059B6     168        0  Call Heavy User  2025-04-07
2  6F37F4CAE74AEA52      36        0  Call Heavy User  2025-04-07
3  A08C744676EDCEE0     106        0  Call Heavy User  2025-04-07
4  0FBB2E2CACD68F73      40        0  Call Heavy User  2025-04-07
