In [None]:
import timeit
import random
from collections import defaultdict
from typing import List, Dict, Tuple, Set
from itertools import chain
from collections import OrderedDict

start = timeit.default_timer()
class WNTreeNode:
    def __init__(self, item: str, weight: float, pre: int = -1, pos: int = -1):
        self.item = item
        self.children = {}
        self.pre = pre
        self.pos = pos
        self.weight = weight

    def get_children(self):
        return self.children

    def get_weight(self):
        return self.weight

    def get_name(self):
        return self.item

    def add_child(self, child_item, child_weight):
        self.children[child_item] = WNTreeNode(child_item, child_weight)

    def update_weight(self, transaction_weight):
        self.weight += transaction_weight

    def assign_pre_pos(self):
        count_pre = [0]
        count_pos = [0]

        self._dfs_assign_pre_pos(count_pre, count_pos)

    def _dfs_assign_pre_pos(self, count_pre, count_pos):
        self.pre = count_pre[0]
        count_pre[0] += 1

        for child in self.children.values():
            child._dfs_assign_pre_pos(count_pre, count_pos)

        self.pos = count_pos[0]
        count_pos[0] += 1

    def calculate_all_wl(self) -> Dict[str, List['WNTreeNode']]:
        item_to_wl = defaultdict(list)
        self.traverse_and_calculate_wl(item_to_wl)
        return dict(item_to_wl)

    def traverse_and_calculate_wl(self, item_to_wl: Dict[str, List['WNTreeNode']]):
        if self is None:
            return

        item_name = self.get_name()
        item_to_wl[item_name].append(self)

        for child in self.get_children().values():
            child.traverse_and_calculate_wl(item_to_wl)



In [None]:
def generate_weighted_data(num_transactions: int, num_items: int) -> List[List[Tuple[str, float]]]:
    """
    Generates a list of weighted transactions.

    Args:
    - num_transactions: The number of transactions to generate.
    - num_items: The maximum number of items per transaction.

    Returns:
    - List of weighted transactions where each transaction is represented as a list of tuples containing
      the item and its corresponding weight.
    """
    weighted_data = []
    item_weights = {}

    random.seed()  # Initialize the random number generator

    for _ in range(num_transactions):
        transaction = []
        used_items = set()

        while len(transaction) < random.randint(2, num_items):
            item = chr(random.randint(65, 75))  # Generate items from A to K

            if item not in used_items:
                used_items.add(item)

                if item not in item_weights:
                    # Assign a permanent weight for the item within the range of 0.1 to 0.9
                    random_weight_int = random.randint(1, 9)  # Random integer between 1 and 9
                    item_weight = random_weight_int / 10.0  # Convert to double and scale to range [0.1, 0.9]
                    item_weights[item] = item_weight

                # Check if item already exists in the transaction
                if (item, item_weights[item]) not in transaction:
                    transaction.append((item, item_weights[item]))

        weighted_data.append(transaction)
    return weighted_data

min_ws=0.3

#weighted_data=(generate_weighted_data(5, 5))
#print(weighted_data)

In [None]:
#Simulate weighted transaction in the article
item_weights_2={'A':0.8,'B':0.3,'C':0.1,'D':0.9,'E':0.2,'F':0.5}
weighted_data=[[('A', 0), ('C', 0), ('D', 0), ('E', 0)],
 [('B', 0), ('C',0 ), ('E', 0)],
 [('A', 0),('B', 0),('C', 0)],
 [('A', 0),('C', 0),('E', 0),('F', 0)],
 [('A', 0),('C', 0),('D', 0),('E', 0),('F', 0)],
 [('B', 0),('C', 0),('D', 0),('E', 0),('F', 0)],
 [('A', 0),('C', 0),('D', 0)]]

for sublist in weighted_data:
    for item, weight in sublist:
        if item in item_weights_2:
            sublist[sublist.index((item, weight))] = (item, item_weights_2[item])

print(weighted_data)

