In [1]:
import os
import time
import psutil


from PAMI.highUtilityPattern.parallel import abstract as _ab


def binarySearch(arr, x):
    l = 0
    r = len(arr) - 1
    while l <= r:
        mid = l + (r - l) // 2
        if arr[mid] == x:
            return mid
        elif arr[mid] < x:
            l = mid + 1
        else:
            r = mid - 1
    return -1

class efimParallel(_ab._utilityPatterns):

    def __init__(self, iFile, minUtil, sep="\t", threads=1):
        super().__init__(iFile, minUtil, sep)
        self.inputFile = iFile
        self.minUtil = minUtil
        self.sep = sep
        self.Patterns = {}
        self.rename = {}
        self.threads = threads

    # Read input file
    def _read_file(self):

        file_data = []
        twu = {}

        with open(self.inputFile, 'r') as f:

            for line in f:
                line = line.strip().split(":")
                
                # Parse and process the line
                line = [x.split(self.sep) for x in line]
                weight = int(line[1][0])

                # Update file data with the parsed items
                file_data.append([line[0], [int(x) for x in line[2]]])

                for k in line[0]:
                    if k not in twu:
                        twu[k] = 0
                    twu[k] += weight

        # Filter TWU dictionary based on minUtil (minimum utility threshold)
        twu = {k: v for k, v in twu.items() if v >= self.minUtil}

        # Sort TWU items by utility
        twu = {k: v for k, v in sorted(twu.items(), key=lambda item: item[1], reverse=True)}

        strToInt = {}
        t = len(twu)
        for k in twu.keys():
            strToInt[k] = t
            self.rename[t] = k
            t -= 1

        # Filter and sort transactions
        subtree = {}
        filtered_transactions = {}
        for col in file_data:
            zipped = zip(col[0], col[1])
            transaction = [(strToInt[x], y) for x, y in zipped if x in strToInt]
            if len(transaction) > 0:
                transaction = sorted(transaction, key=lambda x: x[0])
                
                key = [x[0] for x in transaction]
                val = [x[1] for x in transaction]
                
                fs = frozenset(key)
                
                if fs not in filtered_transactions:
                    filtered_transactions[fs] = [key, val, 0]
                else:
                    for i in range(len(val)):
                        filtered_transactions[fs][1][i] += val[i]

                subUtil = sum([x[1] for x in transaction])
                temp = 0

                for i in range(len(transaction)):
                    item = key[i]
                    if item not in subtree:
                        subtree[item] = subUtil - temp
                    else:
                        subtree[item] += subUtil - temp
                    temp += val[i]

        primary = [key for key in subtree.keys() if subtree[key] >= self.minUtil]
        
        return filtered_transactions, primary

    def _search(self, prefix, fileData, primary):
        
        for item in primary:
            n_fd = {}
            local_util = {}
            
            for key, value in fileData.items():
                if item in key:
                    index = binarySearch(value[0], item) + 1
                    
                    fs = frozenset(value[0][index:])
                    
                    if fs not in n_fd:
                        n_fd[fs] = [value[0][index:], value[1][index:], value[2] + value[1][index - 1]]
                    else:
                        for i in range(len(value[1][index:])):
                            n_fd[fs][1][i] += value[1][index + i]
                        n_fd[fs][2] += value[2] + value[1][index - 1]
                    
                    tran_total = sum(value[1][index:]) + value[2] + value[1][index - 1]
                    
                    for value in value[0][index:]:
                        if value not in local_util:
                            local_util[value] = 0
                        local_util[value] += tran_total
            
            total = sum([x[2] for x in n_fd.values()])
            if total >= self.minUtil:
                pattern = "\t".join([self.rename[x] for x in prefix + [item]])
                self.Patterns[pattern] = total
                
            n_fd2 = {}
            subtree_util = {}
            
            for key, value in n_fd.items():                
                keys = []
                vals = []
                # print("Value:", value)
                for i in range(len(value[0])):
                    if local_util[value[0][i]] >= self.minUtil:
                        keys.append(value[0][i])
                        vals.append(value[1][i])
                
                if len(keys) == 0:
                    continue
                        
                total = sum(vals) + value[2]
                temp = 0
                for i in range(len(keys)):
                    if keys[i] not in subtree_util:
                        subtree_util[keys[i]] = 0
                    subtree_util[keys[i]] += total - temp
                    temp += vals[i]
                
                fs = frozenset(keys)
                if fs not in n_fd2:
                    n_fd2[fs] = [keys, vals, value[2]]
                else:
                    for i in range(len(vals)):
                        # n_fd2[fs][2][i] += vals[i]
                        n_fd2[fs][1][i] += vals[i]
                    n_fd2[fs][2] += value[2]
                            
            
            primary = [key for key in subtree_util.keys() if subtree_util[key] >= self.minUtil]
                            
            if len(primary) > 0:
                self._search(prefix + [item], n_fd2, primary)

    def startMine(self):
        self.mine()

    def mine(self):
        ps = psutil.Process(os.getpid())

        self.start = time.time()

        fileData, primary = self._read_file()

        self._search([], fileData, primary)
        
        self.memoryRSS = ps.memory_info().rss
        self.memoryUSS = ps.memory_full_info().uss

        end = time.time()
        self.runtime = end - self.start

    def save(self, outFile):
        self.oFile = outFile
        writer = open(self.oFile, 'w+')
        for x, y in self._finalPatterns.items():
            patternsAndSupport = x.strip() + ":" + str(y)
            writer.write("%s \n" % patternsAndSupport)
    
    def getPatternsAsDataFrame(self):
        dataFrame = {}
        data = []
        for a, b in self._finalPatterns.items():
            data.append([a.replace('\t', ' '), b])
            dataFrame = _ab._pd.DataFrame(data, columns=['Patterns', 'Utility'])

        return dataFrame

    def getPatterns(self):
        return self.Patterns

    def getRuntime(self):
        return self.runtime

    def getMemoryRSS(self):
        return self.memoryRSS

    def getMemoryUSS(self):
        return self.memoryUSS


    def printResults(self):
        print("Total number of High Utility Patterns:", len(self.getPatterns()))
        print("Total Memory in USS:", self.getMemoryUSS())
        print("Total Memory in RSS", self.getMemoryRSS())
        print("Total ExecutionTime in seconds:", self.getRuntime())


