## Important Libraries 

In [19]:
import csv
import random
from datetime import date, timedelta
import pandas as pd
import time
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType,DoubleType
from pyspark.sql.functions import col
from datetime import datetime, timedelta
from pyspark.sql.functions import current_date, date_sub
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, sum as spark_sum, isnan, when
from pyspark.sql.functions import to_timestamp,to_date,count
from pyspark.sql.functions import desc
from pyspark.sql.functions import desc, sum as sum_agg
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, datediff, sum as spark_sum, count, max as spark_max
from datetime import date
import scipy.stats as stat
import pylab  #,clust_plot
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import sys
import yellowbrick
import base64


In [11]:
#Installing libraries as i was working on a new python virtual environment 

# #pip install prettytable
# !pip install yellowbrick
# #!pip install pandas
# #!pip install findspark
# #!pip install seaborn
# !pip install scipy
#!pip install --upgrade pandas
#!pip install --upgrade --force-reinstall pandas
#!pip install dash dash-bootstrap-components pandas plotly pyspark

## Dash Related Configuration for Visualization

In [None]:
## dash related imports

import dash
# import dash_core_components as dcc
# import dash_html_components as html
from dash.dependencies import Input, Output
from dash import Dash, dcc, html

#import pandas as pd
import plotly.express as px
#from pyspark.sql import SparkSession

## Visualization using Dash

In [None]:
# Sets up a Dash app with interactive slides to visualize customer segmentation using Spark.
# The app's layout consists of a header and a slider to switch between plots.
# The data visualizations are displayed in the callback's updated picture, which is dependent on the slider's position.
# I have made modular functions which contains the images generated from the plots and integrated with the das app by using interactive slides.  


app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1('Visualization of Customer Segmentation using Spark'),
    html.Div(id='slides-container', style={'height': '90vh', 'background-color': 'white'}),
    dcc.Slider(
        id='slide-slider',
        min=0,
        max=6,
        step=1,
        value=0,
        marks={i: f'Slide {i+1}' for i in range(7)}  # More Pythonic way to define marks
    ),
])

@app.callback(
    Output('slides-container', 'children'),
    [Input('slide-slider', 'value')]
)
def update_slide(value):
    if value == 0:
        image_path = 'missing_values_plot.png'
        return html.Img(src='data:image/png;base64,{}'.format(base64.b64encode(open(image_path, 'rb').read()).decode()), style={'width': '100%', 'height': '100%', 'object-fit': 'contain'})
    elif value in range(1, 7):
        feature = ['Recency', 'Recency_Boxcox', 'Frequency', 'Frequency_log', 'Monetary', 'Monetary_log'][value-1]
        image_path = generate_plot(rfm_df_pd, feature)
        return html.Img(src='data:image/png;base64,{}'.format(base64.b64encode(open(image_path, 'rb').read()).decode()), style={'width': '100%', 'height': '100%', 'object-fit': 'contain'})
    else:
        return html.H2('Slide not found')

def generate_plot(df, feature):
    binf(df, feature)
    plot_path = f'{feature}_plot.png'
    return plot_path

def binf(df, feature):
    binned_data = pd.cut(df[feature], bins=4)
    frequency = binned_data.value_counts()
    sorted_frequency = frequency.sort_index(ascending=False)
    plt.figure(figsize=(10, 6), dpi=100)  
    sorted_frequency.plot(kind='bar')
    plt.xlabel('Bins')
    plt.ylabel('Frequency')
    plt.title(f'Frequency of Data Points in Each Bin: {feature}')
    plt.xticks(rotation=45)  # Rotate x-axis labels to prevent overlap
    plt.tight_layout()  # Adjust the layout
    plt.savefig(f'{feature}_plot.png', dpi=100, bbox_inches='tight')  
    plt.close()

if __name__ == '__main__':
    app.run_server(debug=True)


In [None]:
!pwd

In [20]:
print("PySpark version:", pyspark.__version__)
print("Pandas version:", pd.__version__)
print("NumPy version:", np.__version__)
#print("Matplotlib version:", matplotlib.__version__)

# Now you can print the Matplotlib version without any NameError
print("Matplotlib version:", plt.matplotlib.__version__)
print("Seaborn version", sns.__version__)
!python --version


PySpark version: 3.3.0
Pandas version: 2.2.2
NumPy version: 1.24.3
Matplotlib version: 3.9.1
Seaborn version 0.13.2
Python 3.11.8