[[('A', 0.8), ('C', 0.1), ('D', 0.9), ('E', 0.2)], [('B', 0.3), ('C', 0.1), ('E', 0.2)], [('A', 0.8), ('B', 0.3), ('C', 0.1)], [('A', 0.8), ('C', 0.1), ('E', 0.2), ('F', 0.5)], [('A', 0.8), ('C', 0.1), ('D', 0.9), ('E', 0.2), ('F', 0.5)], [('B', 0.3), ('C', 0.1), ('D', 0.9), ('E', 0.2), ('F', 0.5)], [('A', 0.8), ('C', 0.1), ('D', 0.9)]]


In [None]:
def calculate_transaction_weights(weighted_data: List[List[object]]) -> List[float]:
    """
    Calculates transaction weights based on the weighted data provided.

    Args:
    - weighted_data: A list of lists containing tuples or pairs of objects representing items and their weights.

    Returns:
    - List[float]: A list containing calculated transaction weights for each transaction in the input data.
    """
    transaction_weights = []

    for transaction in weighted_data:
        transaction_weight = 0.0
        item_count = 0

        for item_weight_pair in transaction:
            # Unpack the item and weight from the pair
            item, weight = item_weight_pair

            transaction_weight += weight
            item_count += 1

        # Normalize the transaction weight by averaging over item count
        transaction_weight = round((transaction_weight / item_count) * 10000.0) / 10000.0
        transaction_weights.append(transaction_weight)

    return transaction_weights
transaction_weights=calculate_transaction_weights(weighted_data)
print(transaction_weights)
sum_transaction_weights=sum(transaction_weights)
print(sum_transaction_weights)

[0.5, 0.2, 0.4, 0.4, 0.5, 0.4, 0.6]
3.0


In [None]:
def calculate_and_sort_item_supports(weighted_data: List[List[object]], transaction_weights: List[float]) -> List[Tuple[str, float]]:
    """
    Calculates and sorts item supports based on the weighted data and transaction weights provided.

    Args:
    - weighted_data: A list of lists containing tuples or pairs of objects representing items and their weights.
    - transaction_weights: A list containing the weights of transactions corresponding to the weighted data.

    Returns:
    - List[Tuple[str, float]]: A list of tuples containing item names and their calculated supports, sorted in descending order of support.
    """
    item_supports = defaultdict(float)

    for transaction, transaction_weight in zip(weighted_data, transaction_weights):
        for item, item_weight in transaction:
            if item not in item_supports:
                item_supports[item] = 0

            item_supports[item] += transaction_weight

    for item in item_supports:
        item_supports[item] /= sum(transaction_weights)
        item_supports[item] = round(item_supports[item],2)
    sorted_item_supports = sorted(item_supports.items(), key=lambda x: x[1], reverse=True)
    return sorted_item_supports

sorted_item_supports = calculate_and_sort_item_supports(weighted_data, transaction_weights)
print(sorted_item_supports)

[('C', 1.0), ('A', 0.8), ('D', 0.67), ('E', 0.67), ('F', 0.43), ('B', 0.33)]


In [None]:
def remove_items_by_weight_threshold(weighted_data: List[List[Tuple[str, float]]],
                                     sorted_item_supports: List[Tuple[str, float]],
                                     min_ws: float) -> List[List[Tuple[str, float]]]:
    """
    Removes items from the weighted data based on a minimum weight threshold.

    Args:
    - weighted_data: A list of lists containing tuples representing items and their corresponding weights.
    - sorted_item_supports: A list of tuples containing sorted item supports in descending order.
    - min_ws: The minimum weight threshold for including items in the updated data.

    Returns:
    - List[List[Tuple[str, float]]]: Updated weighted data with items meeting the weight threshold.
    """
    updated_weighted_data = []

    for transaction in weighted_data:
        updated_transaction = []

        for item_weight_pair in transaction:
            item, weight = item_weight_pair  # Unpack the item and weight from the pair

            # Find the item in sorted_item_supports
            should_include_item = any(entry[0] == item and entry[1] >= min_ws for entry in sorted_item_supports)

            if should_include_item:
                updated_transaction.append(item_weight_pair)

        updated_weighted_data.append(updated_transaction)

    return updated_weighted_data

