In [21]:
from collections import defaultdict
from typing import List, Set, Dict
import math
import copy

In [22]:
class Item:
    def __init__(self, name):
        self.name = name[0].upper()

    def __repr__(self):
        return f"Item({self.name})"

    def __eq__(self, other):
        if isinstance(other, Item):
            return self.name == other.name
        return False

    def __ne__(self, other):
        return not self.__eq__(other)

    def __hash__(self):
        return hash(self.name)

In [23]:
class Itemset:
    def __init__(self, items=""):
        items = "".join(set(items.upper()))
        self.items = "".join(sorted(items))

    def __repr__(self):
        return f"Itemset({self.items})"

    def __eq__(self, other):
        if isinstance(other, Itemset):
            return self.items == other.items
        return False

    def __ne__(self, other):
        return not self.__eq__(other)

    def __hash__(self):
        return hash(self.items)

    def __contains__(self, item):
        if isinstance(item, Item):
            return item.name in self.items
        else:
            raise ValueError()

    def __iter__(self):
        for item in self.items:
            yield Item(item)

    def __le__(self, other):
        if isinstance(other, Itemset):
            return set(self.items) <= set(other.items)
        elif isinstance(other, Transaction):
            return self <= other.itemset()
        else:
            raise ValueError()

    def union(self, other):
        if isinstance(other, Item):
            self.items = Itemset(self.items + other.name).items
        elif isinstance(other, Itemset):
            self.items = Itemset(self.items + other.items).items
        else:
            raise ValueError()

In [24]:
class Transaction:
    def __init__(self, d={}):
        self.__d = {}
        if isinstance(d, dict):
            for item, value in d.items():
                if isinstance(item, Item) and isinstance(value, int):
                    self.__d[item] = value
                else:
                    raise ValueError()
        else:
            raise ValueError()

    def __repr__(self):
        items_repr = ', '.join(f'{repr(item)}: {count}' for item, count in self.__d.items())
        return f"Transaction({items_repr})"

    def __getitem__(self, item):
        if isinstance(item, Item):
            return self.__d[item]
        else:
            raise ValueError()

    def __setitem__(self, item, value):
        if isinstance(item, Item) and isinstance(value, int):
            self.__d[item] = value
        else:
            raise ValueError()

    def __iter__(self):
        return iter(self.__d.items())

    def __contains__(self, item):
        if isinstance(item, Item):
            return item in self.__d
        else:
            raise ValueError()

    def __le__(self, other):
        if isinstance(other, Transaction):
            return self.itemset() <= other.itemset()
        else:
            raise ValueError()

    def items(self):
        return list(self.__d.keys())

    def itemset(self):
        return Itemset("".join(item.name for item in self.__d.keys()))