### Pyspark home directory 

In [16]:
spark_home = "spark_home_directory"


findspark.init(spark_home)

### Initialize SparkSession


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HadoopSparkIntegration") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [18]:
spark

## Testing the integration of Hadoop and Spark

In [21]:
# Writing data to a text file
text_data = "Hello, Hadoop and Spark integration!"
with open("temp_H.txt", "w") as file:
    file.write(text_data)


In [None]:
import subprocess

# Execute HDFS put command from Python
subprocess.run(["hdfs", "dfs", "-put", "temp_H.txt", "/test/hadoop_spark_test_H.txt"])


In [None]:
subprocess.run(["hdfs", "dfs", "-ls", "/test"])


In [24]:
result = subprocess.run(["hdfs", "dfs", "-cat", "/test/hadoop_spark_test_H.txt"], capture_output=True)
print(result.stdout.decode())





## Uploading data in hdfs

In [None]:
# List files in the root of HDFS
!hdfs dfs -ls /


### Creating a new directory in HDFS called project 

In [None]:
# Creating a new directory in hdfs for the project

!hdfs dfs -mkdir /project


In [None]:
# List files in the root of HDFS
!hdfs dfs -ls /

In [None]:
# Upload the CSV file from local system to the 'project' directory in HDFS
!hdfs dfs -put local_csv_path /project/testing_csv.csv

In [None]:
# List the contents of the 'project' directory to confirm the file is there
!hdfs dfs -ls /project

### The below commented out code contains how I faced issues while I tried using inferschema and eventually designed a custom_schema according to our data. 

In [30]:
# from pyspark.sql import SparkSession

# # Initialize SparkSession
# spark = SparkSession.builder \
#     .appName('Read CSV from HDFS Project Directory') \
#     .getOrCreate()

# # Read the CSV file from the new directory in HDFS with the fully specified URI
# # Ensure to include options for handling headers and schema inference
# df = spark.read.option("header", "true").option("inferSchema", "true") \
#     .csv("hdfs://localhost:9000/project/testing_csv.csv")

# # Show the first few rows to ensure it's loaded correctly
# df.show(20)


In [31]:
# df.printSchema()

In [32]:
## I will now try importing with my defined schmea 

In [33]:
## This schema has incorrect data schema

# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

# # Define the schema correctly
# custom_schema = StructType([
#     StructField("TransactionID", StringType(), True),  # Changed to StringType
#     StructField("CustomerID", StringType(), True),
#     StructField("CustomerDOB", DateType(), True),
#     StructField("CustGender", StringType(), True),
#     StructField("CustLocation", StringType(), True),
#     StructField("CustAccountBalance", DoubleType(), True),
#     StructField("TransactionDate", DateType(), True),
#     StructField("TransactionTime", IntegerType(), True),  # Assuming it's in HHMMSS format
#     StructField("TransactionAmount (INR)", DoubleType(), True),
# ])

# # Initialize SparkSession
# spark = SparkSession.builder \
#     .appName('Read CSV from HDFS Project Directory') \
#     .getOrCreate()

# # Read the CSV file using the schema defined and correct date format
# df = spark.read.format("csv") \
#     .option("header", "true") \
#     .option("dateFormat", "d/M/yy") \  
#     .schema(custom_schema) \
#     .load("hdfs://localhost:9000/project/testing_csv.csv")

# # Show the first few rows to ensure it's loaded correctly
# df.show()

# # Print the schema to verify the types are as expected
# df.printSchema()

In [34]:
# '''In this code chunk I have imported using my custom schema and initially faced issues with the date format and then
# used config("spark.sql.legacy.timeParserPolicy", "LEGACY") to help parse the two columns CustomerDOB and TransactionDate in the yyyy-MM-dd format'''

# #Custom Schema
# custom_schema = StructType([
#     StructField("TransactionID", StringType(), True),  
#     StructField("CustomerID", StringType(), True),
#     StructField("CustomerDOB", DateType(), True),
#     StructField("CustGender", StringType(), True),
#     StructField("CustLocation", StringType(), True),
#     StructField("CustAccountBalance", DoubleType(), True),
#     StructField("TransactionDate", DateType(), True),
#     StructField("TransactionTime", IntegerType(), True), 
#     StructField("TransactionAmount (INR)", DoubleType(), True),
# ])

