In [7]:
import sys
import os

root_path = os.path.abspath('..')
if root_path not in sys.path:
    sys.path.insert(0, root_path)
    
functions_path = os.path.abspath('../Functions')
data_path = os.path.abspath('../Data')
algorithms_path = os.path.abspath('../Algorithms')

sys.path.append(functions_path)
sys.path.append(data_path)
sys.path.append(algorithms_path)

import numpy as np
import pandas as pd
import ruptures as rpt
import matplotlib.pyplot as plt
import time
from stumpy.floss import _rea
import random
import stumpy
from Functions.metrics import f_measure, covering
from Functions.true_cps import read_true_change_points
from Functions.read_cps import read_number_of_change_points
from Functions.evaluate_floss import evaluate_floss
from Functions.evaluate_binseg import evaluate_binseg
from Algorithms.floss import floss
from tssb.utils import load_time_series_segmentation_datasets
from tssb.evaluation import covering
from claspy.segmentation import BinaryClaSPSegmentation

# FLOSS 

In [None]:
dataset_name = 'CricketsX'
routine = 'Routine1'
subject = 'Subject1'
sensor = 'Accelerometer'
sample_rate = 100

# Load the dataset
data_path = f'../Data/{dataset_name}.txt'
data = np.loadtxt(data_path)

# Read true change points
desc_path = '../Data/TrueCPS/desc.txt'
true_change_points = read_true_change_points(desc_path, dataset_name)

# Apply FLOSS and evaluate its performance
_, _, found_cps, f1_score, covering_score, _ = evaluate_floss(
    dataset_name, routine, subject, sensor, sample_rate,
    true_change_points, [], data
)
# Visualization
plt.figure(figsize=(15, 6))  # Adjust the figure size to make it wider
plt.plot(data, label='Time Series Data')

# Draw vertical lines for each detected change point
for cp in found_cps:
    plt.axvline(x=cp, color='red', linestyle='--', linewidth=1, label='Detected Change Point' if cp == found_cps[0] else "")

# Draw vertical lines for each true change point
for cp in true_change_points:
    plt.axvline(x=cp, color='green', linestyle='-', linewidth=1, label='True Change Point' if cp == true_change_points[0] else "")

plt.xlabel('Time')
plt.ylabel('Value')
plt.title(f'Time Series Data of {dataset_name} with Detected and True Change Points')
plt.legend()
plt.show()

# Print the performance metrics
print(f"Covering Score: {covering_score}")

In [8]:
routine = 'Routine1'
subject = 'Subject1'
sensor = 'Accelerometer'
sample_rate = 100

dataset_names = ['CricketX', 'CricketY', 'CricketZ', 
                 'InlineSkate', 'UWaveGestureLibraryAll', 
                 'UWaveGestureLibraryX', 'UWaveGestureLibraryY', 'UWaveGestureLibraryZ'
                 ]

# Path to the description files
desc_path = '../Data/TrueCPS/desc.txt'

# Parameters for binseg function
num_of_cps = read_number_of_change_points(desc_path)

# To store the coverage scores for computing the average
coverage_scores = []
coverage_scores_tssb = []

# Iterate through each dataset
for dataset_name in dataset_names:
    
    n_cps = num_of_cps.get(dataset_name, 1)
    # Path to the current dataset
    data_path = f'../Data/{dataset_name}.txt'

    # Load the dataset
    data = np.loadtxt(data_path)

    # Read true change points
    true_change_points = read_true_change_points(desc_path, dataset_name)

    _, _, found_cps, f1_score, covering_score, _ = evaluate_floss(
        dataset_name, routine, subject, sensor, sample_rate,
        true_change_points, [], data)
    
    score = covering({0: true_change_points}, found_cps, data.shape[0])
    
    # Add the current covering score to the list
    coverage_scores.append(covering_score)
    coverage_scores_tssb.append(score)
    
    # Print the results for current dataset
    print(f"Dataset: {dataset_name}")
    print(f"Coverage Score: {covering_score}")

# Compute and print the average coverage score
average_coverage_score = sum(coverage_scores) / len(coverage_scores)
average_coverage_score_tssb = sum(coverage_scores_tssb) / len(coverage_scores_tssb)
print(f"\nAverage Coverage Score: {average_coverage_score:.6f}")
print(f"Average Coverage Score (TSSB): {average_coverage_score_tssb:.6f}")

