## Thuật toán TKN

In [25]:
"""
Hàm tìm PHN(Positive, Hybrid, Negative):
Input:
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Output:
  List(String): P (Items with positive utility)
  List(String): H (Items with both positive and negative utility)
  List(String): N (Items with negative utility)
"""
def find_PHN(dataset):
  P = []
  H = []
  N = []

  for transaction, values in dataset.items():
    items = values[0]
    quantities = values[1]
    profits = values[2]
    for item in items:
      idx = items.index(item)
      profit = profits[idx]

      if profit >= 0:
        if item not in P:
          P.append(item)
      elif profit < 0:
        if item not in N:
          N.append(item)

  H = list(set(P) & set(N))
  P = list(set(P) - set(H))
  N = list(set(N) - set(H))

  return P, H, N

In [26]:
"""
Hàm tính PTWU(Positive transaction-weighted utility) cho tất cả các item:
Input:
  Dictionary: Dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Output:
  Dictionary: PTWU = {Item(String) : PTWU(Int/Double)}
"""
def calculate_PTWU_1_itemsets(dataset):
  PTU = {}
  PTWU = {}

  for transaction, values in dataset.items():
    items = values[0]
    quantities = values[1]
    profits = values[2]
    PTU_value = 0
    for item in items:
      idx = items.index(item)
      quantity = quantities[idx]
      profit = profits[idx]
      if profit > 0:
        PTU_value += quantity * profit
    PTU[transaction] = PTU_value

  for transaction, values in dataset.items():
    items = values[0]
    quantities = values[1]
    profits = values[2]
    for item in items:
      idx = items.index(item)
      quantity = quantities[idx]
      profit = profits[idx]
      if item not in PTWU:
        PTWU[item] = PTU[transaction]
      else:
        PTWU[item] += PTU[transaction]

  return PTWU

In [27]:
"""
Hàm tính PRIU(Positive transaction-weighted utility) cho các Item không thuộc N:
Input:
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Ouput:
  Dictionary: PRIU_list = {Item(String): PRIU(Int/Double)}
"""
def calculate_PRIU_list(dataset):
  PRIU_list = {}

  for transaction, values in dataset.items():
    items = values[0]
    quantities = values[1]
    profits = values[2]
    for i in range(len(items)):
      item = items[i]
      quantity = quantities[i]
      profit = profits[i]
      if profit > 0:
        if item in PRIU_list:
          PRIU_list[item] += quantity * profit
        else:
          PRIU_list[item] = quantity * profit

  return PRIU_list

In [28]:
"""Dùng PRIU strategy để nâng minU:
Input:
  Dictionary: PRIU_list = {Item(String): PRIU(Int/Double)}
  Int: k
  Int/Double: minU
Output:
  Int/Double: minU (new)
"""
def PRIU_strategy(PRIU_list, k, minU):
    # Sắp xếp PRIU_list theo giá trị giảm dần
    sorted_PRIU_list = sorted(PRIU_list.items(), key=lambda item: item[1], reverse=True)

    # Trả về giá trị PRIU tại vị trí k-1
    if k <= len(sorted_PRIU_list):
        return sorted_PRIU_list[k - 1][1]
    else:
        return minU

In [29]:
"""
Tạo list processing_order dùng để sắp xếp item theo định nghĩa:
Input:
  List(String): P (Items with only positive utility)
  List(String): H (Items with both positive and negative utility)
  List(String): N (Items with only negative utility)
  Dictionary: PTWU = {Item(String) : PTWU(Int/Double)}
Output:
  List(String): processing_order
"""
def create_processing_order(P, H, N, PTWU):
  processing_order = []

  sorted_P = sorted(P, key=lambda item: PTWU.get(item, float('inf')))
  sorted_H = sorted(H, key=lambda item: PTWU.get(item, float('inf')))
  sorted_N = sorted(N, key=lambda item: PTWU.get(item, float('inf')))

  processing_order = sorted_P + sorted_H + sorted_N

  return processing_order

In [30]:
def sort_dataset(dataset, processing_order):
  sorted_dataset = {}
  for transaction, values in dataset.items():
    items = values[0]
    quantities = values[1]
    profits = values[2]
    sorted_items = []
    sorted_quantities = []
    sorted_profits = []

    for item in processing_order:
      if item in items:
        idx = items.index(item)
        sorted_items.append(item)
        sorted_quantities.append(quantities[idx])
        sorted_profits.append(profits[idx])

    sorted_dataset[transaction] = [sorted_items, sorted_quantities, sorted_profits]
  return sorted_dataset

In [31]:
"""
Hàm sắp xếp 1 list bất kỳ dựa trên processing_order:
Input:
  List(String): l
  List(String): processing_order = [sorted P + sorted H]
Output:
  List(String): l_sorted
"""
def sort_list(l, processing_order):
  l_sorted = []

  for item in processing_order:
    if item in l:
      l_sorted.append(item)

  return l_sorted