In [None]:
# obj = efimParallel("/home/tarun/cuEFIM/datasets/accidents_utility_spmf.txt", 15000000, " ")
obj = efimParallel("/home/tarun/testing/test.txt", 40, " ")
obj.mine()
obj.printResults()

FileNotFoundError: [Errno 2] No such file or directory: 'test.txt'

In [8]:
for k,v in obj.getPatterns().items():
    print(k,":", v)
pami = [v for k,v in obj.getPatterns().items()]

1	5	3 : 31
4	3 : 34
4	2 : 44
4	2	5 : 53
4	2	5	3 : 60
4	2	3 : 51
4	5 : 33
4	5	3 : 40
2	5 : 36
2	5	3 : 45
2	3 : 33
5	3 : 33
6	1	4	2	5	3 : 30


In [5]:
my = [31,33,36,45,44,51,53,60,34,33,40,33]

In [6]:
print(sorted(pami))
print(sorted(my))

[30, 31, 33, 33, 33, 34, 36, 40, 44, 45, 51, 53, 60]
[31, 33, 33, 33, 34, 36, 40, 44, 45, 51, 53, 60]


In [None]:
6	1	4	2	5	3 : 30

In [10]:
import os
import mmap
import time
import psutil
from joblib import Parallel, delayed
from deprecated import deprecated


from PAMI.highUtilityPattern.parallel import abstract as _ab

