In [4]:
import findspark
findspark.init()

from pyspark import SparkContext

spark = SparkSession\
    .builder\
    .appName("spark mysql")\
    .getOrCreate()

from pyspark.mllib.fpm import FPGrowth

In [5]:


data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
rdd = sc.parallelize(data, 2)

model = FPGrowth.train(rdd, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

FreqItemset(items=['a'], freq=4)
FreqItemset(items=['c'], freq=3)
FreqItemset(items=['c', 'a'], freq=3)
FreqItemset(items=['e'], freq=2)
FreqItemset(items=['e', 'c'], freq=1)
FreqItemset(items=['e', 'c', 'a'], freq=1)
FreqItemset(items=['e', 'a'], freq=2)
FreqItemset(items=['b'], freq=2)
FreqItemset(items=['b', 'e'], freq=1)
FreqItemset(items=['b', 'e', 'a'], freq=1)
FreqItemset(items=['b', 'c'], freq=1)
FreqItemset(items=['b', 'c', 'a'], freq=1)
FreqItemset(items=['b', 'a'], freq=2)
FreqItemset(items=['d'], freq=1)
FreqItemset(items=['d', 'e'], freq=1)
FreqItemset(items=['d', 'e', 'a'], freq=1)
FreqItemset(items=['d', 'b'], freq=1)
FreqItemset(items=['d', 'b', 'e'], freq=1)
FreqItemset(items=['d', 'b', 'e', 'a'], freq=1)
FreqItemset(items=['d', 'b', 'a'], freq=1)
FreqItemset(items=['d', 'a'], freq=1)
FreqItemset(items=['f'], freq=1)
FreqItemset(items=['f', 'c'], freq=1)
FreqItemset(items=['f', 'c', 'a'], freq=1)
FreqItemset(items=['f', 'a'], freq=1)


In [6]:
from pyspark import SQLContext

sqlContext = SQLContext(sc)

#ignore this , it is just for testing
toy_products_df = sqlContext.read.csv("./amazon_co-ecommerce_sample.csv")

In [7]:
toy_products_df.show(20)

+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-------+
|                 _c0|                 _c1|                 _c2|                 _c3|                 _c4|              _c5|                 _c6|                 _c7|                 _c8|                 _c9|                _c10|                _c11|                _c12|                _c13|                _c14|            _c15|   _c16|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------

In [8]:
# reading amazon's toy products
# it is just for testing
product_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('./amazon_co-ecommerce_sample.csv')

In [9]:
product_df.select('product_name').show(10)

+--------------------+
|        product_name|
+--------------------+
|Hornby 2014 Catal...|
|                null|
|                null|
|Worth Buying For ...|
|                null|
| even if it inclu...|
|                null|
| every credit to ...|
|                null|
| so this has been...|
+--------------------+
only showing top 10 rows



In [10]:
# getting product names and removing null values
# it is just for testing
product_name = product_df.where(product_df.product_name.isNotNull()).select('product_name')


In [11]:
product_name.show(10)

+--------------------+
|        product_name|
+--------------------+
|Hornby 2014 Catal...|
|Worth Buying For ...|
| even if it inclu...|
| every credit to ...|
| so this has been...|
|"{""seller""=>[{"...|
|FunkyBuys® Large ...|
|"{""seller""=>{""...|
|CLASSIC TOY TRAIN...|
| it literally did...|
+--------------------+
only showing top 10 rows



In [12]:
# Frequent pattern mining for user clicked items/transection items
product_data_rdd = sc.textFile('./fp_product_data.txt')

In [13]:
product_data_rdd.take(5)

['82475 -1 84211 -1 86919 -1 86927 -1 86943 -1 -2',
 '56109 -1 222699 -1 -2',
 '55455 -1 -2',
 '81795 -1 81991 -1 -2',
 '55403 -1 55407 -1 55411 -1 55435 -1 55831 -1 55835 -1 55839 -1 55847 -1 55863 -1 55879 -1 55895 -1 55991 -1 56077 -1 56225 -1 56373 -1 56765 -1 222499 -1 222511 -1 222607 -1 222699 -1 -2']

In [14]:
product_ready_rdd = product_data_rdd.map(lambda x: x.split(",")).map(lambda x: x[0].split(" "))
product_ready_rdd.take(5)

[['82475',
  '-1',
  '84211',
  '-1',
  '86919',
  '-1',
  '86927',
  '-1',
  '86943',
  '-1',
  '-2'],
 ['56109', '-1', '222699', '-1', '-2'],
 ['55455', '-1', '-2'],
 ['81795', '-1', '81991', '-1', '-2'],
 ['55403',
  '-1',
  '55407',
  '-1',
  '55411',
  '-1',
  '55435',
  '-1',
  '55831',
  '-1',
  '55835',
  '-1',
  '55839',
  '-1',
  '55847',
  '-1',
  '55863',
  '-1',
  '55879',
  '-1',
  '55895',
  '-1',
  '55991',
  '-1',
  '56077',
  '-1',
  '56225',
  '-1',
  '56373',
  '-1',
  '56765',
  '-1',
  '222499',
  '-1',
  '222511',
  '-1',
  '222607',
  '-1',
  '222699',
  '-1',
  '-2']]

In [15]:
product_ready_rdd.count()

77512

In [None]:
from pyspark.mllib.fpm import FPGrowth
import numpy as np

# function for generating min Support
def find_minSupport(x, a=0.4, b=0.2, c=0.2):
    return np.finfo(np.exp([a * x + b])) + c

# training the model
unique = product_ready_rdd.map(lambda x: list(set(x))).cache()
fp_model = FPGrowth.train(unique, minSupport=0.2, numPartitions=2)
# getting frequent items sets/FP tree
result = fp_model.freqItemsets().collect()

# display top 10 sets of frequent items
i = 0
for fi in result:
    if i <= 10:
        print(fi)
        i += 1
    else:
        break

In [19]:
for fi in result:
        print(fi)

FreqItemset(items=['-1'], freq=77512)
FreqItemset(items=['-2'], freq=77512)
FreqItemset(items=['-2', '-1'], freq=77512)


In [1]:
from pyspark.ml.fpm import FPGrowth

df = spark.createDataFrame([
    (0, [1, 2, 5]),
    (1, [1, 2, 3, 5]),
    (2, [1, 2])
], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show()

+---------+----+
|    items|freq|
+---------+----+
|      [1]|   3|
|      [2]|   3|
|   [2, 1]|   3|
|      [5]|   2|
|   [5, 2]|   2|
|[5, 2, 1]|   2|
|   [5, 1]|   2|
+---------+----+

+----------+----------+------------------+
|antecedent|consequent|        confidence|
+----------+----------+------------------+
|    [5, 2]|       [1]|               1.0|
|       [2]|       [1]|               1.0|
|       [2]|       [5]|0.6666666666666666|
|    [2, 1]|       [5]|0.6666666666666666|
|       [5]|       [2]|               1.0|
|       [5]|       [1]|               1.0|
|    [5, 1]|       [2]|               1.0|
|       [1]|       [2]|               1.0|
|       [1]|       [5]|0.6666666666666666|
+----------+----------+------------------+

+---+------------+----------+
| id|       items|prediction|
+---+------------+----------+
|  0|   [1, 2, 5]|        []|
|  1|[1, 2, 3, 5]|        []|
|  2|      [1, 2]|       [5]|
+---+------------+----------+