In [32]:
"""
Tìm ra tập Secondary(a):
Input:
  Dictionary: PTWU = {Item(String) : PTWU(Int/Double)}
  Int/Double: minU
Output:
  List(String): secondary_a (Items with PTWU >= minU)
"""
def find_secondary_a(PTWU, minU):
  secondary_a = []

  for item, value in PTWU.items():
    if value >= minU:
      secondary_a.append(item)

  return secondary_a

In [33]:
"""
Scan dataset để loại bỏ các item không nằm trong SecondaryX và xóa các transaction trống nếu có.
Đồng thời sắp xếp các item trong mỗi giao dịch theo processing_order:
Input:
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
  List(String): secondary_a (Items with PTWU >= minU)
  List(String): processing_order
Output:
  Dictionary: Scanned_Dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
"""
def scan_dataset(dataset, secondary_a):
  scanned_dataset = {}

  for transaction, values in dataset.items():
    transaction_items = values[0]
    transaction_quantities = values[1]
    transaction_profits = values[2]

    new_items = []
    new_quantities = []
    new_profits = []

    for item in transaction_items:
      if item in secondary_a:
        idx = transaction_items.index(item)
        item_transaction_quantity = transaction_quantities[idx]
        item_transaction_profit = transaction_profits[idx]

        new_items.append(item)
        new_quantities.append(item_transaction_quantity)
        new_profits.append(item_transaction_profit)

    if len(new_items) > 0:
      scanned_dataset[transaction] = [new_items, new_quantities, new_profits]

  return scanned_dataset

In [34]:
"""
Tính PSU(Positive Sub-tree Utility) với x là các item trong secondary_a:
Input:
  List(String): a
  String: x
  List(String): processing_order
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
  List(String): N (Items with negative utility)
Output:
  Int/Double: PSU
"""
def calculate_PSU(a, x, processing_order, dataset, N):
  u_a, u_x, u_i = 0, 0, 0
  PSU = 0

  for transaction, values in dataset.items():
    transaction_items = values[0]
    transaction_quantities = values[1]
    transaction_profits = values[2]

    if all(item in transaction_items for item in a) and x in transaction_items:
      for item in a:
        item_idx = transaction_items.index(item)
        item_quantity = transaction_quantities[item_idx]
        item_profit = transaction_profits[item_idx]
        item_utility = item_quantity * item_profit
        PSU += item_utility


      x_idx = transaction_items.index(x)
      if x in N:
        x_utility = 0
      else:
        x_utility = transaction_quantities[x_idx] * transaction_profits[x_idx]
      PSU += x_utility


      x_processing_order_index = processing_order.index(x)
      if x_processing_order_index == len(processing_order) - 1:
         PSU += 0
      else:
        for item in processing_order[x_processing_order_index + 1:]:
          if item in transaction_items:
            if item in N:
              PSU += 0
            else:
              item_idx = transaction_items.index(item)
              item_quantity = transaction_quantities[item_idx]
              item_profit = transaction_profits[item_idx]
              item_utility = item_quantity * item_profit
              PSU += item_utility

  return PSU

In [35]:
"""
Xây dựng LIUS(Leaf Itemset Utility Structure) từ các item không thuộc N trong dataset:
Input:
  List(String): processing_order
  Dictionary: scanned_dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
  List(String): N (Items with negative utility)
Output:
  Dictionary: LIUS = {[Item x, Item y]: LIU}
"""
def create_LIUS(processing_order, scanned_dataset, N):
  LIUS = {}

  for transaction, values in scanned_dataset.items():
    transaction_items = values[0]
    transaction_quantities = values[1]
    transaction_profits = values[2]

    n = len(processing_order)
    for i in range(n - 1):
      item_i = processing_order[i]
      if item_i in N:
        continue

      for j in range(i + 1, n):
        item_j = processing_order[j]
        if item_j in N:
          continue

        if item_i in transaction_items and item_j in transaction_items:
          x_y = tuple([item_i, item_j])
          x_y_items = processing_order[i:j + 1]
          if all(item in transaction_items for item in x_y_items):
            for item_k in x_y_items:
              item_k_idx = transaction_items.index(item_k)
              item_k_quantity = transaction_quantities[item_k_idx]
              item_k_profit = transaction_profits[item_k_idx]
              item_k_utility = item_k_quantity * item_k_profit
              if item_k_utility > 0:
                if x_y in LIUS:
                  LIUS[x_y] += item_k_utility
                else:
                  LIUS[x_y] = item_k_utility

  return LIUS