In [25]:
class Database:
    def __init__(self, transaction_table={}, external_utility_table={}):
        self.transaction_table = {}
        self.external_utility_table = {}

        if isinstance(transaction_table, dict):
            for tid, transaction in transaction_table.items():
                if isinstance(tid, str) and isinstance(transaction, Transaction):
                    self.transaction_table[tid] = transaction
                else:
                    raise ValueError()
        else:
            raise ValueError()

        if isinstance(external_utility_table, dict):
            for item, external_utility in external_utility_table.items():
                if isinstance(item, Item) and isinstance(external_utility, int):
                    self.external_utility_table[item] = external_utility
                else:
                    raise ValueError()
        else:
            raise ValueError()

    def __repr__(self):
        transaction_entries = '\n    '.join(f'{repr(tid)}: {repr(transaction)}' for tid, transaction in self.transaction_table.items())
        external_utility_entries = '\n    '.join(f'{repr(item)}: {external_utility}' for item, external_utility in self.external_utility_table.items())

        return (f"Database(\n"
                f"  transaction_table={{\n    {transaction_entries}\n  }},\n"
                f"  external_utility_table={{\n    {external_utility_entries}\n  }}\n"
                f")")

    def __getitem__(self, tid):
        if isinstance(tid, str):
            return self.transaction_table[tid]
        else:
            raise ValueError()

    def __setitem__(self, tid, transaction):
        if isinstance(tid, str) and isinstance(transaction, Transaction):
            self.transaction_table[tid] = transaction
        else:
            raise ValueError()

    def __iter__(self):
        return iter(self.transaction_table.items())

    def tids(self):
        return list(self.transaction_table.keys())

    # Definition 1: Internal utility
    def q(self, item, tid):
        return self[tid][item]

    # Definition 2: External utility
    def p(self, item):
        return self.external_utility_table[item]

    def u(self, *args):
        # Definition 3: Utility of an item in a transaction
        if len(args) == 2 and isinstance(args[0], Item) and isinstance(args[1], str):
            item, tid = args
            return self.q(item, tid) * self.p(item)

        # Definition 4: Utility of an itemset in a transaction
        elif len(args) == 2 and isinstance(args[0], Itemset) and isinstance(args[1], str):
            itemset, tid = args
            if itemset <= self[tid]:
                return sum(self.q(item, tid) * self.p(item) for item in itemset)
            else:
                raise ValueError("Itemset not found in transaction")

        # Definition 5: Utility of an itemset in the database
        elif len(args) == 1 and isinstance(args[0], Itemset):
            itemset = args[0]
            s = 0
            for tid, transaction in self.transaction_table.items():
                if itemset <= transaction:
                    s += sum(self.q(item, tid) * self.p(item) for item in itemset)
            return s

        else:
            raise ValueError()

    # Definition 6: Transaction utility
    def tu(self, tid):
        return sum(self.q(item, tid) * self.p(item) for item in self[tid].items())

    # Definition 7: Total utility of the database
    def TU(self):
        return sum(self.tu(tid) for tid in self.transaction_table)

In [26]:
class ULElem:
    def __init__(self, TID=None, tns=None, item_utility=None):
        self.TID = TID
        self.tns = tns
        self.item_utility = item_utility

    def __repr__(self):
        return f"ULElem(TID={repr(self.TID)}, tns={repr(self.tns)}, item_utility={repr(self.item_utility)})"

In [27]:
class UTList:
    def __init__(self, item_name: Item):
        self.item_name = item_name
        self.SINS = 0
        self.sum_utility = 0
        self.ULElems = []

    def __repr__(self):
        ulelems_repr = ',\n    '.join(repr(ulelem) for ulelem in self.ULElems)
        return (f"UTList(\n"
                f"  item_name={repr(self.item_name)},\n"
                f"  SINS={self.SINS},\n"
                f"  sum_utility={self.sum_utility},\n"
                f"  ULElems=[\n    {ulelems_repr}\n  ]\n"
                f")")

def SINS(sensitive_item: Item, NS: Set[Itemset]) -> int:
    SINS = sum(1 for itemset in NS if sensitive_item in itemset)
    return SINS

def tns(NS: Set[Itemset], transaction: Transaction) -> float:
    NSI_num = sum(1 for itemset in NS if itemset <= transaction)
    tns = 1 / (1 + NSI_num)
    return tns

In [28]:
class UTLDic:
    def __init__(self, D: Database, S: Set[Itemset], NS: Set[Itemset], delta: int):
        self.D = D
        self.S = S
        self.NS = NS
        self.delta = delta

        self.__UTLDic = self.__construct_UTLDic()

    def __construct_UTLDic(self) -> Dict[Item, UTList]:
        # Initialize SItem
        SItem = Itemset()

        # Get union of all sensitive high-utility itemsets
        for itemset in self.S:
            SItem.union(itemset)

        # Initialize UTLDic
        utldic = {}
        for item in SItem:
            # Initialize UTList for each item
            UTL = UTList(item)

            # Calculate SINS for each item
            UTL.SINS = SINS(item, self.NS)

            utldic[item] = UTL

        # Scan database D
        for tid, transaction in self.D:
            # Get set of sensitive items in the transaction
            SI = set()
            SI.update(*[itemset for itemset in self.S if itemset <= transaction])

            # Construct UTList for each sensitive item
            for item in SI:
                ULE = ULElem()
                ULE.TID = tid
                ULE.item_utility = self.D.u(item, tid)
                ULE.tns = tns(self.NS, transaction)
                utldic[item].ULElems.append(ULE)
                utldic[item].sum_utility += ULE.item_utility

        return utldic

    def __repr__(self):
        utldic_repr = ',\n  '.join(f'{repr(item)}: {repr(utlist)}' for item, utlist in self.__UTLDic.items())
        return f"UTLDic(\n  {utldic_repr}\n)"

    def __getitem__(self, item):
        if isinstance(item, Item):
            return self.__UTLDic[item]
        else:
            raise ValueError()

    def __setitem__(self, item, utlist):
        if isinstance(item, Item) and isinstance(utlist, UTList):
            self.__UTLDic[item] = utlist
        else:
            raise ValueError()

    def L(self, itemset: Itemset):
        return set.intersection(*[set(elem.TID for elem in self[item].ULElems) for item in itemset])