# spark = SparkSession.builder \
#     .appName('Read CSV from HDFS Project Directory') \
#     .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
#     .getOrCreate()

# df = spark.read.format("csv") \
#     .option("header", "true") \
#     .option("dateFormat", "d/M/yy") \
#     .schema(custom_schema) \
#     .load("hdfs://localhost:9000/project/testing_csv.csv")

# df.show(20)

# df.printSchema()


In [None]:
# List files in the root of HDFS
!hdfs dfs -ls /

### Checking if we could load 100,000 records to hdfs.

In [None]:
## Loading the 100,000 records into hdfs and testing it for our project

# Upload the CSV file from local system to the 'project' directory in HDFS
!hdfs dfs -put file_path_to/testing_hund.csv /project/testing_hund.csv

In [None]:
# List the contents of the 'project' directory to confirm the file is there
!hdfs dfs -ls /project

In [None]:
## using thes same configuartions and schema for importing the 100,000 records

'''In this code chunk I have imported using my custom schema and initially faced issues with the date format and then
used config("spark.sql.legacy.timeParserPolicy", "LEGACY") to help parse the two columns CustomerDOB and TransactionDate in the yyyy-MM-dd format'''

#Custom Schema for hundred thousand records 
custom_schema_hund = StructType([
    StructField("TransactionID", StringType(), True),  
    StructField("CustomerID", StringType(), True),
    StructField("CustomerDOB", DateType(), True),
    StructField("CustGender", StringType(), True),
    StructField("CustLocation", StringType(), True),
    StructField("CustAccountBalance", DoubleType(), True),
    StructField("TransactionDate", DateType(), True),
    StructField("TransactionTime", IntegerType(), True), 
    StructField("TransactionAmount (INR)", DoubleType(), True),
])

# spark = SparkSession.builder \
#     .appName('Read CSV from HDFS Project Directory') \
#     .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
#     .getOrCreate()

df_hund = spark.read.format("csv") \
    .option("header", "true") \
    .option("dateFormat", "d/M/yy") \
    .schema(custom_schema_hund) \
    .load("hdfs://localhost:9000/project/testing_hund.csv")

#df_hund.show(25)

df_hund.printSchema()

## Uploading full data in hdfs

In [None]:
# List files in the root of HDFS
!hdfs dfs -ls /

In [None]:
## uploading our full data which contains 1m+ records to hdfs

# Upload the CSV file from local system to the 'project' directory in HDFS
!hdfs dfs -put file_path_to/full_data.csv /project/full_data.csv

In [None]:
# List the contents of the 'project' directory to confirm the file is there
!hdfs dfs -ls /project

### Import the project data (bank_transactions.csv) from HDFS and read it using our custom schema

In [None]:
## using thes same configuartions and schema for all the records

'''In this code chunk I have imported using my custom schema and initially faced issues with the date format and then
used config("spark.sql.legacy.timeParserPolicy", "LEGACY") to help parse the two columns CustomerDOB and TransactionDate in the yyyy-MM-dd format'''

#Custom Schema for hundred thousand records 
custom_schema_full = StructType([
    StructField("TransactionID", StringType(), True),  
    StructField("CustomerID", StringType(), True),
    StructField("CustomerDOB", DateType(), True),
    StructField("CustGender", StringType(), True),
    StructField("CustLocation", StringType(), True),
    StructField("CustAccountBalance", DoubleType(), True),
    StructField("TransactionDate", DateType(), True),
    StructField("TransactionTime", IntegerType(), True), 
    StructField("TransactionAmount (INR)", DoubleType(), True),
])

df_full = spark.read.format("csv") \
    .option("header", "true") \
    .option("dateFormat", "d/M/yy") \
    .schema(custom_schema_full) \
    .load("hdfs://localhost:9000/project/full_data.csv")

df_full.show(25,False)

df_full.printSchema()

In [None]:
print(df_full.count())

## Exploratory Data Analysis

In [None]:
# Having a glance at the datframe before renaming columns

df_full.show()

In [None]:
'''The Following columns which are renamed are : 

CustomerDOB : DOB
CustGender : Gender
CustLocation: Location
CustAccountBalance: AccountBalanace
TransactionDate : Date
TransactionTime : Time
TransactionAmount (INR) : Amount'''

