In [1]:
class ShortTimePeriodTKHUPI:
    def __init__(self, file_path: str, max_period: int, k: int, threshold: float):
        """
        Initializes the Dataset object with necessary parameters.

        Parameters:
        ----------
        file_path : str
            The path to the input dataset file.
            
        max_period : int
            The maximum allowable period for itemsets. Itemsets with a period greater than this value are pruned.
            
        threshold : float
            The threshold for calculating the minimum expected utility. 
            Itemsets with expected utility below this value are considered low utility.
            
        k : int
            The number of top itemsets to retrieve based on the expected utility.
        """
        self.file_path = file_path
        self.max_period = max_period
        self.threshold = threshold
        self.min_util = 0
        self.k = k
        self.data = []
        self.total_rows = 0

    def read_data(self):
        try:
            with open(self.file_path, 'r') as file:
                for line in file:
                    parsed_row = self.parse_row(line.strip())
                    if parsed_row:
                        self.data.append(parsed_row)
                        self.total_rows += 1
        except FileNotFoundError:
            print(f"Error: File '{self.file_path}' not found.")
        except Exception as e:
            print(f"An error occurred: {e}")

    def parse_row(self, row):
        try:
            parts = row.split(":")
            products = parts[0].split()
            total_utility = int(parts[1])
            utilities = list(map(int, parts[2].split()))
            timestamp = int(parts[3])

            return {
                "products": products,
                "total_utility": total_utility,
                "utilities": utilities,
                "timestamp": timestamp
            }
        except (IndexError, ValueError):
            return None
        
    def filter_transactions_by_time(self, start_time, end_time):
        """Filters transactions based on a given start and end timestamp."""
        filtered_data = []
        for row in self.data:
            if start_time <= row["timestamp"] <= end_time:
                filtered_data.append(row)
        self.data = filtered_data
        self.total_rows = len(filtered_data)
    
    def find_occurrences(self, itemset):
        """Finds all periods and utilities where the itemset occurs."""
        # Hàm này chưa tối ưu
        occurrences = []
        for index, row in enumerate(self.data):
            utilities = []
            for item in itemset:
                if item in row["products"]:
                    i = row["products"].index(item)
                    utilities.append(row["utilities"][i])
                else:
                    utilities.clear()
                    break
            if utilities:
                occurrences.append((index+1, utilities, row["total_utility"]))
        return occurrences
    
    def calculate_db_utility(self):
        """Calculates utility of dataset."""
        return sum(row["total_utility"] for row in self.data)

    def calculate_max_period(self, indices):
        """Calculates the Maximum Period (M.P(X)) from a list of itemset's periods."""
        if len(indices) < 2:
            return float('inf')  # Infinite period if there's only one occurrence
      
        periods = []
        for i in range(len(indices) - 1):
            if not periods:
                periods.append(indices[i] - 0)
            periods.append(indices[i+1] - indices[i])
        return max(periods) if periods else 0

    def calculate_probability(self, occurrences):
        """Calculates the probability of an itemset."""
        return len(occurrences) / self.total_rows

    def calculate_expected_utility(self, occurrences):
        """Calculates the expected utility of an itemset based on its occurrences."""
        if not occurrences:
            return 0
        
        total_expected_util = 0
        for _, utilities, total_util in occurrences:
            util = sum(u for u in utilities)
       
            probability = util / total_util
            expected_util = probability * util
          
            total_expected_util += expected_util
        return total_expected_util            
    
    def dfs_generate_itemsets(self, transactions, current_itemset, index, all_itemsets, seen_itemsets):
        """
        Recursive DFS function to generate itemsets within the short-time period dataset.
        Avoids duplicate itemsets using a set to track seen combinations.
        """
        
        if current_itemset:
            """Calculate occurrences and validate the current itemset."""
            occurrences = self.find_occurrences(current_itemset)
            if occurrences:
                period = [t[0] for t in occurrences]
                mp_x = self.calculate_max_period(period)
                if mp_x > self.max_period:
                    print('Remove explore search from this itemset:')
                    print(current_itemset)
                    print("Because M.P(X): " + str(mp_x) + " > max_per: " + str(self.max_period))
                    print('---------------------------------------------------------------------------')
                    return # không mở rộng không gian tìm kiếm từ current_itemset, quay lại lần đệ quy trước
                
                expected_util = self.calculate_expected_utility(occurrences)
                if expected_util < self.min_util:
                    print('Remove explore search from this itemset:')
                    print(current_itemset)
                    print("Because EU(X): " + str(expected_util) + " < min_util: " + str(self.min_util))
                    print('---------------------------------------------------------------------------')
                    return
                
            # all_itemsets.append(current_itemset[:])
            # Sort the current itemset to ensure unique combinations (e.g., ['a', 'b'] == ['b', 'a'])
            sorted_itemset = tuple(sorted(current_itemset))
            if sorted_itemset not in seen_itemsets:
                all_itemsets.append(sorted_itemset)
                seen_itemsets.add(sorted_itemset)

        for i in range(index, len(transactions)):
            next_items = transactions[i]["products"]
            for item in next_items:
                if item not in current_itemset:  # Avoid duplicates in the itemset
                    current_itemset.append(item)
                    self.dfs_generate_itemsets(transactions, current_itemset, i + 1, all_itemsets, seen_itemsets)
                    current_itemset.pop()  # Backtrack to try other combinations


    def generate_itemsets_with_dfs(self):
        """
        Wrapper function to generate all unique itemsets from the short-time dataset using DFS.
        """
        all_itemsets = []
        transactions = self.data
        seen_itemsets = set()  # To track already seen itemsets
        self.dfs_generate_itemsets(transactions, [], 0, all_itemsets, seen_itemsets)
        
        return all_itemsets
    
    def find_top_k_stp_hupi(self):
        all_itemsets = self.generate_itemsets_with_dfs()
    
        results = []
        for itemset in all_itemsets:
            occurrences = self.find_occurrences(itemset)
            expected_utility = self.calculate_expected_utility(occurrences)
            period = [t[0] for t in occurrences]
            mp_x = self.calculate_max_period(period)
            
            
            if mp_x <= self.max_period and expected_utility >= self.min_util:
                results.append({
                    "itemset": itemset,
                    "max_period": mp_x,
                    "expected_utility": expected_utility
                })
                
        sorted_results = sorted(results, key=lambda x: x["expected_utility"], reverse=True)
        return sorted_results[:self.k]
    
    def run(self):
        self.read_data()
        db_util = self.calculate_db_utility()
        self.min_util = self.threshold * db_util
        print("Transactions utility: " + str(db_util))
        print("Minimum utility: " + str(self.min_util))
        print("Total rows: " + str(self.total_rows))
        print('---------------------------------------------------------------------------')
        top_k_STP_HUPI = self.find_top_k_stp_hupi()
        print('---------------------------------------------------------------------------')
        print("Top-" + str(self.k) + " ST-TKHUPI:")
        for itemset in top_k_STP_HUPI:
            print(f"Itemset: {itemset['itemset']}, "
                  f"Expected Utility: {itemset['expected_utility']:.2f}, " 
                  f"Max Period: {itemset['max_period']}")
 


