# Ex 3a: TIMESERIES PATTERN TREE
## Build a rule-based partitioning tree

## Dataset import

In [14]:
import pandas as pd
import random

## load the datasets
dataset_path = "UCRArchive_2018/MedicalImages/MedicalImages_TRAIN.tsv"
dataset = pd.read_csv(dataset_path, sep="\t", header=None)
dataset.head(3)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,90,91,92,93,94,95,96,97,98,99
0,10,-0.680044,-0.15254,0.463497,1.125701,1.784473,2.387815,2.88659,3.239594,3.417875,...,-0.669457,-0.669649,-0.667227,-0.663867,-0.659006,-0.653538,-0.648224,-0.643643,-0.640175,-0.637993
1,4,2.232027,2.863343,3.229689,3.284083,3.025244,2.488196,1.742625,0.890124,0.062662,...,-0.697279,-0.702275,-0.707983,-0.714786,-0.722466,-0.730555,-0.737959,-0.743156,-0.744424,-0.7412
2,10,2.746582,2.727073,2.688213,2.630574,2.554928,2.46224,2.353654,2.230476,2.094162,...,-0.474689,-0.403397,-0.331573,-0.260273,-0.190542,-0.123397,-0.059818,-0.000729,0.053012,0.100628


## Node class definition

In [15]:
## class Node
class Node:
    def __init__(self):
        self.rule = None # rule to split the data
        self.association_rules = None
        self.class_label = None
        self.information_gain = 0.0
        self.true_branch = None # the next node if the rule is true
        self.false_branch = None # the next node if the rule is false
        self.dataset_length = 0
        self.ts = None

## Support function

example of checking a pattern:

transaction = [1.0, 3.5, 2.1, 5.0, 4.2]                   
pattern = [(3.0, True, 1, None), (2.0, False, 2, None)]        
pattern_matched(pattern, transaction)

Result:                                          
At index 1, check if value 3.5 > 3.0 → ✅ True                 
At index 2, check if value 2.1 < 2.0 → ❌ False                
Result: False (pattern does not match transaction)

In [16]:
def calculate_support(dataset, pattern): #how frequent the pattern is in the dataset
    dataset_list = dataset.iloc[:, 1:].values.tolist()
    count = 0
    for transaction in dataset_list:
        if pattern_matched(pattern, transaction):
            count += 1
    return count / len(dataset)

#Check if a pattern is present in the given time series transaction
def pattern_matched(sequence, transaction):
    for value, is_greater, index, _ in sequence: #index is the position of the pattern in the transaction
        if index >= len(transaction):
            return False
        if is_greater:
            if not transaction[index] > value:
                return False
        else:
            if not transaction[index] < value:
                return False
    return True

## Association rules extraction
### Find the best partial sequence (rule) that meets a given confidence threshold.
the new function returns the association rule before the one that gives a greater or equal confidence than the threshold.

In [17]:
def extract_association_rules(dataset, pts, confidence_threshold): #pts -> best candidate pattern from information gain
    sequence_support = calculate_support(dataset, pts)
    best_pts = None
    for i in range(1, len(pts)):
        antecedent = pts[:i]
        antecedent_support = calculate_support(dataset, antecedent)
        # Confidence measures how likely the full sequence (pts) appears given that the antecedent appeared.
        confidence = sequence_support / antecedent_support if antecedent_support > 0 else 0

        if confidence >= confidence_threshold:
            break

        best_pts = (pts, confidence, i) #i = split index

    return best_pts if best_pts else None # Return None if no such index is found

## Candidates generation

In [None]:
# time series sample function ensuring that the sampled time series is not already in the indexes list
def random_ts_sample(dataset, indexes):
    sampled = dataset.sample()
    while sampled.index[0] in indexes:
        sampled = dataset.sample()

    return sampled, sampled.index[0] + 1

