In [1]:
import sys
import time
import csv
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext


# Sum oparation for reduce by key
def sumOparator(x, y):
    return x + y


# Remove replications after cartesian oparation
def removeReplica(record):
    if isinstance(record[0], tuple):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]

    if not any(x == x2 for x in x1):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
        return result
    else:
        return x1


# Filter items
def filterForConf(item):
    if len(item[0][0]) > len(item[1][0]):
        if not checkItemSets(item[0][0], item[1][0]):
            pass
        else:
            return item
    else:
        pass


# Check Items sets includes at least one comman item // Example command: # any(l == k for k in z for l in x )
def checkItemSets(item_1, item_2):
    if len(item_1) > len(item_2):
        return all(any(k == l for k in item_1) for l in item_2)
    else:
        return all(any(k == l for k in item_2) for l in item_1)


# Confidence calculation
def calculateConfidence(item):
    # Parent item list
    parent = set(item[0][0])

    # Child item list
    if isinstance(item[1][0], str):
        child = set([item[1][0]])
    else:
        child = set(item[1][0])
    # Parent and Child support values
    parentSupport = item[0][1]
    childSupport = item[1][1]
    # Finds the item set confidence is going to be found

    confidence = (parentSupport / childSupport)

    return list([list(child), list(parent.difference(child)), confidence])


class Apriori:

    def __init__(self, path, sc, minSupport, minConfidence):
        if not (0 <= minSupport <= 1):
            raise Exception("The minimum support should be between 0 and 1")

        if not (0 <= minConfidence <= 1):
            raise Exception("The minimum confidence should be between 0 and 1")

        # File path
        self.confidences = None
        self.path = path

        # Spark Context
        self.sc = sc

        self.raw = self.sc.textFile(self.path)
        
        self.n_samples = self.raw.count()
        self.minSupport = self.n_samples * minSupport
        self.minConfidence = minConfidence

        ## Whole Date set with frequencies
        self.lblitems = self.raw.map(lambda line: line.split(','))

        ## Whole lines in single array
        self.wlitems = self.raw.flatMap(lambda line: line.split(','))
        self.wlitems.persist()

        ## Unique frequent items in dataset
        self.uniqueItems = self.wlitems.distinct()
        self.uniqueItems.persist()

    def fit(self):
        supportRdd = self.wlitems.map(lambda item: (item, 1))
        supportRdd = supportRdd.reduceByKey(sumOparator)
        supports = supportRdd.map(lambda item: item[1])

        # Define minimum support value
        if self.minSupport == 'auto':
            minSupport = supports.min()
        else:
            minSupport = self.minSupport

        # If minimum support is 1 then replace it with 2
        minSupport = 2 if minSupport < 2 else minSupport
        
        minConfidence = self.minConfidence

        # Filter first supportRdd with minimum support
        supportRdd = supportRdd.filter(lambda item: item[1] >= minSupport)

        # Create base RDD with will be updated every iteration
        baseRdd = supportRdd.map(lambda item: ([item[0]], item[1]))
        baseRdd.persist()

        supportRdd = supportRdd.map(lambda item: item[0])
        supportRdd.persist()

        c = 2

        print("Started fitting")
        while not supportRdd.isEmpty():
            combined = supportRdd.cartesian(self.uniqueItems).coalesce(100)
            combined = combined.map(lambda item: removeReplica(item))

            combined = combined.filter(lambda item: len(item) == c)
            combined = combined.distinct()
            combined.persist()

            combined_2 = combined.cartesian(self.lblitems).coalesce(100)
            combined_2 = combined_2.filter(lambda item: all(x in item[1] for x in item[0]))

            combined_2 = combined_2.map(lambda item: item[0])
            combined_2 = combined_2.map(lambda item: (item, 1))
            combined_2 = combined_2.reduceByKey(sumOparator)
            combined_2 = combined_2.filter(lambda item: item[1] >= minSupport)

            baseRdd = baseRdd.union(combined_2)

            combined_2 = combined_2.map(lambda item: item[0])
            supportRdd = combined_2
            supportRdd.persist()
            c = c + 1

        print("Fitting final stage")
        sets = baseRdd.cartesian(baseRdd).coalesce(100)
        filtered = sets.filter(lambda item: filterForConf(item))
        confidences = filtered.map(lambda item: calculateConfidence(item))
        self.confidences = confidences
        
        rules = confidences.filter(lambda item: item[2] >= minConfidence)
        
        output_path = f"/home/ubuntu/data/apriori_output"
        !hadoop fs -rm -R {output_path}
        rules.coalesce(1).saveAsTextFile(output_path)

        return rules

    def predict(self, set, confidence):

        if not isinstance(set, list):
            raise ValueError('For prediction "set" argument should be a list')

        _confidences = self.confidences
        _filterForPredict = self._filterForPredict

        filtered = _confidences.filter(lambda item: _filterForPredict(item, set, confidence))

        return filtered

    @staticmethod
    def _filterForPredict(item, set, confidence):
        it = item[0]
        it.sort()
        set.sort()
        if it == set and item[2] >= confidence:
            return item
        else:
            pass


def run_with_percentage_of_data(path, percentage, minSupport, minConfidence):
    spark = (
        SparkSession.builder.appName("Python Spark Apriori")
        .config("spark.executor.memory", "7680M")
        .config("spark.driver.memory", "9G")
        .config("spark.executor.cores", "2")
        #mai multe la https://spark.apache.org/docs/latest/configuration.html
        .getOrCreate()
    )
    sc = spark.sparkContext
    
    # Percentage split
    lines = open(path, "r").readlines()
    lines = lines[:int(len(lines) * percentage)]
    
    #Write to temp file
    filename = 'apriori_split'
    local_path = f"./temp/{filename}.txt"
    hadoop_path = f"/home/ubuntu/data/{filename}.txt"
    
    with open(local_path, "w") as f:
        f.writelines(lines)
        
    # Upload to HDFS otherwise it won't work
    !hadoop fs -put -f {local_path} {hadoop_path}

    # Construct Apriori
    apriori = Apriori(hadoop_path, sc, minSupport=minSupport, minConfidence=minConfidence)
    
    print("Loaded data. Starting fitting!")
    supports = apriori.fit()
    
    for i in supports.take(10):
        print(i)

In [2]:
run_with_percentage_of_data("./data/msnbc.csv", 1, 0.001, 0.1)



Loaded data. Starting fitting!
Started fitting
Fitting final stage
Deleted /home/ubuntu/data/apriori_output
[['17'], ['14'], 0.10517322649069055]
[['17'], ['1'], 0.443082724487391]
[['17'], ['1', '3'], 0.11583785057742163]
[['7', '2'], ['14'], 0.1834521538307059]
[['5', '3'], ['1'], 0.5723925197609409]
[['11', '6'], ['1'], 0.5697290725946509]
[['11', '15'], ['1'], 0.5238556551223057]
[['14', '3'], ['10', '12'], 0.12226163154600458]
[['4', '8', '2'], ['1'], 0.4227793252862891]
[['10', '3', '6'], ['5'], 0.3551155115511551]