In [2]:
file_path = 'datasets/test.txt'

if __name__ == "__main__":
    stp_tkhupi = ShortTimePeriodTKHUPI(file_path=file_path, max_period=10, threshold=0.01, k=3)
    stp_tkhupi.run()

    # Define the time window
    # start_date = "2023-01-01 00:00:00"
    # end_date = "2023-01-15 23:59:59"
    # start_timestamp = int(datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S").timestamp())
    # end_timestamp = int(datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S").timestamp())

    # start = 129116328
    # end = 129116364

    # Filter the dataset by time window 
    # stp_tkhupi.filter_transactions_by_time(start, end)
    # print(f"Filtered dataset size: {stp_tkhupi.total_rows}")  
     

Transactions utility: 1425
Minimum utility: 14.25
Total rows: 14
---------------------------------------------------------------------------
Remove explore search from this itemset:
['1']
Because EU(X): 10.740175007128599 < min_util: 14.25
---------------------------------------------------------------------------
Remove explore search from this itemset:
['2']
Because M.P(X): 12 > max_per: 10
---------------------------------------------------------------------------
Remove explore search from this itemset:
['3']
Because EU(X): 3.0772486474585596 < min_util: 14.25
---------------------------------------------------------------------------
Remove explore search from this itemset:
['1']
Because EU(X): 10.740175007128599 < min_util: 14.25
---------------------------------------------------------------------------
Remove explore search from this itemset:
['4']
Because M.P(X): inf > max_per: 10
---------------------------------------------------------------------------
Remove explore search