In [51]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import matplotlib.ticker as ticker
import seaborn as sns
import ast
import os
from sktime.transformations.series.detrend import Detrender
from sktime.transformations.series.detrend import Deseasonalizer
from statsmodels.tsa.stattools import adfuller, kpss

In [None]:
artists_dataset = pd.read_csv('dataset/tabular/artists.csv')
tracks_dataset = pd.read_csv('dataset/tabular/tracks.csv')

# Artists Dataset

Dropping rows with NaN and duplicated lines from artists dataset

In [None]:
# Drop the rows with missing values
artists_dataset = artists_dataset.dropna()
# Drop the duplicated rows
artists_dataset = artists_dataset.drop_duplicates()

In [None]:
#Drop all the artists with same name and same genres
artists_dataset = artists_dataset.sort_values('popularity', ascending=False).drop_duplicates(['name', 'genres'])

# Tracks Dataset

## Aggregated all rows with duplicated 'id' values into a single row, keeping the unique genres in a list. Then dropped the duplicated rows from the original dataset and added the aggregated rows.


In [None]:
merged_df = tracks_dataset.groupby('id')['genre'].agg(list)
df_merged = pd.merge(tracks_dataset, merged_df, on='id', how='left')

# Find the indices of the rows with the highest popularity within each group (ID)
indices_to_keep = df_merged.groupby('id')['popularity'].idxmax()
# Filter the dataframe to keep only the rows with the highest popularity within each group
tracks_dataset = df_merged.loc[indices_to_keep]
tracks_dataset.rename(columns={'genre_y': 'genre'}, inplace=True)
tracks_dataset = tracks_dataset.drop(columns=['genre_x'])
# Display the resulting dataframe
tracks_dataset

## Handling songs with duplicated names

In [None]:
# Drop all the duplicates rows with same 'name' and 'artists' values, and keep only the row with the highest popularity
tracks_dataset = tracks_dataset.sort_values('popularity', ascending=False).drop_duplicates(['name', 'artists'])
# Display the resulting dataframe
tracks_dataset

## Creating 3 new columns: 'release_year', 'release_month', 'release_day' from the 'release_date' column

In [None]:
# Extract year, month, and day from 'album_release_date'
# If 'album_release_date' is in YYYY-MM format, the day will be set as NaN

# First, ensure 'album_release_date' is a string to safely apply string operations
tracks_dataset['album_release_date'] = tracks_dataset['album_release_date'].astype(str)

# Split 'album_release_date' into year, month, and day
tracks_dataset['year'] = tracks_dataset['album_release_date'].apply(lambda x: x.split('-')[0])
tracks_dataset['month'] = tracks_dataset['album_release_date'].apply(lambda x: x.split('-')[1] if len(x.split('-')) > 1 else 'NaN')
tracks_dataset['day'] = tracks_dataset['album_release_date'].apply(lambda x: x.split('-')[2] if len(x.split('-')) > 2 else 'NaN')

# Display the first few rows to verify the new columns
tracks_dataset[['album_release_date', 'year', 'month', 'day']]

## Dropping columns track_number, disc_number, album_type, album_total_tracks 

In [None]:
# Drop the columns 'track_number', 'disc_number', 'album_type', 'album_total_tracks'
tracks_dataset = tracks_dataset.drop(columns=['track_number', 'disc_number', 'album_type', 'album_total_tracks', 'album_release_date_precision', 'album_release_date'])

In [None]:
# Export the cleaned datasets to CSV files
artists_dataset.to_csv('dataset/tabular/artists_cleaned.csv', index=False)
tracks_dataset.to_csv('dataset/tabular/tracks_cleaned.csv', index=False)

# Time Series Dataset

LOADING THE DATASET

In [48]:
def load_npy(filename):
    return np.load(filename)


dir_path = 'dataset/time_series/'
len_threshold = 1280
X, y, ids = [], [], []

for file in os.listdir(dir_path):
    if os.path.splitext(file)[1] != '.npy':
        continue

    split = file.split("_")
    ids.append(split[0])  # track_id
    y.append(split[1][:-4])  # genre
    ts = load_npy(dir_path + file)

    if len(ts) > len_threshold:
        ts = ts[0:len_threshold]
    else:
        # pad = [np.mean(ts[:-5])] * (len_threshold-len(ts)) # fill by mean value of last n observations
        pad = [ts[-1]] * (len_threshold - len(ts))  # fill with last observation
        ts = np.append(ts, pad)

    X.append([ts])

X, y, ids = np.array(X), np.array(y), np.array(ids)

In [None]:
print(X[0])
print(y[0])

In [None]:
X[0][-1]

In [None]:
ts = pd.Series(X[0,-1].T)
plt.plot(ts)
plt.show()

## Performing offset translation

In [49]:
for i in range(len(X)):
    X[i] = X[i] - X[i].mean() #Offset translation

## Performing amplitude scaling


In [50]:
for i in range(len(X)):
    X[i] = (X[i] - X[i].mean()) / X[i].std() #Amplitue scaling

## Noice removal

In [None]:
w = 3
for i in range(len(X)):
    ts = pd.Series(X[i,-1].T)
    ts.rolling(window=w).mean() # Noise (smoothing)

In [None]:
ts = pd.Series(X[0,-1].T)
w = 5
ts.rolling(window=w).mean().plot() # Noise (smoothing)

In [None]:
prog_house_indices = [i for i, genre in enumerate(y) if genre == 'progressive-house']
prog_house_series = [X[i] for i in prog_house_indices]

if prog_house_series:
    # Calculate the mean time series
    prog_house_series = np.array(prog_house_series)
    mean_series = np.mean(prog_house_series, axis=0)
else:
    mean_series = None
    print("No progressive-house genre found.")    

In [None]:
# Time series of 'progressive-house' genre with the trend
ts = pd.Series(mean_series[0].T)
plt.plot(ts)
plt.show()

In [None]:
# Time series of 'progressive-house' genre without the trend
t1 = pd.Series(mean_series[0].T)
t1_detrended = detrender.fit_transform(t1)
plt.plot(t1)
plt.plot(t1_detrended)
plt.show()

In [52]:
## Searching for trends in the time series
def test_stationarity(timeseries):
    print("Performing Augmented Dickey-Fuller test:")
    adf_test = adfuller(timeseries, autolag='AIC')
    print(f"ADF Statistic: {adf_test[0]}")
    print(f"p-value: {adf_test[1]}")

    print("\nPerforming KPSS test:")
    kpss_test = kpss(timeseries, regression='c', nlags='auto')
    print(f"KPSS Statistic: {kpss_test[0]}")
    print(f"p-value: {kpss_test[1]}")

# Apply the test to a random sample
sample_index = np.random.choice(len(X), replace=False)
test_stationarity(X[sample_index][0])  # Assuming X[sample_index] is a numpy array


Performing Augmented Dickey-Fuller test:
ADF Statistic: -4.730706966497688
p-value: 7.365812781901537e-05

Performing KPSS test:
KPSS Statistic: 0.4223443850625584
p-value: 0.0675239719557938