weighted_data= remove_items_by_weight_threshold(weighted_data,sorted_item_supports,min_ws)
print(weighted_data)

[[('A', 0.8), ('C', 0.1), ('D', 0.9), ('E', 0.2)], [('B', 0.3), ('C', 0.1), ('E', 0.2)], [('A', 0.8), ('B', 0.3), ('C', 0.1)], [('A', 0.8), ('C', 0.1), ('E', 0.2), ('F', 0.5)], [('A', 0.8), ('C', 0.1), ('D', 0.9), ('E', 0.2), ('F', 0.5)], [('B', 0.3), ('C', 0.1), ('D', 0.9), ('E', 0.2), ('F', 0.5)], [('A', 0.8), ('C', 0.1), ('D', 0.9)]]


In [None]:
#sort the items in each transaction in descending order based on its ws
item_name_to_index = {item: i for i, (item, _) in enumerate(sorted_item_supports)}

sorted_transactions_and_weights = []
for transaction_index, transaction in enumerate(weighted_data):
    sorted_transaction_items = sorted(transaction, key=lambda x: sorted_item_supports[item_name_to_index[x[0]]][1], reverse=True)
    transaction_weight = transaction_weights[transaction_index]

    sorted_transaction_items = [item for item, weight in sorted_transaction_items]

    sorted_transactions_and_weights.append((sorted_transaction_items, transaction_weight))

print(sorted_transactions_and_weights)

[(['C', 'A', 'D', 'E'], 0.5), (['C', 'E', 'B'], 0.2), (['C', 'A', 'B'], 0.4), (['C', 'A', 'E', 'F'], 0.4), (['C', 'A', 'D', 'E', 'F'], 0.5), (['C', 'D', 'E', 'F', 'B'], 0.4), (['C', 'A', 'D'], 0.6)]


In [None]:
def build_wn_tree(sorted_transactions_and_weights: List[Tuple[List[str], float]], min_ws: float) -> WNTreeNode:
    """
    Builds a Weighted node Tree (WN-Tree) from sorted transactions and weights.

    Args:
    - sorted_transactions_and_weights: A list of tuples containing a list of strings (items) and their corresponding float weights.
    - min_ws: Minimum weight threshold.

    Returns:
    - WNTreeNode: The root node of the generated WN-Tree.
    """
    wn_tree = WNTreeNode("", 0.0)
    for transaction in sorted_transactions_and_weights:
        insert_tree(transaction, wn_tree)
    return wn_tree



In [None]:
def print_wn_tree(root: WNTreeNode, prefix: str = "", is_tail: bool = True) -> None:
    """
    Recursively prints the structure of a Weighted N-ary Tree (WN-Tree).

    Args:
    - root: The root node of the WN-Tree.
    - prefix: A string used for spacing and visualization.
    - is_tail: Indicates whether the current node is a leaf node.

    Returns:
    - None: Prints the WN-Tree structure.
    """
    print(prefix + ("" if is_tail else "│ ") + root.get_name() + f" (Pre: {root.pre}, Pos: {root.pos}, Weight: {root.get_weight()})")
    children = list(root.get_children().values())
    for i, child in enumerate(children[:-1]):
        print_wn_tree(child, prefix + ("" if is_tail else "│ ") + "   ", False)
    if children:
        print_wn_tree(children[-1], prefix + ("" if is_tail else "│ ") + "   ", True)

In [None]:
def insert_tree(transaction: tuple, tree: WNTreeNode) -> None:
    """
    Inserts a transaction into the WNTree, updating weights and creating new nodes as needed.

    Args:
    - transaction: A tuple containing a list of items and the corresponding transaction weight.
    - tree: The WNTreeNode root of the tree.
    """
    current_node = tree
    item_pair, transaction_weight = transaction

    for item in item_pair:
        found = False

        for child_key, child_node in current_node.children.items():
            if item == child_key:
                found = True
                current_node = child_node
                current_node.weight += transaction_weight
                break

        if not found:
            new_child = WNTreeNode(item, transaction_weight)
            current_node.children[item] = new_child
            current_node = new_child

