<a href="https://colab.research.google.com/github/MerakchiHibaa/FINAL_FUZZ2025/blob/main/Final_FUZZ_2025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# FTH requirements :

In [2]:
!pip install scikit-fuzzy

import networkx as nx
import pandas as pd
import multiprocessing as mp
import matplotlib.pyplot as plt
from numba import prange
from typing import List
import skfuzzy as fuzz
from scipy import signal
import numpy as np
from functools import lru_cache
from scipy.spatial.distance import squareform
from scipy.cluster import hierarchy
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
import time

Collecting scikit-fuzzy
  Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl.metadata (2.6 kB)
Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl (920 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/920.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/920.8 kB[0m [31m2.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━[0m [32m614.4/920.8 kB[0m [31m9.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m920.8/920.8 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: scikit-fuzzy
Successfully installed scikit-fuzzy-0.5.0


In [4]:
# Get the ontology

!wget https://gist.githubusercontent.com/MerakchiHibaa/09142b59721546554c5550a5c85703d8/raw/b953af93d867f7d937c4e2a838696801caa584ae/Fuzz_activity.txt -q --show-progress



In [5]:
path_onto = "Fuzz_activity.txt"

ontology_emd = nx.read_adjlist(path_onto, create_using=nx.DiGraph) # creates a directed graph (DiGraph) representation of the ontology data.



In [6]:
# Wu-palmer similarity
@lru_cache(maxsize=100000)
def wu_palmer_fth(x: str, y: str, path: str, rootnode="All") -> float:
    ontologie = nx.read_adjlist(path, create_using=nx.DiGraph)
    return (2.0 * nx.shortest_path_length(ontologie, rootnode, nx.lowest_common_ancestor(ontologie, x, y))) / (
            nx.shortest_path_length(ontologie, rootnode, x) + nx.shortest_path_length(ontologie, rootnode, y))



# Define the used similarity
def sim_FTH(x: str, y: str) -> float:
    return wu_palmer_fth(x, y, path_onto)



In [7]:
from typing import List, TypeVar

T = TypeVar('T') # defines a type variable and allows the class to work with generic types.
# https://dev.to/decorator_factory/typevars-explained-hmo


class Temporal_seq:
    def __init__(self, acts: List[T], times: List[float]) -> None:
        self.acts = acts
        self.times = times
        # acts: A list of elements of type T. The type variable T allows the list to contain elements of any type.
        # times: A list of floating-point numbers representing the corresponding times for each activity.

In [8]:
class Edit_FTH:# To define the edit operation
    def __init__(self, x: str, delta: float, t_edit: float, S_i: Temporal_seq):
        self.x = x # the activity
        self.delta = delta # the duration
        self.t_edit = t_edit # the starting point
        self.S_i = S_i # the sequence


In [25]:
def sim_fth_e(e: Edit_FTH, t: float) -> float: #  for t in I (starts with 0)
    """
    :param e:     Edit operation
    :param t:     Time t
    :return:      Similarity over the sequence at time t
    """
    for i in prange(len(e.S_i.times)):# Iterate through the times in the sequence 'e.S_i'
    # every i is an index for a duration (starts with 0)
        if np.sum(e.S_i.times[:i]) <= t < np.sum(e.S_i.times[:i + 1]): # # Check if 't' falls within the time interval of the current activity in the sequence
            return sim_FTH(e.S_i.acts[i], e.x) # If 't' is within the interval, return the similarity between the current activity and 'e.x'
    return 0

## FTH :

In [14]:
def cost_gamma_fth(e: Edit_FTH, interval_step = 1, time_window = 240) -> float:
  I = np.arange(0, np.sum(e.S_i.times)  , interval_step) # the duration divided by the interval step
  mu = fuzz.trapmf(I, [e.t_edit - time_window, e.t_edit, e.t_edit + e.delta, # the context function
                       e.t_edit + e.delta + time_window])
  sim_fun = [sim_fth_e(e, t) for t in I] # the similarity measure, t starts with 0
  sim_context = [mu[i] * sim_fun[i] for i in range(len(I))] # the similarity multiplied by the context function
  tab_gate = np.arange(0, e.delta, interval_step)
  gate = fuzz.trapmf(tab_gate, [0, 0, e.delta, e.delta])
  convo = signal.convolve(sim_context, gate, mode='same') / (e.delta / interval_step) # calculates the convolution weighted by the duration divided by the interval step
  return 1 - np.max(convo) # returns the cost (between 0 and 1)

def cost_delta_fth(e: Edit_FTH, interval_step = 1, time_window = 240) -> float:
  return e.delta * cost_gamma_fth(e, interval_step, time_window) # returns the weighted cost (cost multiplied by the duration of the new activity )

# The function that calculates the one-sided distance between two trajectories
def one_sided_dist_fth(S1: Temporal_seq, S2: Temporal_seq, f = cost_delta_fth, interval_step = 1, time_window = 240) -> float:
  sum = 0

  for i in prange(len(S1.acts)): # starts from 0
    e = Edit_FTH(S1.acts[i], S1.times[i], np.sum(S1.times[:i]), S2) # np.sum(S1.times[:i]) calculates the cumulative time spent on actions prior to the current action at index i (to get the beginning of the activity)
    sum += f(e, interval_step, time_window) # f is the used cost function (delta or gamma function)
  return sum # sum represents the sum of maximal costs necessary for all the edit operations


In [15]:
# Calculation of the FTH distance
def dist_fth(S1: Temporal_seq, S2: Temporal_seq, f = cost_delta_fth, interval_step = 1, time_window = 1, agg = max)-> float:
  return agg(one_sided_dist_fth(S1, S2, f, interval_step, time_window), # returns the maximal one-sided cost to get from S1 to S2 or from S2 to S1
             one_sided_dist_fth(S2, S1, f, interval_step, time_window))


In [9]:
# Fuction that calculates the t-length of a given sequence
def sum_duration(sequence: Temporal_seq) -> float: # gives the sum of durations for a sequence
    return sum(sequence.times)

In [11]:
# Fuction that decodes the activities to be used when we show the results
def decode_activities(acts: List[str]) -> List[str]:
    activity_mapping = {
        "1": "Bike",
        "2": "Walk",
        "3": "Bus",
        "4": "Work",
        "5": "Shopping",
        "6": "Read",
        "7": "Swim",

    }
    decoded_activities = [activity_mapping[act] for act in acts]
    return decoded_activities

# Example
s_h = Temporal_seq(['1', '4', '5'], [30, 5, 12])
decoded_activities = decode_activities(s_h.acts)
print(decoded_activities)

['Bike', 'Work', 'Shopping']


### Relative FTH

In [27]:
# DEFINE A RELATIVE TEMPORAL SEQUENCE FROM A TEMPORAL SEQUENCE T
import copy
def relative_temporal_seq(T :Temporal_seq ) -> Temporal_seq:
  """
  T :Temporal_seq
  Returns a relative temporal sequence
  """
  T_ = copy.deepcopy(T)
  T_max_seq = sum_duration(T_)
  relative_times = [time / T_max_seq * 100 for time in T_.times]
  return Temporal_seq(T_.acts, relative_times)

In [29]:
# DEFINE A ROUNDED RELATIVE TEMPORAL SEQUENCE FROM A TEMPORAL SEQUENCE T (BY ROUNDING THE RELATIVE DURATIONS)
def rounded_relative_temporal_seq(T: Temporal_seq) -> Temporal_seq:
    T_ = copy.deepcopy(T)
    T_max_seq = sum(T_.times)
    relative_times = [time / T_max_seq * 1000 for time in T_.times]
    integer_parts = [int(time) for time in relative_times] # Truncate to integer part
    remainders = [time - int(time) for time in relative_times] # Calculate remainders
    # Rank remainders in decreasing order
    ranked_remainders = sorted([(i,integer_parts[i], remainders[i]) for i in range(len(integer_parts))], key=lambda x: x[2], reverse=True)
    sum_integers = sum(integer_parts)
    index = 0

    # While the sum of integers is less than 1000, add one to the integer with the next biggest integer
    while sum_integers < 1000:
        index %= len(ranked_remainders) # Wrap around if we reach the end of the ranked remainders list
        i, integer_part, _ = ranked_remainders[index] # Get the index of the next biggest remainder
        integer_parts[i] += 1 # Add one to the integer part
        sum_integers += 1 # Increment the sum of remainders
        index += 1 # Move to the next remainder

    return Temporal_seq(T_.acts, integer_parts)



In [39]:
# FUNCTION THAT CALCULATES THE FTH_DELTA DISTANCE BETWEEN TWO RELATIVE SEQUENCES

def FTH_relative_fuzz(sequence, pattern, f=cost_delta_fth):
    # Compute the relative temporal sequence for the pattern and the sequence
    patt = copy.deepcopy(pattern)
    relative_pattern = rounded_relative_temporal_seq(patt)

    seq = copy.deepcopy(sequence)
    relative_sequence = rounded_relative_temporal_seq(seq)

    # Calculate the distance
    distance = dist_fth(relative_pattern, relative_sequence, f, 1, 500, max) / 1000

    # Decode and display the result
    decoded_sequence = decode_activities(seq.acts)
    print("Distance:", distance, " - Sequence:", decoded_sequence, seq.times)


## Tests

In [32]:
# 1 => Bike
# 2 => Walk
# 3 => Bus
# 4 => Work
# 5 => Shopping
# 6 => Read
# 7 => Swim

# Define the sequences
S = Temporal_seq(
    ['2', '4', '2', '5', '2'],  # Activities: Walk, Work, Walk, Shopping, Walk
    [60, 240, 60, 120, 60]      # Durations in minutes: 1h, 4h, 1h, 2h, 1h
)

S1 = Temporal_seq(
    ['2', '5', '2', '4', '2'],  # Activities: Walk, Shopping, Walk, Work, Walk
    [30, 90, 30, 240, 30]       # Durations in minutes: 30min, 1h30, 30min, 4h, 30min
)

S2 = Temporal_seq(
    ['3', '4', '3'],            # Activities: Bus, Work, Bus
    [30, 420, 30]               # Durations in minutes: 30min, 7h, 30min
)

S3 = Temporal_seq(
    ['1', '4', '1'] ,  # Activities: Bike, Work, Bike
    [30, 300, 30]      # Durations : 30min, 5h, 30min
)

S4 = Temporal_seq(
    ['2', '4', '2', '5', '2'] ,  # Activities: Walk, Work, Walk, Shopping, Walk
    [65, 245, 65, 145, 80]      # Durations : 1h05, 4h25, 1h05, 2h25, 1h20
)
# Example usage
print("Sequence S:", S.acts , S.times)
print("Sequence S1:", S1.acts , S1.times)
print("Sequence S2:", S2.acts , S2.times)
print("Sequence S3:", S3.acts , S3.times)
print("Sequence S4:", S4.acts , S4.times)


Sequence S: ['2', '4', '2', '5', '2'] [60, 240, 60, 120, 60]
Sequence S1: ['2', '5', '2', '4', '2'] [30, 90, 30, 240, 30]
Sequence S2: ['3', '4', '3'] [30, 420, 30]
Sequence S3: ['1', '4', '1'] [30, 300, 30]
Sequence S4: ['2', '4', '2', '5', '2'] [65, 245, 65, 145, 80]


In [33]:
seq_example = [S, S1, S2, S3, S4]

#### FTH relative ( avec élimination)

In [40]:
print('______________________________ FTH_delta relative __________________________________')

decoded_main = decode_activities(S.acts)
print( "Pattern :", decoded_main, S.times)
print('__________________________________________________________________________________')

for rel_seq in seq_example :
  decoded_seq = decode_activities(rel_seq.acts)
  print( "Sequence:", decoded_seq, rel_seq.times)
  FTH_relative_fuzz(rel_seq, S , f = cost_delta_fth)

  print('__________________________________________________________________________________')



______________________________ FTH_delta relative __________________________________
Pattern : ['Walk', 'Work', 'Walk', 'Shopping', 'Walk'] [60, 240, 60, 120, 60]
__________________________________________________________________________________
Sequence: ['Walk', 'Work', 'Walk', 'Shopping', 'Walk'] [60, 240, 60, 120, 60]
Distance: 0.0  - Sequence: ['Walk', 'Work', 'Walk', 'Shopping', 'Walk'] [60, 240, 60, 120, 60]
__________________________________________________________________________________
Sequence: ['Walk', 'Shopping', 'Walk', 'Work', 'Walk'] [30, 90, 30, 240, 30]
Distance: 0.4205980000000001  - Sequence: ['Walk', 'Shopping', 'Walk', 'Work', 'Walk'] [30, 90, 30, 240, 30]
__________________________________________________________________________________
Sequence: ['Bus', 'Work', 'Bus'] [30, 420, 30]
Distance: 0.5133333333333334  - Sequence: ['Bus', 'Work', 'Bus'] [30, 420, 30]
__________________________________________________________________________________
Sequence: ['Bike', '

In [41]:
# From FTH_Delta to Sim_Delta
def similarity_fth(fth_distance):
    """
    Computes the similarity score based on FTH distance

    Parameters:
        fth_distance (float): The FTH distance between two sequences.
    Returns:
        float: The similarity score.
    """

    return 1 - fth_distance

# Example
fth_dist = 0.4


similarity = similarity_fth(fth_dist)
print(f"FTH Similarity: {similarity}")


FTH Similarity: 0.6


In [42]:
# Calculate the t-length comparability index

def membership_function(x, b, w):
    """
    Computes the membership value μ(x) based on parameters b and w.

    Parameters:
        x (float): The input value.
        b (float): The center of the range.
        w (float): The width of the range.

    Returns:
        float: The membership value μ(x).
    """
    if x <= b - w or x >= b + w:
        return 0
    elif b - w < x <= b:
        return (x - (b - w)) / w
    elif b < x < b + w:
        return ((b + w) - x) / w

# Example usage:
x1 = 7  # Input value
x2 = 8  # Input value
x3 = 5  # Input value
x4 = 10

b = 9.0  # Center of the range
w = 4.0  # Width of the range

mu1 = membership_function(x1, b, w)
mu2 = membership_function(x2, b, w)
mu3 = membership_function(x3, b, w)
mu4 = membership_function(x4, b, w)
print(f"Membership value 1: {mu1}")
print(f"Membership value 2: {mu2}")
print(f"Membership value 3: {mu3}")
print(f"Membership value 4: {mu4}")


Membership value 1: 0.5
Membership value 2: 0.75
Membership value 3: 0
Membership value 4: 0.75