class efimParallel(_ab._utilityPatterns):
    """
    :Description:   EFIM is one of the fastest algorithm to mine High Utility ItemSets from transactional databases.
    
    :Reference:     Zida, S., Fournier-Viger, P., Lin, J.CW. et al. EFIM: a fast and memory efficient algorithm for
                     high-utility itemset mining. Knowl Inf Syst 51, 595â€“625 (2017). https://doi.org/10.1007/s10115-016-0986-0

    :param  iFile: str :
                   Name of the Input file to mine complete set of High Utility patterns
    :param  oFile: str :
                   Name of the output file to store complete set of High Utility patterns
    :param minUtil: int :
                   The user given minUtil value.
    :param maxMemory: int
                   Maximum memory used by this program for running
    :param  sep: str :
                   This variable is used to distinguish items from one another in a transaction. The default seperator is tab space. However, the users can override their default separator.


    :Attributes:

        inputFile (str):
            The input file path.
        minUtil (int):
            The minimum utility threshold.
        sep (str):
            The separator used in the input file.
        threads (int):
            The number of threads to use.
        Patterns (dict):
            A dictionary containing the discovered patterns.
        rename (dict):
            A dictionary containing the mapping between the item IDs and their names.
        runtime (float):
            The runtime of the algorithm in seconds.
        memoryRSS (int):
            The Resident Set Size (RSS) memory usage of the algorithm in bytes.
        memoryUSS (int):
            The Unique Set Size (USS) memory usage of the algorithm in bytes.

    :Methods:

        read_file():
            Read the input file and return the filtered transactions, primary items, and secondary items.
        binarySearch(arr, item):
            Perform a binary search on the given array to find the given item.
        project(beta, file_data, secondary):
            Project the given beta itemset on the given database.
        search(collections):
            Search for high utility itemsets in the given collections.
        mine():
            Start the EFIM algorithm.
        savePatterns(outputFile):
            Save the patterns discovered by the algorithm to an output file.
        getPatterns():
            Get the patterns discovered by the algorithm.
        getRuntime():
            Get the runtime of the algorithm.
        getMemoryRSS():
            Get the Resident Set Size (RSS) memory usage of the algorithm.
        getMemoryUSS():
            Get the Unique Set Size (USS) memory usage of the algorithm.
        printResults():
            Print the results of the algorithm.

    """

    def __init__(self, iFile, minUtil, sep="\t", threads=1):
        super().__init__(iFile, minUtil, sep)
        self.inputFile = iFile
        self.minUtil = minUtil
        self.sep = sep
        self.Patterns = {}
        self.rename = {}
        self.threads = threads

    # Read input file
    def _read_file(self):
        """
        Read the input file and return the filtered transactions, primary items, and secondary items.

        :return:

            filtered_transactions (dict): A dictionary containing the filtered transactions.
            primary (set): A set containing the primary items.
            secondary (set): A set containing the secondary items.
        """


        file_data = []
        twu = {}

        with open(self.inputFile, 'r') as f:
            fd = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ)

            for line in iter(fd.readline, b""):
                line = line.decode('utf-8').strip().split(":")
                
                # Parse and process the line
                line = [x.split(self.sep) for x in line]
                weight = int(line[1][0])

                # Update file data with the parsed items
                file_data.append([line[0], [int(x) for x in line[2]]])

                for k in line[0]:
                    if k not in twu:
                        twu[k] = weight
                    else:
                        twu[k] += weight

        # Filter TWU dictionary based on minUtil (minimum utility threshold)
        twu = {k: v for k, v in twu.items() if v >= self.minUtil}

        # Sort TWU items by utility
        twu = {k: v for k, v in sorted(twu.items(), key=lambda item: item[1], reverse=True)}

        strToInt = {}
        t = len(twu)
        for k in twu.keys():
            strToInt[k] = t
            self.rename[t] = k
            t -= 1

        secondary = set(self.rename.keys())

        # Filter and sort transactions
        subtree = {}
        filtered_transactions = {}
        for col in file_data:
            zipped = zip(col[0], col[1])
            transaction = [(strToInt[x], y) for x, y in zipped if x in strToInt]
            transaction = sorted(transaction, key=lambda x: x[0])
            if len(transaction) > 0:
                val = [x[1] for x in transaction]
                key = [x[0] for x in transaction]
                
                fs = frozenset(key)

                if fs not in filtered_transactions:
                    filtered_transactions[fs] = [key, val, 0]
                else:
                    filtered_transactions[fs][1] = [x + y for x, y in zip(filtered_transactions[fs][1], val)]

                subUtil = sum([x[1] for x in transaction])
                temp = 0

                for i in range(len(transaction)):
                    item = key[i]
                    if item not in subtree:
                        subtree[item] = subUtil - temp
                    else:
                        subtree[item] += subUtil - temp
                    temp += val[i]

        primary = [key for key in subtree.keys() if subtree[key] >= self.minUtil]

        return filtered_transactions, primary, secondary
    

    def _binarySearch(self, arr, item):
        """
        Do a binary search on the given array to find the given item.

        :param arr: The array to search in

        :type arr: list

        :param item: The item to search for

        :type item: int

        :return:

            mid (int):
                The index of the item if found, -1 otherwise.

        """

        low = 0
        high = len(arr) - 1
        mid = 0

        while low <= high:
            mid = (high + low) // 2
            if arr[mid] < item:
                low = mid + 1
            elif arr[mid] > item:
                high = mid - 1
            else:
                return mid

        return -1

    def _project(self, beta, file_data, secondary):
        """
        Project the given beta itemset on the given database.

        :param beta: The beta itemset to project

        :type beta: list

        :param file_data: The database to project on

        :type file_data: dict

        :param secondary: The set of secondary items

        :type secondary: set

        :return:

            projected_db (dict): The projected database.
            local_utils (dict): The local utilities of the projected database.
            subtree_utils (dict): The subtree utilities of the projected database.
            utility (int): The utility of the projected database.

        """


        projected_db = {}
        local_utils = {}
        subtree_utils = {}
        utility = 0     

        added = set()

        item = beta[-1]

        temp = [v for k, v in file_data.items() if item in k]
        start = time.time()

        for v in temp:
            index = self._binarySearch(v[0], item)

            curr = v[1][index] + v[2]
            utility += curr

            newKey = []
            newVal = []

            for i in range(index+1, len(v[0])):
                if v[0][i] in secondary:
                    newKey.append(v[0][i])
                    newVal.append(v[1][i])

            if len(newKey) == 0:
                continue

            s = sum(newVal)
            temp = 0

            for i in range(len(newKey)):
                if newKey[i] in added:
                    local_utils[newKey[i]] += s + curr
                    subtree_utils[newKey[i]] += s + curr - temp
                else:
                    local_utils[newKey[i]] = s + curr
                    subtree_utils[newKey[i]] = s + curr - temp
                    added.add(newKey[i])
                
                temp += newVal[i]
            
            fs = frozenset(newKey)

            if fs not in projected_db:
                projected_db[fs] = [newKey, newVal, curr]
            else:
                projected_db[fs][1] = [x + y for x, y in zip(projected_db[fs][1], newVal)]
                projected_db[fs][2] += curr

        nprimary = [key for key in subtree_utils.keys() if subtree_utils[key] >= self.minUtil]
        print(subtree_utils)
        nsecondary = set([key for key in local_utils.keys() if local_utils[key] >= self.minUtil])
        
        # print(subtree_utils)
        # print(local_utils)

        return beta, projected_db, nsecondary, nprimary, utility
    

    def _search(self, collections):

        """
        Search for frequent patterns in the given collections.

        :param collections: The collections to search in

        :type collections: list
        """

        if (self.threads > 1):
            with Parallel(n_jobs=self.threads) as parallel:
                while len(collections) > 0:
                    new_collections = []

                    # print("Num of tasks:", sum(len(collections[i][2]) for i in range(len(collections))))
                    results = parallel(delayed(self._project)(collections[i][0] + [collections[i][2][j]], collections[i][1], collections[i][3]) for i in range(len(collections)) for j in range(len(collections[i][2])))

                    for i in range(len(results)):
                        beta, projected_db, secondary, primary, utility = results[i]
                        if utility >= self.minUtil:
                            pattern = "\t".join([self.rename[x] for x in beta])
                            # self.Patterns[tuple(beta)] = utility
                            self.Patterns[pattern] = utility
                        if len(primary) > 0:
                            new_collections.append([beta, projected_db, primary, secondary])
                    
                    collections = new_collections

        else:
            while len(collections) > 0:
                new_collections = []
                for i in range(len(collections)):
                    for j in range(len(collections[i][2])):
                        beta, projected_db, secondary, primary, utility = self._project(collections[i][0] + [collections[i][2][j]], collections[i][1], collections[i][3])
                        print(beta, utility)
                        print(projected_db)
                        # print(secondary)
                        print(primary)
                        print()
                        if utility >= self.minUtil:
                            # pattern = "\t".join([self.rename[x] for x in beta])
                            pattern = "\t".join([str(x) for x in beta])
                            # self.Patterns[tuple(beta)] = utility
                            self.Patterns[pattern] = utility
                        if len(primary) > 0:
                            new_collections.append([beta, projected_db, primary, secondary])

                collections = new_collections


    @deprecated("It is recommended to use 'mine()' instead of 'mine()' for mining process. Starting from January 2025, 'mine()' will be completely terminated.")
    def startMine(self):
        """
        Start the EFIM algorithm.
        """

        self.mine()

    def mine(self):
        """
        Start the EFIM algorithm.
        """

        ps = psutil.Process(os.getpid())

        self.start = time.time()

        fileData, primary, secondary = self._read_file()

        collection = [[[], fileData, primary, secondary]]

        self._search(collection)

        self.memoryRSS = ps.memory_info().rss
        self.memoryUSS = ps.memory_full_info().uss

        end = time.time()
        self.runtime = end - self.start

    def save(self, outFile):
        """
        Complete set of frequent patterns will be loaded in to an output file
        :param outFile: name of the output file
        :type outFile: csv file
        """
        self.oFile = outFile
        writer = open(self.oFile, 'w+')
        for x, y in self._finalPatterns.items():
            patternsAndSupport = x.strip() + ":" + str(y)
            writer.write("%s \n" % patternsAndSupport)
    
    def getPatternsAsDataFrame(self):
        """
        Storing final patterns in a dataframe
        :return: returning patterns in a dataframe
        :rtype: pd.DataFrame
        """
        dataFrame = {}
        data = []
        for a, b in self._finalPatterns.items():
            data.append([a.replace('\t', ' '), b])
            dataFrame = _ab._pd.DataFrame(data, columns=['Patterns', 'Utility'])

        return dataFrame

    def getPatterns(self):
        """
        Get the patterns discovered by the algorithm.

        :return: A dictionary containing the discovered patterns.

        :rtype: dict
        """
        return self.Patterns

    def getRuntime(self):
        """
        Get the runtime of the algorithm.

        :return: The runtime in seconds.

        :rtype: float
        """
        return self.runtime

    def getMemoryRSS(self):
        """
        Get the Resident Set Size (RSS) memory usage of the algorithm.

        :return: The RSS memory usage in bytes.

        :rtype: int
        """
        return self.memoryRSS

    def getMemoryUSS(self):
        """
        Get the Unique Set Size (USS) memory usage of the algorithm.

        :return: The USS memory usage in bytes.

        :rtype: int
        """
        return self.memoryUSS


    def printResults(self):
        """
        This function is used to print the results
        """
        print("Total number of High Utility Patterns:", len(self.getPatterns()))
        print("Total Memory in USS:", self.getMemoryUSS())
        print("Total Memory in RSS", self.getMemoryRSS())
        print("Total ExecutionTime in seconds:", self.getRuntime())

