In [None]:
import matplotlib.pyplot as plt
plt.rcParams['font.family'] = 'Microsoft YaHei'  

from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from glob import glob
import pandas as pd
import dask
import dask.dataframe as dd
from dask import delayed
import re
import pandas as pd
import jieba
from pyspark.sql import functions as F
from pyspark.sql import Window

import math
from pyspark.ml.fpm import FPGrowth


Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [12]:
print(sc.uiWebUrl)

http://midway3-0141.rcc.local:4041


# Spark-based FP-growth preprocessing

### Data and Computing Environment

I used Spark on Midway3 to read a cleaned, segmented news corpus (People’s Daily) stored in Parquet. Parquet was chosen because its columnar storage and built-in statistics significantly reduce I/O overhead, facilitating large-scale distributed processing.

- Field Selection and Type Standardization: Only select the "Year" and "Text Content_Cleansing" columns, and convert "Year" to int to unify the key type for subsequent grouping/slicing.

- Tokenization into Item Arrays: Use `F.split(..., r"\s+")` to split the token column `items_raw` based on whitespace. The upstream cleansing process has already handled punctuation, special characters, and stop words

- Document Deduplication: Use `F.array_distinct` to generate `items`, which converts "frequency representation" to "set representation."

- Filtering Empty Transactions: Filter for `size(items) > 0` to avoid empty sets interfering with the model and statistics.

- Sampling and Sorting Verification: Use `.show()` to verify the samples and sort by year, ensuring the data slicing logic is correct.

- Set year = 1947, filter items for that year and use `.cache()`. Then use `.count()` to trigger a calculation to obtain the number of transactions for that year, which will serve as the training base and scale reference for subsequent FP-growth.

In [2]:
base = "cleaned_segmented_parquet"  
df = spark.read.parquet(base)

df = df.select(
    F.col("年份").cast("int").alias("year"),
    F.col("文本内容_清洗").alias("content_cleaned")
)

                                                                                

In [3]:
df_items = (df
    .withColumn("items_raw", F.split(F.col("content_cleaned"), r"\s+"))
    .withColumn("items", F.array_distinct(F.col("items_raw")))  
    .drop("items_raw")
    .filter(F.size("items") > 0))
df_items.select("year","items").show(5, truncate=False)

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [None]:
df_items.orderBy("year").show(5, truncate=False)

                                                                                

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
year = 1947
df_y = df_items.filter(F.col("year") == year).select("items").cache()
print("Transactions in", year, "=", df_y.count())



Transactions in 1947 = 10773


                                                                                

### Initial Data Profiling and Support Threshold Setup for 1947 Data

- **Transaction Profiling and Length Distribution**: In this step, I first examined the basic structure of the 1947 dataset used for FP-growth. Each article was treated as a single transaction consisting of unique tokens. In total, 10,773 transactions (articles) were identified. I then calculated the number of unique items per transaction to understand the overall distribution of transaction lengths. The results show that transaction sizes vary substantially: the minimum is 1 item, the median (p50) is 146 items, the 90th percentile is 312, and the maximum is 1,093 items, with an average of approximately 172.87 items per transaction. To better understand these extremes, I also inspected the five longest transactions, all exceeding 900 unique items. This step confirms that while most articles fall within a reasonable range, a small number of very long documents exist and may significantly affect the efficiency of FP-growth if left unfiltered, which provides the basis for deciding whether to delete excessively long or abnormally short transactions in the next stage.

- **Item Document Frequency and Vocabulary Profiling**: In the second step, I calculated the document frequency of each unique item to understand which terms appear most frequently across articles. Since each transaction already contains distinct tokens, each item was counted at most once per document. A total of 159,483 unique items were detected in the 1947 corpus, indicating a very large and diverse vocabulary. I then ranked items by their document frequency and examined the top 20. The most frequent tokens include highly common words such as “中” (6,387 articles), “上” (6,000), “人” (5,426), “后” (5,324), and “不” (5,137), many of which are single-character structural or functional words rather than meaningful content terms, which suggests the presence of quasi-stopwords that may dominate association patterns if not removed.

