In [12]:
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder

# 加载数据
data = pd.read_csv('Market_Basket_Optimisation.csv')

In [13]:
# 数据预处理
# 创建一个空列表records，用于存储每个事务中的商品
records = []
# 遍历数据集中的每一行
records = data.apply(lambda row: row.dropna().astype(str).tolist(), axis=1).tolist()


# 使用TransactionEncoder将事务数据转换为适合Apriori算法处理的布尔矩阵格式
te = TransactionEncoder()
te_ary = te.fit(records).transform(records)
df = pd.DataFrame(te_ary, columns=te.columns_)

In [14]:
from mlxtend.frequent_patterns import apriori, association_rules

# Apriori算法生成频繁项集
# 使用apriori函数，设置最小支持度为0.05，use_colnames=True表示使用DataFrame的列名作为项集的名称
frequent_itemsets_apriori = apriori(df, min_support=0.05, use_colnames=True)

# 提取关联规则
# 使用association_rules函数从频繁项集中提取关联规则，设置最小置信度为0.5
rules_apriori = association_rules(frequent_itemsets_apriori, metric="confidence", min_threshold=0.5)

# 筛选高提升度规则
# 筛选出提升度大于2.0的关联规则
high_lift_rules_apriori = rules_apriori[rules_apriori["lift"] > 2.0]

# 打印频繁项集和关联规则
print("Apriori频繁项集：")
print(frequent_itemsets_apriori)
print("\nApriori关联规则：")
print(rules_apriori)
print("\nApriori高提升度关联规则：")
print(high_lift_rules_apriori)

Apriori频繁项集：
     support                    itemsets
0   0.087200                   (burgers)
1   0.081067                      (cake)
2   0.060000                   (chicken)
3   0.163867                 (chocolate)
4   0.080400                   (cookies)
5   0.051067               (cooking oil)
6   0.179733                      (eggs)
7   0.079333                  (escalope)
8   0.170933              (french fries)
9   0.063200           (frozen smoothie)
10  0.095333         (frozen vegetables)
11  0.052400             (grated cheese)
12  0.132000                 (green tea)
13  0.098267               (ground beef)
14  0.076400            (low fat yogurt)
15  0.129600                      (milk)
16  0.238267             (mineral water)
17  0.065733                 (olive oil)
18  0.095067                  (pancakes)
19  0.071333                    (shrimp)
20  0.050533                      (soup)
21  0.174133                 (spaghetti)
22  0.068400                  (tomatoes)
23 

In [15]:
# 可视化部分添加判断逻辑，防止空值导致报错
if not rules_apriori.empty:
    # 相关性分析 - 提升度热力图
    rules_apriori_pivot = rules_apriori.pivot(index='antecedents', columns='consequents', values='lift')

    import matplotlib.pyplot as plt
    import seaborn as sns

    # 创建图形
    plt.figure(figsize=(12, 8))
    # 绘制热力图
    sns.heatmap(rules_apriori_pivot.fillna(0), annot=True, cmap='YlGnBu')
    plt.title('Apriori Lift Heatmap')
    plt.show()
else:
    print("没有关联规则可供绘制热力图，请调整参数或使用更低的 min_threshold。")


没有关联规则可供绘制热力图，请调整参数或使用更低的 min_threshold。


In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import time  # 确保导入 time 模块
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession

# 尝试获取当前活跃的 session
try:
    spark = SparkSession.getActiveSession()
    if spark is not None:
        print("正在关闭已有的 SparkSession...")
        spark.stop()
except Exception as e:
    print("无活跃的 SparkSession，无需关闭。")
    pass

# 创建新的 SparkSession
spark = SparkSession.builder.appName("FPGrowth").getOrCreate()
print("新的 SparkSession 已创建。")

# 检查是否已有活跃的 SparkSession
spark = SparkSession.builder.appName("FPGrowth").getOrCreate()
print("当前 SparkSession 状态：Active" if spark.sparkContext.isStopped is False else "已停止")

# 创建或获取 SparkSession
spark = SparkSession.builder.appName("FPGrowth").getOrCreate()