# generate candidates rules meeting the min_support threshold
def generate_candidates(dataset, min_support):

    # initialize list of indexes
    I = list()

    # initialize list of candidate rules
    candidates = list()

    # initialize list of patterns
    pts = list()

    # generate candidates rules until the min_support threshold is met
    while len(pts) <= 2 or calculate_support(dataset, pts) >= min_support:

        if (len(pts) >= len(dataset.columns) - 1):
            break

        # sample random time series
        random_ts, ts_index = random_ts_sample(dataset, I)
        random_ts_class = random_ts[0].values[0]

        # first item is the class label, so we skip it
        ts_as_list = random_ts.values.tolist()[0][1:]

        # sample random index and add it to I
        random_index = random.randint(0, len(ts_as_list) - 1)
        while random_index in I: #ensure that the index is not already in the list
            random_index = random.randint(0, len(ts_as_list) - 1)
        I.append(random_index)

        # sample another random time series
        random_ts_prime, ts_prime_index = random_ts_sample(dataset, I)
        while ts_prime_index == ts_index: #check if is the same
            random_ts_prime, ts_prime_index = random_ts_sample(dataset, I)

        random_ts_prime_as_list = random_ts_prime.values.tolist()[0][1:]

        # generate a candidate pattern (value, is_greater, index, class), the class item is useful later to get the class label for the node
        candidate = (ts_as_list[random_index],
                    #It helps in identifying meaningful splits that separate data points effectively.
                    #By comparing two real samples, the algorithm ensures that the rule is based on actual variations in the dataset.
                    random_ts_prime_as_list[random_index] >= ts_as_list[random_index],
                    random_index, #feature index
                    random_ts_class)  #class label

        candidates.append(candidate)
        pts = sorted(candidates, key=lambda x: x[2]) #sort on feature index


    # remove last element because it does not satisfy the min_support condition
    return pts[:-1]

PTS now obtained can be used for:
1) Rule Evaluation: These candidate patterns are potential rules that could be evaluated for their usefulness in classification or clustering.
2) Tree Building: In the context of decision trees or other tree-based models, the patterns could be used as splitting rules at each node.

## Information gain functions
It measures how much the split reduces uncertainty, with higher information gain indicating a better split.

In [19]:
import math
from collections import Counter

def calculate_entropy(labels): # entropy is used to determine how mixed or pure the labels are in a dataset.
    label_counts = Counter(labels)
    total_count = len(labels)
    entropy = 0.0

    for count in label_counts.values():
        probability = count / total_count
        if probability > 0:
            entropy -= probability * math.log2(probability)

    return entropy

# Information gain tells how well a particular feature splits a set of labels into more homogeneous subsets.
# It's used to decide the best feature for splitting data at each step in building a decision tree.
def calculate_information_gain(parent_labels, true_labels, false_labels):
    parent_entropy = calculate_entropy(parent_labels) #before split
    true_entropy = calculate_entropy(true_labels) #labels on true branch after split
    false_entropy = calculate_entropy(false_labels) #label on false branch after split

    total_count = len(parent_labels)
    true_weight = len(true_labels) / total_count
    false_weight = len(false_labels) / total_count

    info_gain = parent_entropy - (true_weight * true_entropy + false_weight * false_entropy)
    return info_gain

## Dataset split functions 
Branches that satisfy the rules and branches that don't.        
rule or rules = pts

In [None]:
def matches_rule(ts, rule): #if a given time series (ts) matches a set of conditions defined by a rule
    #Condition True in rule means that the value should be less than the value in the rule
    for (value, condition, index, _) in rule:
        if index >= len(ts):
            return False
        if condition == True and ts[index] < value:
            return False
        elif condition == False and ts[index] > value:
            return True
    return True

def dataset_split(dataset, rules): #se la rule è rispettata manda il time series in true_subset altrimenti in false_subset
    dataset_list = dataset.iloc[:, :].values.tolist()
    false_subset = []
    true_subset = []

    for ts in dataset_list:
        if matches_rule(ts[1:], rules):
            true_subset.append(ts)
        else:
            false_subset.append(ts)

    return false_subset, true_subset

## New tree generation
this version generates **n** candidates, where **n** is user-inserted, then for each candidate takes only the one that has the best information gain.
If there is no best candidate or information gain higher the  the min_gain, the algorithm is done 