- **Suggested Support Thresholds and High-Frequency Item Detection**: To prepare for FP-growth, I converted several commonly used relative support values into absolute document counts based on the 1947 dataset. With 10,773 articles in total, a support of 0.5% corresponds to 54 documents, 1% to 108 documents, and 5% to 539 documents. These values provide a practical reference for setting minSupport. I also identified items that appear in more than 30% of all documents (≥ 3,232 articles). A total of 15 such items were found, mostly single-character high-frequency words like “中”, “上”, “人”, and “不”. These terms may dominate the association results and will be considered for removal in the data-cleaning stage before running FP-growth.

In [None]:
# ---------- STEP 1: basic counts & transaction-length distribution ----------
n_tx = df_y.count()
print(f"[1947] Number of transactions (articles): {n_tx:,}")

df_len = df_y.withColumn("items_len", F.size(F.col("items")))

# Basic stats
len_stats = (df_len.agg(
    F.min("items_len").alias("min_len"),
    F.expr("percentile(items_len, 0.5)").alias("p50_len"),
    F.expr("percentile(items_len, 0.9)").alias("p90_len"),
    F.expr("percentile(items_len, 0.95)").alias("p95_len"),
    F.expr("percentile(items_len, 0.99)").alias("p99_len"),
    F.max("items_len").alias("max_len"),
    F.avg("items_len").alias("avg_len")
).collect()[0])

print("[1947] Transaction length stats (unique items per doc):")
print(f"  min={len_stats['min_len']}, p50={len_stats['p50_len']}, "
      f"p90={len_stats['p90_len']}, p95={len_stats['p95_len']}, "
      f"p99={len_stats['p99_len']}, max={len_stats['max_len']}, "
      f"avg={len_stats['avg_len']:.2f}")

# Inspect a few very long transactions (top-5)
print("\n[1947] Example of longest transactions (top-5 by items_len):")
(df_len.orderBy(F.desc("items_len"))
      .select("items_len", "items")
      .show(5, truncate=80))

# ---------- STEP 2: item document frequency (top terms) ----------
# Each item counted once per document because df_y.items are already distinct.
df_item_freq = (df_y
    .select(F.explode(F.col("items")).alias("item"))
    .groupBy("item").agg(F.count(F.lit(1)).alias("doc_freq"))
    .orderBy(F.desc("doc_freq"))
)

print("\n[1947] Top 20 items by document frequency:")
df_item_freq.show(20, truncate=False)

vocab_size = df_item_freq.count()
print(f"[1947] Vocabulary size (unique items): {vocab_size:,}")

# ---------- STEP 3: suggest minSupport thresholds for FP-growth ----------
# Common ratios to try 
ratios = [0.001, 0.005, 0.01, 0.02, 0.05]
abs_supports = [(r, max(1, math.ceil(r * n_tx))) for r in ratios]

print("\n[1947] Suggested minSupport thresholds (ratio -> absolute doc count):")
for r, a in abs_supports:
    print(f"  {r:.3%} -> {a} docs")

# ---------- flag extremely frequent items ----------
# e.g., items appearing in >30% docs might be near-stopwords for association rules
high_freq_cut = 0.30
high_freq_abs = max(1, math.ceil(high_freq_cut * n_tx))
high_freq_items = (df_item_freq
                   .filter(F.col("doc_freq") >= high_freq_abs)
                   .orderBy(F.desc("doc_freq")))
count_high = high_freq_items.count()
print(f"\n[1947] Items appearing in >= {high_freq_cut:.0%} of docs (>= {high_freq_abs} docs): {count_high}")
if count_high > 0:
    high_freq_items.show(min(20, count_high), truncate=False)


[1947] Number of transactions (articles): 10,773
[1947] Transaction length stats (unique items per doc):
  min=1, p50=146.0, p90=312.0, p95=390.0, p99=583.2800000000007, max=1093, avg=172.87