# Rename the columns
data_eda = df_full.withColumnRenamed("CustomerDOB", "DOB") \
       .withColumnRenamed("CustGender", "Gender") \
       .withColumnRenamed("CustLocation", "Location") \
       .withColumnRenamed("CustAccountBalance", "AccountBalance") \
       .withColumnRenamed("TransactionDate", "Date") \
       .withColumnRenamed("TransactionTime", "Time") \
       .withColumnRenamed("TransactionAmount (INR)", "Amount")


# Show the result to verify the changes
data_eda.show()

In [None]:
# Check the shape of the DataFrame
print("Number of rows:", data_eda.count())
print("Number of columns:", len(data_eda.columns))

In [None]:
# def check(data_eda):
#     data = []
#     columns = data_eda.columns
#     for c in columns:
#         dtypes = str(data_eda.schema[c].dataType)
#         nunique = data_eda.select(c).distinct().count()
#         sum_null = data_eda.filter(col(c).isNull()).count()
#         data.append((c, dtypes, nunique, sum_null))

#     schema = StructType([
#         StructField("column", StringType(), True),
#         StructField("dtypes", StringType(), True),
#         StructField("nunique", IntegerType(), True),
#         StructField("sum_null", IntegerType(), True)
#     ])

#     df_check = spark.createDataFrame(data, schema)

#     # Convert the Spark DataFrame to Pandas for plotting
#     df_check_pandas = df_check.toPandas()

#     # Filter columns with missing values
#     df_check_pandas_filtered = df_check_pandas[df_check_pandas["sum_null"] > 0]

#     # Set the plotting style
#     sns.set(style="whitegrid")

#     # Plot missing values
#     plt.figure(figsize=(10, 6))
#     sns.barplot(x="sum_null", y="column", data=df_check_pandas_filtered)
#     plt.xlabel("Number of Missing Values", fontsize=14)
#     plt.ylabel("Columns", fontsize=14)
#     plt.title("Missing Values per Column", fontsize=16)
    
#     plt.show()

#     # Display the missing values table
#     print("Missing Values Table:")
#     df_check.show()

# # Call the function to check missing values
# check(data_eda)


In [None]:
import os

def check(data_eda):
    data = []
    columns = data_eda.columns
    for c in columns:
        dtypes = str(data_eda.schema[c].dataType)
        nunique = data_eda.select(c).distinct().count()
        sum_null = data_eda.filter(col(c).isNull()).count()
        data.append((c, dtypes, nunique, sum_null))

    schema = StructType([
        StructField("column", StringType(), True),
        StructField("dtypes", StringType(), True),
        StructField("nunique", IntegerType(), True),
        StructField("sum_null", IntegerType(), True)
    ])

    df_check = spark.createDataFrame(data, schema)

    # Convert the Spark DataFrame to Pandas for plotting
    df_check_pandas = df_check.toPandas()

    # Filter columns with missing values
    df_check_pandas_filtered = df_check_pandas[df_check_pandas["sum_null"] > 0]

    # Set the plotting style
    sns.set(style="whitegrid")

    # Plot missing values
    plt.figure(figsize=(10, 6))
    sns.barplot(x="sum_null", y="column", data=df_check_pandas_filtered)
    plt.xlabel("Number of Missing Values", fontsize=14)
    plt.ylabel("Columns", fontsize=14)
    plt.title("Missing Values per Column", fontsize=16)
    
    # Get the current working directory
    current_dir = os.getcwd()
    
    # Define the path to save the image file
    plot_path = os.path.join(current_dir, "missing_values_plot.png")

    # Save the plot as an image
    plt.savefig(plot_path)

    # Close the plot to avoid displaying it in the notebook
    plt.close()

    # Return the path to the saved image file
    return plot_path

## Data Preprocessing using Spark

In [None]:
## For visualization we have created a utility function

import sys
# This is the directory that directly contains the 'Utility_Folder' which inside it conatins the 'utility.py'
sys.path.append('path_to_Utility_Folder')

from Utility_Folder.utility import kelbow, clust_plot, plot_data, outlier

In [None]:
# Getting the schema for the df schema.

df_full.printSchema()

In [None]:
df_full.show(5)

In [None]:
'''The Following columns which are renamed are : 

CustomerDOB : DOB
CustGender : Gender
CustLocation: Location
CustAccountBalance: AccountBalanace
TransactionDate : Date
TransactionTime : Time
TransactionAmount (INR) : Amount'''

