# Initializing Spark

In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder.appName("Simple data mining with Synthetic Financial Dataset").getOrCreate()
    

In [2]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x00000208441EB4A8>


# Loading Dataset 

In [2]:
df = spark.read.csv("D:\Kuliah\Smt6\Big Data\Tugas\data.csv", header=True, inferSchema=True)

In [4]:
#Show Dataset

df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [5]:
#Count how many rows in dataset

df.count()

541909

In [6]:
#Show Dataset Schema

df.schema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

In [3]:
#Look at the grouping of invoice with stock and description

data = df[['InvoiceNo','StockCode','Description']]

In [6]:
data.show()

+---------+---------+--------------------+
|InvoiceNo|StockCode|         Description|
+---------+---------+--------------------+
|   536365|   85123A|WHITE HANGING HEA...|
|   536365|    71053| WHITE METAL LANTERN|
|   536365|   84406B|CREAM CUPID HEART...|
|   536365|   84029G|KNITTED UNION FLA...|
|   536365|   84029E|RED WOOLLY HOTTIE...|
|   536365|    22752|SET 7 BABUSHKA NE...|
|   536365|    21730|GLASS STAR FROSTE...|
|   536366|    22633|HAND WARMER UNION...|
|   536366|    22632|HAND WARMER RED P...|
|   536367|    84879|ASSORTED COLOUR B...|
|   536367|    22745|POPPY'S PLAYHOUSE...|
|   536367|    22748|POPPY'S PLAYHOUSE...|
|   536367|    22749|FELTCRAFT PRINCES...|
|   536367|    22310|IVORY KNITTED MUG...|
|   536367|    84969|BOX OF 6 ASSORTED...|
|   536367|    22623|BOX OF VINTAGE JI...|
|   536367|    22622|BOX OF VINTAGE AL...|
|   536367|    21754|HOME BUILDING BLO...|
|   536367|    21755|LOVE BUILDING BLO...|
|   536367|    21777|RECIPE BOX WITH M...|
+---------+

# Group Data by Invoice Number

In [4]:
from pyspark.sql.functions import collect_list
new_data = data.groupby("InvoiceNo").agg(collect_list('StockCode').alias('StockCode'))

In [7]:
new_data.show()

