# installation

In [1]:
! pip install -q pyspark

[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[K     |████████████████████████████████| 198 kB 46.4 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
! tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [4]:
! pip install -q findspark

In [5]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

In [6]:
# First Method
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()

sc = spark.sparkContext

In [7]:
spark.version

'3.2.1'

# Q5

##### Dataset

In [8]:
!wget "http://snap.stanford.edu/class/cs246-data/browsing.txt"

--2022-04-12 15:05:28--  http://snap.stanford.edu/class/cs246-data/browsing.txt
Resolving snap.stanford.edu (snap.stanford.edu)... 171.64.75.80
Connecting to snap.stanford.edu (snap.stanford.edu)|171.64.75.80|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3458517 (3.3M) [text/plain]
Saving to: ‘browsing.txt’


2022-04-12 15:05:31 (1.46 MB/s) - ‘browsing.txt’ saved [3458517/3458517]



In [9]:
spark = SparkSession(sc)

In [112]:
sc_data = sc.textFile("browsing.txt")
sc_data.collect()

['FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 ',
 'GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ',
 'ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ',
 'ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ',
 'ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 ',
 'ELE17451 GRO73461 DAI22896 SNA99873 FRO18919 DAI50921 SNA80192 GRO75578 ',
 'ELE17451 ELE59935 FRO18919 ELE23393 SNA80192 SNA85662 SNA91554 DAI22177 ',
 'ELE17451 SNA69641 FRO18919 SNA90258 ELE28573 ELE11375 DAI14125 FRO78087 ',
 'ELE17451 GRO73461 DAI22896 SNA80192 SNA85662 SNA90258 DAI46755 FRO81176 ELE66810 DAI49199 DAI91535 GRO94758 ELE94711 DAI22177 ',
 'ELE17451 SNA69641 DAI91535 GRO94758 GRO99222 FRO76833 FRO81176 SNA80192 DAI54690 ELE37798 GRO56989 ',
 'ELE17451 GRO73461 DAI22896 GRO99222 SNA47306 GRO36567 ELE82555 SNA17715 SNA94781 DAI87514 GRO48282 GRO12935 SNA55952 DAI93692 DAI92253 FRO82427 ELE26917 DAI22177 ',
 'GRO9922

# First Pass of A-Priori Algorithm

here we count the number of each item's occurence in the baskets.

In [99]:
import re
# Map
def map_item_to_one(line):
  results = []
  line = re.sub(' +', ' ', str(line))
  line = line.split()
  for item in line:
    results.append((item, 1))
  return results
mapped_to_one = sc_data.flatMap(map_item_to_one)

#Reduce
single_item_count_rdd = mapped_to_one.reduceByKey(lambda freq1, freq2: freq1+freq2)

supports = single_item_count_rdd.map(lambda item: item[1]) # returns only support of each item

In [100]:
min_support = 1000
freq_items_counts = single_item_count_rdd.filter(lambda item: item[1] >= min_support)

In [102]:
freq_items_counts.collect()[:10]

[('ELE17451', 3875),
 ('ELE26917', 2292),
 ('GRO73461', 3602),
 ('DAI22896', 1219),
 ('SNA99873', 2083),
 ('FRO78087', 1531),
 ('ELE59935', 1311),
 ('DAI22177', 1627),
 ('ELE66810', 1697),
 ('GRO94758', 1489)]

# Pass 2

In [103]:
freq_items = freq_items_counts.map(lambda item: item[0])
def filter_repeatitive(itemset):
  if itemset[0] == itemset[1]:
    return False
  if itemset[0] > itemset[1]:
    return False
  return True
combined_freq_items = freq_items.cartesian(freq_items)
combined_freq_items = combined_freq_items.filter(filter_repeatitive)

In [107]:
('ELE17451', 'ELE26917') in combined_freq_items.collect()

True

In [119]:
def map_combined_to_one(line):
  results = []
  line = re.sub(' +', ' ', str(line))
  line = line.split()
  for item1 in line:
    for item2 in line:
      if (item1, item2) in combined_freq_items_list:
        results.append(((item1, item2), 1))
  return results
combined_freq_items_list = combined_freq_items.collect()
combined_freq_items_mapped = sc_data.flatMap(map_combined_to_one)
reduced = combined_freq_items_mapped.reduceByKey(lambda freq1, freq2: freq1+freq2)

In [121]:
frequent_pairs = reduced.collect()

In [123]:
print("most frequent pairs(items which are usually bought together): ")
reduced.filter(lambda item: item[1]>400).collect()

most frequent pairs(items which are usually bought together): 


[(('ELE17451', 'GRO73461'), 580),
 (('ELE17451', 'GRO30386'), 468),
 (('ELE17451', 'GRO59710'), 408),
 (('DAI62779', 'FRO78087'), 482),
 (('DAI62779', 'ELE17451'), 1592),
 (('DAI62779', 'SNA55762'), 593),
 (('DAI62779', 'ELE26917'), 650),
 (('DAI62779', 'SNA99873'), 406),
 (('DAI62779', 'SNA45677'), 604),
 (('DAI62779', 'SNA93860'), 537),
 (('DAI62779', 'FRO40251'), 1070),
 (('ELE17451', 'FRO40251'), 697),
 (('FRO40251', 'GRO85051'), 1213),
 (('DAI62779', 'GRO73461'), 1139),
 (('DAI62779', 'GRO30386'), 709),
 (('DAI62779', 'ELE32164'), 832),
 (('ELE17451', 'ELE32164'), 511),
 (('ELE32164', 'GRO73461'), 486),
 (('DAI43223', 'ELE32164'), 711),
 (('FRO40251', 'GRO73461'), 882),
 (('GRO73461', 'SNA80324'), 562),
 (('DAI75645', 'GRO73461'), 712),
 (('DAI75645', 'FRO40251'), 1254),
 (('DAI62779', 'SNA80324'), 923),
 (('ELE17451', 'SNA80324'), 597),
 (('FRO40251', 'SNA80324'), 1412),
 (('GRO85051', 'SNA80324'), 471),
 (('GRO38814', 'GRO73461'), 427),
 (('DAI75645', 'SNA80324'), 1130),
 (('DAI

In [124]:
print("most frequent items:")
freq_items_counts.collect()

most frequent items:


[('ELE17451', 3875),
 ('ELE26917', 2292),
 ('GRO73461', 3602),
 ('DAI22896', 1219),
 ('SNA99873', 2083),
 ('FRO78087', 1531),
 ('ELE59935', 1311),
 ('DAI22177', 1627),
 ('ELE66810', 1697),
 ('GRO94758', 1489),
 ('SNA55952', 1094),
 ('FRO32293', 1702),
 ('GRO30386', 1840),
 ('DAI35347', 1060),
 ('SNA93860', 1407),
 ('SNA72163', 1090),
 ('FRO31317', 2330),
 ('GRO15017', 1275),
 ('GRO59710', 2004),
 ('DAI63921', 1773),
 ('ELE66600', 1713),
 ('DAI91290', 1138),
 ('ELE14480', 1147),
 ('DAI62779', 6667),
 ('SNA55762', 1646),
 ('ELE91337', 1289),
 ('SNA45677', 2455),
 ('FRO40251', 3881),
 ('GRO85051', 1214),
 ('GRO44993', 1193),
 ('GRO61133', 1321),
 ('GRO81087', 1220),
 ('ELE32164', 2851),
 ('DAI88807', 1316),
 ('GRO21487', 2115),
 ('DAI43223', 1290),
 ('SNA80324', 3044),
 ('DAI75645', 2736),
 ('GRO38814', 1352),
 ('ELE99737', 1516),
 ('SNA96271', 1295),
 ('FRO35904', 1436),
 ('FRO85978', 1918),
 ('FRO53271', 1420),
 ('SNA90094', 1390),
 ('ELE74009', 1816),
 ('GRO56726', 1784),
 ('ELE34057',