In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Cluster Analysis").config('spark.sql.shuffle.partitions','20').getOrCreate()

In [4]:
import findspark
findspark.init()

In [5]:
customerDF=spark.read.csv('../Dataset/CC GENERAL.csv', header=True, inferSchema=True)
customerDF.count()

8950

In [6]:
customerDF.printSchema()

root
 |-- CUST_ID: string (nullable = true)
 |-- BALANCE: double (nullable = true)
 |-- BALANCE_FREQUENCY: double (nullable = true)
 |-- PURCHASES: double (nullable = true)
 |-- ONEOFF_PURCHASES: double (nullable = true)
 |-- INSTALLMENTS_PURCHASES: double (nullable = true)
 |-- CASH_ADVANCE: double (nullable = true)
 |-- PURCHASES_FREQUENCY: double (nullable = true)
 |-- ONEOFF_PURCHASES_FREQUENCY: double (nullable = true)
 |-- PURCHASES_INSTALLMENTS_FREQUENCY: double (nullable = true)
 |-- CASH_ADVANCE_FREQUENCY: double (nullable = true)
 |-- CASH_ADVANCE_TRX: integer (nullable = true)
 |-- PURCHASES_TRX: integer (nullable = true)
 |-- CREDIT_LIMIT: double (nullable = true)
 |-- PAYMENTS: double (nullable = true)
 |-- MINIMUM_PAYMENTS: double (nullable = true)
 |-- PRC_FULL_PAYMENT: double (nullable = true)
 |-- TENURE: integer (nullable = true)



In [7]:
customerDF.show(5,truncate=False)

+-------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------+----------------+------+
|CUST_ID|BALANCE    |BALANCE_FREQUENCY|PURCHASES|ONEOFF_PURCHASES|INSTALLMENTS_PURCHASES|CASH_ADVANCE|PURCHASES_FREQUENCY|ONEOFF_PURCHASES_FREQUENCY|PURCHASES_INSTALLMENTS_FREQUENCY|CASH_ADVANCE_FREQUENCY|CASH_ADVANCE_TRX|PURCHASES_TRX|CREDIT_LIMIT|PAYMENTS   |MINIMUM_PAYMENTS|PRC_FULL_PAYMENT|TENURE|
+-------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------+----------------+------+
|C10001 |40.900749  |0.818182         |95.4     |0.0             |95.4                  |0.

In [8]:
customerDF.rdd.getNumPartitions()

1

In [9]:
customerDF = customerDF.repartition(20)

In [10]:
customerDF.rdd.getNumPartitions()

20

In [11]:
%%timeit
customerDF.rdd.glom().map(len).collect()

22.8 s ± 210 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Null Values

In [12]:
from pyspark.sql.functions import col,isnan,when,count
customerDF.select([count(when(col(c).contains('None') | col(c).contains('NULL') | (col(c) == '' ) | col(c).isNull() |  isnan(c), c )).alias(c) for c in customerDF.columns]).show()

+-------+-------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+--------+----------------+----------------+------+
|CUST_ID|BALANCE|BALANCE_FREQUENCY|PURCHASES|ONEOFF_PURCHASES|INSTALLMENTS_PURCHASES|CASH_ADVANCE|PURCHASES_FREQUENCY|ONEOFF_PURCHASES_FREQUENCY|PURCHASES_INSTALLMENTS_FREQUENCY|CASH_ADVANCE_FREQUENCY|CASH_ADVANCE_TRX|PURCHASES_TRX|CREDIT_LIMIT|PAYMENTS|MINIMUM_PAYMENTS|PRC_FULL_PAYMENT|TENURE|
+-------+-------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+--------+----------------+----------------+------+
|      0|      0|                0|        0|               0|                     0|           0|              

In [13]:
from pyspark.sql.functions import col,isnan,when,count,mean
customerDF.select([count(when(col(c).contains('None') | col(c).contains('NULL') | (col(c) == '' ) | col(c).isNull() |  isnan(c), c )).alias(c) for c in ['CREDIT_LIMIT','MINIMUM_PAYMENTS']]).show()


a=customerDF.select(mean(customerDF['CREDIT_LIMIT']))

a.show()

+------------+----------------+
|CREDIT_LIMIT|MINIMUM_PAYMENTS|
+------------+----------------+
|           1|             313|
+------------+----------------+

+-----------------+
|avg(CREDIT_LIMIT)|
+-----------------+
|4494.449450364621|
+-----------------+