# obj = efimParallel("/home/tarun/cuEFIM/datasets/accidents_utility_spmf.txt", 15000000, " ")
# obj.mine()
# obj.printResults()

# for k,v in obj.getPatterns().items():
#     print(k,v)

In [11]:
obj = efimParallel("test.txt", 40, " ")
obj.mine()
obj.printResults()


{2: 33, 5: 28, 4: 31, 3: 13}
[1] 20
{frozenset({2, 5}): [[2, 5], [2, 1], 5], frozenset({4, 5}): [[4, 5], [6, 6], 10], frozenset({2, 3, 4, 5}): [[2, 3, 4, 5], [12, 4, 3, 1], 5]}
[]

{5: 34, 3: 60, 4: 40}
[2] 26
{frozenset({5}): [[5], [1], 2], frozenset({3, 4, 5}): [[3, 4, 5], [20, 9, 7], 24]}
[3, 4]

{4: 45, 5: 33}
[3] 24
{frozenset({4, 5}): [[4, 5], [12, 9], 24]}
[4]

{4: 60, 5: 51}
[2, 3] 44
{frozenset({4, 5}): [[4, 5], [9, 7], 44]}
[4, 5]

{5: 40}
[2, 4] 33
{frozenset({5}): [[5], [7], 33]}
[5]

{5: 45}
[3, 4] 36
{frozenset({5}): [[5], [9], 36]}
[5]

{5: 60}
[2, 3, 4] 53
{frozenset({5}): [[5], [7], 53]}
[5]

{}
[2, 3, 5] 51
{}
[]

{}
[2, 4, 5] 40
{}
[]

{}
[3, 4, 5] 45
{}
[]

{}
[2, 3, 4, 5] 60
{}
[]

Total number of High Utility Patterns: 6
Total Memory in USS: 118292480
Total Memory in RSS 120049664
Total ExecutionTime in seconds: 0.00407862663269043


In [3]:
for k,v in obj.getPatterns().items():
    print(k,v)

2	3 44
2	3	4 53
2	3	5 51
2	4	5 40
3	4	5 45
2	3	4	5 60