In [21]:
def rpts_tree(Ts, labels, n_candidates, max_height=float('inf'), MIN_SUPPORT=None, min_samples=1, depth=0):
    min_gain = 0.01 #minimum information gain to make the split
    CONFIDENCE_THRESHOLD = 0.7 #minimum confidence threshold for association rule extraction

    if depth >= max_height or len(Ts) < min_samples:
        return None

    # create a new node
    v = Node()
    v.ts = Ts

    # get the most common label
    v.class_label = Counter(labels).most_common(1)[0][0]
    if not any(label == v.class_label for label in labels): # If all labels are the same, there's no need to split further
        return v

    # generate candidates
    all_candidates = []
    for _ in range(n_candidates):
        pts = generate_candidates(dataset, MIN_SUPPORT)
        all_candidates.append(pts)

    best_candidate = None
    best_info_gain = -1
    best_true_subset = None
    best_false_subset = None

    # find the best candidate
    for pts in all_candidates:

        true_subset, false_subset = dataset_split(Ts, pts)
        if not true_subset or not false_subset:
            continue

        labels_true = [ts[0] for ts in true_subset]
        labels_false = [ts[0] for ts in false_subset]

        ig = calculate_information_gain(labels, labels_true, labels_false)

        if ig > best_info_gain: #candidate with the highest information gain is selected as best one
            best_info_gain = ig
            best_candidate = pts
            best_true_subset = true_subset
            best_false_subset = false_subset

    if best_candidate is None or best_info_gain < min_gain: #no split improves the purity of the dataset
        print("No split improves the purity of the dataset")
        return v

    v.rule = best_candidate
    v.information_gain = best_info_gain
    v.dataset_length = len(Ts) # number of time series in each node -> how the dataset is split

    association_rules = extract_association_rules(dataset, best_candidate, CONFIDENCE_THRESHOLD)

    if association_rules is not None:
        pattern, confidence, split_index = association_rules #pattern = pts
        # add association rule to node
        v.association_rules = "Antecedent:" + str(pattern[:split_index]) + "  ==> Consequent:" + str(pattern[split_index:]) + " Confidence:" + str(round(confidence, 2)) + " Split Index:" + str(split_index)
        final_candidates = pattern[:split_index] + pattern[split_index:]
    else:
        final_candidates = pts

    if not final_candidates:
        print("No final candidates")
        return v

    v.rule = final_candidates
    print(f"Depth: {depth}, Rule: {v.rule if v.association_rules is None else  v.association_rules}, Information Gain: {v.information_gain:.4f}, Class Label: {v.class_label}")
    v.dataset_length = len(Ts)

    # recursively build and split the tree
    v.true_branch = rpts_tree(pd.DataFrame(best_true_subset), labels_true, n_candidates, max_height, MIN_SUPPORT, min_samples, depth + 1)
    v.false_branch = rpts_tree(pd.DataFrame(best_false_subset), labels_false, n_candidates,  max_height, MIN_SUPPORT, min_samples, depth + 1)

    return v

## Tree printing
Recursive function to print the structure of the decision tree starting from the root. For each branch we print informations about the child nodes 

In [22]:
def print_tree(node, depth=0):
    if node is None:
        return

    # Check and skip nodes where information gain is 0
    if hasattr(node, 'information_gain') and node.information_gain == 0.0:
        return

    print("\nDepth: ", depth)
    indent = "  " * depth
    print(f"{indent}Node class: {node.class_label}")
    print(f"{indent}Dataset Length: {node.dataset_length}")

    if hasattr(node, 'rule') and node.rule:
        print(f"{indent}Rule Length: {len(node.rule)}")

    if hasattr(node, 'information_gain'):
        # Check if information_gain is not None before printing
        if node.information_gain is not None:
            print(f"{indent}Information Gain: {node.information_gain:.4f}")
        else:
            print(f"{indent}Information Gain: None")

    if node.true_branch or node.false_branch:
        # Print true branch if it exists and has significant information gain
        if node.true_branch and (not hasattr(node.true_branch, 'information_gain') or node.true_branch.information_gain != 0.0):
            print(f"{indent}True Branch:")
            print_tree(node.true_branch, depth + 1)

        # Print false branch if it exists and has significant information gain
        if node.false_branch and (not hasattr(node.false_branch, 'information_gain') or node.false_branch.information_gain != 0.0):
            print(f"{indent}False Branch:")
            print_tree(node.false_branch, depth + 1)


## Tree informations
This function is used to visualize the time series data for a particular node in the decision tree.
It provides a visual representation of the time-series data, where each series is colored according to its class, making it easier to identify patterns or splits based on class.
The HTML file output allows for interactive exploration of the time series. 
Each file .html visualizes a different subset of time series at the different levels.                                   
node root -> all time-series and classes, last nodes -> maybe one class only


In [23]:
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd

