In [2]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython
import pandas as pd
import pickle

import os
os.chdir("F:\M2\Interpretable-TimeSeries-Classification")

## Importing data

In [57]:
dataloc  = "Data/genres_original/"

genres = os.listdir(dataloc)

musiclist = []
genrelist = []
srlist = []

musictrain = []
genretrain = []
srtrain = []
musictest = []
genretest = []
srtest = []

print("Reading songs")
for genre in genres:
    for i, music in enumerate(os.listdir(dataloc + genre)):
        try:
            music_time_series, sr = librosa.load(dataloc + genre + "/" + music)
        except Exception as e:
            print(f"Error loading {music}: {e}")
            continue
        srlist.append(sr)
        musiclist.append(music_time_series)
        genrelist.append(genre)

        if i<25: # 25-75 test train split
            musictest.append(music_time_series)
            genretest.append(genre)
            srtest.append(sr)
        else:
            musictrain.append(music_time_series)
            genretrain.append(genre)
            srtrain.append(sr)

print("Creating dataframe")
fulldata = pd.DataFrame({'Music': musiclist, 'Genre': genrelist, "SampleRate": srlist})

print("Saving dataframe")
fulldata.to_csv("Data/fulldata.csv", index=False)

print("Saving train and test data")
train = pd.DataFrame({"Music" : musictrain, "Genre" : genretrain, "SampleRate": srtrain})
test = pd.DataFrame({"Music" : musictest, "Genre" : genretest, "SampleRate": srtest})

with open("Data/train.pkl", "wb") as f:
    pickle.dump(train, f)

with open("Data/test.pkl", "wb") as f:
    pickle.dump(test, f)

Reading songs


  music_time_series, sr = librosa.load(dataloc + genre + "/" + music)


Error loading jazz.00054.wav: 
Creating dataframe
Saving dataframe
Saving train and test data


In [3]:
with open("Data/train.pkl", "rb") as f:
    train = pickle.load(f)

with open("Data/test.pkl", "rb") as f:
    test = pickle.load(f)

## SAX Implementation

In [33]:
alphabet = np.array(["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"])

def SAX(time_series, w, alpha):
    time_series = np.array(time_series)
    time_series = (time_series - np.mean(time_series)) / np.std(time_series)

    # PAA
    PAA=[]
    for word_index in range(w):
        ceiling = min(len(time_series), int((word_index+1)*len(time_series)/w))
        PAA.append(np.mean(time_series[int(word_index*len(time_series)/w):ceiling]))
    
    lookup = np.quantile(time_series, np.arange(1,alpha)*(1/alpha) )
    
    sax = np.digitize(PAA, lookup)

    return list(sax)

def SAX_window(time_series, w, alpha, l):
    L = len(time_series)
    sax_sentence = []
    previous_word = np.full(w,-1)

    for i in range(0, L, l):
        print(i)
        ceiling = min(L, (i+1)*l)
        sax_word = SAX(time_series[i : ceiling], w, alpha)

        npsaxword = np.array(sax_word)
        if np.sum(npsaxword - previous_word) == 0: # Remove repeated words for reasons
            continue
        
        if i > 0:
            sax_sentence.append(" ")
        sax_sentence.extend(sax_word)
        if ceiling == L: break # Do not take <L sized sequences into account
        previous_word = sax_word

    phrase = ""
    for letter in sax_sentence:
        if isinstance(letter, np.int64):
            phrase += alphabet[letter]
        else :
            phrase += letter
    return phrase

In [57]:
print(np.arange(0,100,10))

[ 0 10 20 30 40 50 60 70 80 90]


In [34]:
nptrainmusic = train["Music"].to_numpy()

testmusic = nptrainmusic[19]

# Pre-process eliminating noise
#testmusic = librosa.effects.preemphasis(testmusic)

sentence = SAX_window(testmusic, w = 16, alpha = 4, l = int(.2*len(testmusic)+1 ))
print(sentence)

0
132359
cbccbbcbccbbccbb cccccccccccccccc


## SFA Implementation

In [20]:
alpha = 4
w , halfw= 4, 2

DFT_base_length4 = np.empty((len(nptrainmusic), w))

for i, music in enumerate(nptrainmusic):
    fft = np.fft.fft(music)
    halfw_real = np.real(fft[:halfw])
    halfw_complex = np.imag(fft[:halfw])
    
    DFT_base_length4[i] = np.array([halfw_real[0], halfw_complex[0], halfw_real[1], halfw_complex[1]]) # Shitty inefficient, can be made with np.apply_along_axis

q =  np.arange(1,alpha)*(1/alpha)

SFA_breakpoints = np.quantile(DFT_base_length4, q, axis=0)