# Rename the columns
data = df_full.withColumnRenamed("CustomerDOB", "DOB") \
       .withColumnRenamed("CustGender", "Gender") \
       .withColumnRenamed("CustLocation", "Location") \
       .withColumnRenamed("CustAccountBalance", "AccountBalance") \
       .withColumnRenamed("TransactionDate", "Date") \
       .withColumnRenamed("TransactionTime", "Time") \
       .withColumnRenamed("TransactionAmount (INR)", "Amount")


# Show the result to verify the changes
data.show()

In [None]:
from pyspark.sql.functions import col, isnan

# Initialize an empty condition for filtering out nulls and NaNs
condition = col(data.columns[0]).isNull()
for column in data.columns[1:]:  # Start from the second column
    # Check data type of the column and apply appropriate condition
    if str(data.schema[column].dataType) in ('IntegerType', 'DoubleType', 'FloatType'):
        condition |= col(column).isNull() | isnan(col(column))
    else:
        condition |= col(column).isNull()

# Apply the condition to filter the DataFrame
filtered_df = data.filter(~condition)

In [None]:
# The filteredn_df contains a condition to fileter out based on a condition.

filteredn_df = filtered_df.where((filtered_df['Time'] > 0) & (filtered_df['Amount'] > 0))

In [None]:
# Aggregate the top locations

location_counts = filteredn_df.groupBy('Location').agg(count('TransactionID').alias('transaction_count'))
total_transactions = filteredn_df.count()
location_percentages = location_counts.withColumn('percentage', (col('transaction_count') / total_transactions) * 100)

In [None]:
# Count of all distinct locations

ordered_location_percentages = location_percentages.orderBy(desc('percentage'))
distinct_locations = ordered_location_percentages.select('Location').distinct()
distinct_locations.count()

In [None]:
# Retrieves the  the top_forty_locations by using the limit function.

top_forty_location_percentages = ordered_location_percentages.limit(40)
top_forty_location_percentages.show()

sums_top_twenty = top_forty_location_percentages.agg(
    sum_agg(col('transaction_count')).alias('sum_transaction_count'),
    sum_agg(col('percentage')).alias('sum_percentage')
)


In [None]:
cust_locations_list = [row['Location'] for row in top_forty_location_percentages.select('Location').distinct().collect()]
df_f = filtered_df.where(filtered_df['Location'].isin(cust_locations_list))

In [None]:
df_f.show(5)

In [None]:
#Calculate recency,frequency and monetary
latest_date = date(2016, 10, 22)
# Define a UDF to calculate the recency in days
def calculate_recency(transaction_date):
    return (latest_date - transaction_date).days

# Register the UDF
calculate_recency_udf = udf(calculate_recency, IntegerType())

# Group by CustomerID and aggregate
rfm_df = df_f.groupBy("CustomerID").agg(
    calculate_recency_udf(spark_max("Date")).alias("Recency"),
    count("TransactionID").alias("Frequency"),
    spark_sum("Amount").alias("Monetary")
)

In [None]:
rfm_df_pd = rfm_df.toPandas()

In [None]:
# Plotting the Recency, Monetary and Frequency for visualization.

plot_data(rfm_df_pd,'Recency')
plot_data(rfm_df_pd,'Frequency')
plot_data(rfm_df_pd,'Monetary')

In [None]:
# Pefroming normalization of data using log transformation.

rfm_df_pd['Recency_Boxcox'],parameters=stat.boxcox(rfm_df_pd['Recency']+1)
rfm_df_pd['Monetary_log'] = np.log1p(rfm_df_pd['Monetary'])
rfm_df_pd['Frequency_log'] = np.log1p(rfm_df_pd['Frequency'])

In [None]:
plot_data(rfm_df_pd,'Recency_Boxcox')
plot_data(rfm_df_pd,'Frequency_log')
plot_data(rfm_df_pd,'Monetary_log')

In [None]:
rfm_df_pd.describe()#644016

In [None]:
# Outlier Detection

rfm_df_pd = outlier(rfm_df_pd,'Recency')
rfm_df_pd = outlier(rfm_df_pd,'Frequency')
rfm_df_pd = outlier(rfm_df_pd,'Monetary')

In [None]:
rfm_df_pd.describe()

In [None]:
#Code snippet for the function below where we are plotting of the frequency of data points in each bin