In [14]:
customerDF.select([(1 - (count(c) / count('*'))).alias(c + '_') for c in ('CREDIT_LIMIT','MINIMUM_PAYMENTS')]).show(truncate=False)

+---------------------+-------------------+
|CREDIT_LIMIT_        |MINIMUM_PAYMENTS_  |
+---------------------+-------------------+
|1.1173184357538002E-4|0.03497206703910616|
+---------------------+-------------------+



In [15]:
from pyspark.sql.functions import mean
means = customerDF.select([mean(c).alias(c) for c in ('CREDIT_LIMIT','MINIMUM_PAYMENTS')]).drop('income').toPandas().to_dict('records')[0]


In [16]:
means

{'CREDIT_LIMIT': 4494.449450364622, 'MINIMUM_PAYMENTS': 864.2065423050828}

In [17]:
customerDF = customerDF.fillna(means)

In [18]:
customerDF.select([count(when(col(c).contains('None') | col(c).contains('NULL') | (col(c) == '' ) | col(c).isNull() |  isnan(c), c )).alias(c) for c in customerDF.columns]).show()

+-------+-------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+--------+----------------+----------------+------+
|CUST_ID|BALANCE|BALANCE_FREQUENCY|PURCHASES|ONEOFF_PURCHASES|INSTALLMENTS_PURCHASES|CASH_ADVANCE|PURCHASES_FREQUENCY|ONEOFF_PURCHASES_FREQUENCY|PURCHASES_INSTALLMENTS_FREQUENCY|CASH_ADVANCE_FREQUENCY|CASH_ADVANCE_TRX|PURCHASES_TRX|CREDIT_LIMIT|PAYMENTS|MINIMUM_PAYMENTS|PRC_FULL_PAYMENT|TENURE|
+-------+-------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+--------+----------------+----------------+------+
|      0|      0|                0|        0|               0|                     0|           0|              