In [54]:
def SFA(time_series, alpha=alpha, w=w, lookup=SFA_breakpoints):
    fft = np.fft.fft(time_series)
    halfw = w//2
    halfw_real = np.real(fft[:halfw])
    halfw_complex = np.imag(fft[:halfw])
    coefs = np.array([halfw_real[0], halfw_complex[0], halfw_real[1], halfw_complex[1]])

    sfa=np.empty(w, dtype=np.int64)
    for i, coef in enumerate(coefs):
        sfa[i] = np.digitize(coef, lookup[:,i]) * (i+1) # *(i+1) to make sure same indices for different coefficients don't get the same symbol

    return sfa

def SFA_window(time_series, l, alpha=alpha, w=w, lookup=SFA_breakpoints):
    L = len(time_series)
    sfa_sentence = []
    previous_word = np.full(w,-1)

    for i in range(0, L, l): # Non-overlapping windows for this one
        ceiling = min(L, (i+1)*l) 
        sfa_word = SFA(time_series[i : ceiling], w, alpha, lookup=lookup)

        npsfaword = np.array(sfa_word)
        if np.sum(npsfaword - previous_word) == 0: # Remove repeated words for reasons
            continue
        
        if i > 0:
            sfa_sentence.append(" ")
        sfa_sentence.extend(sfa_word)
        previous_word = sfa_word

    phrase = ""
    for letter in sfa_sentence:
        if isinstance(letter, np.int64):
            phrase += alphabet[letter]
        elif letter == " ":
            phrase += letter
        else:
            raise ValueError("error, got letter", letter)
    return phrase

In [55]:
selected_index = 161
selected_music = nptrainmusic[selected_index]

print(SFA_window(selected_music, l=int(.2*len(selected_music+1))))

dcde degi dade dede dege


## Stuff

In [63]:
# ChatGPT code, not even sure what this is

from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer

# Function to convert time series to SAX representation
def time_series_to_sax(time_series, w, alpha):
    return ' '.join(map(str, SAX_window(time_series, w, alpha, int(.2*len(time_series)+1))))

# Convert the training and test data to SAX representation
train_sax = [time_series_to_sax(ts, w=16, alpha=4) for ts in train['Music']]
train_labels = train['Genre']
test_sax = [time_series_to_sax(ts, w=16, alpha=4) for ts in test['Music']]

# Filter out empty SAX representations and corresponding labels
train_sax, train_labels = zip(*[(sax, label) for sax, label in zip(train_sax, train['Genre']) if sax.strip()])
test_sax, test_labels = zip(*[(sax, label) for sax, label in zip(test_sax, test['Genre']) if sax.strip()])

# Create a pipeline with CountVectorizer and LogisticRegression
seql_pipeline = make_pipeline(
    CountVectorizer(),
    LogisticRegression(max_iter=1000)
)

# Train the model
print("Training the model")
seql_pipeline.fit(train_sax, train_labels)

# Predict on the test set
test_predictions = seql_pipeline.predict(test_sax)

# Evaluate the model
accuracy = (test_predictions == test_labels).mean()
print(f"Test Accuracy: {accuracy:.2f}")

Training the model


ValueError: empty vocabulary; perhaps the documents only contain stop words

In [None]:
train_sax = np.array([SAX_window(ts, w=16, alpha=4, l = int(0.2*len(ts))+1 ) for ts in train['Music']])
train_labels = np.array(train['Genre'])

test_sax = np.array([SAX_window(ts, w=16, alpha=4, l = int(0.2*len(ts))+1 ) for ts in test['Music']])
test_labels = np.array(test['Genre'])

