This short notebook contains three functions: (1) dynamic_time_warping -> calculates the alignment cost (not average cost) between two lists; (2) calculate_distances -> calculates the alignment cost of an entire numpy array of series data and returns a numpy array of distances between series; (3) optics -> takes in a numpy array of distances and calculates the cluster assignment between corresponding points.
For more information, see:
(a) https://towardsdatascience.com/understanding-optics-and-implementation-with-python-143572abdfb6
(b) https://www.youtube.com/@HermanKamperML/videos

##

In [None]:
import numpy as np

def dynamic_time_warping(ts1, ts2):
    # Calculate the Euclidean distance between two points in the time series
    def euclidean_distance(point1, point2):
        return np.sqrt((point1 - point2)**2)

    # Initialize the cost matrix with large values
    cost_matrix = np.full((len(ts1), len(ts2)), np.inf)

    # Set the initial cost to 0
    cost_matrix[0, 0] = euclidean_distance(ts1[0], ts2[0])

    # Fill the first row of the cost matrix
    for i in range(1, len(ts2)):
        cost_matrix[0, i] = cost_matrix[0, i - 1] + euclidean_distance(ts1[0], ts2[i])

    # Fill the first column of the cost matrix
    for i in range(1, len(ts1)):
        cost_matrix[i, 0] = cost_matrix[i - 1, 0] + euclidean_distance(ts1[i], ts2[0])

    # Fill the rest of the cost matrix
    for i in range(1, len(ts1)):
        for j in range(1, len(ts2)):
            cost_matrix[i, j] = euclidean_distance(ts1[i], ts2[j]) + min(
                cost_matrix[i - 1, j],      # Insertion
                cost_matrix[i, j - 1],      # Deletion
                cost_matrix[i - 1, j - 1]   # Match
            )

    # Return the alignment cost (DTW distance)
    return cost_matrix[-1, -1]

# Example usage
time_series1 = np.array([1, 2, 3, 4, 5])
time_series2 = np.array([2, 3, 4, 5, 6])

alignment_cost = dynamic_time_warping(time_series1, time_series2)
print("Alignment Cost (DTW Distance):", alignment_cost)


In [None]:
import numpy as np

def calculate_distances(series_data):
    num_series = len(series_data)
    distances = np.zeros((num_series, num_series))

    for i in range(num_series):
        for j in range(i + 1, num_series):
            distance = dynamic_time_warping(series_data[i].tolist(), series_data[j].tolist())
            distances[i, j] = distance
            distances[j, i] = distance  # Distance matrix is symmetric

    return distances

In [None]:
import numpy as np

def optics(distances, epsilon, min_pts):
    num_points = len(distances)
    reachability_distances = np.full(num_points, np.inf)
    processed_points = set()
    cluster_id = 0
    cluster_assignments = np.full(num_points, -1)

    def update_seeds(current_point, neighbors, seeds, reachability_distances, epsilon):
        for neighbor in neighbors:
            if neighbor not in processed_points:
                new_reachability_distance = max(distances[current_point, neighbor], reachability_distances[current_point])

                if new_reachability_distance < reachability_distances[neighbor]:
                    reachability_distances[neighbor] = new_reachability_distance

                    if new_reachability_distance <= epsilon and neighbor not in seeds:
                        seeds.append(neighbor)

    for point in range(num_points):
        if point not in processed_points:
            processed_points.add(point)
            neighbors = np.where(distances[point] <= epsilon)[0]

            if len(neighbors) >= min_pts:
                cluster_id += 1
                cluster_assignments[point] = cluster_id
                seeds = [neighbor for neighbor in neighbors if point != neighbor]

                update_seeds(point, neighbors, seeds, reachability_distances, epsilon)

                while seeds:
                    current_seed = seeds.pop(0)
                    current_seed_neighbors = np.where(distances[current_seed] <= epsilon)[0]

                    if len(current_seed_neighbors) >= min_pts:
                        cluster_assignments[current_seed] = cluster_id
                        update_seeds(current_seed, current_seed_neighbors, seeds, reachability_distances, epsilon)

    return cluster_assignments


In [None]:
### Application:

series_data = np.random.rand(11, 10) 

distances = calculate_distances(series_data)

epsilon = 2  # Adjust as needed -> some datasets need values as low as 0.5
min_pts = 2    # Adjust as needed

cluster_assignments = optics(distances, epsilon, min_pts)
print("Cluster Assignments:", cluster_assignments)