In [27]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BDA_Project").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
spark

In [28]:
from pyspark.sql.functions import expr, array, round, size, col, array_contains
from pyspark.ml.fpm import FPGrowth, PrefixSpan

In [29]:
df = spark.read.csv('./Mall_Customers.csv', inferSchema=True, header=True)

In [30]:
df.count()

200

In [31]:
df.limit(4).toPandas()

Unnamed: 0,CustomerID,Gender,Age,Annual Income (k$),Spending Score (1-100)
0,1,Male,19,15,39
1,2,Male,21,15,81
2,3,Female,20,16,6
3,4,Female,23,16,77


In [32]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Annual Income (k$): integer (nullable = true)
 |-- Spending Score (1-100): integer (nullable = true)



In [33]:
df = df.withColumnRenamed("Annual Income (k$)", "Income")
df = df.withColumnRenamed("Spending Score (1-100)", "Spending_score")

In [34]:
df.show(5)

+----------+------+---+------+--------------+
|CustomerID|Gender|Age|Income|Spending_score|
+----------+------+---+------+--------------+
|         1|  Male| 19|    15|            39|
|         2|  Male| 21|    15|            81|
|         3|Female| 20|    16|             6|
|         4|Female| 23|    16|            77|
|         5|Female| 31|    17|            40|
+----------+------+---+------+--------------+
only showing top 5 rows



In [35]:
from pyspark.sql.functions import *

groups = df.withColumn(
    "age_group",
    expr(
        "CASE WHEN Age < 30 THEN 'Under 30' WHEN Age BETWEEN 30 AND 55 THEN '30 to 55' WHEN Age > 50 THEN '50 +' ELSE 'Other' END AS age_group"
    ),
)
print(groups.groupBy("age_group").count().show())

groups = groups.withColumn(
    "income_group",
    expr(
        "CASE WHEN income < 40 THEN 'Under 40' WHEN income BETWEEN 40 AND 70 THEN '40 - 70' WHEN income > 70 THEN '70 +' ELSE 'Other' END AS income_group"
    ),
)
print(groups.groupBy("income_group").count().show())

groups = groups.withColumn(
    "spending_group",
    expr(
        "CASE WHEN spending_score < 30 THEN 'Less than 30' WHEN spending_score BETWEEN 30 AND 60 THEN '30 -- 60' WHEN spending_score > 60 THEN '60 +' ELSE 'Other' END AS spending_group"
    ),
)
print(groups.groupBy("spending_group").count().show())

print(groups.groupBy("Gender").count().show())
# items is what spark is expecting
groups = groups.withColumn(
    "items", array("Gender", "age_group", "income_group", "spending_group")
)
groups.limit(4).toPandas()

+---------+-----+
|age_group|count|
+---------+-----+
| 30 to 55|  116|
| Under 30|   55|
|     50 +|   29|
+---------+-----+

None
+------------+-----+
|income_group|count|
+------------+-----+
|     40 - 70|   80|
|    Under 40|   46|
|        70 +|   74|
+------------+-----+

None
+--------------+-----+
|spending_group|count|
+--------------+-----+
|          60 +|   62|
|      30 -- 60|   92|
|  Less than 30|   46|
+--------------+-----+

None
+------+-----+
|Gender|count|
+------+-----+
|Female|  112|
|  Male|   88|
+------+-----+

None


Unnamed: 0,CustomerID,Gender,Age,Income,Spending_score,age_group,income_group,spending_group,items
0,1,Male,19,15,39,Under 30,Under 40,30 -- 60,"[Male, Under 30, Under 40, 30 -- 60]"
1,2,Male,21,15,81,Under 30,Under 40,60 +,"[Male, Under 30, Under 40, 60 +]"
2,3,Female,20,16,6,Under 30,Under 40,Less than 30,"[Female, Under 30, Under 40, Less than 30]"
3,4,Female,23,16,77,Under 30,Under 40,60 +,"[Female, Under 30, Under 40, 60 +]"


In [36]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.2, minConfidence=0.1)
model = fpGrowth.fit(groups)

In [37]:
itempopularity = model.freqItemsets
itempopularity.createOrReplaceTempView("itempopularity")
print("Top 20")
spark.sql("SELECT * FROM itempopularity ORDER BY freq desc").limit(200).toPandas()

Top 20


Unnamed: 0,items,freq
0,[30 to 55],116
1,[Female],112
2,[30 -- 60],92
3,[Male],88
4,[40 - 70],80
5,"[40 - 70, 30 -- 60]",77
6,[70 +],74
7,"[Female, 30 to 55]",72
8,[60 +],62
9,"[30 -- 60, Female]",55


In [38]:
assoc = model.associationRules
assoc.createOrReplaceTempView("assoc")
print("Top 20")
spark.sql("SELECT * FROM assoc ORDER BY confidence desc").limit(200).toPandas()

Top 20


Unnamed: 0,antecedent,consequent,confidence,lift,support
0,[40 - 70],[30 -- 60],0.9625,2.092391,0.385
1,"[40 - 70, Female]",[30 -- 60],0.957447,2.081406,0.225
2,[30 -- 60],[40 - 70],0.836957,2.092391,0.385
3,"[30 -- 60, Female]",[40 - 70],0.818182,2.045455,0.225
4,[70 +],[30 to 55],0.743243,1.281454,0.275
5,[Female],[30 to 55],0.642857,1.108374,0.36
6,[30 to 55],[Female],0.62069,1.108374,0.36
7,[30 -- 60],[Female],0.597826,1.067547,0.275
8,[40 - 70],[Female],0.5875,1.049107,0.235
9,"[40 - 70, 30 -- 60]",[Female],0.584416,1.043599,0.225