def print_ts(df, l, depth, branch):
    # Get unique classes from the DataFrame
    classes = df.iloc[:, 0].unique()

    # Define a color palette with enough colors for all unique classes
    colors = px.colors.qualitative.Plotly[:len(classes)]  # Using Plotly's default qualitative color palette
    if len(classes) > len(colors):
        colors = px.colors.qualitative.Plotly * (len(classes) // len(px.colors.qualitative.Plotly) + 1)

    fig = go.Figure()
    added_classes = []  # To track which classes have been added to the plot for legend purposes

    for idx in l:
        ts = df.iloc[idx, 1:]  # Assuming the first column is the class and we skip it for the data
        classe = df.iloc[idx, 0]  # Class is in the first column

        # Define the legend group based on class
        legend_group = f'class_{classe}'

        # Determine whether to show in legend, avoid duplicates
        show_in_legend = classe not in added_classes

        # Assign color based on class
        color_idx = list(classes).index(classe)
        color = colors[color_idx]

        # Add trace to the figure
        fig.add_trace(go.Scatter(y=ts, mode='lines',
                                 line=dict(color=color),
                                 name=f'Class {classe}',
                                 legendgroup=legend_group,
                                 showlegend=show_in_legend))

        # Remember that this class has been added
        if show_in_legend:
            added_classes.append(classe)

    fig.update_layout(title='Visualizzazione Time-Series',
                      xaxis_title='Tempo',
                      yaxis_title='Ampiezza',
                      legend_title='Class')

    # Save the figure as an HTML file
    fig.write_html(f"timeseries_output/timeseries_node{depth}_{branch}.html")

## Tree generation and visualization

* Node = split in dataset
* Edges = true or false
* Labels :
    * Class
    * IG, how much the split improves the classification
    * Dataset Length, number of ts in that node

In [24]:
from graphviz import Digraph
def visualize_tree(node, ts_list=[], graph=None, depth=0, parent_name=None, branch_label=None):

    # ts_list = []
    if graph is None:
        graph = Digraph()
        root_label = f'Class: {node.class_label}\nIG: {node.information_gain:.4f}' if node.information_gain is not None else f'Class: {node.class_label}\nIG: None\nDataset Length: {node.dataset_length}'
        graph.node(name='root', label=root_label)
        parent_name = 'root'

    #time_series = [i for i in range(node.dataset_length)]
    current_name = f'node_{depth}_{id(node)}'
    ig_label = f'{node.information_gain:.4f}' if node.information_gain is not None else 'None'
    #rule_label = f'{node.rule}' if node.rule else 'None'
    label = f'Class: {node.class_label}\nIG: {ig_label}\nDataset Length: {node.dataset_length}'

    # Only add node if information gain is not zero
    if node.information_gain is not None and node.information_gain != 0:
        graph.node(name=current_name, label=label)
        if node.ts is not None:
            ts_list.append((node.ts, branch_label))
        if parent_name:
            edge_label = branch_label if branch_label else ''
            graph.edge(parent_name, current_name, label=edge_label)
        if node.true_branch:
            visualize_tree(node.true_branch, ts_list, graph, depth + 1, current_name, branch_label='True')
        if node.false_branch:
            visualize_tree(node.false_branch, ts_list, graph, depth + 1, current_name, branch_label='False')

    return graph, ts_list

def tree_depth(node):
    if node is None:
        return 0
    return 1 + max(tree_depth(node.true_branch), tree_depth(node.false_branch))

def tree_size(node):
    if node is None:
        return 0
    return 1 + tree_size(node.true_branch) + tree_size(node.false_branch)


## MAIN

In [25]:
labels = dataset[0]

MIN_SUPPORT = 0.05
n_candidates = int(input("Enter the number of candidates: "))
tree = rpts_tree(dataset, labels, n_candidates, MIN_SUPPORT=MIN_SUPPORT, min_samples=1)
print_tree(tree)
tree_visualization, ts_list = visualize_tree(tree)
tree_visualization.render('timeseries_output/decision_tree', format='png', cleanup=True)

depth = tree_depth(tree)
size = tree_size(tree)
print()
print(f"Tree Depth: {depth}")
print(f"Tree Size: {size}")


## UNCOMMENT TO SAVE TIME SERIES HTML FILES IT MIGHT TAKE A WHILE
'''for i, (ts, branch) in enumerate(ts_list):
    depth = i
    indexes = [j for j in range(len(ts))]

    for t in ts:
        print_ts(ts, indexes, depth, branch)'''


Depth: 0, Rule: Antecedent:[(1.079826, True, 2, 10), (-0.1256249, False, 14, 10), (1.1075086, False, 31, 6), (-0.55415713, True, 53, 9), (-0.33616091, True, 63, 10), (-1.4643432, True, 67, 10)]  ==> Consequent:[(-0.43613979, True, 84, 3)] Confidence:0.47 Split Index:6, Information Gain: 0.1386, Class Label: 10
No split improves the purity of the dataset
Depth: 1, Rule: Antecedent:[(0.76450792, False, 1, 10), (-0.5372983, True, 37, 10), (1.5995113, False, 39, 10), (-0.23802175, True, 41, 9), (-0.32217333, True, 52, 10), (-0.14567268, False, 65, 10), (0.17492992, False, 73, 10)]  ==> Consequent:[(-0.66121751, True, 88, 4)] Confidence:0.22 Split Index:7, Information Gain: 0.8724, Class Label: 10.0
Depth: 2, Rule: Antecedent:[(-0.59995319, True, 6, 9), (1.5355213, False, 16, 9), (1.1382731, False, 20, 10), (0.57431135, False, 23, 3), (1.0347243, False, 24, 10), (-0.16172463, True, 28, 10)]  ==> Consequent:[(-0.5132882, True, 74, 3)] Confidence:0.46 Split Index:6, Information Gain: 1.9603, 

'for i, (ts, branch) in enumerate(ts_list):\n    depth = i\n    indexes = [j for j in range(len(ts))]\n\n    for t in ts:\n        print_ts(ts, indexes, depth, branch)'

## Project 3b: SEQUENCE BOOSTING

Boosting logic → We find the best subsequence to separate the classes                                                   
Exhaustive search → The function explores all possible subsequences

In [44]:
import numpy as np
import random
from math import log, exp
from tqdm import tqdm
from collections import Counter


Z_dataset = []
labels = []
alphabet = set() ## not used

with open("dataset_boosting/question.txt") as f:
    test_data = f.readlines()
    for line in test_data:
        (label, data) = line.split(" ", 1)
        data_array = [int(x) for x in data.split()]
        Z_dataset.append(((1 if int(label[0]) == 1 else -1), data_array)) ## convert 0 to -1, doesn't matter but on the pdf it is -1
# save datas like (-1, [6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 5])
for label, data in Z_dataset:
    if label not in labels:
        labels.append(label)
    for value in data:
        alphabet.add(value)

alphabet = sorted(list(alphabet))
if len(labels) != 2:
    raise ValueError("Only binary classification is supported")

count = Counter([label for label, _ in Z_dataset])

print("Count of labels: ", count)
print("Z Dataset: ", Z_dataset)
print("Len Z Dataset: ", len(Z_dataset))
print("Labels: ", labels)
print("Alphabet: ", alphabet)

shuffled_Z = Z_dataset.copy()
random.shuffle(shuffled_Z)

print("Shuffled Z Dataset: ", shuffled_Z)

Count of labels:  Counter({-1: 896, 1: 835})
Z Dataset:  [(-1, [1, 2, 3, 4, 5]), (-1, [7, 8, 9, 10, 11, 12, 13, 14, 15, 5]), (-1, [17, 18, 19, 20, 21, 22, 5]), (-1, [7, 23, 24, 25, 26, 27, 28, 29, 30, 5]), (-1, [7, 31, 32, 33, 34, 35, 11, 36, 5]), (-1, [37, 18, 38, 39, 40, 5]), (-1, [7, 41, 11, 18, 42, 43, 44, 5]), (-1, [7, 45, 46, 47, 48, 17, 18, 49, 50, 51, 5]), (-1, [7, 52, 20, 18, 53, 54, 55, 34, 35, 5]), (-1, [7, 56, 34, 57, 11, 18, 58, 59, 5]), (-1, [61, 62, 1, 63, 64, 65, 66, 18, 67, 68, 5]), (-1, [37, 69, 70, 71, 5]), (-1, [37, 72, 73, 74, 75, 76, 5]), (-1, [77, 78, 79, 26, 80, 81, 82, 83, 5]), (-1, [84, 79, 85, 86, 87, 88, 89, 90, 24, 91, 92, 93, 94, 18, 95, 5]), (-1, [37, 18, 96, 97, 86, 98, 5]), (-1, [77, 99, 85, 100, 90, 101, 26, 102, 103, 104, 5, 105, 106, 5, 5]), (-1, [7, 107, 108, 109, 86, 90, 110, 5]), (-1, [7, 111, 9, 112, 11, 18, 113, 114, 5]), (-1, [7, 115, 34, 35, 11, 26, 116, 117, 118, 5]), (-1, [17, 18, 119, 120, 121, 122, 5]), (-1, [123, 62, 37, 124, 125, 126, 12

## find the best subsequence of numbers that minimizes classification error based on weighted samples.

In [45]:
def contains_subsequence_(sequence, subsequence): #check if a sequence contains a subsequence
    sub_len = len(subsequence)
    for i in range(len(sequence) - sub_len + 1): #window of size sub_len
        if sequence[i:i + sub_len] == subsequence:
            return True

    return False

def compute_error(dataset, labels, weights, subsequence, potential_label): #how wrong a specific subsequence is when used for classification.
    error = 0
    for i, (seq, l) in enumerate(zip(dataset, labels)):
        seq = seq[1]
        contains_subsequence = contains_subsequence_(seq, subsequence)
        if (contains_subsequence and l != potential_label) or (not contains_subsequence and l == potential_label):
            #error increase if the sequence contains the subsequence, but the label is not the expected potential_label.
            #error increase if the sequence does not contain the subsequence, but the label should be potential_label.
            error += weights[i]
    return error

def find_best_sequence(Z, weights):
    best_sequence = None
    best_sequence_label = None
    best_error = float('inf')
    labels = [label for label, _ in Z]

    for i in tqdm(range(len(Z))):#for each sequence in the dataset
        sequence = Z[i][1]
        label = labels[i]

        for j in range(len(sequence)):
            for k in range(j + 1, len(sequence) + 1):
                subsequence = sequence[j:k]
                for potential_label in set(labels):
                    error = compute_error(Z, labels, weights, subsequence, potential_label)# test the error with each subsequence
                    if error < best_error: #Update best subsequence if we find a lower error
                        best_error = error
                        best_sequence = subsequence
                        best_sequence_label = potential_label

    return best_sequence, best_sequence_label, best_error

In "classify" we have that each weak classifier "(seq,label)" contributes to the final score

In [46]:
# Boosting relies on reweighting misclassified samples, making the classifier focus more on hard-to-classify sequences in the next iteration.
def update_weights(dataset, labels, weights, best_sequence, best_label, alpha): #update weights for every sequence
    updated_weights = np.copy(weights)
    for i, (seq, l) in enumerate(zip(dataset, labels)):
        seq = seq[1]
        contains_subsequence = contains_subsequence_(seq, best_sequence)

        if(contains_subsequence and l == best_label) or (not contains_subsequence and l != best_label):
            updated_weights[i] *= np.exp(alpha) # Increases weight for misclassified samples
        else:
            updated_weights[i] *= np.exp(-alpha) # Decreases weight for correctly classified samples

    return updated_weights / np.sum(updated_weights) # Normalize to make them sum to 1


def classify(sequence, rules, alphas):
    score = 0
    for alpha, (seq, label) in zip(alphas, rules):#each rule contributes to the final score
        if contains_subsequence_(sequence, seq):
            score += alpha if label == 1 else -alpha
        else:
            score += -alpha if label == 1 else alpha

    return np.sign(score)# +1 o -1

#Iterates through the dataset and compares the classifier’s prediction to the true label.
def compute_classifier_error(dataset, classifier, rules, alphas):
    error_count = 0
    for label, sequence in dataset:
        if classifier(sequence, rules, alphas) != label:
            pred = classifier(sequence)
            print(f"Classifier prediction: {pred}, true label: {label}")
            error_count += 1
    return error_count / len(dataset) #error rate

## This function implements AdaBoost-style boosting for sequence classification
### It iteratively selects the best weak classifier (a subsequence-based rule) and updates the weights to focus more on misclassified sequences. The process continues until a stopping criterion is met.

In [None]:

def sequence_boosting(dataset, delta):

    N = len(dataset)
    weights = np.ones(N) / N
    alphas = [] # importance of each weak classifier
    rules = [] # Stores best sequences and corresponding labels
    labels = [label for label, _ in dataset]
    classifier_errors = []
    iteration_info = []

    t = 1

    while True:
        random.shuffle(dataset)
        print(f"Iteration: {t}")
        best_sequence, best_label, error = find_best_sequence(dataset, weights) #finds the subsequence that best separates the classes.

        if best_sequence is None or error >= 0.5: #if error greater then 0.5 the classifier is like a random guesser
            break

        alpha_t = 0.6 * log((1 - error) / (error + 1e-5)) #small error-> high alpha, high error -> low alpha

        print(f"Best sequence: {best_sequence}, best label: {best_label}, error: {error:.4f}, alpha: {alpha_t:.4f}")
        alphas.append(alpha_t)
        rules.append((best_sequence, best_label)) # adding a weak classifier at each step of while loop

        #Increases weights for misclassified samples. Decreases weights for correctly classified samples.
        weights = update_weights(dataset, labels, weights, best_sequence, best_label, alpha_t)

        error_count = 0
        predictions = []

        #it classifies every sequence in the dataset and compares the classifier’s prediction to the true label.
        for label, sequence in dataset:
            if classify(sequence, rules, alphas) != label:
                pred = classify(sequence, rules, alphas)
                predictions.append(pred)
                error_count += 1

        overall_classifier_error = error_count / len(dataset)
        classifier_errors.append(overall_classifier_error)
        print(f"Classifier error: {overall_classifier_error}")

        iteration_info.append({
            "iteration": t,
            "alpha": alpha_t,
            "error": error,
            "classifier_error": overall_classifier_error,
            "weights": weights.copy(),
            "best_sequence": best_sequence,
            "best_label": best_label,
            "predictions": predictions
        })

        #THE SYSTEM STOPS IF ERROR IS SMALL OR REACH 50 ITERATIONS
        if overall_classifier_error < delta or overall_classifier_error == 0:
            print("Classifier error below threshold.") if overall_classifier_error < delta else print("Classifier error is zero.")
            break

        if t > 50:
            print("Reached maximum number of iterations.")
            break

        t += 1

    final_classifier = lambda seq: classify(seq, rules, alphas)

    '''final_predictions = [classify(seq, rules, alphas) for _, seq in dataset]
    y_true = labels
    y_pred = final_predictions
    #Confusion matrix and accuracy using scikit-learn
    cm = confusion_matrix(y_true, y_pred)
    accuracy = accuracy_score(y_true, y_pred)'''
    #returns the boosted classifier, best sequences/rules, etc.
    return final_classifier, rules, alphas, classifier_errors, iteration_info#, cm, accuracy

In [87]:
#Count label occurrences in the first 100 sequences
count_shuffled_reduced = Counter([label for label, _ in shuffled_Z[600:700]])
print("Count of labels: ", count_shuffled_reduced)

Count of labels:  Counter({-1: 52, 1: 48})


In [None]:
classifier, rules, alphas, errors, iteration_info = sequence_boosting(shuffled_Z[600:700], delta=0.20)

Iteration: 1


100%|██████████| 100/100 [00:01<00:00, 67.05it/s]


Best sequence: [7], best label: -1, error: 0.3500, alpha: 0.3714
Classifier error: 0.35
Iteration: 2


100%|██████████| 100/100 [00:01<00:00, 69.60it/s]


Best sequence: [1146], best label: 1, error: 0.3483, alpha: 0.3760
Classifier error: 0.36
Iteration: 3


100%|██████████| 100/100 [00:01<00:00, 70.02it/s]


Best sequence: [1146], best label: 1, error: 0.3312, alpha: 0.4217
Classifier error: 0.36
Iteration: 4


100%|██████████| 100/100 [00:01<00:00, 69.61it/s]


Best sequence: [7], best label: -1, error: 0.3153, alpha: 0.4654
Classifier error: 0.35
Iteration: 5


100%|██████████| 100/100 [00:01<00:00, 69.07it/s]


Best sequence: [7], best label: -1, error: 0.3653, alpha: 0.3315
Classifier error: 0.35
Iteration: 6


100%|██████████| 100/100 [00:01<00:00, 69.68it/s]


Best sequence: [7], best label: -1, error: 0.3516, alpha: 0.3673
Classifier error: 0.35
Iteration: 7


100%|██████████| 100/100 [00:01<00:00, 69.77it/s]


Best sequence: [1146], best label: 1, error: 0.3109, alpha: 0.4775
Classifier error: 0.35
Iteration: 8


100%|██████████| 100/100 [00:01<00:00, 66.33it/s]


Best sequence: [7], best label: -1, error: 0.3250, alpha: 0.4386
Classifier error: 0.35
Iteration: 9


100%|██████████| 100/100 [00:01<00:00, 69.34it/s]


Best sequence: [220], best label: 1, error: 0.2873, alpha: 0.5451
Classifier error: 0.35
Iteration: 10


100%|██████████| 100/100 [00:01<00:00, 71.44it/s]


Best sequence: [7], best label: -1, error: 0.2463, alpha: 0.6711
Classifier error: 0.35
Iteration: 11


100%|██████████| 100/100 [00:01<00:00, 69.74it/s]


Best sequence: [1, 18], best label: -1, error: 0.3074, alpha: 0.4873
Classifier error: 0.35
Iteration: 12


100%|██████████| 100/100 [00:01<00:00, 67.31it/s]


Best sequence: [1146], best label: 1, error: 0.2312, alpha: 0.7210
Classifier error: 0.34
Iteration: 13


100%|██████████| 100/100 [00:01<00:00, 68.04it/s]


Best sequence: [37], best label: -1, error: 0.3366, alpha: 0.4070
Classifier error: 0.35
Iteration: 14


100%|██████████| 100/100 [00:01<00:00, 69.68it/s]


Best sequence: [1739], best label: 1, error: 0.2583, alpha: 0.6330
Classifier error: 0.3
Iteration: 15


100%|██████████| 100/100 [00:01<00:00, 69.03it/s]


Best sequence: [26], best label: -1, error: 0.2747, alpha: 0.5826
Classifier error: 0.31
Iteration: 16


100%|██████████| 100/100 [00:01<00:00, 68.61it/s]


Best sequence: [1146], best label: 1, error: 0.2481, alpha: 0.6651
Classifier error: 0.28
Iteration: 17


100%|██████████| 100/100 [00:01<00:00, 68.17it/s]


Best sequence: [1146], best label: 1, error: 0.2154, alpha: 0.7756
Classifier error: 0.29
Iteration: 18


100%|██████████| 100/100 [00:01<00:00, 68.24it/s]


Best sequence: [1146], best label: 1, error: 0.2086, alpha: 0.8000
Classifier error: 0.35
Iteration: 19


100%|██████████| 100/100 [00:01<00:00, 67.20it/s]


Best sequence: [20], best label: -1, error: 0.1775, alpha: 0.9198
Classifier error: 0.28
Iteration: 20


100%|██████████| 100/100 [00:01<00:00, 70.42it/s]

Best sequence: [79], best label: -1, error: 0.1482, alpha: 1.0494
Classifier error: 0.18
Classifier error below threshold.





In [77]:
count = 0
for label, sequence in shuffled_Z[600:700]:
    prediction = classifier(sequence)
    if prediction == label:
        count += 1

print(f"Number of correct predictions: {count} out of {len(shuffled_Z[600:700])}")
print(f"Percentage of correct: {(count * 100) / len(shuffled_Z[600:700]):.2f}%")

Number of correct predictions: 82 out of 100
Percentage of correct: 82.00%


## Weights interpretation
Higher weights indicate data points that are difficult to classify for the previous ensembled classifier, this means that the new classifier needs to give more attention to those weights.

In [None]:
import pandas as pd
import plotly.express as px

def plot_weights(iteration_info):
    # Create a DataFrame to hold the data for all iterations
    data = []
    for info in iteration_info:
        for idx, weight in enumerate(info['weights']):
            data.append({'Data Point': idx, 'Weight': weight, 'Iteration': f"Iteration {info['iteration']}"})

    # Convert the list of dictionaries to a pandas DataFrame
    df = pd.DataFrame(data)

    # Create the line plot using Plotly Express
    fig = px.line(df, x='Data Point', y='Weight', color='Iteration',
                  labels={'Data Point': 'Data Point', 'Weight': 'Weights', 'Iteration': 'Iterations'},
                  title='Weights Distribution Over Iterations')

    # Show the plot
    fig.show()
    fig.write_html(f"boosting_output/weights.html")

plot_weights(iteration_info)

## Alphas interpretation

alpha should be low and then while the error the decrease, alpha increase until it becomes a plateu

In [79]:
def plot_alphas(iteration_info):
    # Extract iteration numbers and alpha values
    data = [{'Iteration': info['iteration'], 'Alpha': info['alpha']} for info in iteration_info]

    # Convert to a DataFrame
    df = pd.DataFrame(data)

    # Create the line plot using Plotly Express
    fig = px.line(df, x='Iteration', y='Alpha',
                  labels={'Iteration': 'Iteration', 'Alpha': 'Alpha'},
                  title='Alpha Values Over Iterations',
                  markers=True) # Ensures markers are shown on the line

    # Show the plot
    fig.show()
    fig.write_html(f"boosting_output/alphas.html")


plot_alphas(iteration_info)

In [80]:
def plot_classifier_errors(classifier_errors):
    # Create a DataFrame with iterations and corresponding classifier errors
    data = {
        'Iteration': list(range(1, len(classifier_errors) + 1)),
        'Classifier Error': classifier_errors
    }

    df = pd.DataFrame(data)

    # Create the line plot with markers using Plotly Express
    fig = px.line(df, x='Iteration', y='Classifier Error',
                  labels={'Iteration': 'Iteration', 'Classifier Error': 'Classifier Error'},
                  title='Classifier Errors Over Iterations',
                  markers=True) # Adds markers to the plot

    # Show the plot
    fig.show()
    fig.write_html(f"boosting_output/error.html")

plot_classifier_errors(errors)