In [29]:
def hide_sensitive_high_utility_itemsets(utldic: UTLDic) -> UTLDic:
        sanitized_utldic = copy.deepcopy(utldic)

        # Sort S in descending order of u(Si) (Si in S)
        S_sorted = sorted(sanitized_utldic.S, key=lambda x: sanitized_utldic.D.u(x), reverse=True)

        for Si in S_sorted:
            # Sort Si in ascending order of SINS(item) (item in Si)
            Si_sorted = sorted(Si, key=lambda x: sanitized_utldic[x].SINS)

            # Calculate l = L(Si) according to Definition 19
            l = sanitized_utldic.L(Si)

            # Calculate target utility to be reduced
            target_util = sanitized_utldic.D.u(Si) - sanitized_utldic.delta + 1

            while target_util > 0:
                for item in Si_sorted:
                    # Sort ULElems of UTlist order by tns desc and utility asc
                    sanitized_utldic[item].ULElems.sort(key=lambda x: (-x.tns, x.item_utility))

                    for elem in sanitized_utldic[item].ULElems:
                        if (elem.TID in l) and (target_util > 0):
                            reduced_utility = 0

                            if elem.item_utility <= target_util:
                                target_util -= elem.item_utility
                                reduced_utility = elem.item_utility
                                elem.item_utility = 0
                            else:
                                count = sanitized_utldic.D.q(item, elem.TID) - math.ceil(target_util / sanitized_utldic.D.p(item))
                                reduced_utility = elem.item_utility - count * sanitized_utldic.D.p(item)
                                elem.item_utility = count * sanitized_utldic.D.p(item)
                                target_util = 0

                            # update utldic[item].sum_utility
                            sanitized_utldic[item].sum_utility -= reduced_utility

        return sanitized_utldic

In [30]:
transaction_table = {
    'T1': Transaction({Item('C'): 7, Item('D'): 1, Item('E'): 1}),
    'T2': Transaction({Item('A'): 1, Item('C'): 2, Item('E'): 2}),
    'T3': Transaction({Item('B'): 6, Item('C'): 4, Item('D'): 3, Item('E'): 7}),
    'T4': Transaction({Item('B'): 5, Item('C'): 3, Item('D'): 9}),
    'T5': Transaction({Item('A'): 3, Item('C'): 10, Item('D'): 3}),
    'T6': Transaction({Item('C'): 5, Item('E'): 9}),
    'T7': Transaction({Item('A'): 6, Item('C'): 9, Item('D'): 2, Item('E'): 5}),
    'T8': Transaction({Item('A'): 1, Item('B'): 6, Item('C'): 2, Item('D'): 5, Item('E'): 3})
}

external_utility_table = {Item('A'): 9, Item('B'): 11, Item('C'): 4, Item('D'): 6, Item('E'): 7}

D = Database(transaction_table, external_utility_table)


HUIs = {Itemset('ACDE'), Itemset('BCDE'), Itemset('BC'),
        Itemset('ACD'), Itemset('BCE'), Itemset('CDE'),
        Itemset('BE'), Itemset('BD'), Itemset('CE'),
        Itemset('BDE'), Itemset('BCD'), Itemset('CD')}

S = {Itemset('ACD'), Itemset('BC')}

NS = HUIs - S