[1947] Example of longest transactions (top-5 by items_len):
+---------+--------------------------------------------------------------------------------------------------------------------------+
|items_len|                                                                                                                     items|
+---------+--------------------------------------------------------------------------------------------------------------------------+
|     1093|    [不, 真实, 新闻, 客里空, 揭露, 晋绥, 日报, 编辑部, 女, 游击队, 长, 李桂芳, 谈起, 编辑, 工作, 中, 极, 缺点, 稿件, 上...|
|     1022|[法国社会党, 政策, 齐生, 拉马, 社会党, 内阁, 掌权, 十个月, 终于, 法国, 工人阶级, 罢工, 怒潮, 下台, 五月, 四日, 第奉, 美...|
|     1020|[九月, 十八日, 联合国大会, 上, 维辛斯基, 演说, 全文, 续完, 战争贩子, 美国, 培育, 新, 战争, 宣传, 地方, 倡导者, 不, 势力...|
|      937| [人民, 副刊, 蒋军, 必败, 活报剧, 边府, 俱乐部, 集体创作, 周方, 执笔, 第二

- Based on the initial profiling, some transactions were extremely short or unusually long, which could negatively affect FP-growth by introducing noise or causing excessive itemset combinations. To stabilize the transaction size, I first filtered out documents with fewer than 20 items or more than the 99th percentile (583 items). This reduced the dataset from 10,773 to 10,621 documents.

- Next, to reduce the dominance of meaningless co-occurrences, I targeted very common single-character tokens that appeared in more than 30 percent of all documents. Eleven such items were identified and removed from all transactions. After this step, no transactions became empty, and the dataset size remained 10,621.

- I then recalculated the transaction length distribution to ensure the dataset remained consistent. The median basket size was 141 items, the average was 163, and the maximum dropped to 573. This confirms that basket sizes are now more balanced and suitable for frequent pattern mining.

- Finally, using the cleaned dataset size, I recalculated suggested minSupport values for FP-growth. For 10,621 documents, support ratios of 0.5, 1, 2, and 5 percent correspond to 54, 107, 213, and 532 documents respectively. These values will be used as reference points in the next step when running FP-growth.

In [None]:
# -------------------------------
# STEP 1 & 2: keep 20 <= items_len <= 583
# -------------------------------
df_y_len = df_y.withColumn("items_len", F.size("items"))

before_cnt = df_y_len.count()
print(f"[1947] Before length filter: {before_cnt:,} docs")

df_y_trim = (df_y_len
             .filter((F.col("items_len") >= 20) & (F.col("items_len") <= 583))
             .drop("items_len"))

after_trim_cnt = df_y_trim.count()
print(f"[1947] After length filter (20..583): {after_trim_cnt:,} docs "
      f"({after_trim_cnt/before_cnt:.1%} kept)")

# -------------------------------
# STEP 3: remove high-frequency 1-char items (appear in >=30% of docs)
# -------------------------------
# Compute 1-char item document frequency on the trimmed set
n_tx2 = after_trim_cnt
thr_ratio = 0.30
thr_abs = max(1, math.ceil(thr_ratio * n_tx2))

df_1char_freq = (df_y_trim
    .select(F.explode("items").alias("item"))
    .filter(F.length("item") == 1)
    .groupBy("item").agg(F.count(F.lit(1)).alias("doc_freq"))
    .orderBy(F.desc("doc_freq"))
)

hf_1char = [row["item"] for row in df_1char_freq.filter(F.col("doc_freq") >= thr_abs).collect()]
print(f"[1947] High-frequency 1-char items (>= {thr_ratio:.0%} of docs, >= {thr_abs} docs): {len(hf_1char)}")
print(hf_1char[:50])  # preview first 50

# Remove them from each transaction
if len(hf_1char) > 0:
    stop_arr = F.array(*[F.lit(x) for x in hf_1char])
    df_y_clean = df_y_trim.withColumn("items", F.array_except(F.col("items"), stop_arr))
else:
    df_y_clean = df_y_trim

# Optional: drop any empty transactions after removal (rare but safe)
df_y_clean = df_y_clean.filter(F.size("items") > 0).cache()
clean_cnt = df_y_clean.count()

print(f"[1947] After removing HF 1-char items: {clean_cnt:,} docs "
      f"({clean_cnt/after_trim_cnt:.1%} of trimmed)")

# -------------------------------
# Quick post-clean sanity stats
# -------------------------------
len_stats = (df_y_clean.withColumn("items_len", F.size("items"))
    .agg(
        F.min("items_len").alias("min_len"),
        F.expr("percentile(items_len, 0.5)").alias("p50_len"),
        F.expr("percentile(items_len, 0.9)").alias("p90_len"),
        F.expr("percentile(items_len, 0.95)").alias("p95_len"),
        F.expr("percentile(items_len, 0.99)").alias("p99_len"),
        F.max("items_len").alias("max_len"),
        F.avg("items_len").alias("avg_len")
    ).collect()[0])

print("[1947] Post-clean transaction length stats:")
print(f"  min={len_stats['min_len']}, p50={len_stats['p50_len']}, "
      f"p90={len_stats['p90_len']}, p95={len_stats['p95_len']}, "
      f"p99={len_stats['p99_len']}, max={len_stats['max_len']}, "
      f"avg={len_stats['avg_len']:.2f}")

# Show a small sample after cleaning
print("\n[1947] Sample transactions after cleaning:")
df_y_clean.select("items").show(5, truncate=80)

# Re-suggest minSupport thresholds on the CLEAN set
ratios = [0.005, 0.01, 0.02, 0.05]
print("\n[1947] Suggested minSupport on CLEAN set (ratio -> absolute docs):")
for r in ratios:
    a = max(1, math.ceil(r * clean_cnt))
    print(f"  {r:.1%} -> {a} docs")

[1947] Before length filter: 10,773 docs
[1947] After length filter (20..583): 10,621 docs (98.6% kept)
[1947] High-frequency 1-char items (>= 30% of docs, >= 3187 docs): 11
['中', '上', '人', '后', '不', '都', '已', '说', '时', '下', '大']




[1947] After removing HF 1-char items: 10,621 docs (100.0% of trimmed)
[1947] Post-clean transaction length stats:
  min=19, p50=141.0, p90=295.0, p95=362.0, p99=500.0, max=573, avg=163.29

[1947] Sample transactions after cleaning:
+------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                   items|
+------------------------------------------------------------------------------------------------------------------------+
|[我军, 炮兵, 大显神威, 配合, 步兵, 解决, 刘竞, 此次, 苏北, 歼灭战, 新四军, 前线, 记者, 刘亮, 报导, 十七日, 晚, 反击, ...|
|[中央局, 边府, 军区, 新年, 慰贺, 民主, 建国, 军, 西北, 联军, 全体, 将士, 边区, 政府, 司令部, 政治部, 元旦, 联衔, 致电...|
|[山东, 军区, 一年, 自卫, 作战, 共歼, 蒋军, 十二, 万余, 今日, 解放军, 建军, 九, 周年, 纪念日, 司令部, 顷, 公布, 一九四...|
|  [投降, 消灭, 蒋军, 九十六, 师师, 部, 被歼, 萧里, 前线, 记者, 报导, 圩, 宿迁, 北, 歼灭, 六九, 十七, 日夜, 新四军, 包...|
|  [新年, 试谈, 副刊, 群众, 林曦, 元旦, 照例, 一篇, 献辞, 空话, 无益, 谈谈, 关系, 一个, 中国, 

                                                                                

### Custom stops, FP-Growth run, and rule post-processing

Following the cleaning in Step 3, I added a small custom stop list [‘网’, ‘数’, ‘马’, ‘克’, ‘公众’, ‘号’, ‘微信’] to remove residual non-content tokens and re-cached the cleaned transactions. I then ran FP-Growth on this set with minSupport = 0.02 and minConfidence = 0.30 to obtain frequent itemsets and association rules.

To make lifts interpretable, I restricted to rules with a single-item consequent and computed lift as confidence divided by the consequent’s singleton support. I also required |antecedent| ≥ 2 to avoid trivial rules, then joined back the antecedent’s absolute support and estimated A∪C counts for scale.

Although FP-Growth successfully generated high-confidence and high-lift association rules, the results also revealed a clear limitation: many of the antecedents and consequents are dominated by single-character tokens (e.g., “身”, “翻”, “群”, “人”). These characters tend to be grammatically functional or semantically ambiguous in Chinese, rather than meaningful content words. 

In [None]:
# ---------- A) add custom stopwords and rebuild the clean df ----------
custom_stop = ['网','数','马','克','公众','号','微信']  
stop_arr = F.array(*[F.lit(x) for x in custom_stop])

# start from last clean set
df_y_clean2 = (df_y_clean
               .withColumn("items", F.array_except("items", stop_arr))
               .filter(F.size("items") > 0)
               .cache())
n_tx2 = df_y_clean2.count()
print(f"[CLEAN+] Transactions after custom-stop removal: {n_tx2:,}")

# ---------- B) run FP-Growth ----------
min_sup = 0.02   
min_conf = 0.30  

print(f"[FP] Params: minSupport={min_sup}, minConfidence={min_conf}")
fp = FPGrowth(itemsCol="items", minSupport=min_sup, minConfidence=min_conf)
model = fp.fit(df_y_clean2)

freq = model.freqItemsets.cache()
rules = model.associationRules.cache()

# ---------- C) singleton supports and LIFT for rules with single-item consequent ----------
singleton_support = (freq
    .filter(F.size("items") == 1)
    .select(
        F.col("items")[0].alias("item"),
        (F.col("freq") / F.lit(n_tx2)).alias("support_c_pct")
    )
    .cache()
)

rules_single = (rules
    .filter(F.size("consequent") == 1)
    .withColumn("c_item", F.col("consequent")[0])
    .join(singleton_support.withColumnRenamed("support_c_pct", "support_c_pct"),
          on=(F.col("c_item") == F.col("item")), how="left")
    .drop("item")
    .withColumn("lift", F.col("confidence") / F.col("support_c_pct"))
    # Build ante_key NOW and keep it
    .withColumn("ante_key", F.concat_ws("§", F.array_sort(F.col("antecedent"))))
    .cache()
)

print("\n[Rules] Top 20 by lift (single-item consequent) AFTER custom stops:")
(rules_single
 .orderBy(F.desc("lift"))
 .select("antecedent","consequent","confidence","support_c_pct","lift")
 .show(20, truncate=False))

# ---------- D) require |antecedent| >= 2  ----------
rules_single_k2 = rules_single.filter(F.size("antecedent") >= 2).cache()
print("\n[Rules] Top 20 by lift with |antecedent|>=2:")
(rules_single_k2
 .orderBy(F.desc("lift"))
 .select("antecedent","consequent","confidence","support_c_pct","lift")
 .show(20, truncate=False))

# ---------- E) join antecedent absolute support safely ----------
ante_support = (freq
    .withColumn("key", F.concat_ws("§", F.array_sort("items")))
    .select("key", F.col("freq").alias("ante_freq_abs"))
    .cache()
)

rules_with_counts = (rules_single_k2
    .join(ante_support, rules_single_k2["ante_key"] == ante_support["key"], "left")
    .drop("key")
    .withColumn("union_est_abs", (F.col("confidence") * F.col("ante_freq_abs")).cast("long"))
    .withColumn("union_est_pct", F.col("union_est_abs") / F.lit(n_tx2))
    .cache()
)

print("\n[Rules] With antecedent abs support (top 10 by lift):")
(rules_with_counts
 .orderBy(F.desc("lift"))
 .select("antecedent","consequent","ante_freq_abs","union_est_abs","union_est_pct","confidence","lift")
 .show(10, truncate=False))


                                                                                

[CLEAN+] Transactions after custom-stop removal: 10,621
[FP] Params: minSupport=0.02, minConfidence=0.3
25/11/02 14:27:43 WARN FPGrowth: Input data is not cached.


                                                                                


[Rules] Top 20 by lift (single-item consequent) AFTER custom stops:


                                                                                

+----------------+----------+------------------+--------------------+------------------+
|antecedent      |consequent|confidence        |support_c_pct       |lift              |
+----------------+----------+------------------+--------------------+------------------+
|[版]            |[专栏]    |0.8764940239043825|0.021278599001977216|41.191340831364805|
|[专栏]          |[版]      |0.9734513274336283|0.02363242632520478 |41.191340831364805|
|[茭]            |[玉]      |0.7574850299401198|0.030223142830241974|25.063079448579476|
|[玉]            |[茭]      |0.7881619937694704|0.03144713303832031 |25.063079448579476|
|[缴获, 长]      |[短枪]    |0.7508771929824561|0.03643724696356275 |20.607407407407408|
|[翻, 翻身, 群众]|[身]      |0.7829181494661922|0.03860276810093211 |20.281399184098603|
|[翻, 翻身]      |[身]      |0.775811209439528 |0.03860276810093211 |20.09729476940787 |
|[翻, 领导]      |[身]      |0.7733812949640287|0.03860276810093211 |20.034348131251097|
|[身, 翻身, 群众]|[翻]      |0.9016393442622951|0.046135

                                                                                

### Remove single-character tokens, rerun FP-Growth, and re-rank rules

To address the earlier issue that many antecedents/consequents were single characters with little semantic value, I filtered each transaction to keep only tokens with length ≥ 2, then re-ran FP-Growth on the cleaned set (minSupport = 0.01, minConfidence = 0.30).

After filtering out single-character tokens and re-running FP-Growth, the association rules became clearer and more meaningful in semantic terms.One example is that: [“全半”, “生产”, “组织”] → [“劳力”] with confidence = 1.0 and lift ≈ 10.7. This rule suggests that whenever articles mention “production”, “organization”, and “work units” together, they almost always involve the concept of “labor”. This aligns closely with the historical context of 1947, when the Chinese Communist Party was actively promoting land reform and agricultural production in liberated areas. Newspapers frequently emphasized mobilizing labor, organizing peasants into production teams, and increasing grain outputs to support the ongoing civil war effort.

In [None]:
# ===================== Filter to length>=2 tokens, then FP-Growth =====================
custom_stop = ['网','数','马','克','公众','号','微信'] 
stop_arr = F.array(*[F.lit(x) for x in custom_stop])

# 1) Start from previous clean set. 
base_df = df_y_clean2 if 'df_y_clean2' in globals() else df_y_clean

# 2) Remove custom stopwords and ALL single-char tokens; keep only length>=2 tokens
df_y_clean3 = (base_df
    .withColumn("items", F.array_except("items", stop_arr))                # remove custom stops
    .withColumn("items", F.expr("filter(items, x -> length(x) >= 2)"))     # keep tokens with len>=2
    .filter(F.size("items") > 0)
    .cache()
)

n_tx3 = df_y_clean3.count()
print(f"[CLEAN len>=2] Transactions: {n_tx3:,}")

# Quick sanity stats
len_stats = (df_y_clean3.withColumn("items_len", F.size("items"))
    .agg(F.min("items_len").alias("min"),
         F.expr("percentile(items_len, 0.5)").alias("p50"),
         F.expr("percentile(items_len, 0.9)").alias("p90"),
         F.max("items_len").alias("max"),
         F.avg("items_len").alias("avg"))
    .collect()[0])
print(f"[CLEAN len>=2] items per doc -> min={len_stats['min']}, p50={len_stats['p50']}, "
      f"p90={len_stats['p90']}, max={len_stats['max']}, avg={len_stats['avg']:.2f}")

# 3) Run FP-Growth
min_sup = 0.01   
min_conf = 0.30 
print(f"[FP] Params: minSupport={min_sup}, minConfidence={min_conf}")

fp = FPGrowth(itemsCol="items", minSupport=min_sup, minConfidence=min_conf)
model = fp.fit(df_y_clean3)

freq  = model.freqItemsets.cache()
rules = model.associationRules.cache()

print("\n[FP] Frequent itemsets (top 20 by freq):")
freq.orderBy(F.desc("freq")).show(20, truncate=False)

# 4) Compute singleton supports  & LIFT for rules with single-item consequent
singleton_support = (freq
    .filter(F.size("items") == 1)
    .select(F.col("items")[0].alias("item"),
            (F.col("freq")/F.lit(n_tx3)).alias("support_c_pct"))
    .cache()
)

rules_single = (rules
    .filter(F.size("consequent") == 1)
    .withColumn("c_item", F.col("consequent")[0])
    .join(singleton_support, F.col("c_item") == F.col("item"), "left")
    .drop("item")
    .withColumn("lift", F.col("confidence") / F.col("support_c_pct"))
    .withColumn("ante_key", F.concat_ws("§", F.array_sort(F.col("antecedent"))))
    .cache()
)

print("\n[Rules] Top 20 by confidence (single-item consequent, with LIFT):")
(rules_single
 .orderBy(F.desc("confidence"), F.desc("lift"))
 .select("antecedent","consequent","confidence","support_c_pct","lift")
 .show(20, truncate=False))

print("\n[Rules] Top 20 by LIFT (single-item consequent):")
(rules_single
 .orderBy(F.desc("lift"))
 .select("antecedent","consequent","confidence","support_c_pct","lift")
 .show(20, truncate=False))

# Only keep readable rules with |antecedent|>=2
rules_single_k2 = rules_single.filter(F.size("antecedent") >= 2)
print("\n[Rules] |antecedent|>=2, Top 20 by LIFT:")
(rules_single_k2
 .orderBy(F.desc("lift"))
 .select("antecedent","consequent","confidence","support_c_pct","lift")
 .show(20, truncate=False))
# =============================================================================


                                                                                

[CLEAN len>=2] Transactions: 10,621
[CLEAN len>=2] items per doc -> min=10, p50=118.0, p90=249.0, max=518, avg=137.30
[FP] Params: minSupport=0.01, minConfidence=0.3
25/11/02 14:42:55 WARN FPGrowth: Input data is not cached.


                                                                                


[FP] Frequent itemsets (top 20 by freq):


                                                                                

+------------+----+
|items       |freq|
+------------+----+
|[群众]      |4715|
|[一个]      |3812|
|[组织]      |3355|
|[运动]      |3218|
|[人民]      |3093|
|[工作]      |2998|
|[领导]      |2817|
|[生产]      |2602|
|[干部]      |2571|
|[提出]      |2558|
|[参加]      |2510|
|[农民]      |2471|
|[政府]      |2408|
|[斗争]      |2305|
|[组织, 群众]|2250|
|[翻身]      |2193|
|[运动, 群众]|2174|
|[计划]      |2122|
|[领导, 群众]|2072|
|[同志]      |2042|
+------------+----+
only showing top 20 rows


[Rules] Top 20 by confidence (single-item consequent, with LIFT):


                                                                                

+------------------------------+----------+----------+-------------------+------------------+
|antecedent                    |consequent|confidence|support_c_pct      |lift              |
+------------------------------+----------+----------+-------------------+------------------+
|[全半, 生产, 组织]            |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 领导, 组织]            |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 生产]                  |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 领导]                  |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 妇女]                  |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 互助, 组织]            |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 妇女, 组织]            |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[全半, 互助]                  |[劳力]    |1.0       |0.09377648055738631|10.663654618473895|
|[打通, 讨论, 提出, 运动, 群众]|

In [None]:
# ========== Targeted consequents: 民主 / 群众 / 人民 ==========

# 1) Recompute antecedent absolute support safely
ante_support = (freq
    .withColumn("key", F.concat_ws("§", F.array_sort("items")))
    .select("key", F.col("freq").alias("ante_freq_abs"))
    .cache()
)

# 2) Helper: materialize rules_single with a stable 'ante_key'
rules_single_keyed = (rules_single
    .withColumn("ante_key", F.concat_ws("§", F.array_sort(F.col("antecedent"))))
    .cache()
)

targets = ["民主", "群众", "人民"]  # consequents of interest
min_items_in_antecedent = 2        # require |A| >= 2 for readability
min_ante_abs = 50                  # require antecedent to appear in >= 50 docs (tune)
min_union_abs = 30                 # require estimated |A ∪ C| >= 30 docs (tune)
topk = 20

for tgt in targets:
    print(f"\n================ Target Consequent: {tgt} ================")

    # 3) Filter rules whose consequent equals the target
    rs_tgt = (rules_single_keyed
        .filter((F.size("consequent") == 1) & (F.col("consequent")[0] == tgt))
        .filter(F.size("antecedent") >= min_items_in_antecedent)
        # join antecedent absolute support
        .join(ante_support, rules_single_keyed["ante_key"] == ante_support["key"], "left")
        .drop("key")
        # estimate support for A∪C by conf * |A|
        .withColumn("union_est_abs", (F.col("confidence") * F.col("ante_freq_abs")).cast("long"))
        .withColumn("union_est_pct", F.col("union_est_abs") / F.lit(n_tx3))
        .cache()
    )

    # 4) Apply practical support floors to avoid spurious pairs
    rs_tgt_filt = (rs_tgt
        .filter(F.col("ante_freq_abs") >= F.lit(min_ante_abs))
        .filter(F.col("union_est_abs") >= F.lit(min_union_abs))
        .cache()
    )

    # 5) Show Top-K by lift and by confidence
    print(f"[{tgt}] Top {topk} by LIFT (|A|>={min_items_in_antecedent}, A>= {min_ante_abs} docs, A∪C>= {min_union_abs} docs):")
    (rs_tgt_filt
        .orderBy(F.desc("lift"), F.desc("confidence"))
        .select("antecedent","consequent","ante_freq_abs","union_est_abs","union_est_pct","confidence","support_c_pct","lift")
        .show(topk, truncate=False)
    )

    print(f"[{tgt}] Top {topk} by CONFIDENCE (with same floors):")
    (rs_tgt_filt
        .orderBy(F.desc("confidence"), F.desc("lift"))
        .select("antecedent","consequent","ante_freq_abs","union_est_abs","union_est_pct","confidence","support_c_pct","lift")
        .show(topk, truncate=False)
    )

25/11/02 14:57:51 WARN CacheManager: Asked to cache already cached data.
25/11/02 14:57:51 WARN CacheManager: Asked to cache already cached data.

25/11/02 14:57:51 WARN CacheManager: Asked to cache already cached data.
25/11/02 14:57:51 WARN CacheManager: Asked to cache already cached data.
[民主] Top 20 by LIFT (|A|>=2, A>= 50 docs, A∪C>= 30 docs):
+--------------------------+----------+-------------+-------------+--------------------+------------------+-------------------+-----------------+
|antecedent                |consequent|ante_freq_abs|union_est_abs|union_est_pct       |confidence        |support_c_pct      |lift             |
+--------------------------+----------+-------------+-------------+--------------------+------------------+-------------------+-----------------+
|[联军, 收复]              |[民主]    |112          |112          |0.010545146408059505|1.0               |0.17766688635721684|5.628510863804982|
|[联军, 我军]              |[民主]    |108          |108          |0.01016853