In [1]:
from pyspark import StorageLevel
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, size, udf
from pyspark.sql.types import ArrayType, StringType

from tag_recommender.utils.text import to_snake_case_boosted

In [2]:
spark = (
    SparkSession.builder.appName("FrequentPatternsSpark")
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memoryOverhead", "2g")
    .config("spark.sql.shuffle.partitions", "500")
    .config("spark.driver.maxResultSize", "4g")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/18 00:03:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load data from CSV (adjust the file path as necessary)
file_path = "../data/processed/train.parquet"
# The train dataset already contains the root_tags as an array form.
# The tags column is also in an array form.
df = spark.read.parquet(file_path, header=True, inferSchema=True)

In [4]:
# Show the schema of the DataFrame
df.printSchema()

root
 |-- type: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- is_reblog: double (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- root_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- root_tags_count: long (nullable = true)
 |-- tags_count: long (nullable = true)
 |-- lang_type: string (nullable = true)
 |-- type_bucket: string (nullable = true)
 |-- root_tags_popularity: long (nullable = true)
 |-- tags_popularity: long (nullable = true)
 |-- total_popularity: long (nullable = true)
 |-- root_tags_count_bucket: string (nullable = true)
 |-- tags_count_bucket: string (nullable = true)



In [5]:
# Show the first 5 rows of the DataFrame
df.show(truncate=True, n=10)

+-------+-----+---------+--------------------+--------------------+---------------+----------+---------+-----------+--------------------+---------------+----------------+----------------------+-----------------+
|   type| lang|is_reblog|                tags|           root_tags|root_tags_count|tags_count|lang_type|type_bucket|root_tags_popularity|tags_popularity|total_popularity|root_tags_count_bucket|tags_count_bucket|
+-------+-----+---------+--------------------+--------------------+---------------+----------+---------+-----------+--------------------+---------------+----------------+----------------------+-----------------+
|  photo|it_IT|      1.0|[Tbb, gregor, Art...|[I'm artblocked s...|              8|         5|    other|      photo|                3861|           3125|            6986|           (3.0, 10.0]|       (2.0, 5.0]|
|  photo|en_US|      1.0|[doctor who, bill...|[dwedit, doctor w...|             10|         5|       en|      photo|               13260|           6202

In [6]:
# Register UDF for splitting tags
@udf(ArrayType(StringType()))
def preprocess_tags(tags):
    return [to_snake_case_boosted(tag) for tag in tags]


# Remove empty strings from the arrays
@udf(ArrayType(StringType()))
def remove_empty(arr):
    return [i for i in arr if i]


# UDF to remove duplicates from each tag array
@udf(ArrayType(StringType()))
def remove_duplicates(arr):
    return sorted(set(arr))

In [7]:
# Apply the split and snake_case conversion function to both 'root_tags' and 'tags' columns
df = df.withColumn("root_tags_array", preprocess_tags(col("root_tags")))
df = df.withColumn("tags_array", preprocess_tags(col("tags")))

In [8]:
# Remove empty lists that are like ['']
df = df.withColumn("root_tags_array", remove_empty(col("root_tags_array")))
df = df.withColumn("tags_array", remove_empty(col("tags_array")))

In [9]:
df.show(truncate=True, n=5)

+-----+-----+---------+--------------------+--------------------+---------------+----------+---------+-----------+--------------------+---------------+----------------+----------------------+-----------------+--------------------+--------------------+
| type| lang|is_reblog|                tags|           root_tags|root_tags_count|tags_count|lang_type|type_bucket|root_tags_popularity|tags_popularity|total_popularity|root_tags_count_bucket|tags_count_bucket|     root_tags_array|          tags_array|
+-----+-----+---------+--------------------+--------------------+---------------+----------+---------+-----------+--------------------+---------------+----------------+----------------------+-----------------+--------------------+--------------------+
|photo|it_IT|      1.0|[Tbb, gregor, Art...|[I'm artblocked s...|              8|         5|    other|      photo|                3861|           3125|            6986|           (3.0, 10.0]|       (2.0, 5.0]|[im_artblocked_so...|[tbb, gregor, 

In [10]:
# Step 5: Join all normalized lists into one unified column

# This method performs a SQL-style set union of the rows from both DataFrame objects,
# with no automatic deduplication of elements.
df_unified = df.selectExpr("root_tags_array as tag_arrays").union(
    df.selectExpr("tags_array as tag_arrays")
)

In [11]:
# Show the unified dataset
df_unified.show(truncate=True, n=10)

+--------------------+
|          tag_arrays|
+--------------------+
|[im_artblocked_so...|
|[dwedit, doctor_w...|
|[aftersun, charlo...|
|[sofia_bulgaria_s...|
|[kenny_omega, aew...|
|[s_classes_that_i...|
|                  []|
|                  []|
|[mother_earth_oas...|
|[ask_meme, ask_me...|
+--------------------+
only showing top 10 rows



In [12]:
df_unified.printSchema()

root
 |-- tag_arrays: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [13]:
# Remove empty arrays from the tag_arrays column
df_unified = df_unified.filter(size(col("tag_arrays")) > 0)

In [14]:
df_unified.show(truncate=True, n=10)

+--------------------+
|          tag_arrays|
+--------------------+
|[im_artblocked_so...|
|[dwedit, doctor_w...|
|[aftersun, charlo...|
|[sofia_bulgaria_s...|
|[kenny_omega, aew...|
|[s_classes_that_i...|
|[mother_earth_oas...|
|[ask_meme, ask_me...|
|[phineas_and_ferb...|
|[we_best_love_fig...|
+--------------------+
only showing top 10 rows



In [15]:
# remove duplicate tags that were created during normalization
df_unified = df_unified.withColumn("tag_arrays", remove_duplicates(col("tag_arrays")))

In [16]:
# Show the unified dataset
df_unified.show(truncate=False, n=10)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tag_arrays                                                                                                                                                                                                                              |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[clone_commando_gregor, im_artblocked_so_even_simple_sketches_are_a_struggle_atm_but_at_least_i_finished_this, lornart, spoilers, tbb_spoilers, the_bad_batch, the_bad_batch_fanart, the_bad_batch_spoilers]                            |
|[billie_piper, david_tennant, doctor_who, dwedit, mine_dw, 

In [17]:
# Calculate the length of df_unified
n_baskets = df_unified.count()

# Print the result
print(f"Number of rows in df_unified: {n_baskets}")



Number of rows in df_unified: 2332809


                                                                                

In [18]:
# Repartition the dataset into more partitions
df_unified = df_unified.repartition(500)

In [19]:
# Persist with MEMORY_AND_DISK
df_unified.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[tag_arrays: array<string>]

In [20]:
support = 250
# Use the unified tags for frequent pattern mining
min_support = support / n_baskets  # Adjust based on dataset size
min_confidence = 0.5  # Adjust based on desired association rules

print(f"Min support threshold: {support}. As a percentage: {min_support}")

Min support threshold: 250. As a percentage: 0.00010716693908502582


In [21]:
fp_growth = FPGrowth(
    itemsCol="tag_arrays", minSupport=min_support, minConfidence=min_confidence
)
model = fp_growth.fit(df_unified)

24/10/18 00:05:31 WARN FPGrowth: Input data is not cached.                      
                                                                                

In [22]:
# Extract frequent itemsets
frequent_itemsets = model.freqItemsets
frequent_itemsets.show(truncate=False, n=10)



+----------------------------------------+----+
|items                                   |freq|
+----------------------------------------+----+
|[colored_pencil]                        |254 |
|[fatigue]                               |282 |
|[biology]                               |579 |
|[webtoon]                               |740 |
|[webtoon, comic]                        |454 |
|[webtoon, cottagecore]                  |274 |
|[webtoon, cottagecore, comic]           |272 |
|[webtoon, cottagecore, long_post]       |272 |
|[webtoon, cottagecore, long_post, comic]|272 |
|[webtoon, cottagecore, comics]          |274 |
+----------------------------------------+----+
only showing top 10 rows



                                                                                

In [23]:
# Generate association rules
association_rules = model.associationRules
association_rules.show(truncate=False)



+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------+------------------+------------------+---------------------+
|antecedent                                                                                                                                                         |consequent                                                            |confidence        |lift              |support              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------+------------------+------------------+---------------------+
|[hboedit, hbo_max, tuserpolly, hbo_the_last_of_us, tlouedit, joel_miller, pedro_pascal]                     

24/10/18 00:07:34 WARN Executor: Managed memory leak detected; size = 17493958 bytes, task 0.0 in stage 31.0 (TID 4068)
                                                                                

In [24]:
# Sort rules by confidence
sorted_rules = association_rules.orderBy(col("confidence").desc())
sorted_rules.show(truncate=False, n=10)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+----------+------------------+---------------------+
|antecedent                                                                                                                                                                                         |consequent       |confidence|lift              |support              |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+----------+------------------+---------------------+
|[miim, guldum, komikresim, guldumnet, keps, monte, 9gag, komik, reels, discover, kesfet, caps, instagram]                                                                                          

                                                                                

In [25]:
# Filter rules by lift
filtered_rules = sorted_rules.filter(col("lift") >= 1.0)
filtered_rules.show(truncate=False, n=10)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+----------+------------------+---------------------+
|antecedent                                                                                                                                                                                         |consequent       |confidence|lift              |support              |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+----------+------------------+---------------------+
|[inspiring_quotes, prose, relationship_quotes, life_quotes, motivation, spilled_ink, poem, lit, words, quotes, poetry, aesthetic]                                                                  

                                                                                

In [26]:
# Count the number of association rules with lift >= 1.0
n_rules = filtered_rules.count()
n_rules

                                                                                

42458419

In [27]:
# Calculate the actual support (frequency) of the association rules
filtered_rules = filtered_rules.withColumn(
    "support_count",
    col("support") * n_baskets,
)

In [28]:
# show the schema
filtered_rules.printSchema()

root
 |-- antecedent: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- consequent: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- confidence: double (nullable = false)
 |-- lift: double (nullable = true)
 |-- support: double (nullable = false)
 |-- support_count: double (nullable = false)



In [29]:
# calculate the antecedent and consequent sizes
filtered_rules = filtered_rules.withColumn(
    "antecedent_size", size(col("antecedent"))
).withColumn("consequent_size", size(col("consequent")))

In [30]:
# Group by the antecedent size and count the number of rules
filtered_rules.groupBy("antecedent_size").count().show()



+---------------+-------+
|antecedent_size|  count|
+---------------+-------+
|             12|3730727|
|              1|   5925|
|              6|2543144|
|              3| 143147|
|              2|  38255|
|             18|   1178|
|              4| 453822|
|              8|6345510|
|             13|1953756|
|             11|5754180|
|             17|  10944|
|              7|4434362|
|             14| 813975|
|              9|7471980|
|             10|7239068|
|             16|  63954|
|              5|1190864|
|             15| 263568|
|             19|     60|
+---------------+-------+



                                                                                

In [31]:
# Keep only rules with at most 4 antecedents
filtered_rules = filtered_rules.filter(col("antecedent_size") <= 4)

In [32]:
# Count the number of rules after filtering
filtered_rules.count()

                                                                                

641149

In [33]:
# Sort the antecedents and consequents arrays alphabetically for better readability
filtered_rules = filtered_rules.withColumn(
    "antecedent", udf(sorted, ArrayType(StringType()))(col("antecedent"))
).withColumn("consequent", udf(sorted, ArrayType(StringType()))(col("consequent")))

In [34]:
# Show the updated DataFrame
filtered_rules.show(truncate=False, n=10)



+---------------------------------+-----------+----------+------------------+---------------------+-------------+---------------+---------------+
|antecedent                       |consequent |confidence|lift              |support              |support_count|antecedent_size|consequent_size|
+---------------------------------+-----------+----------+------------------+---------------------+-------------+---------------+---------------+
|[keps, kesfet, komikresim, reels]|[guldumnet]|1.0       |9077.077821011673 |1.1016761337940654E-4|257.0        |4              |1              |
|[keps, kesfet, komikresim, reels]|[komik]    |1.0       |8803.052830188679 |1.1016761337940654E-4|257.0        |4              |1              |
|[keps, kesfet, komikresim, reels]|[monte]    |1.0       |9041.89534883721  |1.1016761337940654E-4|257.0        |4              |1              |
|[keps, kesfet, komikresim, reels]|[komikli]  |1.0       |8937.965517241379 |1.1016761337940654E-4|257.0        |4          

                                                                                

In [35]:
# Save the association rules to a CSV file
output_path = "../artifacts/models/association_rules_spark.csv"
filtered_rules.toPandas().to_csv(output_path, index=False)

                                                                                

In [36]:
# Stop the Spark session after transformation
spark.stop()