def binf(df,feature):
    binned_data = pd.cut(df[feature],bins=4)

    frequency = binned_data.value_counts()
    sorted_frequency = frequency.sort_index(ascending=False)
    sorted_frequency.plot(kind='bar')

    plt.xlabel('Bins')
    plt.ylabel('Frequency')
    plt.title(f'Frequency of Data Points in Each Bin:{feature}')
    plt.show()
binf(rfm_df_pd,'Recency')
binf(rfm_df_pd,'Recency_Boxcox')
binf(rfm_df_pd,'Frequency')
binf(rfm_df_pd,'Frequency_log')
binf(rfm_df_pd,'Monetary')
binf(rfm_df_pd,'Monetary_log')

In [None]:
df = spark.createDataFrame(rfm_df_pd)

assembler = VectorAssembler(inputCols=['Recency_Boxcox', 'Frequency_log', 'Monetary_log'], outputCol='features')
df_assembled = assembler.transform(df)

scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=False)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

In [None]:
vectors = df_scaled.select("scaledFeatures").rdd.map(lambda row: row['scaledFeatures'].toArray())
local_vectors = vectors.collect()
numpy_array = np.array(local_vectors)

In [None]:
# Finidng the ideal number of clusters.

kelbow(numpy_array)  

In [None]:
# Performing the KMeans Clustering with optimum clusters(which was discovered in the above step). 

k=4
kmeans = KMeans(featuresCol='scaledFeatures', k=k, seed=1, initMode='k-means||', maxIter=1000)
model = kmeans.fit(df_scaled)
predictions = model.transform(df_scaled)


evaluator = ClusteringEvaluator(featuresCol='scaledFeatures', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette:.2f}")

In [None]:
print(predictions)

In [None]:
# The daframe which contains the predictions and the clusters is being showcased below.

df_predict = predictions.withColumnRenamed('prediction', 'Cluster')
print(df_predict.show())

In [None]:
# Extracting all the features and converting it toPandas() for forthcoming visualizations.

dfp_p = df_predict.toPandas()
f1= numpy_array[:,0]
f2= numpy_array[:,1]
f3=numpy_array[:,2]
f4 = dfp_p['Recency_Boxcox']
f5 = dfp_p['Frequency_log']
f6 = dfp_p['Monetary_log']
label_c = dfp_p['Cluster']
df = pd.DataFrame({
    'Recency': f1,
    'Frequency': f2,
    'Monetary': f3,
    'Recency_Boxcox': f4,
    'Frequency_log': f5,
    'Monetary_log': f6,
    'Clusters': label_c,
})
print(df.head(10))

In [None]:
df.describe()

In [None]:
# A figure illustrating the customer segmentation on Recency and Monetary of all the clusters.

plt.figure(figsize=(10, 6))
plt.title('Customer Segmentation based on Recency and Monetary')
clusters = df['Clusters'].unique()
for cluster in clusters:
    subset = df[df['Clusters'] == cluster]
    plt.scatter(subset['Recency'], subset['Monetary'], label=cluster, s=50, cmap='Set1')
plt.xlabel('Recency')
plt.ylabel('Monetary')
plt.legend(title='Cluster')
plt.show()

In [None]:
cluster_summary = df_predict.groupBy('Cluster').agg({'Recency': 'mean', 'Frequency': 'mean', 'Monetary': 'mean', 'Cluster': 'count'})
cluster_summary.show()

In [None]:
#Separating the csv's and naming them in this format : fname=0.csv,fname=1.csv,etc.

clusters = dfp_p['Cluster'].unique()
cols=['CustomerID','Recency','Monetary','Cluster']
dfp_p = dfp_p[cols]
for cluster in clusters:
    subset = dfp_p[dfp_p['Cluster'] == cluster]
    filename = f"fname={cluster}.csv"
    subset.to_csv(filename, index=False)

In [None]:
# Grouping the df_predict with all the clusters and calucating the mean of Rececy and Monetary and counting the number of entries. Applied the binf function to group all of them into bins for further analysis.

cluster_summary = df_predict.groupBy('Cluster').agg({'Recency_Boxcox': 'mean','Monetary_log': 'mean', 'Cluster': 'count'})
cluster_summary.show()

binf(rfm_df_pd,'Recency_Boxcox')
binf(rfm_df_pd,'Monetary_log')

In [None]:
print(dfp_p.head(10))