['cccbccccbcbcccbc cccbcccccccccccc cccccccccccccccc bcbcbccbcccbcccb'
 'cbccbbcbbbbbbbbb bbbbbbbbbbccbbbc bbbbbbbbcbbbbbbb cbbbcbbbbbbbbcbb'
 'cbcbbcbcbbcbcbcb cccccccccccccccc ccbbccbbccbbbbcb cccccccccccccccb bbbcbbcbbcbcbbbb'
 'cccccccccccccccc' 'cbccbccbccbcbccb bbbbbbbbbbbbbbbb bccbbcbccbcbbbbc'
 'bbbbbbbbbbbbbbbb cccccccccccccccc' 'bbcbbbbbbbbbbbbb bbbbbbbbbbbbbbbb'
 'bbbbbbbbbbbbbbbb bbcbbbbbbcbbbcbb' 'bbbbbbbbbbbbbbbb cbbbbcbccbcbbbcb'
 'bbbbbbbbbbbbbbbb' 'bbbbbbbbbbbbbbbb' 'bbbbbbbbbbbbbbbb'
 'cccbccbcbccbcccb bbbbbbbbbbbbbbbb bbbcbcbbbbcbbbcb bbbbcbbbbbbbbbbb'
 'bbbbbbbbbbbbbbbb' 'bbbbcbbbbcbbbcbb bbbbbbbbbbbbbbbb'
 'cccccccccccccccc cbcccccbcccbcccb'
 'cccccccccccccccc cbcbcbbbcbccbccc ccbccbccbbccccbc'
 'bcccccccccbccccc bbbbbbbbbbbbbbbb cccbcccccccccccc' 'cccccccccccccccc'
 'cbccbbcbccbbccbb cccccccccccccccc'
 'bccbbbcbcbcbbcbc bbbbbbbbbbbbbbbb cbbbbbbbbbbbbbbb'
 'cccccccccccccccc bbbbbbbbbbbbbbbb bbbbcbbbbbbcbcbb'
 'bcbcbccbcbbccbbb cccccccbccbccbbc cbbbbbbbbbbbbbbb cccc

In [None]:
# ChatGPT code 

class SEQL:
    def __init__(self, C=1.0, alpha=0.5, convergence_threshold=1e-5, max_iterations=1000):
        self.C = C
        self.alpha = alpha
        self.convergence_threshold = convergence_threshold
        self.max_iterations = max_iterations
        self.beta = None
        self.best_feature = None

    def _compute_regularization_gradient(self, beta_j):
        """Elastic-net gradient."""
        return self.alpha * np.sign(beta_j) + (1 - self.alpha) * beta_j

    def _compute_loss_gradient(self, X, y, beta):
        """Compute gradients for all features."""
        margins = y * (X @ beta)
        probabilities = 1 / (1 + np.exp(margins))
        gradients = X.T @ (y * probabilities) / len(y)
        return gradients

    def _prune(self, prefix_gradient, current_best):
        """Prune if the bound indicates no improvement."""
        return np.abs(prefix_gradient) < current_best

    def _expand_features(self, sequences, feature_map):
        """Iteratively expand features from unigrams."""
        expanded_features = set(feature_map.keys())
        for seq in sequences:
            for i in range(len(seq)):
                for j in range(i + 1, len(seq) + 1):
                    feature = seq[i:j]
                    if ' ' in feature:
                        continue  # Skip features containing spaces
                    if feature not in expanded_features:
                        feature_map[feature] = len(feature_map)
                        expanded_features.add(feature)
        return feature_map

    def fit(self, sequences, y):
        """
        Fit SEQL model.
        - sequences: List of input sequences.
        - y: Binary labels (-1, 1).
        """
        # Start with unigrams
        feature_map = {char: idx for idx, char in enumerate(set(''.join(sequences)))}
        X = np.zeros((len(sequences), len(feature_map)))

        # Build initial feature matrix
        for i, seq in enumerate(sequences):
            for feature in feature_map:
                X[i, feature_map[feature]] = seq.count(feature)

        num_features = X.shape[1]
        self.beta = np.zeros(num_features)

        for iteration in range(self.max_iterations):
            best_gradient = 0
            best_feature = None

            # Traverse features to find the best gradient
            for feature, idx in feature_map.items():
                gradient = np.sum(
                    y * X[:, idx] * (1 / (1 + np.exp(y * (X @ self.beta))))
                ) / len(y)
                gradient += self.C * self._compute_regularization_gradient(self.beta[idx])

                # Update best feature
                if np.abs(gradient) > best_gradient:
                    best_gradient = np.abs(gradient)
                    best_feature = feature

            # Stopping condition
            if best_gradient < self.convergence_threshold:
                print("Converged after", iteration, "iterations.")
                break

            # Update selected feature
            idx = feature_map[best_feature]
            step_size = best_gradient  # Simplified; refine with line search if needed
            self.beta[idx] -= step_size

            # Expand feature map iteratively
            feature_map = self._expand_features(sequences, feature_map)
            new_X = np.zeros((len(sequences), len(feature_map)))
            for i, seq in enumerate(sequences):
                for feature in feature_map:
                    new_X[i, feature_map[feature]] = seq.count(feature)
            X = new_X
            self.beta = np.zeros(len(feature_map))

        self.best_feature = best_feature

    def predict(self, X):
        """Predict using the learned model."""
        return np.sign(X @ self.beta)

# Example usage
# Sequences should exclude spaces during preprocessing.
sequences = ["abc", "ab", "bc"]  # Example sequences
y = np.array([1, -1, 1])            # Labels

seql = SEQL(C=1.0, alpha=0.5)
seql.fit(train_sax, train_labels)
print("Best feature:", seql.best_feature)


UFuncTypeError: ufunc 'multiply' did not contain a loop with signature matching types (dtype('<U9'), dtype('float64')) -> None