# 假设 records 已经定义
data_spark = spark.createDataFrame([(record,) for record in records], ["items"])

# 初始化 FP-Growth 模型
fp_growth = FPGrowth(itemsCol="items", minSupport=0.05, minConfidence=0.5)

# 记录开始时间
start_time_fp = time.time()

# 拟合模型
model_fp = fp_growth.fit(data_spark)

# 记录结束时间
end_time_fp = time.time()

# 提取结果
frequent_itemsets_fp = model_fp.freqItemsets
association_rules_fp = model_fp.associationRules

# 打印结果
print("FP - Growth频繁项集：")
frequent_itemsets_fp.show()
print("\nFP - Growth关联规则：")
association_rules_fp.show()

# 性能对比分析
start_time_ap = time.time()
frequent_itemsets_apriori = apriori(df, min_support=0.05, use_colnames=True)
rules_apriori = association_rules(frequent_itemsets_apriori, metric="confidence", min_threshold=0.5)
end_time_ap = time.time()

print(f"Apriori算法运行时间: {end_time_ap - start_time_ap} 秒")
print(f"FP - Growth算法运行时间: {end_time_fp - start_time_fp} 秒")  # 这里不再报错

# 停止 SparkSession
spark.stop()


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Unknown Source)
	at org.apache.spark.SparkContext$.$anonfun$assertNoOtherContextIsRunning$2(SparkContext.scala:2525)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2522)
	at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2599)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:88)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


In [18]:
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori, association_rules
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import time

# -----------------------------
# 数据加载与预处理（Apriori）
# -----------------------------
data = pd.read_csv('Market_Basket_Optimisation.csv')
records = data.apply(lambda row: row.dropna().astype(str).tolist(), axis=1).tolist()

te = TransactionEncoder()
te_ary = te.fit(records).transform(records)
df = pd.DataFrame(te_ary, columns=te.columns_)

# -----------------------------
# Apriori 算法执行
# -----------------------------
frequent_itemsets_apriori = apriori(df, min_support=0.05, use_colnames=True)
rules_apriori = association_rules(frequent_itemsets_apriori, metric="confidence", min_threshold=0.3)
high_lift_rules_apriori = rules_apriori[rules_apriori["lift"] > 2.0]

# -----------------------------
# Spark 初始化（安全方式）
# -----------------------------
# 先检查是否有活跃的 SparkSession 并关闭
try:
    existing_spark = SparkSession.getActiveSession()
    if existing_spark is not None:
        print("检测到已有活跃的 SparkSession，正在关闭...")
        existing_spark.stop()
except:
    pass

# 创建新的 SparkSession
spark = SparkSession.builder.appName("FPGrowth").getOrCreate()
print("SparkSession 已创建。")

# 转换为 Spark DataFrame
data_spark = spark.createDataFrame([(record,) for record in records], ["items"])

# FP-Growth 模型训练
fp_growth = FPGrowth(itemsCol="items", minSupport=0.05, minConfidence=0.3)
start_time_fp = time.time()
model_fp = fp_growth.fit(data_spark)
end_time_fp = time.time()

# 提取频繁项集和关联规则
frequent_itemsets_fp = model_fp.freqItemsets
association_rules_fp = model_fp.associationRules

# 打印结果
print("FP - Growth频繁项集：")
frequent_itemsets_fp.show()
print("\nFP - Growth关联规则：")
association_rules_fp.show()

# 性能对比
start_time_ap = time.time()
frequent_itemsets_apriori = apriori(df, min_support=0.05, use_colnames=True)
rules_apriori = association_rules(frequent_itemsets_apriori, metric="confidence", min_threshold=0.3)
end_time_ap = time.time()

print(f"Apriori算法运行时间: {end_time_ap - start_time_ap} 秒")
print(f"FP - Growth算法运行时间: {end_time_fp - start_time_fp} 秒")

# 停止 SparkSession
spark.stop()


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Unknown Source)
	at org.apache.spark.SparkContext$.$anonfun$assertNoOtherContextIsRunning$2(SparkContext.scala:2525)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2522)
	at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2599)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:88)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
