## **Project - 1(Sindhu_Sheri)**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 45.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=f7bba69ebad35a67c2945318c30b389d9fc6d4477eb5b02eff8c5ed505fb7e20
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [203]:
import sys
from pyspark import SparkContext, SparkConf
import itertools
from collections import Counter
from itertools import combinations

In [213]:
# creating Spark context with necessary configuration
sparkcontext = SparkContext("local","PySpark Aprori example")

In [None]:
file = "drive/My Drive/bigdata_assignments/browsing.txt"

In [200]:
MIN_SUPPORT = 85
MIN_CONFIDENCE = 0.9

In [215]:
fileRDD = sparkcontext.textFile(file)
itemset = fileRDD.map(lambda line: sorted([str(item) for item in line.strip().split(' ')]))

In [217]:
# Broadcast the itemset to all executors. Covert to frozenset as it can be hashed and can be used to lookup in O(1).
broadcasted_itemset = sparkcontext.broadcast(itemset.map(lambda x: frozenset(x)).collect())

In [218]:
# Generates next candidate given previous frequent item set and k
def generate_next_candidate(prev_frequent_k, k):
    next_candidate = []
    # Create next candidate set by joinining previous fequent set with itself
    for index, left in enumerate(prev_frequent_k) :
      for right in prev_frequent_k[index + 1:] :
        # If first k-2 elements are same, join
        if len(left.intersection(right)) == k - 2 :
          next_candidate.append(left | right)
    return next_candidate

In [219]:
# Generates frequent item set
# sparkContext - spark context
# candidate_k - candidate set
# broadcasted_itemset - broadcasted itemset
# supprot - minimum support required
def generate_frequent_k(sparkContext, candidate_k, brodcasted_item_transaction_index_map, support):
    def get_frequent(candidate):
        current_transactions = None
        for item in candidate :
          if current_transactions == None:
            # If current_transactions is None (i.e this is the first loop), initialise current_transactions to set of transation indexes for this item.
            current_transactions = brodcasted_item_transaction_index_map.value[item]
          else:
            # Take intersection between current_transactions and indexes of transactions where item is present.
            # If I1 is present in T1, T2, T5 and I2 is present in T2, T4, T6. 
            # To efficiently find out all transactions where (I1, I2) both are present we can take the intersection of both the transaction_indexes. This gives us (T2) for the example. 
            current_transactions = current_transactions.intersection(brodcasted_item_transaction_index_map.value[item])
        # Once we have all the indexes in which the candidate is present, support can be calculated using length of the transaction indexes.
        candidate_support = len(current_transactions)
        if candidate_support >= support:
            # Return only if candidate_support is greater than the minimum support required.
            return frozenset(candidate), candidate_support

    # From the candidate set, take only the frequent itemsets
    frequent_k = sparkContext.parallelize(candidate_k).map(get_frequent).filter(lambda x: x).collect()
    return frequent_k

In [221]:
item_transaction_index_map = {}

# Returns a list of tuples where each tuple is of form (item, transaction_set). transaction_set is set of indexes of transactions where the item is present.
def get_item_transaction_index(transaction_with_index):
  transaction, transaction_index = transaction_with_index
  curr_item_transaction_index_map = {}
  for item in transaction:
    if item in curr_item_transaction_index_map :
      curr_item_transaction_index_map[item].add(transaction_index)
    else :
      curr_item_transaction_index_map[item] = set([transaction_index])
  return [(item, transactions_set) for item, transactions_set in curr_item_transaction_index_map.items()]

# brodcasted_item_transaction_index_map is a dictionary mapping each item to set of transation indexes it is present in. 
brodcasted_item_transaction_index_map = sparkcontext.broadcast(dict(itemset.zipWithIndex() # zipWithIndex gives (itemset, index). itemset is transaction.
                                                                            # Call get_item_transaction_index to get item_transaction_index map for each transation in form of list of (item, transaction_set).
                                                                            # flatMap converts to convert list of lists to a single list
                                                                           .flatMap(get_item_transaction_index) 
                                                                            # We have list of (item, transaction_set) from previous step. reduce using key (item) to get final list of (item, transaction_set) considering all transations
                                                                           .reduceByKey(lambda transationIndex1, transationIndex2 : 
                                                                                         transationIndex1 | transationIndex2)
                                                                           .collect()))

In [223]:
frequent_set_support = dict()