wn_tree = build_wn_tree(sorted_transactions_and_weights, min_ws)
wn_tree.assign_pre_pos()

# Printing the WNTree
print("WNTree:")
print_wn_tree(wn_tree, "", True)

WNTree:
 (Pre: 0, Pos: 14, Weight: 0.0)
   C (Pre: 1, Pos: 13, Weight: 3.0)
      │ A (Pre: 2, Pos: 6, Weight: 2.4)
      │    │ D (Pre: 3, Pos: 2, Weight: 1.6)
      │    │    E (Pre: 4, Pos: 1, Weight: 1.0)
      │    │       F (Pre: 5, Pos: 0, Weight: 0.5)
      │    │ B (Pre: 6, Pos: 3, Weight: 0.4)
      │    E (Pre: 7, Pos: 5, Weight: 0.4)
      │       F (Pre: 8, Pos: 4, Weight: 0.4)
      │ E (Pre: 9, Pos: 8, Weight: 0.2)
      │    B (Pre: 10, Pos: 7, Weight: 0.2)
      D (Pre: 11, Pos: 12, Weight: 0.4)
         E (Pre: 12, Pos: 11, Weight: 0.4)
            F (Pre: 13, Pos: 10, Weight: 0.4)
               B (Pre: 14, Pos: 9, Weight: 0.4)


In [None]:
# Print WL for each item
all_wl = wn_tree.calculate_all_wl()
del all_wl['']
sorted_item=[]
for item_name, item_wl in all_wl.items():
    print(f"WL({item_name}):")
    sorted_item.append(item_name)
    for node in item_wl:
        print(f"({node.pre}, {node.pos}, {node.weight})")
    print()
print(sorted_item)

WL(C):
(1, 13, 3.0)

WL(A):
(2, 6, 2.4)

WL(D):
(3, 2, 1.6)
(11, 12, 0.4)

WL(E):
(4, 1, 1.0)
(7, 5, 0.4)
(9, 8, 0.2)
(12, 11, 0.4)

WL(F):
(5, 0, 0.5)
(8, 4, 0.4)
(13, 10, 0.4)

WL(B):
(6, 3, 0.4)
(10, 7, 0.2)
(14, 9, 0.4)

['C', 'A', 'D', 'E', 'F', 'B']


In [None]:
def union(s1: str, s2: str) -> str:
    # Convert list of tuples to a dictionary with order preserved
    predict_dict = {item: i for i, item in enumerate(sorted_item)}

    combined = s2 + s1

    # Sort the combined string based on the dictionary value
    unique_chars = set(combined)
    sorted_combined = ''.join(sorted(unique_chars, key=predict_dict.get, reverse=True))
    return sorted_combined


In [None]:
def wl_intersection(wl1: List[WNTreeNode], wl2: List[WNTreeNode]) -> List[WNTreeNode]:
    """
    Computes the intersection of two lists of WNTreeNode objects based on specified conditions.

    Args:
    - wl1: List of WNTreeNode objects (sorted by pre)
    - wl2: List of WNTreeNode objects (sorted by pre)

    Returns:
    - List of WNTreeNode objects representing the intersection of wl1 and wl2
    """
    wl3 = []
    s = 0
    i = 0
    j = 0
    m = len(wl1)
    n = len(wl2)

    while i < m and j < n:
        pre1i = wl1[i].pre
        pos1i = wl1[i].pos
        w1i = wl1[i].weight

        pre2j = wl2[j].pre
        pos2j = wl2[j].pos

        if pre2j < pre1i:
            if pos2j > pos1i:
                if s > 0 and pre2j == wl3[s - 1].pre:
                    wl3[s - 1].weight += w1i
                else:
                    s += 1
                    newNode = WNTreeNode(union(wl1[i].item,wl2[j].item ), w1i,pre2j, pos2j)
                    wl3.append(newNode)
                i += 1
            else:
                j += 1
        else:
            i += 1
    return wl3