In [19]:
cols = [c for c in customerDF.columns if c !='CUST_ID']
bounds = {}
for col in cols:
    quantiles = customerDF.approxQuantile(col, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [quantiles[0] - 1.5 * IQR , quantiles[1] + 1.5 * IQR]

In [20]:
bounds

{'BALANCE': [-2381.2763274999998, 4272.0555644999995],
 'BALANCE_FREQUENCY': [0.5833325, 1.2500005],
 'PURCHASES': [-1366.18, 2336.0600000000004],
 'ONEOFF_PURCHASES': [-672.0, 1120.0],
 'INSTALLMENTS_PURCHASES': [-593.58, 989.3000000000001],
 'CASH_ADVANCE': [-1351.7784434999999, 2252.9640725],
 'PURCHASES_FREQUENCY': [-1.166668, 2.166668],
 'ONEOFF_PURCHASES_FREQUENCY': [-0.375, 0.625],
 'PURCHASES_INSTALLMENTS_FREQUENCY': [-1.0000005, 1.6666675],
 'CASH_ADVANCE_FREQUENCY': [-0.2500005, 0.4166675],
 'CASH_ADVANCE_TRX': [-4.5, 7.5],
 'PURCHASES_TRX': [-18.5, 33.5],
 'CREDIT_LIMIT': [-5250.0, 12750.0],
 'PAYMENTS': [-1578.9243295, 3616.4625705],
 'MINIMUM_PAYMENTS': [-762.4654229999999, 1718.0445289999998],
 'PRC_FULL_PAYMENT': [-0.1363635, 0.2272725],
 'TENURE': [12.0, 12.0]}

In [21]:
outliers = customerDF.select(['CUST_ID']+[((customerDF[c] < bounds[c][0]) | (customerDF[c] > bounds[c][1]) ).alias(c + '_o') for c in customerDF.columns if c !='CUST_ID'])
outliers.show()

+-------+---------+-------------------+-----------+------------------+------------------------+--------------+---------------------+----------------------------+----------------------------------+------------------------+------------------+---------------+--------------+----------+------------------+------------------+--------+
|CUST_ID|BALANCE_o|BALANCE_FREQUENCY_o|PURCHASES_o|ONEOFF_PURCHASES_o|INSTALLMENTS_PURCHASES_o|CASH_ADVANCE_o|PURCHASES_FREQUENCY_o|ONEOFF_PURCHASES_FREQUENCY_o|PURCHASES_INSTALLMENTS_FREQUENCY_o|CASH_ADVANCE_FREQUENCY_o|CASH_ADVANCE_TRX_o|PURCHASES_TRX_o|CREDIT_LIMIT_o|PAYMENTS_o|MINIMUM_PAYMENTS_o|PRC_FULL_PAYMENT_o|TENURE_o|
+-------+---------+-------------------+-----------+------------------+------------------------+--------------+---------------------+----------------------------+----------------------------------+------------------------+------------------+---------------+--------------+----------+------------------+------------------+--------+
| C12499| 

In [22]:
outliers.printSchema()

root
 |-- CUST_ID: string (nullable = true)
 |-- BALANCE_o: boolean (nullable = true)
 |-- BALANCE_FREQUENCY_o: boolean (nullable = true)
 |-- PURCHASES_o: boolean (nullable = true)
 |-- ONEOFF_PURCHASES_o: boolean (nullable = true)
 |-- INSTALLMENTS_PURCHASES_o: boolean (nullable = true)
 |-- CASH_ADVANCE_o: boolean (nullable = true)
 |-- PURCHASES_FREQUENCY_o: boolean (nullable = true)
 |-- ONEOFF_PURCHASES_FREQUENCY_o: boolean (nullable = true)
 |-- PURCHASES_INSTALLMENTS_FREQUENCY_o: boolean (nullable = true)
 |-- CASH_ADVANCE_FREQUENCY_o: boolean (nullable = true)
 |-- CASH_ADVANCE_TRX_o: boolean (nullable = true)
 |-- PURCHASES_TRX_o: boolean (nullable = true)
 |-- CREDIT_LIMIT_o: boolean (nullable = false)
 |-- PAYMENTS_o: boolean (nullable = true)
 |-- MINIMUM_PAYMENTS_o: boolean (nullable = false)
 |-- PRC_FULL_PAYMENT_o: boolean (nullable = true)
 |-- TENURE_o: boolean (nullable = true)



In [23]:
outliers.select('CUST_ID').where(" or ".join([ c  for c in outliers.columns if c!='CUST_ID'])).count()

6452

In [24]:
from pyspark.ml.feature import VectorAssembler

assembler=VectorAssembler(inputCols=[c for c in customerDF.columns if c!='CUST_ID'], outputCol='features')

In [25]:
assembleDF = assembler.transform(customerDF)

In [26]:
assembleDF.rdd.getNumPartitions()

20

In [27]:
assembleDF.select('features').show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------+
|[37.759404,1.0,398.64,0.0,398.64,0.0,1.0,0.0,1.0,0.0,0.0,12.0,2500.0,429.594704,186.268047,1.0,12.0]                                  |
|[97.064177,0.545455,863.97,114.0,749.97,0.0,0.416667,0.083333,0.416667,0.0,0.0,9.0,7000.0,1384.018459,103.701385,0.363636,12.0]       |
|[1681.16287,0.454545,1138.85,0.0,1138.85,4044.377495,0.416667,0.0,0.333333,0.25,6.0,5.0,4500.0,3763.240294,1818.174017,0.0,12.0]      |
|(17,[0,1,5,9,10,12,13,14,16],[51.071594,0.888889,360.199098,0.555556,14.0,500.0,0.049513,67.786744,9.0])                              |
|(17,[0,1,5,9,10,12,13,14,16],[4094.43389

In [28]:
from pyspark.ml.feature import StandardScaler

In [29]:
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')

In [30]:
scaledDF = scaler.fit(assembleDF).transform(assembleDF)

In [31]:
scaledDF = scaledDF.select('scaledFeatures')

In [32]:
scaledDF.show()

+--------------------+
|      scaledFeatures|
+--------------------+
|[0.01814019971188...|
|[0.04663112679558...|
|[0.80765655649670...|
|(17,[0,1,5,9,10,1...|
|(17,[0,1,5,9,10,1...|
|[0.6461502983808,...|
|(17,[0,1,5,9,10,1...|
|[0.03994571681588...|
|[3.97848727023200...|
|[0.10614474233163...|
|(17,[0,1,5,9,10,1...|
|[0.61245853190239...|
|[0.83093553025551...|
|(17,[0,1,5,9,10,1...|
|[0.34637730083107...|
|(17,[2,4,6,8,11,1...|
|[1.41441840264712...|
|(17,[0,1,5,9,10,1...|
|[0.10410873748259...|
|[4.46957224571980...|
+--------------------+
only showing top 20 rows



### K-means Clustering

distanceMeasure = Param(parent='undefined', name='distanceMeasure', doc="the distance measure. Supported options: 'euclidean' and 'cosine'.")

featuresCol = Param(parent='undefined', name='featuresCol', doc='features column name.')

initMode = Param(parent='undefined', name='initMode', doc='The initialization algorithm. This can be either "random" to choose random points as initial cluster centers, or "k-means||" to use a parallel variant of k-means++')

initSteps = Param(parent='undefined', name='initSteps', doc='The number of steps for k-means|| initialization mode. Must be > 0.')

k = Param(parent='undefined', name='k', doc='The number of clusters to create. Must be > 1.')

maxIter = Param(parent='undefined', name='maxIter', doc='max number of iterations (>= 0).')¶

params

Returns all params ordered by name. The default implementation uses dir() to get all attributes of type Param.

predictionCol = Param(parent='undefined', name='predictionCol', doc='prediction column name.')

seed = Param(parent='undefined', name='seed', doc='random seed.')

tol = Param(parent='undefined', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).')

weightCol = Param(parent='undefined', name='weightCol', doc='weight column name. If this is not set or empty, we treat all instance weights as 1.0.')

In [33]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [34]:
eval_result =[]
evaluator = ClusteringEvaluator(featuresCol='scaledFeatures', predictionCol='prediction', metricName='silhouette', distanceMeasure='squaredEuclidean')

In [35]:
for i in range(3,20):
    kmeans = KMeans(featuresCol='scaledFeatures',
    predictionCol='prediction',
    k=i,
    initMode='k-means||',
    initSteps=5,
    tol=0.0001,
    maxIter=20,
    seed=None,
    distanceMeasure='euclidean')
    
    kmeansModel = kmeans.fit(scaledDF)
    kmeansresult = kmeansModel.transform(scaledDF)
    
    result = evaluator.evaluate(kmeansresult)
    eval_result.append(result)

In [36]:
print(eval_result)

[0.27927360343604385, 0.29316700730220563, 0.24970885636951734, 0.2810229879698895, 0.22299681513779435, 0.27400735942280785, 0.2908158035348592, 0.3018701136051864, 0.33486844980503383, 0.3319772866146268, 0.26500222147758895, 0.2986168430621672, 0.32161068285143657, 0.23667356175942067, 0.28273114452255255, 0.2719307509450091, 0.30848201894984056]


### Visualizing the silhouette scores in a plot

In [37]:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(3,20),eval_result)
ax.set_xlabel('k')
ax.set_ylabel('cost')

ModuleNotFoundError: No module named 'matplotlib'

In [None]:
centers = kmeansModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
kmeans = KMeans(featuresCol='scaledFeatures',predictionCol='prediction',k=10,initMode='k-means||',initSteps=5,tol=0.0001,
    maxIter=20,
    seed=None,
    distanceMeasure='euclidean')
    
kmeansModel = kmeans.fit(scaledDF)
kmeanCluters = kmeansModel.transform(scaledDF)

In [None]:
kmeanCluters.groupBy('prediction').count().show()

In [None]:
kmeanCluters.rdd.getNumPartitions()

In [None]:
evaResult = evaluator.evaluate(kmeanCluters)

In [None]:
print(evaResult)

## Bisecting k-means

In [None]:
from pyspark.ml.clustering import BisectingKMeans

In [None]:
bkmeans = BisectingKMeans(k=10,seed=1, featuresCol='scaledFeatures', predictionCol='prediction',maxIter=50)

In [None]:
bkmeansModel = bkmeans.fit(scaledDF)

In [None]:
bkmansClusters = bkmeansModel.transform(scaledDF)

In [None]:
bkmansClusters.show()

In [None]:
bkmansClusters.groupBy('prediction').count().show()

In [None]:
bikmeansEvalutionResult = evaluator.evaluate(bkmansClusters)

In [None]:
print(bikmeansEvalutionResult)

In [None]:
bi_eval_result=[]
for i in range(3,20):
    bkmeans = BisectingKMeans(k=i,seed=None, featuresCol='scaledFeatures', predictionCol='prediction',maxIter=50)
    
    bkmeansModel = bkmeans.fit(scaledDF)
    bkmansClusters = bkmeansModel.transform(scaledDF)
    
    result = evaluator.evaluate(bkmansClusters)
    bi_eval_result.append(result)

In [None]:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(3,20),bi_eval_result)
ax.set_xlabel('k')
ax.set_ylabel('silhouette')

## Reading Multiple Files

In [None]:
from  pyspark.sql.functions import input_file_name

txtFiles = spark.read.text('txtFiles/*', wholetext=True).withColumn("filename", input_file_name())

In [None]:
txtFiles.show(5,truncate=False)

In [None]:
txtFiles.count()