In [47]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

# Reading CSV file from /sparkdata folder
csv_path = "/sparkdata/bank_transactions.csv"  # This will read any CSV file in the /sparkdata folder
df = spark.read.csv(csv_path, header=True, inferSchema=True)  # Assuming the CSV has a header

# Show the DataFrame
df.show()

# If you want to perform any action on DataFrame
# For instance, to get the count of rows:
print("Number of rows:", df.count())

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|           T1|  C5841053|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|                   25.0|
|           T2|  C2142763|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|                27999.0|
|           T3|  C4417068|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|                  459.0|
|           T4|  C5342380|    14/9/73|         F|      MUMBAI|         866503.21|         2/8/16|         142714|                 2060.0|
|           T5|  C9031234|    24/3

In [48]:
from pyspark.sql.functions import col, sum

# Count the number of null values in each column
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|            0|         0|          0|      1100|         151|              2369|              0|              0|                      0|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+



In [49]:
from pyspark.sql.functions import mean, median

# Impute numerical columns with mean or median
mean_balance = df.select(mean("CustAccountBalance")).first()[0]
df = df.fillna({"CustAccountBalance": mean_balance})
print(mean_balance)

115403.54005622343


In [50]:
# Impute categorical columns with mode
from pyspark.sql.functions import col

mode_gender = df.groupBy("CustGender").count().orderBy("count", ascending=False).first()[0]
df = df.fillna({"CustGender": mode_gender})
print(mode_gender)

mode_location = df.groupBy("CustLocation").count().orderBy("count", ascending=False).first()[0]
df = df.fillna({"CustLocation": mode_location})
print(mode_location)

M
MUMBAI


In [53]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df.groupBy('CustGender').agg(countDistinct('CustomerID').alias('Gender count')).show()
df = df.filter(df["CustGender"] != "T")

+----------+------------+
|CustGender|Gender count|
+----------+------------+
|         F|      268662|
|         T|           1|
|         M|      675583|
+----------+------------+



In [54]:
df.groupBy('CustGender').agg(countDistinct('CustomerID').alias('Gender count')).orderBy(desc('Gender count')).show()

+----------+------------+
|CustGender|Gender count|
+----------+------------+
|         M|      675583|
|         F|      268662|
+----------+------------+



In [57]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
df = df.withColumn('date', to_timestamp("TransactionDate", 'M/d/yy'))
df.select(max("date")).show()

+-------------------+
|          max(date)|
+-------------------+
|2016-12-09 00:00:00|
+-------------------+



In [58]:
df.select(min("date")).show()

+-------------------+
|          min(date)|
+-------------------+
|2016-01-08 00:00:00|
+-------------------+



In [59]:
df.show(2,0)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|date               |
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-------------------+
|T1           |C5841053  |10/1/94    |F         |JAMSHEDPUR  |17819.05          |2/8/16         |143207         |25.0                   |2016-02-08 00:00:00|
|T2           |C2142763  |4/4/57     |M         |JHAJJAR     |2270.69           |2/8/16         |141858         |27999.0                |2016-02-08 00:00:00|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-------------------+
only showing top 2 rows



In [69]:
df = df.withColumn("from_date", lit("2016-01-08 00:00:00"))
df = df.withColumn('from_date',to_timestamp("from_date", 'yy/MM/dd HH:mm:ss'))

df2 = df.withColumn('from_date',to_timestamp(col('from_date'))).withColumn('recency',col("date").cast("long") - col('from_date').cast("long"))

In [70]:
df2 = df2.join(df2.groupBy('CustomerID').agg(max('recency').alias('recency')),on='recency',how='leftsemi')

In [73]:
df2.show(5,0)

+----------+-------+-------------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+----+---------+-----------+
|CustomerID|recency|TransactionID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|date|from_date|max_recency|
+----------+-------+-------------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+----+---------+-----------+
+----------+-------+-------------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+----+---------+-----------+



In [72]:
# Group by CustomerID and calculate max recency
max_recency = df2.groupBy('CustomerID').agg(max('recency').alias('max_recency'))

# Join to get the max recency per CustomerID
df2 = df2.join(max_recency, on='CustomerID', how='left')


In [83]:
# Step 1: Fix 'from_date' timestamp conversion
df = df.withColumn("from_date", lit("2016-01-08"))
df = df.withColumn('from_date', to_timestamp("from_date", 'yyyy-MM-dd'))

# Step 2: Ensure 'date' and 'from_date' are timestamps and calculate 'recency'
df2 = df.withColumn('from_date', to_timestamp(col('from_date'))).withColumn('date', to_timestamp(col('date'))) \
        .withColumn('recency', (col("date").cast("long") - col('from_date').cast("long")))

df2 = df2.join(df2.groupBy('CustomerID').agg(max('recency').alias('recency')),on='recency',how='leftsemi')

# Show the result
df2.show(3,0)


+-------+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-------------------+-------------------+
|recency|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|date               |from_date          |
+-------+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+-------------------+-------------------+
|2678400|T1           |C5841053  |10/1/94    |F         |JAMSHEDPUR  |17819.05          |2/8/16         |143207         |25.0                   |2016-02-08 00:00:00|2016-01-08 00:00:00|
|2678400|T2           |C2142763  |4/4/57     |M         |JHAJJAR     |2270.69           |2/8/16         |141858         |27999.0                |2016-02-08 00:00:00|2016-01-08 00:00:00|
|2678400|T3           |C4417068  |26/11/96   |F         |MUMBAI      |

In [84]:
df2.printSchema()

root
 |-- recency: long (nullable = true)
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerDOB: string (nullable = true)
 |-- CustGender: string (nullable = false)
 |-- CustLocation: string (nullable = false)
 |-- CustAccountBalance: double (nullable = false)
 |-- TransactionDate: string (nullable = true)
 |-- TransactionTime: integer (nullable = true)
 |-- TransactionAmount (INR): double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- from_date: timestamp (nullable = true)