+---------+--------------------+
|InvoiceNo|           StockCode|
+---------+--------------------+
|   536596|[21624, 22900, 22...|
|   536938|[22386, 85099C, 2...|
|   537252|             [22197]|
|   537691|[22791, 22171, 82...|
|   538041|             [22145]|
|   538184|[22585, 21481, 22...|
|   538517|[22491, 21232, 21...|
|   538879|[84819, 22150, 21...|
|   539275|[22909, 22423, 22...|
|   539630|[21484, 85099B, 2...|
|   540499|[21868, 22697, 22...|
|   540540|[21877, 21868, 21...|
|   540976|[22394, 21890, 22...|
|   541432|[21485, 22457, 84...|
|   541518|[21880, 21881, 21...|
|   541783|[22423, 22854, 22...|
|   542026|[21754, 82600, 22...|
|   542375|[21731, 22367, 22...|
|  C540850|             [21231]|
|   543641|[85123A, 21833, 2...|
+---------+--------------------+
only showing top 20 rows



In [5]:
#Remove duplicates data using udf function 
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

remove_dupes = udf(lambda row: list(set(row)), ArrayType(StringType()))
new_data = new_data.withColumn("remove_dupes", remove_dupes("StockCode"))

In [9]:
new_data.show()

+---------+--------------------+--------------------+
|InvoiceNo|           StockCode|        remove_dupes|
+---------+--------------------+--------------------+
|   536596|[21624, 22900, 22...|[84926A, 21624, 2...|
|   536938|[22386, 85099C, 2...|[21479, 84997B, 2...|
|   537252|             [22197]|             [22197]|
|   537691|[22791, 22171, 82...|[22505, 22791, 82...|
|   538041|             [22145]|             [22145]|
|   538184|[22585, 21481, 22...|[22492, 22561, 48...|
|   538517|[22491, 21232, 21...|[22197, 22844, 22...|
|   538879|[84819, 22150, 21...|[22130, 22555, 84...|
|   539275|[22909, 22423, 22...|[22423, 21914, 22...|
|   539630|[21484, 85099B, 2...|[22988, 84347, 22...|
|   540499|[21868, 22697, 22...|[21755, 84978, 22...|
|   540540|[21877, 21868, 21...|[22555, 22551, 22...|
|   540976|[22394, 21890, 22...|[22207, 21110, 84...|
|   541432|[21485, 22457, 84...|[22113, 22457, 21...|
|   541518|[21880, 21881, 21...|[20724, 21982, 20...|
|   541783|[22423, 22854, 22

# FP Tree Algorithm

In [6]:
from pyspark.ml.fpm import FPGrowth

## Test model using a large support and confindence

In [7]:
fpGrowth = FPGrowth(itemsCol="remove_dupes", minSupport=0.05, minConfidence=0.08)
model = fpGrowth.fit(new_data)

In [15]:
model.freqItemsets.show()

+--------+----+
|   items|freq|
+--------+----+
|[85123A]|2246|
| [22423]|2172|
|[85099B]|2135|
| [47566]|1706|
| [20725]|1608|
| [84879]|1468|
| [22720]|1462|
| [22197]|1442|
| [21212]|1334|
| [22383]|1306|
| [20727]|1295|
+--------+----+



In [17]:
model.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



# Test model using a medium support and confindence

In [16]:
fpGrowth2 = FPGrowth(itemsCol="remove_dupes", minSupport=0.04, minConfidence=0.06)
model2 = fpGrowth2.fit(new_data)

In [18]:
model2.freqItemsets.show()

+--------+----+
|   items|freq|
+--------+----+
|[85123A]|2246|
| [22423]|2172|
|[85099B]|2135|
| [47566]|1706|
| [20725]|1608|
| [84879]|1468|
| [22720]|1462|
| [22197]|1442|
| [21212]|1334|
| [22383]|1306|
| [20727]|1295|
| [22457]|1266|
|  [POST]|1254|
| [23203]|1249|
| [22386]|1231|
| [22960]|1220|
| [22469]|1214|
| [21931]|1201|
| [22411]|1187|
| [22961]|1174|
+--------+----+
only showing top 20 rows



In [19]:
model2.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



# Test model using a small support and confindence

In [8]:
fpGrowth3 = FPGrowth(itemsCol="remove_dupes", minSupport=0.02, minConfidence=0.03)
model3 = fpGrowth3.fit(new_data)

In [9]:
model3.freqItemsets.show()

+---------------+----+
|          items|freq|
+---------------+----+
|       [85123A]|2246|
|        [22423]|2172|
|       [85099B]|2135|
|        [47566]|1706|
|        [20725]|1608|
|[20725, 85099B]| 588|
|        [84879]|1468|
|        [22720]|1462|
|        [22197]|1442|
|        [21212]|1334|
|        [22383]|1306|
| [22383, 20725]| 663|
|        [20727]|1295|
| [20727, 20725]| 648|
| [20727, 22383]| 587|
|        [22457]|1266|
|         [POST]|1254|
|        [23203]|1249|
|[23203, 85099B]| 582|
|        [22386]|1231|
+---------------+----+
only showing top 20 rows



In [10]:
model3.associationRules.show()

+----------+----------+-------------------+
|antecedent|consequent|         confidence|
+----------+----------+-------------------+
|   [22699]|   [22423]|0.47946428571428573|
|   [22699]|   [22697]|                0.7|
|   [22699]|   [22698]| 0.5482142857142858|
|   [22386]|  [85099B]| 0.6766856214459789|
|   [22386]|   [21931]| 0.4207961007311129|
|   [20727]|   [20725]| 0.5003861003861004|
|   [20727]|   [22383]| 0.4532818532818533|
|   [20727]|   [20728]| 0.4061776061776062|
|   [20727]|   [22384]| 0.4223938223938224|
|   [22382]|   [20725]| 0.4811965811965812|
|   [22382]|   [22383]|0.45897435897435895|
|   [20725]|  [85099B]| 0.3656716417910448|
|   [20725]|   [22383]| 0.4123134328358209|
|   [20725]|   [20727]|0.40298507462686567|
|   [20725]|   [20728]|0.34950248756218905|
|   [20725]|   [22382]|0.35012437810945274|
|   [20725]|   [22384]| 0.3812189054726368|
|   [20725]|   [20726]| 0.3308457711442786|
|   [22384]|   [20725]| 0.5522522522522523|
|   [22384]|   [20727]| 0.492792

In [12]:
model3.transform(new_data).show()

+---------+--------------------+--------------------+--------------------+
|InvoiceNo|           StockCode|        remove_dupes|          prediction|
+---------+--------------------+--------------------+--------------------+
|   536596|[21624, 22900, 22...|[84926A, 21624, 2...|                  []|
|   536938|[22386, 85099C, 2...|[21479, 84997B, 2...|     [85099B, 22411]|
|   537252|             [22197]|             [22197]|                  []|
|   537691|[22791, 22171, 82...|[22505, 22791, 82...|                  []|
|   538041|             [22145]|             [22145]|                  []|
|   538184|[22585, 21481, 22...|[22492, 22561, 48...|                  []|
|   538517|[22491, 21232, 21...|[22197, 22844, 22...|                  []|
|   538879|[84819, 22150, 21...|[22130, 22555, 84...|                  []|
|   539275|[22909, 22423, 22...|[22423, 21914, 22...|      [22699, 22697]|
|   539630|[21484, 85099B, 2...|[22988, 84347, 22...|[20725, 23203, 22...|
|   540499|[21868, 22697,

In [15]:
#Get example of items and put it into Data Frame
compare = spark.createDataFrame([
    ('0',['22423']),
    ('1', ['20725', '21212'])
],['ID','remove_dupes'])

In [16]:
model3.transform(compare).show()

+---+--------------+--------------------+
| ID|  remove_dupes|          prediction|
+---+--------------+--------------------+
|  0|       [22423]|      [22699, 22697]|
|  1|[20725, 21212]|[85099B, 22383, 2...|
+---+--------------+--------------------+



In [18]:
data.createOrReplaceTempView("ecommerce")

In [23]:
# Find the Description for comparison ID 0
query = spark.sql("SELECT DISTINCT StockCode, Description \
                FROM ecommerce \
                WHERE (StockCode = '22423' OR StockCode = '22699' OR StockCode = '22697') ORDER BY StockCode")
query.show(truncate = False)

+---------+--------------------------------+
|StockCode|Description                     |
+---------+--------------------------------+
|22423    |faulty                          |
|22423    |damages                         |
|22423    |REGENCY CAKESTAND 3 TIER        |
|22697    |GREEN REGENCY TEACUP AND SAUCER |
|22699    |ROSES REGENCY TEACUP AND SAUCER |
+---------+--------------------------------+



In [24]:
# Find the Description for comparison ID 1
query = spark.sql("SELECT DISTINCT StockCode, Description \
                FROM ecommerce \
                WHERE (StockCode = '20725' OR StockCode = '21212' OR StockCode = '85099B' OR StockCOde = '22383') \
                ORDER BY StockCode")
query.show(truncate = False)

+---------+-------------------------------+
|StockCode|Description                    |
+---------+-------------------------------+
|20725    |LUNCH BAG RED RETROSPOT        |
|20725    |LUNCH BAG RED SPOTTY           |
|21212    |PACK OF 72 RETROSPOT CAKE CASES|
|22383    |LUNCH BAG SUKI  DESIGN         |
|22383    |LUNCH BAG SUKI DESIGN          |
|85099B   |JUMBO BAG RED RETROSPOT        |
+---------+-------------------------------+



# Referrence

Remove Multiple Items : https://stackoverflow.com/questions/54185710/remove-duplicates-from-pyspark-array-column 