k = 1
candidate_k = itemset.flatMap(lambda x: set(x)).distinct().collect()
candidate_k = [{x} for x in candidate_k]

while k <= 4:
  frequent_k = generate_frequent_k(sparkcontext, candidate_k, brodcasted_item_transaction_index_map, MIN_SUPPORT)
 
  # Store frequent_set to support mapping in frequent_set_support. This helps us in calculating confidence in O(1).
  for frequent_set, support in frequent_k:
    frequent_set_support[frequent_set] = support

  k += 1
  if k > 4:
    break

  candidate_k = generate_next_candidate([set(item) for item in map(lambda x: x[0], frequent_k)], k)

In [224]:
# Returns confidence of association rule
def get_confidence(left, right, broadcasted_frequent_set_support):
  frequent_set_support_dict = broadcasted_frequent_set_support.value
  print(type(frequent_set_support_dict))
  return frequent_set_support_dict[frozenset(left | right)]/frequent_set_support_dict[frozenset(left)]

In [225]:
# Given frequent_set and right hand side item, returns left hand set and right hand set
# If frequent_set is I1, I2, T3 and right_item is T2 returns set(I1, I3), set(T2) - I1, I3 → T2
def get_rule(frequent_set, right_item):
  left = set(frequent_set)
  left.remove(right_item)
  return left, set([right_item])

In [226]:
# Returns list of tuple of associtaion rule and confidence whose confidence is greater than MIN_CONFIDENCE
def get_associate_rule_confidence(frequent_item_support, broadcasted_frequent_set_support):
  frequent_set, support = frequent_item_support
  rules = []
  for right_item in frequent_set:
    left, right = get_rule(frequent_set, right_item)
    confidence = get_confidence(left, right, broadcasted_frequent_set_support)
    if confidence >= MIN_CONFIDENCE:
      # Pretty print association rule and confidence
      rules.append((",".join(left) + " → " + right_item, confidence))
  return rules

In [228]:
broadcasted_frequent_set_support = sparkcontext.broadcast(frequent_set_support)

In [231]:
# Delete folder if exists
!rm -rf /content/associationRulesWithSupport

(sparkcontext.parallelize(frequent_set_support.items()) # Parallelize frequent_set_support items (frequent_set, support)
            .filter(lambda x : len(x[0]) > 1) # Only consider frequent sets of size > 1 as we need atleast 2 items for generating association rules
            .flatMap(lambda x : get_associate_rule_confidence(x, broadcasted_frequent_set_support)) # Flat map get_associate_rule_confidence
            .sortBy(lambda x : x[1], False) # Sort by confidence in descending order
            .map(lambda x : x[0] + "; Confidence=" + str(round(x[1]*100, 2)) + "%")  # Generate pretty string for saving
            .coalesce(1)  # Coalesce to one partition to get a single file output
            .saveAsTextFile("/content/associationRulesWithSupport"))

In [232]:
!cat /content/associationRulesWithSupport/part-00000

DAI93865 → FRO40251; Confidence=100.0%
GRO85051,ELE17451 → FRO40251; Confidence=100.0%
GRO85051,ELE26917 → FRO40251; Confidence=100.0%
GRO85051,GRO73461 → FRO40251; Confidence=100.0%
GRO85051,GRO94758 → FRO40251; Confidence=100.0%
GRO85051,DAI55911 → FRO40251; Confidence=100.0%
SNA18336,DAI23334 → DAI62779; Confidence=100.0%
DAI88079,DAI62779 → FRO40251; Confidence=100.0%
ELE92920,DAI23334 → DAI62779; Confidence=100.0%
SNA55762,GRO85051 → FRO40251; Confidence=100.0%
SNA45677,GRO85051 → FRO40251; Confidence=100.0%
GRO85051,GRO21487 → FRO40251; Confidence=100.0%
GRO85051,SNA80324 → FRO40251; Confidence=100.0%
GRO85051,DAI75645 → FRO40251; Confidence=100.0%
GRO38814,GRO85051 → FRO40251; Confidence=100.0%
GRO85051,DAI83948 → FRO40251; Confidence=100.0%
GRO85051,FRO53271 → FRO40251; Confidence=100.0%
GRO85051,ELE74009 → FRO40251; Confidence=100.0%
DAI31081,GRO85051 → FRO40251; Confidence=100.0%
GRO85051,ELE20847 → FRO40251; Confidence=100.0%
GRO85051,DAI85309 → FRO40251; Confidence=100.0%
F