In [36]:
"""
Xây dựng PIQU_LIU(Positive Real Item Utility - Leaf Itemset Utility):
Input:
  Dictionary: LIUS = {[Item x, Item y](String): LIU(Int/Double)}
  Int: k
Output:
  List([[Item x, Item y](String), LIU(Int/Double)]): PIQU_LIU
"""
def create_PIQU_LIU(LIUS, k):
  PIQU_LIU = []

  for pair, value in LIUS.items():
    if value not in PIQU_LIU:
      PIQU_LIU.append(value)

  PIQU_LIU.sort(reverse=True)

  if k <= len(PIQU_LIU):
    return PIQU_LIU[:k]

  return PIQU_LIU

In [37]:
"""
Chiến lược PLIU_E(Positive Leaf Itemset Utility - Exact):
Input:
  List([[Item x, Item y](String), LIU(Int/Double)]): PIQU_LIU
  Int: k
  Int/Double: minU
Output:
  Int/Double: minU (new)
"""
def PLIU_E_strategy(PIQU_LIU, k, minU):
  if k <= len(PIQU_LIU):
    new_minU = PIQU_LIU[k - 1]
    if new_minU > minU:
      return new_minU

  return minU

In [38]:
"""
Tính utility của 1 item trong toàn bộ dataset:
Input:
  String: item
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Output:
  Int/Double: utility (Tổng giá trị utility của item trong dataset)
"""
def calculate_utility(item, dataset):
  utility = 0

  for transaction, values in dataset.items():
    transaction_items = values[0]
    transaction_quantities = values[1]
    transaction_profits = values[2]
    if item in transaction_items:
      item_idx = transaction_items.index(item)
      item_quantity = transaction_quantities[item_idx]
      item_profit = transaction_profits[item_idx]
      item_utility = item_quantity * item_profit
      utility += item_utility

  return utility

In [39]:
"""
Tính Primary(a) từ Secondary(a) và minU:
Input:
  List(String): a
  List(String): secondary_a
  Dictionary: PSU = {[a, x](List): PSU(Int/Double)}
  Int/Double: minU
Output:
  List(String): primary_a = [Secondary(a) Items with PSU(a, item) >= minU]
"""
def calculate_primary_a(a, secondary_a, PSU, minU):
  primary_a = []

  for item in secondary_a:
    if (PSU[tuple(a + [item])] >= minU):
      primary_a.append(item)

  return primary_a

In [40]:
"""
Tính PLU(Positive Local Utility):
Input:
  List(String): a
  String: i
  List(String): processing_order
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Output:
  Int/Double: PLU(a, i)
"""
def calculate_PLU(a, i, processing_order, dataset):
  PLU = 0
  ai = a + [i]

  for transaction, values in dataset.items():
    transaction_items = values[0]
    transaction_quantities = values[1]
    transaction_profits = values[2]
    if all(item in transaction_items for item in ai):
      for item in transaction_items:
        if item in a or processing_order.index(item) > processing_order.index(a[-1]):
          item_idx = transaction_items.index(item)
          item_quantity = transaction_quantities[item_idx]
          item_profit = transaction_profits[item_idx]
          item_utility = item_quantity * item_profit
          if item_utility > 0:
            PLU += item_utility

  return PLU

In [41]:
"""
Tính utility của 1 itemset:
Input:
  List(String): itemset
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Output:
  Int/Double: utility
"""
def calculate_utility_itemset(itemset, dataset):
  utility = 0

  for transaction, values in dataset.items():
    transaction_items = values[0]
    transaction_quantities = values[1]
    transaction_profits = values[2]
    if all(item in transaction_items for item in itemset):
      for i in transaction_items:
        if i in itemset:
          item_idx = transaction_items.index(i)
          item_quantity = transaction_quantities[item_idx]
          item_profit = transaction_profits[item_idx]
          item_utility = item_quantity * item_profit
          utility += item_utility

  return utility

In [42]:
"""
Hàm tính topkHUIs(Top-k High Utility Itemsets):
Input:
  List(String): a
  Int/Double: prfU
  List(String): N (Items with negative utility)
  List(String): primary_a
  List(String): secondary_a
  Int/Double: minU
  List: TKHQ = []
  List(String): processing_order
  Int: k
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
  Dictionary: default_dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
Output:
  List[List[String]]: TKHQ
"""
def Search(a, N, primary_a, secondary_a, minU, TKHQ, processing_order, k, dataset):
  for z in primary_a:
    if z in a:
      continue

    B = a + [z]

    U_B = calculate_utility_itemset(B, dataset)

    if U_B >= minU:
      TKHQ[tuple(B)] = U_B
      if len(TKHQ) == k:
        minU = min(TKHQ.values())

    if z in secondary_a:
      i = secondary_a.index(z)
      if i + 1 < len(secondary_a):
        w = secondary_a[i + 1]
        if U_B < minU and w in N:
          continue

    PLU_B_w = {}
    PSU_B_w = {}
    for w in secondary_a:
      if processing_order.index(w) > processing_order.index(B[-1]):
        PLU_B_w[w] = calculate_PLU(B, w, processing_order, dataset)
        PSU_B_w[w] = calculate_PSU(B, w, processing_order, dataset, N)

    primary_b = []
    for w in secondary_a:
      if w in PSU_B_w and PSU_B_w[w] >= minU:
        primary_b.append(w)

    secondary_b = []
    for w in secondary_a:
      if w in PLU_B_w and PLU_B_w[w] >= minU:
        secondary_b.append(w)

    Search(B, N, primary_b, secondary_b, minU, TKHQ, processing_order, k, dataset)