Dataset: CricketX
Coverage Score: 0.861
Dataset: CricketY
Coverage Score: 0.886
Dataset: CricketZ
Coverage Score: 0.731
Dataset: InlineSkate
Coverage Score: 0.709
Dataset: UWaveGestureLibraryAll
Coverage Score: 0.97
Dataset: UWaveGestureLibraryX
Coverage Score: 0.726
Dataset: UWaveGestureLibraryY
Coverage Score: 0.747
Dataset: UWaveGestureLibraryZ
Coverage Score: 0.712

Average Coverage Score: 0.792750
Average Coverage Score (TSSB): 0.792827


# BinSeg

In [6]:
dataset_names = ['CricketX', 'CricketY', 'CricketZ', 
                 'InlineSkate', 'UWaveGestureLibraryAll', 
                 'UWaveGestureLibraryX', 'UWaveGestureLibraryY', 'UWaveGestureLibraryZ'
                 ]

# Path to the description files
desc_path = '../Data/TrueCPS/desc.txt'

# Parameters for binseg function
num_of_cps = read_number_of_change_points(desc_path)
cost_func = "rbf"

coverage_scores = []

# Iterate through each dataset
for dataset_name in dataset_names:    
    n_cps = num_of_cps.get(dataset_name, 1)
    # Path to the current dataset
    data_path = f'../Data/{dataset_name}.txt'

    # Load the dataset
    data = np.loadtxt(data_path)

    # Read true change points
    true_change_points = read_true_change_points(desc_path, dataset_name)

    # Evaluate the detected change points
    dataset, true_cps, detected_cps, f1_score, covering_score = evaluate_binseg(dataset_name, data, true_change_points, cost_func=cost_func)
    
    coverage_scores.append(covering_score)
    
    # Print the results
    print(f"Dataset: {dataset_name}")
    print(f"Coverage Score: {covering_score}")
    
# Compute and print the average coverage score
average_coverage_score = sum(coverage_scores) / len(coverage_scores)
print(f"\nAverage Coverage Score: {average_coverage_score:.6f}")

Dataset: CricketX
Coverage Score: 0.745
Dataset: CricketY
Coverage Score: 0.649
Dataset: CricketZ
Coverage Score: 0.727
Dataset: InlineSkate
Coverage Score: 0.541
Dataset: UWaveGestureLibraryAll
Coverage Score: 0.72
Dataset: UWaveGestureLibraryX
Coverage Score: 0.737
Dataset: UWaveGestureLibraryY
Coverage Score: 0.753
Dataset: UWaveGestureLibraryZ
Coverage Score: 0.878

Average Coverage Score: 0.718750


# CLASP

In [10]:
dataset_names = ['CricketX', 'CricketY', 'CricketZ', 
                 'InlineSkate', 'UWaveGestureLibraryAll', 
                 'UWaveGestureLibraryX', 'UWaveGestureLibraryY', 'UWaveGestureLibraryZ'
                 ]

# Path to the description files
desc_path = '../Data/TrueCPS/desc.txt'

# Parameters for binseg function
num_of_cps = read_number_of_change_points(desc_path)

coverage_scores = []
clasp = BinaryClaSPSegmentation()

# Iterate through each dataset
for dataset_name in dataset_names:    
    n_cps = num_of_cps.get(dataset_name, 1)
    data_path = f'../Data/{dataset_name}.txt'

    # Load the dataset
    data = np.loadtxt(data_path)

    # Read true change points
    true_change_points = read_true_change_points(desc_path, dataset_name)

    found_cps = clasp.fit_predict(data)
    # Evaluate the detected change points

    covering_score = covering({0: true_change_points}, found_cps, data.shape[0])    
    coverage_scores.append(covering_score)
    
    # Print the results
    print(f"Dataset: {dataset_name}")
    print(f"Coverage Score: {covering_score}")
    
# Compute and print the average coverage score
average_coverage_score = sum(coverage_scores) / len(coverage_scores)
print(f"\nAverage Coverage Score: {average_coverage_score:.6f}")

  start, end = pranges[idx]


Dataset: CricketX
Coverage Score: 0.9871908200852524
Dataset: CricketY
Coverage Score: 0.9820309803240417
Dataset: CricketZ
Coverage Score: 0.9854162224790733
Dataset: InlineSkate
Coverage Score: 0.35755762585566386
Dataset: UWaveGestureLibraryAll
Coverage Score: 0.9913192163577255
Dataset: UWaveGestureLibraryX
Coverage Score: 0.9859107375523489
Dataset: UWaveGestureLibraryY
Coverage Score: 0.9804345707595664
Dataset: UWaveGestureLibraryZ
Coverage Score: 0.8068538260776167

Average Coverage Score: 0.884589
