In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('sample').getOrCreate()

data1 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("ccdata.csv")

data1.printSchema()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/25 13:39:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- CUST_ID: string (nullable = true)
 |-- BALANCE: double (nullable = true)
 |-- BALANCE_FREQUENCY: double (nullable = true)
 |-- PURCHASES: double (nullable = true)
 |-- ONEOFF_PURCHASES: double (nullable = true)
 |-- INSTALLMENTS_PURCHASES: double (nullable = true)
 |-- CASH_ADVANCE: double (nullable = true)
 |-- PURCHASES_FREQUENCY: double (nullable = true)
 |-- ONEOFF_PURCHASES_FREQUENCY: double (nullable = true)
 |-- PURCHASES_INSTALLMENTS_FREQUENCY: double (nullable = true)
 |-- CASH_ADVANCE_FREQUENCY: double (nullable = true)
 |-- CASH_ADVANCE_TRX: integer (nullable = true)
 |-- PURCHASES_TRX: integer (nullable = true)
 |-- CREDIT_LIMIT: double (nullable = true)
 |-- PAYMENTS: double (nullable = true)
 |-- MINIMUM_PAYMENTS: double (nullable = true)
 |-- PRC_FULL_PAYMENT: double (nullable = true)
 |-- TENURE: integer (nullable = true)



                                                                                

In [3]:
data2 = data1.na.drop() # drop rows with null values

In [4]:
from pyspark.ml.feature import VectorAssembler

assemble = VectorAssembler(inputCols=[
    'BALANCE',
    'BALANCE_FREQUENCY',
    'PURCHASES',
    'ONEOFF_PURCHASES',
    'INSTALLMENTS_PURCHASES',
    'CASH_ADVANCE',
    'PURCHASES_FREQUENCY',
    'ONEOFF_PURCHASES_FREQUENCY',
    'PURCHASES_INSTALLMENTS_FREQUENCY',
    'CASH_ADVANCE_FREQUENCY',
    'CASH_ADVANCE_TRX',
    'PURCHASES_TRX',
    'CREDIT_LIMIT',
    'PAYMENTS',
    'MINIMUM_PAYMENTS',
    'PRC_FULL_PAYMENT',
    'TENURE'],outputCol='features'
)

data3 = assemble.transform(data2)

data3.show(2)

22/11/25 13:49:33 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------+----------------+------+--------------------+
|CUST_ID|    BALANCE|BALANCE_FREQUENCY|PURCHASES|ONEOFF_PURCHASES|INSTALLMENTS_PURCHASES|CASH_ADVANCE|PURCHASES_FREQUENCY|ONEOFF_PURCHASES_FREQUENCY|PURCHASES_INSTALLMENTS_FREQUENCY|CASH_ADVANCE_FREQUENCY|CASH_ADVANCE_TRX|PURCHASES_TRX|CREDIT_LIMIT|   PAYMENTS|MINIMUM_PAYMENTS|PRC_FULL_PAYMENT|TENURE|            features|
+-------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+-----------------------

In [5]:
from pyspark.ml.feature import StandardScaler

scale = StandardScaler(inputCol='features',outputCol='standardized')

scaled_data1 =  scale.fit(data3)
scaled_data2 = scaled_data1.transform(data3)

scaled_data2.select("features","standardized").show(10)

                                                                                

+--------------------+--------------------+
|            features|        standardized|
+--------------------+--------------------+
|[40.900749,0.8181...|[0.01951770812869...|
|(17,[0,1,5,9,10,1...|(17,[0,1,5,9,10,1...|
|[2495.148862,1.0,...|[1.19067714936384...|
|[817.714335,1.0,1...|[0.39021069573032...|
|[1809.828751,1.0,...|[0.86364455880605...|
|[627.260806,1.0,7...|[0.29932687374696...|
|[1823.652743,1.0,...|[0.87024132408850...|
|[1014.926473,1.0,...|[0.48431970456340...|
|[152.225975,0.545...|[0.07264175405825...|
|[1293.124939,1.0,...|[0.61707513310675...|
+--------------------+--------------------+
only showing top 10 rows



In [7]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol='standardized',k=3)
kmeansfit = kmeans.fit(scaled_data2)

kmeanstf = kmeansfit.transform(scaled_data2).select("CUST_ID","prediction")
rows =  kmeanstf.collect()
print(rows[:3])

                                                                                

[Row(CUST_ID='C10001', prediction=2), Row(CUST_ID='C10002', prediction=2), Row(CUST_ID='C10003', prediction=0)]


In [8]:
dfpred =  spark.createDataFrame(rows)
dfpred.show()

[Stage 110:>                                                        (0 + 1) / 1]

+-------+----------+
|CUST_ID|prediction|
+-------+----------+
| C10001|         2|
| C10002|         2|
| C10003|         0|
| C10005|         2|
| C10006|         0|
| C10007|         1|
| C10008|         0|
| C10009|         2|
| C10010|         2|
| C10011|         0|
| C10012|         2|
| C10013|         0|
| C10014|         0|
| C10015|         2|
| C10016|         2|
| C10017|         2|
| C10018|         0|
| C10019|         0|
| C10020|         0|
| C10021|         0|
+-------+----------+
only showing top 20 rows



                                                                                

In [9]:
dfpred = dfpred.join(scaled_data2,"CUST_ID")
dfpred.show()

[Stage 111:>                                                        (0 + 1) / 1]

22/11/25 14:00:00 WARN TransportClientFactory: DNS resolution succeed for server1.example.com/192.168.1.11:42309 took 11049 ms


                                                                                

+-------+----------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------+----------------+------+--------------------+--------------------+
|CUST_ID|prediction|    BALANCE|BALANCE_FREQUENCY|PURCHASES|ONEOFF_PURCHASES|INSTALLMENTS_PURCHASES|CASH_ADVANCE|PURCHASES_FREQUENCY|ONEOFF_PURCHASES_FREQUENCY|PURCHASES_INSTALLMENTS_FREQUENCY|CASH_ADVANCE_FREQUENCY|CASH_ADVANCE_TRX|PURCHASES_TRX|CREDIT_LIMIT|   PAYMENTS|MINIMUM_PAYMENTS|PRC_FULL_PAYMENT|TENURE|            features|        standardized|
+-------+----------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------