In [43]:
"""
Hàm tìm top-k High Utility Itemsets (HUIs) bằng thuật toán TKN:
Input:
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
  Int: k (Số lượng top-k HUIs cần tìm)
Output:
  List[List[String]]: TKHQ (Danh sách top-k HUIs)
"""
def TKN_top_k_high_utility_itemsets(dataset, k):
  # Khởi tạo itemset rỗng:
  a = []

  # Khởi tạo tập PHN
  P, H, N = find_PHN(dataset)

  # Khởi tạo hàng đợi ưu tiên cho top-k HUIs:
  TKHQ = {}

  # Khởi tạo minU = 1:
  minU = 1

  # Tính toán PTWU cho tất cả item (1-itemsets):
  PTWU_1_itemsets = calculate_PTWU_1_itemsets(dataset)

  # PRIU_list cho các item không thuộc N:
  PRIU_list = calculate_PRIU_list(dataset)

  # Nâng minU mới từ PRIU_strategy:
  minU = PRIU_strategy(PRIU_list, k, minU)

  # Tạo processing_order là các item đã được sắp xếp theo định nghĩa:
  processing_order = create_processing_order(P, H, N, PTWU_1_itemsets)

  # Sắp xếp lại các item trong dataset theo processing_order:
  dataset = sort_dataset(dataset, processing_order)

  # Tạo tập Secondary(a):
  secondary_a = find_secondary_a(PTWU_1_itemsets, minU)

  # Sắp xếp lại Secondary(a) theo processing_order:
  secondary_a = sort_list(secondary_a, processing_order)

  # Scan dataset với Secondary(a):
  scanned_dataset = scan_dataset(dataset, secondary_a)

  # Tính PSU(a, x) với x thuộc Secondary(a):
  PSU = {}
  for x in secondary_a:
    PSU[tuple(a + [x])] = calculate_PSU(a, x, processing_order, scanned_dataset, N)

  # Tính LIUS từ các item không thuộc N trong dataset:
  LIUS = create_LIUS(processing_order, scanned_dataset, N)

  # Tạo priority queue PIQU_LIU từ LIUS và minU:
  PIQU_LIU = create_PIQU_LIU(LIUS, k)

  # Nâng giá trị của minU bằng PLIU_E strategy:
  minU = PLIU_E_strategy(PIQU_LIU, k, minU)

  # Primary(a):
  primary_a = calculate_primary_a(a, secondary_a, PSU, minU)

  # Sắp xếp lại N:
  N = sort_list(N, processing_order)

  # Tìm top-k HUIs bằng hàm Search:
  Search(a, N, primary_a, secondary_a, minU, TKHQ, processing_order, k, dataset)

  TKHQ = dict(sorted(TKHQ.items(), key=lambda item: item[1], reverse=True))
  return TKHQ

-- Hàm in kết quả thuật toán TKN và trả về thời gian chạy:

In [46]:
"""
Hàm in kiểm tra các itemsets thuộc danh sách Top-k High-Utility Itemsets (HUIs).
Input:
  Dictionary: dataset = {Tid(String): [[Items(String)], [Quantities(Int)], [Profits(Int/Double)]]}
  Int: k (Số lượng Top-k HUIs cần in ra)
Output:
  In danh sách Top-k HUIs kèm theo giá trị utility của từng itemset.
"""
import time
def TKN_print(dataset, k):
  start = time.time()
  TKHQ = TKN_top_k_high_utility_itemsets(dataset, k)
  end = time.time()

  print("- TKN Algorithm result:")
  index = 1
  for itemset, utility in TKHQ.items():
    print(f"{index}. Itemset: {'; '.join(itemset)}")
    print(f"Utility: {utility}")
    print()
    index += 1
    if index == k + 1:
      break

  TKN_time = end - start

  return TKN_time

-- Thực hiện chạy TKN trên bộ dữ liệu nhỏ:

In [47]:
# INPUT:
dataset = TKN_dataset
TopK = 5

# Gọi hàm print kết quả:
TKN_time = TKN_print(dataset, TopK)
print("-> Time of TKN Algorithm = ", TKN_time)

AttributeError: 'str' object has no attribute 'items'