<a href="https://colab.research.google.com/github/MarkStephens060482/MarkStephens060482/blob/main/Big_Data_Algorithms.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2 - PageRank, Frequent Items
### Mark Stephens 
## Exercise 1 - PageRank with MapReduce
The algorithm is run on the  Google Web Graph 2002 available at
http://snap.stanford.edu/data/web-Google.html 

Install pyspark and mount Google Drive

In [None]:
!pip install pyspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=2dcbc3ee8d2386e14f7e98e2b305014db756b40683505be9f47deacc1e8b8622
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

Mount the Google Drive to Google Colab.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Initialise the Spark Session and Spark Context

In [None]:
# Create SparkSession and SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder\
                  .master('local[*]')\
                  .appName('PageRank')\
                  .getOrCreate()
sc = spark.sparkContext

In [None]:
input_test_PATH = '/content/drive/MyDrive/Colab Notebooks/PangeRank test.txt'
input_PATH = '/content/drive/MyDrive/Colab Notebooks/web-Google.txt'
GoogleWebGraph = sc.textFile(input_PATH)

Define a combiner to combine each node and successors. The mapKeys function returns a tuple of the form ((node,([successor_node],value)).The *createCombiner* function passes the key and list tuple to the combineByKey function. The *mergeValue* and *mergeCombiner* functions takes the Value tuple for each node key and appends to the [successor_nodes] list and adds a count to the value. The end result is a total of the out-degree of each node and the list of the node's successors.

In [None]:
# Defining createCombiner, mergeValue and mergeCombiner functions
def mapKeys(tpl):
   return (tpl[0],([tpl[1]],1))

def createCombiner(value):
  return value
    
def mergeValue(combiner, new_value): 
  return (combiner[0] + new_value[0],combiner[1]+1)
    
def mergeCombiner(combiner1, combiner2): 
  return (combiner1[0] + combiner2[0],combiner1[1] + combiner2[1])
 

The processing of the Network textfile involves parsing each line and seperating the elements by a tab delimiter, forming an RDD. Filtering out the first four lines of the RDD as they are text and not related to the data. Convert strings to integers. Combine elements of RDD to form sparse representation of the transition matrix of the network. Initialise a page rank RDD made up of initial values of rank for each node based on the reciprocal of number of nodes, and send the RDD vector to cache in preparation to use in preceeding calculations.  Define the values for the damping factor, taxation and number of iterations. 

In [None]:
# Preprocessing and initialisaing vectors and parameters
#transform the adjacency list text file to split at the occurrences of a tab
data = GoogleWebGraph.map(lambda line: line.split("\t"))

# filter the top lines from the resilient distributed dataset (RDD)
rdd = data.zipWithIndex().filter(lambda a:a[1]>3).map(lambda a: a[0])

#Identify the number of nodes in the network from the text in line 3 of the datafile
n = int(data.zipWithIndex().filter(lambda a:a[1]==2).map(lambda a: a[0]).collect()[0][0].split(' ')[2])

# map the elements to tuples and convert from strings to integers.
rdd = rdd.map(lambda x: tuple(map(int,x)))

# map to a key value pair for the first element and form a list with the second element,
#achieve a sparse representation of the transition matrix
sparserepM = rdd.map(mapKeys).combineByKey(createCombiner,mergeValue,mergeCombiner)

# define the intial rank value per node key as the reciprocal of the number of nodes for the whole set.
# Each page has equal scores to begin with. 
vector_zero = sparserepM.map(lambda x: (x[0],1/n))

# Set the damping factor and taxation rate
damping = 0.80
tax = (1-damping)/n

# define the maximum number of iterations and convergence tolerance
max_iterations = 30
tolerance = 10**(-7)

A Transition matrix of a Webpage network is very sparse. It is better to represent the transition matrix by the out-degree of each node and the list of its successors. The *CombineByKey()* function is used to achieve the sparse representation, as an example below:


In [None]:
sparserepM.map(lambda x: (x[0],x[1][0],x[1][1])).toDF(schema = ['node','neighbours','out-degree']).show(5)

+------+--------------------+----------+
|  node|          neighbours|out-degree|
+------+--------------------+----------+
|     0|[11342, 824020, 8...|         4|
| 11342|[0, 27469, 38716,...|        14|
|824020|[0, 91807, 322178...|        11|
|203402|[1, 53051, 164684...|        30|
|223236|[8517, 14456, 515...|        24|
+------+--------------------+----------+
only showing top 5 rows



Each of the non-zero elements in the Transition matrix, as given by the sparse representation, is mapped to a tuple in the form of (node, successor, probability). The function *map_Matrix()* takes a tuple of the sparse representation of the Transition matrix and returns a list of tuples of the form (node, successor, probability).

In [None]:
def map_matrix(sparse_tuple):
  """
  The function will take a tuple of the sparse representation of the Transition Matrix and map it to a tuple 
  of the form (node,successor,probability).
  """
  node = sparse_tuple[0]
  successors = sparse_tuple[1][0]
  probability = 1/sparse_tuple[1][1]
  result=[]
  #loop through the list of successors for each node key in the passed tuple
  for successor in successors:
    result.append((node,(successor,probability)))
  return result
  
# apply the function to each element in the sparse representation RDD to give a flattered list of tuples.
#Cache the resulting RDD for efficient processing as matrix is used in preceeding transformations.
matrix = sparserepM.flatMap(lambda x: map_matrix(x)).cache()

### PageRank Recursion
An iteration of the PageRank algorithm involves beginning with an initial estimated PageRank vector $v_{k-1}$ and computing the next estimate $v_{k}$ via the recursive rule:

$v_{k}=βM𝙫_{k-1}+(1-β){𝐞\over n}$

A matrix-vector multiplication is performed between **M** and $𝐯_{k-1}$. This is incorporated in a function *matrix_vector()*, which will take a tuple of the form (node,(Mij,vi)). This tuple is formed by joining the matrix, **M**, element and corresponding vector, $𝐯_{k-1}$, element for each key given by *node*. The function *matrix_vector()* will return the contribution that each node makes to the page rank score of the successor node as a list of tuples of the form (successor,score_interval), where *score_interval* is the interval of the rank score contributed to the successor from the node.

The scalar multiplication of $β$ and vector addition of $(1-β){𝐞\over n}$ is performed element-wise using *mapValues()*.

In [None]:
def matrix_vector(joined_tuple):
  ''' 
  Performs piecewise matrix vector multiplication of the matrix element and vector element for corresponding 'node' key.
  arguement:
        tuple: (i, (Mij, vj)), where  Mij is of the form (j, mij) 
  '''
  matrix_element = joined_tuple[1][0]
  rank_score = joined_tuple[1][1]
  result = []
  # Unpack the matrix element tuple
  successor, probability = matrix_element
  #perform pievewise multiplication to obtain page_rank_part, the interval of rank score contributed to the successor from node.
  result.append((successor, rank_score*probability))  
  return result

The following function calculates the squared difference of an array vector. It takes a partitioned rdd and forms a numpy array to then perform the squared difference from rdd element values. This is used to test convergence of the iteration procedure of the PageRank algorithm.

In [None]:
def calculate_L2_norm(tpl):
  array_vector = np.array([(x[1][0]-x[1][1])**2 for x in tpl])
  return array_vector

The main program sequence consists of a iterative loop to evaluate and update the page rank vector and is as follows:
1. define the initial vector $𝓥_{0}$ as the vector object to be updated.  
2. join the transition matrix, $M$, RDD and the vector 𝒱 RDD and send to cache memory.
3. Perform the matrix vector multiplication as a map job of elements-wise multiplications then add up all elements of the same row in the reduce job.
4. Map the values of the matrix vector multiplication to perform scalar multiplication by β and add the taxation amount to each element.
5. Perform convergence test by calculating the L2 norm of the difference of the new rdd vector and the previous rdd vector and compare against the tolerance. 
6. Assign this new calculated page rank estimate to the vector object and repeat the process until convergence is satisfied or the specified maximum number of iterations is achieved.

In [None]:
# *********The Main Program***********
import math, numpy as np

if __name__ == "__main__": 
  prev_vector = None
  i = 0
  #Loop through the number of iterations
  while i < max_iterations:
    i += 1
    if prev_vector == None:
      #assign the vector v to the initial vector v0 on the first iteration and save in memory to use later.
      prev_vector = vector_zero.persist()
    # join the matrix elements with corresponding vector elements acvcording to node key forming (node,(Mij,vj))
    matrixvectorjoin = matrix.join(prev_vector).cache()
    # produce an RDD 'score_interval' that contains the contributions of each source node to the rank score of its successor nodes.
    # The score_interval
    score_interval = matrixvectorjoin.flatMap(lambda joined_tuple: matrix_vector(joined_tuple))
    # The score intervals for each successor nodes are added together via the reducer task.
    # This completes the Matrix Vector multiplication.
    score_contributions = score_interval.reduceByKey(lambda x, y: x + y)
    # The vector_new estimate for each node is achieved by multiplying by the damping and adding on the tax amount
    vector_new = score_contributions.mapValues(lambda x: damping*x +  tax)
    # save the result of the new rdd vector in the memory to use it further in testing convergence.
    vector_new.cache()  
    # convergence between the new updated page rank vector and the previous is tested using L2 norm of the two vectors and comparing to the tolerance
    l2_norm = vector_new.join(prev_vector).mapPartitions(calculate_L2_norm).sum()**0.5
   
    #Check for convergence
    if l2_norm < tolerance:
      print(f"The algorithm converged on the {i} iteration with a L2 norm value of {l2_norm}.")
      break
    # remove the previous rdd from memory
    prev_vector.unpersist()
    # update the rdd vector object and save in memory to use later. 
    prev_vector = vector_new.persist()


The algorithm converged on the 25 iteration with a L2 norm value of 7.960493110838393e-08.


Having performed the recursive calculations of the page rank vector, the elements of the vector RDD are mapped to a tab delimited formatted output. This is applied to each partition of the rdd in parallel. The RDD is then materialised as a textfile and saved to drive.  

In [None]:
# Produce the output in the format <node><TAB><Page Rank>, employing mapPartitions to apply the lambda function to each partition
output = vector_new.sortByKey().mapPartitions(lambda partitions: ("%s\t%s" %(x[0],"{:.4E}".format(x[1])) for x in partitions))

#headers for the output file are defined
header_rdd = sc.parallelize(["nodes  pagerank"])

#The output page rank for each node and the headers for the output file are combined using union.
output_with_headers = header_rdd.union(output)

# define the directory where the partition files are located
dir_path = "/content/drive/MyDrive/Colab Notebooks/pagerankoutput"

# write the RDD output to a HDFS coalesced to a single partition file.
output_with_headers.coalesce(1,True).saveAsTextFile(dir_path)

The Output of the Top 10 nodes by page rank score is given below. This is achieved by first sorting the RDD page rank vector in descending order, formatting the page rank values to scientific notation so the number form is more presentable and converting the RDD to a dataframe, showing only the top 10 results.

In [None]:
# Having achieved the maximum number of iterations the top nodes with page rank scores are reported. The rdd is sorted by page rank value.
vector_new = vector_new.sortBy(lambda x: x[1], ascending = False)
# the page rank values are formated and the top 10 results are given as a dataframe.
vector_new.map(lambda x: (x[0],"{:.4E}".format(x[1]))).toDF(schema = ['node','page rank']).show(10) 

+------+----------+
|  node| page rank|
+------+----------+
|558791|1.7488E-05|
| 41909|1.5670E-05|
|425770|1.5507E-05|
| 32163|1.5172E-05|
|828963|1.4194E-05|
|504140|1.3961E-05|
|751384|1.2749E-05|
|486980|1.2295E-05|
|597621|1.2115E-05|
|605856|1.1904E-05|
+------+----------+
only showing top 10 rows



The Binary File with name *part-00000* is converted to a textfile with extension .txt such that it may be read externally by any text file reader application.

In [None]:
# import the os and shutil libraries
import os
import shutil

# loop over all files in the directory and rename them with the .txt extension
for filename in os.listdir(dir_path):
    if filename.startswith('part-'):
        os.rename(os.path.join(dir_path, filename), os.path.join(dir_path, filename + '.txt'))

spark.stop()

***
## Exercise 2 - Frequent Itemsets

## Simple, randomized A-Priori algorithm 

An implementation of the simple randomised A-priori algorithm is performed and tested on [frequent itemset mining dataset](http://fimi.uantwerpen.be/data/) found in the repository. The data is preprocessed by seperating the items by whitespace. A random sample of the data is taken, shuffling the dataset before hand. The support threshold proportion of the total number of transactions, $s$, is defined and is reduced to $0.9s$ for the sample of transactions. This allows identifying in the sample almost all itemsets having support of atleast $s$ in the whole dataset.

The *get_frequent_itemsets()* function determines determines a list of itemsets that are frequent as defined by the given support threshold , $s$.

The *generate_candidate_itemsets()* function returns a list of all combinations of itemsets of a given size $k$.

The *prune_candidates()* function reduced the candidate itemset list based on the apriori principle.

The *sample_transactions()* function takes a random sample of the whole dataset for a given sample size proportion.

In [None]:
from itertools import combinations
import random

def get_frequent_itemsets(transactions, itemsets, min_support):
  """
  Determine the frequent items and their support
  arguments:
  transactions: list[list] A list of lists of transactions
  itemsets: [list] A list of candidate itemsets
  min_support: float, The minimum support for frequent items
  """
  frequent_itemsets = []
  
  itemset_counts={}
  # number of transactions
  T = len(transactions)

  for itemset in itemsets:
         
    itemset_counts[itemset] = 0
        
    for transaction in transactions:
      if set(itemset).issubset(transaction):
        itemset_counts[itemset] += 1

  # sort the dictionary of itemset and support count  by support count value in descending order.
  itemset_counts = dict(sorted(itemset_counts.items(), key=lambda x:x[1], reverse=True ))
  
  #Add the frequent itemsets to the list based on suppor to define a frequent item
  for itemset,count in itemset_counts.items():
    if count >= min_support*T:
      frequent_itemsets.append(itemset)
  
  return frequent_itemsets

def generate_candidate_itemsets(itemsets,k):
  """
  Generates all possible candidate itemsets of size k from the given itemsets, finding possible combinations.
  """
  candidates = list(combinations(itemsets,k))
  return sorted(candidates)

def prune_candidates(candidates,frequent_itemsets):
  """
  Prunes candidates itemsets based on the apriori principle.
  arguements:
  candidates: list of candidates itemsets
  frequent_itemsets:   set of frequent itemsets 
  """
  list_of_set = [set(x) for x in frequent_itemsets]
  pruned_candidates = []
  for candidate in candidates:
    is_pruned = False
    # check if all subsets of candidate are frequent
    for e in candidate:
      subset = set(candidate).difference(set([e]))
          
      if subset not in list_of_set:
        is_pruned = True
        break
    if not is_pruned:
      pruned_candidates.append(candidate)
  return pruned_candidates

def sample_transactions(transactions, sample_probability,randomise):
  '''
  Take a random sample of transactions of sampling proportion of p. For m transactions there will be a
  sample size of m.p transactions. If the list of transactions are randomised already, then select the first m.p samples instead of reading 
  the whole dataset.
  arguements:
  transactions: List[list].  A list of transactions.
  sample_probability: Float.  A sampling proportion, p.
  randomise: Bool.  The sample should be randomised.    
  '''
  m = len(transactions)
  sample_size = round(m * sample_probability)
  if randomise:
    sample = random.sample(transactions, sample_size)
  else:
    sample = transactions[:sample_size]
  return sample


The *apriori_algorithm()* function initially forms a list of distinct itemsets of size 1 from scanning all transactions of the dataset. The support counts for each of these items is found and the frequent items, items whose support count is greater than the threshold, are retained in a frequent items list. From the list of frequent itemsets of size 1, candidate itemsets of size 2 are found by listing possible combinations. The list of candidate itemsets of size 2 are pruned using the apriori principle, that is, cannot be frequent if a subset of the itemset is not frequent themselves.The pruned candiated itemset list is then counted in all transactions and are retained as frequent itemsets if they meet the support count threshold. This process is repeated for all k sized itemsets.

In [None]:
def apriori_algorithm(transactions, min_support):
    """
    Returns all frequent itemsets in the given transactions using the Apriori algorithm.
    transactions: List of sets where each set represents a transaction
    min_support: Minimum support threshold for an itemset to be considered frequent as a proportion of the total number of transactions.
    """
    itemsets = []
    # Find the unique items present in the transactions
    for transaction in transactions:
      for item in transaction:
        if item not in itemsets:
          itemsets.append(item)
    
    # Sort the itemsets in ascending order
    itemsets = sorted(itemsets)
    
    # Initialize the frequent itemsets
    frequent_itemsets = []
    
    # Get the frequent itemsets of size 1
    frequent_itemsets.append(sorted(get_frequent_itemsets(transactions, itemsets, min_support)))
 
    k = 2
    while len(frequent_itemsets[k-2]) > 0:
      # Generate candidate itemsets of size k
      candidate_itemsets = generate_candidate_itemsets(frequent_itemsets[0], k)
      
      # prune the candidate itemsets
      pruned_candidate_itemsets = prune_candidates(candidate_itemsets, frequent_itemsets[k-2])
     
      # Get the frequent itemsets of size k
      frequent_itemsets_k = get_frequent_itemsets(transactions, pruned_candidate_itemsets, min_support)
      
      if len(frequent_itemsets_k) > 0:
        frequent_itemsets.append(frequent_itemsets_k) 
        k += 1
      else:
        break
    return frequent_itemsets

The *apriori algorithm* is performed on a simple randomised sample of the whole set of transations from multiple datasets. The algorithm is performed for various sample sizes. Having achieved all frequent itemsets up to size k from a given sample, the whole dataset is passed through once and the support is determined for the frequent itemsets. The itemsets that have support greater than the threshold are retained while those that are frequent in the sample but not in the whole dataset are *false positives* and are removed.

The Apriori algorithm is repeated across 7 different datasets for 3 different samples sizes of 2%, 5% and 10%. The support threshold is also set either at 1% or 5% according to the particular dataset. A list of the top 10 frequent itemsets per itemset size is produced as the output for each dataset and for each sample. The number of False Positives that were removed is also stated ans well as the computation time.

In [None]:
# import the builtin time module
import time

files = [#'T40I10D100K.dat.txt',
         #'T10I4D100K.dat.txt',
         #'chess.dat.txt',
         'connect.dat.txt',
         'mushroom.dat.txt']
         #'pumsb.dat.txt',
         #'pumsb_star.dat.txt']

####### MAIN PROGRAM ###########
if __name__ == "__main__":

  #loop through all files
  for i,f in enumerate(files):
    # open the datafile
    with open('/content/drive/MyDrive/Colab Notebooks/apriori algorithm/' + f,encoding='utf8') as dataFile:
    # preprocess the transaction data
      transactions = []
      for line in dataFile.read().split('\n'):
        transactions.append(line.split())
              
    new_line = '\n'
    print(f'The dataset is: {f}{new_line}')

    #sample sizes of the transactions
    sample_sizes = [0.1,0.2]#[0.02,0.05,0.1]
    for prob in sample_sizes:
      # Grab Currrent Time Before Running the Code
      start = time.time()
    
      sample = sample_transactions(transactions, prob,randomise = True)

      # define the support thresholds as porportions of total transactions for each of the 7 dataset.
      support_threshold_proportions = [0.2, 0.07] #[0.01, 0.01, 0.7, 0.2, 0.07, 0.01, 0.01]
      
      # identify as having support at least 0.9 * s in the sample almost all those itemsets
      # that have support at least s is the whole
      sample_support = 0.9 * support_threshold_proportions[i]

      #Perform the A-priori Algorithm and collect frequent itemsets of all itemset sizes for the sample support.
      frequent_itemsets_sample = apriori_algorithm(sample,sample_support)
            
      # Identifying False Positives by passing through the whole set of transactions and counting the occurrences
      # of the frequent itemsets found in the sample for the support of s.
      false_positives = set()
      
      frequent_itemsets_filtered = {}
      for j,frequent_itemset in enumerate(frequent_itemsets_sample):
        # Have the list of frequent itemsets from the whole dataset returned with support counts.
        pop_frequent_itemset = get_frequent_itemsets(transactions,
                                                     frequent_itemset,
                                                     support_threshold_proportions[i])
        
        false_positives.update(set(frequent_itemset).difference(set(pop_frequent_itemset)))
        
        # Itemsets found to be frequent in the sample but not frequent in the whole dataset are removed from
        # the frequent itemsets list of the sample.
        # elements of the frequent itemsets taken from the sample that are also frequent in the whole set are retained. 
        frequent_itemsets_filtered[j+1] = list(set(frequent_itemset).intersection(set(pop_frequent_itemset)))
      # Grab Currrent Time After Running the Code
      end = time.time()

      #Subtract Start Time from The End Time
      total_time = end - start  
      # The top 10 most freqent itemssets for each size are given in the output
      new_line = '\n'
      tab = '\t'
      print(f'Sample size is: {prob} and support ratio threshold of {support_threshold_proportions[i]}')

      for k,v in frequent_itemsets_filtered.items():
        if len(v) > 0:
          print(f'frequent itemsets of size {k} :{v[:5]}{new_line}{tab}{tab}{tab}{v[5:10]}')
      print(f'The number of False positives removed from frequent itemset are: {len(false_positives)}')
      print(f'The computation time was: {round(total_time,2)}{new_line}') 


The dataset is: connect.dat.txt

Sample size is: 0.1 and support ratio threshold of 0.2
frequent itemsets of size 1 :['114', '51', '17', '14', '47']
			['5', '74', '33', '15', '111']
frequent itemsets of size 2 :[('1', '4'), ('4', '7'), ('1', '5'), ('1', '7')]
			[]
frequent itemsets of size 3 :[('1', '4', '7')]
			[]
The number of False positives removed from frequent itemset are: 4
The computation time was: 8.21

Sample size is: 0.2 and support ratio threshold of 0.2
frequent itemsets of size 1 :['114', '51', '17', '14', '47']
			['5', '74', '33', '15', '111']
frequent itemsets of size 2 :[('1', '4'), ('4', '7'), ('1', '5'), ('1', '7')]
			[]
frequent itemsets of size 3 :[('1', '4', '7')]
			[]
The number of False positives removed from frequent itemset are: 0
The computation time was: 10.98

The dataset is: mushroom.dat.txt

Sample size is: 0.1 and support ratio threshold of 0.07
frequent itemsets of size 1 :['69', '22', '93', '91', '2']
			['116', '17', '13', '9', '113']
frequent i

### The Algorithm of Savasere, Omiecinski, and Navathe
This is an improvement on the simple randomised algorithm  avoiding both false negatives and false positives as a result of frequent itemset mining on just a sample of the wholed ataset. As a cost for the accurate result, SON algorithm makes two passes of the whole dataset.This can be computationally achieveable by partitioning the dataset as a distributed file system and compute the A-priori algorithm in parallel on each partition. The process is implemented in MapReduce  and consists of two Mapping and Reduction phases.

In [None]:
# Create SparkSession and SparkContext
import findspark
from pyspark.sql import SparkSession
findspark.init()
spark = SparkSession.builder\
                  .master('local[*]')\
                  .appName('Frequent Itemset SON algorithm')\
                  .getOrCreate()
sc = spark.sparkContext

The *first_Mapper()* function performs the Apriori algorithm on the allocated chunk of the dataset and returns a list of candidate frequent itemsets. This is performed for each chunk, or partition, of the dataset.

The *second_Mapper()* function takes the whole reduced candidate frequent itemset list from all partitions and the support count is determined for each partition. A list of the candidate frequent itemsets and support counts is returned.

In [None]:
def first_Mapper(partitions, min_support):
  """
  Performs the first map job by performing the A-priori algorithm on each partition and returns
  a list of candidate frequent itemsets seperately.
  """
  transactions = list(partitions)
  # Perform local A-priori algorithm on chunks
  candidate_itemsets_apriori = apriori_algorithm(transactions, min_support)
  candidate_itemsets_list = [(x,1) for sublist in candidate_itemsets_apriori for x in sublist]
  return iter(candidate_itemsets_list)

def second_Mapper(partitions,itemsets):
  """
  Determine the frequent items and their support.
  arguments:
  transactions: list[list] A list of transaction in a chunk
  itemsets: A list of candidate itemsets
  """
  itemset_counts={}
  transactions = list(partitions)
  for itemset in itemsets.value:
    itemset_counts[itemset] = 0
    for transaction in transactions:     
      if set(itemset).issubset(transaction):
        itemset_counts[itemset] += 1
    
  itemset_counts_iter = [(k,v) for k,v in itemset_counts.items()]
  return iter(itemset_counts_iter)

def length(element):
  '''
  Determines the length of the itemset: returning a 1 for a singleton,
  2 for a doubleton and a 3 for a tripleton.
  '''
  if type(element) == str:
    return len([element])
  return len(element)

The SON algorithm consists of the two phases of MapReduce jobs. For each dataset and for varying sample sizes, the SON algorithm is performed and top 10 frequent itemsets that may occur for each itemset size is produced asd the output.  The number of partitions is determined from ${1\over p}$ where $p$ is the sample size proportion. For instance, the sample size of 10% would result in 10 partitions of the dataset. The *first_Mapper()* function is mapped across all partitions and then the reducer job adds up all the elements of the same key. The support count value is dropped and the remaining list of candidate frequent itemsets is materialised. The *second_Mapper()* function is mapped across all partitions, taking the list of candidate frequent itemsets as an argument. The candidate frequent itemsets and their support count are combined from all partitions. The frequent itemsets are determined from the candidates by filtering out those whom support count is greater than the minimum support threshold. The frequent itemsets are sorted in descending order of support count and materialised into a list. The output is produced showing the top 10 frequent itemsets per size for each dataset and for varying sample size.  

In [None]:
import math
files = [#'T40I10D100K.dat.txt',
         #'T10I4D100K.dat.txt',
         #'chess.dat.txt',
         'connect.dat.txt',
         'mushroom.dat.txt']#,
         #'pumsb.dat.txt',
         #'pumsb_star.dat.txt']

####### MAIN PROGRAM ###########
if __name__ == "__main__":
  #loop through all files
  for i,f in enumerate(files):
    #load the dataset
    data = sc.textFile('/content/drive/MyDrive/Colab Notebooks/apriori algorithm/' + f)

    # split the rdd elements by whitespace and have elements of items in list per transaction 
    rdd = data.map(lambda line: line.split()).cache()
      
    # define the support thresholds as porportions of total transactions for each of the 7 dataset.
    support_threshold_proportions = [0.2, 0.07] #[0.01, 0.01, 0.7, 0.2, 0.07, 0.01, 0.01]

    new_line = '\n'
    print(f'The dataset is: {f}{new_line}')
  
    #sample sizes of the transactions
    partition_proportion =[0.1,0.2] # [0.02,0.05,0.1]
    for proportion in partition_proportion:
      
      # Grab Currrent Time Before Running the Code
      start = time.time()

      # Define the number of chunks to partition the dataset based on the sample size.
      chunk_number = int(1/proportion)
      chunks = rdd.repartition(chunk_number).persist()

      #total number of transactions
      total = chunks.count()

      #First Map Reduce job
      # Apply the A-priori algorithm on each chunk and Identify Candidate pairs they occor once or more than once 
      chunk_frequent_itemsets = chunks.mapPartitions(lambda chunk: first_Mapper(chunk,support_threshold_proportions[i]))
      # Combine frequent itemsets from chunks to form candidate itemsets
      candidate_itemsets = chunk_frequent_itemsets.reduceByKey(lambda x,y: x+y).map(lambda x:x[0]).cache()
      candidate_itemsets = candidate_itemsets.collect()
        
      #broadcast the itemsets data to each worker in distribution environment.
      candidate_itemsets = sc.broadcast(candidate_itemsets)
      # candidate_itemsets.value to retrieve the data list.

      # Second Map Reduce job
      #The Map tasks for the second Map function take all the  candidate itemsets and a chunk of the input data file.
      # Each Map task counts the number of occurrences of each of the candidate itemsets among the transaction in the chunk of the dataset.
      # The output is a set of key-value pairs (C,v) where C is the candidate itemset and v is the support.
      itemsets_support = chunks.mapPartitions(lambda chunk: second_Mapper(chunk,candidate_itemsets))

      # Add the key value pairs and filter itemsets based on support.
      frequent_itemset = itemsets_support.reduceByKey(lambda x, y: x + y)\
      .filter(lambda x: x[1] >= math.ceil(support_threshold_proportions[i]*total))

      #Sort the frequent item list based on support counts and cache the rdd ready to materialise.
      frequent_itemset_sorted = frequent_itemset.sortBy(lambda x: x[1], ascending = False).cache()

      #materialise the frequent itemset list.
      frequent_itemset_sorted = frequent_itemset_sorted.collect()
      
      #unpersist the original partitioned rdd of transactions from main memory.
      chunks.unpersist()

      # Organise the list of frequent itemsets into a dictionary of itemsets of varying sizes.
      freq_list ={(j+1):[e[0] for e in frequent_itemset_sorted if length(e[0]) == (j+1)] for j in range(3)}

      # Grab Currrent Time After Running the Code
      end = time.time()

      #Subtract Start Time from The End Time
      total_time = end - start  

      # The top 10 most freqent itemsets for each size are given in the output
      new_line = '\n'
      tab = '\t'
      print(f'Sample size is {proportion} with support ratio threshold of {support_threshold_proportions[i]}')
      
      # Loop through key and value of the frequent itemset dictionary and print results.
      for k,v in freq_list.items():
        if len(v) > 0:
          print(f'frequent itemsets of size {k} :{v[:5]}{new_line}{tab}{tab}{tab}{v[5:10]}')
      print(f'The computation time was: {round(total_time,2)}{new_line}') 

The dataset is: connect.dat.txt

Sample size is 0.1 with support ratio threshold of 0.2
frequent itemsets of size 1 :['1', '111', '11', '77', '7']
			['71', '117', '17', '4', '74']
frequent itemsets of size 2 :[('1', '7'), ('4', '7'), ('1', '4'), ('1', '5')]
			[]
frequent itemsets of size 3 :[('1', '4', '7')]
			[]
The computation time was: 28.63

Sample size is 0.2 with support ratio threshold of 0.2
frequent itemsets of size 1 :['1', '11', '111', '7', '77']
			['117', '17', '71', '47', '44']
frequent itemsets of size 2 :[('1', '7'), ('4', '7'), ('1', '4'), ('1', '5')]
			[]
frequent itemsets of size 3 :[('1', '4', '7')]
			[]
The computation time was: 24.75

The dataset is: mushroom.dat.txt

Sample size is 0.1 with support ratio threshold of 0.07
frequent itemsets of size 1 :['2', '22', '1', '111', '11']
			['33', '3', '6', '66', '99']
frequent itemsets of size 2 :[('2', '3'), ('1', '3'), ('2', '6'), ('1', '6'), ('1', '9')]
			[('2', '9'), ('3', '9'), ('6', '9'), ('1', '7')]
frequen

The challenges in implementation of the above algorithms include having the support count in the outcome of the frequent itemsets for the Simple Randomised Apriori algorithm and failing to avoid the computationally expensive materialisation of RDDs for the SON algorithm. For the simple randomised algorithm, instances of the frequent itemsets identified from the sample were counted in the whole dataset in order to identify False Positives and have them removed from the list. In doing this, the support count was not included as the value together with the frequent itemset key in a key-value tuple pair element. This was due to Python Set datatype operations being employed to find the difference in sets of frequent itemsets from the sample and the whole dataset. The result of this led to the output list of frequent itemsets of different size not being sorted in ascending order by support count. The other challenge in implementation was for the SON algorithm with having to tolerate computationally expensive materialisation of RDDs for two occasions within the algorithm. The candidate frequent itemset RDD obtained from the first MapReduce job had to be materialised to a list inorder for instances of each candidate frequent itemset to be counted in each partition of the dataset. The second occasion of materialising an RDD was after the second MapReduce job to produce the output of the final frequent itemsets for the whole dataset.   


***
## Exercise 3: Hierarchical Clustering

A cluster of points is determined from their distance metric being the minimum. A cluster is represented by its centroid and the distance metric is determined between all other clusters. Two clusters that share a minimum distance between their centroids shall merge and form a new cluster. The $distance()$ function is the euclidean distance metric that for any two one-dimensional point values, reduces to the absolute value of the difference between the two values.  
The first part ofthe algorithm is to initialise each point as a cluster and form a list of these clusters. The algorithm repeatedly will form clusters until the stopping criterion is met, which in this scenario is when only a single cluster remains. For each cluster, the distance between centroids is found with every cluster and the two clusters with the smallest distance will merge.

In [None]:
import numpy as np
# define a function to compute the distance between two clusters
def distance1(c1, c2):
  '''
  Finds the distance between two clusters from their respective centroids.
  '''
  return np.abs(np.mean(c1) - np.mean(c2))

# Stopping Criteria of specified number of clusters, threshold cluster distance
# and single cluster.
def cluster_number(min_dist,clusters,threshold):
  '''
  Fixed number of clusters. This criterion specifies a fixed number of clusters
  that the algorithm should produce.
  '''
  number = len(clusters)
  if number <= threshold:
    return True
  return False

def threshold_distance(min_dist,clusters,threshold):
  '''
  This criterion specifies a threshold distance above which the clusters should 
  not be moerged. Once all remaining pairwise distances exceed this threshold,
  the algorithm stops.
  '''
  if min_dist > threshold:
    return True
  return False

def single_cluster(min_dist,clusters,threshold = 1):
  '''
  This is the default stopping criterion of obtaining a single cluster.
  '''
  return False

def hierarchical_clustrering(data,distance,stopping_criterion,threshold):
  '''
  Performs hierarchical clustering with given distance metric, stopping criterion and threshold.
  '''
  # initialize the clusters
  clusters = [[x] for x in data]
  print(clusters)
  iteration = 0
  # calculate distance between all clusters and identify the minimum distance
  while len(clusters) > 1:
    iteration += 1
    min_dist = np.inf
    for i in range(len(clusters)):
        for j in range(i+1, len(clusters)):
            d = distance(clusters[i], clusters[j])
            
            # identify the clusters that have the smallest distance.
            if d < min_dist:
              c1, c2 = clusters[i], clusters[j]
              min_dist = d
    
    # Stopping Criterion
    if stopping_criterion(min_dist,clusters,threshold):
      break

    # Merging clusters to form new cluster.
    clusters.remove(c1)
    clusters.remove(c2)
    clusters.append(c1 + c2)
    
    #print the resulting clusters and their centroids.
    for i,cluster in enumerate(clusters):
      centroid = np.mean(cluster)
      if i == 0:
        print(f'Iteration number: {iteration} ')
      print(f'Cluster {i+1}: {cluster} with centroid of {centroid}')


Hierarchical clustering is performed for a one-dimensional array of values. The default distance metric of absolute difference between centroids is used and the outcome of the algorithm is compared for 3 conditions of stopping criterion: single cluster, threshold distance and specific number of clusters 

In [None]:
 # define the data points
data = np.array([1,4,11,16,25,36,49,64,81])

#stopping criteria to perform th Hierarchical clustering algorithm one at a time
stopping_criteria = [cluster_number,threshold_distance,single_cluster]

#the corresponding threshold hyperparameters foreach of the above stopping criterion
thresholds = [2, 25, 1] 

new_line = '\n'
# Perform hierarchical clustering
for i,stopping_criterion in enumerate(stopping_criteria):
  print(f'{new_line}***Hierarchical Clustering for stopping criterion of {stopping_criterion.__name__}\
   with threshold of {thresholds[i]}{new_line}***')
  hierarchical_clustrering(data,distance1,stopping_criterion, thresholds[i])



***Hierarchical Clustering for stopping criterion of cluster_number  with threshold of 2
***
[[1], [4], [11], [16], [25], [36], [49], [64], [81]]
Iteration number: 1 
Cluster 1: [11] with centroid of 11.0
Cluster 2: [16] with centroid of 16.0
Cluster 3: [25] with centroid of 25.0
Cluster 4: [36] with centroid of 36.0
Cluster 5: [49] with centroid of 49.0
Cluster 6: [64] with centroid of 64.0
Cluster 7: [81] with centroid of 81.0
Cluster 8: [1, 4] with centroid of 2.5
Iteration number: 2 
Cluster 1: [25] with centroid of 25.0
Cluster 2: [36] with centroid of 36.0
Cluster 3: [49] with centroid of 49.0
Cluster 4: [64] with centroid of 64.0
Cluster 5: [81] with centroid of 81.0
Cluster 6: [1, 4] with centroid of 2.5
Cluster 7: [11, 16] with centroid of 13.5
Iteration number: 3 
Cluster 1: [49] with centroid of 49.0
Cluster 2: [64] with centroid of 64.0
Cluster 3: [81] with centroid of 81.0
Cluster 4: [1, 4] with centroid of 2.5
Cluster 5: [11, 16] with centroid of 13.5
Cluster 6: [25, 36]

Instead of finding the distance between clusters as measured from their centroid, the distance can be determined as the minimum distance between any two points within the respective clusters. The implication on heirarchical clistering is shown below:

In [None]:
def distance2(c1, c2):
  '''
  Finds the distances between all points in two clusters and returns the minimum 
  as the distance between the two clusters.
  '''
  distances = []
  # Find the distances between each point in cluster 1 and every other point in cluster 2
  for p1 in c1:
    for p2 in c2:
      d = np.abs(p1-p2)
      distances.append(d)
  
  # determine the minimum of the distances as the distance between the two clusters.
  d_min = min(distances)
  return d_min

The Hierarchical Clustering algorithm is again performed with using the second distance metric. The outcomes are compared across the different stopping metrics.

In [None]:
# define the data points
data = np.array([1,4,11,16,25,36,49,64,81])

#stopping criteria to perform th Hierarchical clustering algorithm one at a time
stopping_criteria = [cluster_number,threshold_distance,single_cluster]

#the corresponding threshold hyperparameters foreach of the above stopping criterion
thresholds = [2, 15, 1] 

new_line = '\n'
# Perform hierarchical clustering
for i,stopping_criterion in enumerate(stopping_criteria):
  print(f'{new_line}***Hierarchical Clustering for stopping criterion of {stopping_criterion.__name__}\
   with threshold of {thresholds[i]}{new_line}***')
  # The alternate distance metric is used in the Hierarchical Clustering.
  hierarchical_clustrering(data,distance2,stopping_criterion, thresholds[i])


***Hierarchical Clustering for stopping criterion of cluster_number   with threshold of 2
***
[[1], [4], [11], [16], [25], [36], [49], [64], [81]]
Iteration number: 1 
Cluster 1: [11] with centroid of 11.0
Cluster 2: [16] with centroid of 16.0
Cluster 3: [25] with centroid of 25.0
Cluster 4: [36] with centroid of 36.0
Cluster 5: [49] with centroid of 49.0
Cluster 6: [64] with centroid of 64.0
Cluster 7: [81] with centroid of 81.0
Cluster 8: [1, 4] with centroid of 2.5
Iteration number: 2 
Cluster 1: [25] with centroid of 25.0
Cluster 2: [36] with centroid of 36.0
Cluster 3: [49] with centroid of 49.0
Cluster 4: [64] with centroid of 64.0
Cluster 5: [81] with centroid of 81.0
Cluster 6: [1, 4] with centroid of 2.5
Cluster 7: [11, 16] with centroid of 13.5
Iteration number: 3 
Cluster 1: [25] with centroid of 25.0
Cluster 2: [36] with centroid of 36.0
Cluster 3: [49] with centroid of 49.0
Cluster 4: [64] with centroid of 64.0
Cluster 5: [81] with centroid of 81.0
Cluster 6: [1, 4, 11, 1

By changing the distance metric to the minimum of distances of corresponding points from two clusters, results in a Hierarchical cLustering process that merges a cluster and the very next closest point. This process continues until the stopping criterion is satisfied and results in different outcomes compared to the hierarchical clustering under the centroid-based distance metric. In particular, the clustering under the threshold distance stopping criterion requires a lower distance threshold to achieve two distinct clusters for distance metric 2 compared to distance metric 1. 