In [None]:
# Function to calculate weighted support
def calculate_weighted_support(nodes: List[WNTreeNode]) -> float:
        """
        Calculate the weighted support for a list of WNTreeNode objects.

        Args:
        - nodes (List[WNTreeNode]): A list of WNTreeNode objects.

        Returns:
        - float: Weighted support calculated as the sum of weights divided by the total transaction weights.
        """
        weight_sum = sum(node.weight for node in nodes)
        return weight_sum/sum_transaction_weights


def is_ancestor(wl_pa1: List['WNTreeNode'], wl_pa2: List['WNTreeNode']) -> bool:
    """
    Check if the WNTreeNode list wl_pa1 is an ancestor of wl_pa2.

    Args:
    - wl_pa1 (List['WNTreeNode']): List of WNTreeNode objects to check if they are ancestors.
    - wl_pa2 (List['WNTreeNode']): List of WNTreeNode objects to be checked against.

    Returns:
    - bool: True if wl_pa1 is an ancestor of wl_pa2, False otherwise.
    """
    count=0
    check=len(wl_pa1)
    ancestor_found = False
    for ci in wl_pa2:
        for cj in wl_pa1:
            if ci.pre < cj.pre and ci.pos > cj.pos:
                count += 1
    if count==check:
        ancestor_found=True
    else:
        ancestor_found=False

    return ancestor_found



In [None]:
FWCIs = []
def NFWCI(all_wl_1_item, minws):
    FWCIs = find_fwci(all_wl_1_item)

    return FWCIs

def find_fwci(calculated_wls: Dict[str, List[WNTreeNode]]) -> List[str]:
    """
    Finds frequent weighted closed itemsets (FWCIs) from calculated weighted lists (calculated_wls).

    Args:
    - calculated_wls (Dict[str, List[WNTreeNode]]): Dictionary containing keys as item names and values as corresponding WNTreeNode lists.
    - min_ws (float): Minimum weighted support threshold.

    Returns:
    - List[str]: List of frequent weighted closed itemsets.
    """
    Is = list(calculated_wls.keys())
    for i in range(len(Is) - 1, -1, -1):
        Inext = {}
        Inext_clone={}
        Xi = Is[i]
        wl_Xi = calculated_wls.get(Xi, [])
        for j in range(i - 1, -1, -1):
            Xj = Is[j]
            wl_Xj = calculated_wls.get(Xj, [])
            if (is_ancestor(wl_Xi,wl_Xj)):
                wl_Xi = wl_intersection(wl_Xi, wl_Xj)
                Xi = union(Xi,Xj)

                for Xk in Inext:
                    if union(Xk,Xj) not in Inext:
                        Inext_clone[union(Xk,Xj)]=[]
                    Inext_clone[union(Xk,Xj)]=(Inext.get(Xk,[]))
                    Inext_clone = OrderedDict((key, value) for key, value in reversed(Inext_clone.items()))

                if calculate_weighted_support(wl_Xi) == calculate_weighted_support(wl_Xj):
                    Is.remove(Xj)
                    i -= 1
                Inext.clear()
                Inext=dict(Inext_clone)
            else:
                wl_Xi_Xj = wl_intersection(wl_Xi, wl_Xj)
                if calculate_weighted_support(wl_Xi_Xj) >= min_ws and set(wl_Xi_Xj) not in FWCIs:
                    union_key = union(Xi, Xj)
                    if union_key not in Inext:
                        Inext[union_key]=(wl_Xi_Xj)
        find_fwci(Inext)
        FWCIs.append(Xi)
    return FWCIs



result_FWCIs = NFWCI(all_wl, min_ws)
print(result_FWCIs)

stop = timeit.default_timer()

print('Time: ', stop - start)

['BC', 'FEDC', 'FEAC', 'FEC', 'EDAC', 'EDC', 'EAC', 'EC', 'DAC', 'DC', 'AC', 'C']
Time:  1.063687066