delta = 200  # Minimum utility threshold

---

Kiểm tra độ chính xác của cột **tu** trong Table **1**

In [31]:
D.tu('T1'), D.tu('T2'), D.tu('T3'), D.tu('T4'), D.tu('T5'), D.tu('T6'), D.tu('T7'), D.tu('T8')

(41, 31, 149, 121, 85, 83, 137, 134)

Kiểm tra độ chính xác của Table **3**

In [32]:
{itemset: D.u(itemset) for itemset in HUIs}

{Itemset(BC): 223,
 Itemset(BE): 202,
 Itemset(BCD): 325,
 Itemset(CE): 305,
 Itemset(CD): 278,
 Itemset(CDE): 266,
 Itemset(BD): 289,
 Itemset(BCE): 226,
 Itemset(BCDE): 274,
 Itemset(BDE): 250,
 Itemset(ACD): 234,
 Itemset(ACDE): 205}

Kiểm tra độ chính xác của Figure **4**

In [33]:
utldic = UTLDic(D, S, NS, delta)
utldic

UTLDic(
  Item(A): UTList(
  item_name=Item(A),
  SINS=1,
  sum_utility=90,
  ULElems=[
    ULElem(TID='T5', tns=0.5, item_utility=27),
    ULElem(TID='T7', tns=0.2, item_utility=54),
    ULElem(TID='T8', tns=0.09090909090909091, item_utility=9)
  ]
),
  Item(B): UTList(
  item_name=Item(B),
  SINS=6,
  sum_utility=187,
  ULElems=[
    ULElem(TID='T3', tns=0.1, item_utility=66),
    ULElem(TID='T4', tns=0.25, item_utility=55),
    ULElem(TID='T8', tns=0.09090909090909091, item_utility=66)
  ]
),
  Item(C): UTList(
  item_name=Item(C),
  SINS=7,
  sum_utility=112,
  ULElems=[
    ULElem(TID='T3', tns=0.1, item_utility=16),
    ULElem(TID='T4', tns=0.25, item_utility=12),
    ULElem(TID='T5', tns=0.5, item_utility=40),
    ULElem(TID='T7', tns=0.2, item_utility=36),
    ULElem(TID='T8', tns=0.09090909090909091, item_utility=8)
  ]
),
  Item(D): UTList(
  item_name=Item(D),
  SINS=7,
  sum_utility=60,
  ULElems=[
    ULElem(TID='T5', tns=0.5, item_utility=18),
    ULElem(TID='T7', tns=0.2

Kiểm tra độ chính xác của Figure **6**

In [34]:
sanitized_utldic = hide_sensitive_high_utility_itemsets(utldic)
sanitized_utldic

UTLDic(
  Item(A): UTList(
  item_name=Item(A),
  SINS=1,
  sum_utility=54,
  ULElems=[
    ULElem(TID='T5', tns=0.5, item_utility=0),
    ULElem(TID='T7', tns=0.2, item_utility=45),
    ULElem(TID='T8', tns=0.09090909090909091, item_utility=9)
  ]
),
  Item(B): UTList(
  item_name=Item(B),
  SINS=6,
  sum_utility=154,
  ULElems=[
    ULElem(TID='T4', tns=0.25, item_utility=22),
    ULElem(TID='T3', tns=0.1, item_utility=66),
    ULElem(TID='T8', tns=0.09090909090909091, item_utility=66)
  ]
),
  Item(C): UTList(
  item_name=Item(C),
  SINS=7,
  sum_utility=112,
  ULElems=[
    ULElem(TID='T5', tns=0.5, item_utility=40),
    ULElem(TID='T4', tns=0.25, item_utility=12),
    ULElem(TID='T7', tns=0.2, item_utility=36),
    ULElem(TID='T3', tns=0.1, item_utility=16),
    ULElem(TID='T8', tns=0.09090909090909091, item_utility=8)
  ]
),
  Item(D): UTList(
  item_name=Item(D),
  SINS=7,
  sum_utility=60,
  ULElems=[
    ULElem(TID='T5', tns=0.5, item_utility=18),
    ULElem(TID='T7', tns=0.2,