In [85]:
df_freq = df2.groupBy('CustomerID').agg(count('TransactionDate').alias('frequency'))

In [89]:
df_freq.show(5,0)

+----------+---------+
|CustomerID|frequency|
+----------+---------+
|C5634040  |1        |
|C5228983  |1        |
|C7910322  |1        |
|C8385586  |1        |
|C8718115  |1        |
+----------+---------+
only showing top 5 rows



In [90]:
df3 = df2.join(df_freq,on='CustomerID',how='inner')

In [91]:
df3.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- recency: long (nullable = true)
 |-- TransactionID: string (nullable = true)
 |-- CustomerDOB: string (nullable = true)
 |-- CustGender: string (nullable = false)
 |-- CustLocation: string (nullable = false)
 |-- CustAccountBalance: double (nullable = false)
 |-- TransactionDate: string (nullable = true)
 |-- TransactionTime: integer (nullable = true)
 |-- TransactionAmount (INR): double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- from_date: timestamp (nullable = true)
 |-- frequency: long (nullable = false)



In [93]:
m_val = df3
m_val = m_val.groupBy('CustomerID').agg(sum('TransactionAmount (INR)').alias('monetary_value'))

In [94]:
m_val.show(3,0)

+----------+--------------+
|CustomerID|monetary_value|
+----------+--------------+
|C1010011  |356.0         |
|C1010014  |1455.0        |
|C1010031  |1864.0        |
+----------+--------------+
only showing top 3 rows



In [95]:
finaldf = m_val.join(df3,on='CustomerID',how='inner')

In [96]:
finaldf = finaldf.select(['recency','frequency','monetary_value','CustomerID']).distinct()

In [101]:
finaldf.show(10,0)

+--------+---------+--------------+----------+
|recency |frequency|monetary_value|CustomerID|
+--------+---------+--------------+----------+
|21081600|1        |356.0         |C1010011  |
|0       |2        |1455.0        |C1010014  |
|15724800|2        |1455.0        |C1010014  |
|5184000 |2        |1864.0        |C1010031  |
|7862400 |2        |1864.0        |C1010031  |
|0       |1        |50.0          |C1010035  |
|21081600|1        |19680.0       |C1010037  |
|15811200|1        |100.0         |C1010038  |
|2764800 |1        |915.0         |C1010039  |
|13132800|3        |16917.0       |C1010041  |
+--------+---------+--------------+----------+
only showing top 10 rows



In [98]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

assemble=VectorAssembler(inputCols=[
    'recency','frequency','monetary_value'
], outputCol='features')

assembled_data=assemble.transform(finaldf)

scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)

In [99]:
data_scale_output.select('standardized').show(2,truncate=False)

+--------------------------------------------------------+
|standardized                                            |
+--------------------------------------------------------+
|[2.33160487164955,2.327981431296426,0.05303108387332255]|
|[0.0,4.655962862592852,0.21674221077439412]             |
+--------------------------------------------------------+
only showing top 2 rows



In [100]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np

cost = np.zeros(10)

evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized',metricName='silhouette', distanceMeasure='squaredEuclidean')

for i in range(2,10):
    KMeans_algo=KMeans(featuresCol='standardized', k=i)
    KMeans_fit=KMeans_algo.fit(data_scale_output)
    output=KMeans_fit.transform(data_scale_output)
    cost[i] = KMeans_fit.summary.trainingCost

Py4JJavaError: An error occurred while calling o1444.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 341.0 failed 1 times, most recent failure: Lost task 0.0 in stage 341.0 (TID 224) (85d74fc87a38 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.numRecords()" because "this.inMemSorter" is null
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:478)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.hashAgg_doAggregateWithKeys_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.hashAgg_doAggregateWithKeys_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:968)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1601)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1528)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1592)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1293)
	at org.apache.spark.rdd.RDD.$anonfun$takeSample$1(RDD.scala:626)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:615)
	at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:400)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithmWithWeight(KMeans.scala:273)
	at org.apache.spark.mllib.clustering.KMeans.runWithWeight(KMeans.scala:231)
	at org.apache.spark.ml.clustering.KMeans.trainWithRow(KMeans.scala:446)
	at org.apache.spark.ml.clustering.KMeans.$anonfun$fit$1(KMeans.scala:382)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:371)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:478)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.hashAgg_doAggregateWithKeys_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.hashAgg_doAggregateWithKeys_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage19.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:968)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1601)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1528)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1592)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [None]:
import pandas as pd
import pylab as pl
df_cost = pd.DataFrame(cost[2:])
df_cost.columns = ["cost"]
new_col = range(2,10)
df_cost.insert(0, 'cluster', new_col)
pl.plot(df_cost.cluster, df_cost.cost)
pl.xlabel('Number of Clusters')
pl.ylabel('Score')
pl.title('Elbow Curve')
pl.show()

In [None]:
KMeans_algo=KMeans(featuresCol='standardized', k=4)
KMeans_fit=KMeans_algo.fit(data_scale_output)

In [None]:
preds=KMeans_fit.transform(data_scale_output)

preds.show(5,0)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

df_viz = preds.select('recency','frequency','monetary_value','prediction')
df_viz = df_viz.toPandas()
avg_df = df_viz.groupby(['prediction'], as_index=False).mean()

list1 = ['recency','frequency','monetary_value']

for i in list1:
    sns.barplot(x='prediction',y=str(i),data=avg_df)
    plt.show()

In [102]:
import os

# Get the current notebook directory
notebook_path = os.getcwd()

print(f"The notebook is located in: {notebook_path}")


The notebook is located in: /home/jovyan
