In [None]:
import clip
import torch
import numpy
import pandas
import PIL.Image
from umap import UMAP
from tqdm import tqdm
from src import config
from sklearn.manifold import TSNE


def calculate_clip_embeddings(dataset):

    model, preprocess = clip.load("ViT-B/32", device="cpu")
    clip_embeddings = []
    for image_name in tqdm(dataset["image_path"]):
        image = PIL.Image.open(image_name)
        image_input = preprocess(image).unsqueeze(0).to("cpu")
        with torch.no_grad():
            clip_embeddings.append(model.encode_image(image_input).cpu().numpy())
    clip_embeddings = numpy.concatenate(clip_embeddings, axis=0)

    return clip_embeddings


def calculate_umap(clip_embeddings):

    umap_embeddings = UMAP(metric="cosine", n_components=2).fit_transform(
        clip_embeddings
    )
    umap_x, umap_y = umap_embeddings[:, 0], umap_embeddings[:, 1]

    return umap_x, umap_y


def calculate_tsne(clip_embeddings):

    tsne_embeddings = TSNE(metric="cosine", n_components=2).fit_transform(
        clip_embeddings
    )
    tsne_x, tsne_y = tsne_embeddings[:, 0], tsne_embeddings[:, 1]

    return tsne_x, tsne_y


def generate_projection_data():

    dataset = pandas.read_csv(config.DATASET_PATH)
    dataset_sample = (
        dataset.sample(n=config.DATASET_SAMPLE_SIZE, random_state=1)
        if config.DATASET_SAMPLE_SIZE
        else dataset
    )
    print("Calculating clip embeddings")
    clip_embeddings = calculate_clip_embeddings(dataset_sample)
    umap_x, umap_y = calculate_umap(clip_embeddings)
    print("Calculating umap")
    tsne_x, tsne_y = calculate_tsne(clip_embeddings)
    print("Calculating tsne")
    augmented_dataset = dataset_sample.assign(
        umap_x=umap_x, umap_y=umap_y, tsne_x=tsne_x, tsne_y=tsne_y
    )
    augmented_dataset.to_csv(config.AUGMENTED_DATASET_PATH, index=False)
    print("Saving augmented dataset to", config.AUGMENTED_DATASET_PATH)


if __name__ == "__main__":
    generate_projection_data()


In [4]:
import numpy as np
import matplotlib.pyplot as plt
import IPython.display as ipd
import librosa
import librosa.display
import os
import json
from src import config
from scipy.special import softmax

ModuleNotFoundError: No module named 'src'

In [3]:
# class that uses the librosa library to analyze the key that an mp3 is in
# arguments:
#     waveform: an mp3 file loaded by librosa, ideally separated out from any percussive sources
#     sr: sampling rate of the mp3, which can be obtained when the file is read with librosa
#     tstart and tend: the range in seconds of the file to be analyzed; default to the beginning and end of file if not specified
class Tonal_Fragment(object):
    def __init__(self, waveform, sr, tstart=None, tend=None):
        self.waveform = waveform
        self.sr = sr
        self.tstart = tstart
        self.tend = tend
        
        if self.tstart is not None:
            self.tstart = librosa.time_to_samples(self.tstart, sr=self.sr)
        if self.tend is not None:
            self.tend = librosa.time_to_samples(self.tend, sr=self.sr)
        self.y_segment = self.waveform[self.tstart:self.tend]
        self.chromograph = librosa.feature.chroma_cqt(y=self.y_segment, sr=self.sr, bins_per_octave=24)
        
        # chroma_vals is the amount of each pitch class present in this time interval
        self.chroma_vals = []
        for i in range(12):
            self.chroma_vals.append(np.sum(self.chromograph[i]))
        pitches = ['C','C#','D','D#','E','F','F#','G','G#','A','A#','B']
        # dictionary relating pitch names to the associated intensity in the song
        self.keyfreqs = {pitches[i]: self.chroma_vals[i] for i in range(12)} 
        
        keys = [pitches[i] + ' major' for i in range(12)] + [pitches[i] + ' minor' for i in range(12)]

        # use of the Krumhansl-Schmuckler key-finding algorithm, which compares the chroma
        # data above to typical profiles of major and minor keys:
        maj_profile = [6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88]
        min_profile = [6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17]

        # finds correlations between the amount of each pitch class in the time interval and the above profiles,
        # starting on each of the 12 pitches. then creates dict of the musical keys (major/minor) to the correlation
        self.min_key_corrs = []
        self.maj_key_corrs = []
        for i in range(12):
            key_test = [self.keyfreqs.get(pitches[(i + m)%12]) for m in range(12)]
            # correlation coefficients (strengths of correlation for each key)
            self.maj_key_corrs.append(round(np.corrcoef(maj_profile, key_test)[1,0], 3))
            self.min_key_corrs.append(round(np.corrcoef(min_profile, key_test)[1,0], 3))

        # names of all major and minor keys
        self.key_dict = {**{keys[i]: self.maj_key_corrs[i] for i in range(12)}, 
                         **{keys[i+12]: self.min_key_corrs[i] for i in range(12)}}
        
        # this attribute represents the key determined by the algorithm
        self.key = max(self.key_dict, key=self.key_dict.get)
        self.bestcorr = max(self.key_dict.values())
        
        # this attribute represents the second-best key determined by the algorithm,
        # if the correlation is close to that of the actual key determined
        self.altkey = None
        self.altbestcorr = None

        for key, corr in self.key_dict.items():
            if corr > self.bestcorr*0.9 and corr != self.bestcorr:
                self.altkey = key
                self.altbestcorr = corr
                
    # prints the relative prominence of each pitch class            
    def print_chroma(self):
        self.chroma_max = max(self.chroma_vals)
        for key, chrom in self.keyfreqs.items():
            print(key, '\t', f'{chrom/self.chroma_max:5.3f}')
                
    # prints the correlation coefficients associated with each major/minor key
    def corr_table(self):
        for key, corr in self.key_dict.items():
            print(key, '\t', f'{corr:6.3f}')
    
    # printout of the key determined by the algorithm; if another key is close, that key is mentioned
    def print_key(self):
        print("likely key: ", max(self.key_dict, key=self.key_dict.get), ", correlation: ", self.bestcorr, sep='')
        if self.altkey is not None:
                print("also possible: ", self.altkey, ", correlation: ", self.altbestcorr, sep='')
    
    # prints a chromagram of the file, showing the intensity of each pitch class over time
    def chromagram(self, title=None):
        C = librosa.feature.chroma_cqt(y=self.waveform, sr=sr, bins_per_octave=24)
        plt.figure(figsize=(12,4))
        librosa.display.specshow(C, sr=sr, x_axis='time', y_axis='chroma', vmin=0, vmax=1)
        if title is None:
            plt.title('Chromagram')
        else:
            plt.title(title)
        plt.colorbar()
        plt.tight_layout()

        plt.show()

In [12]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display
import os
import config
# class that uses the librosa library to analyze the key that an mp3 is in
# arguments:
#     waveform: an mp3 file loaded by librosa, ideally separated out from any percussive sources
#     sr: sampling rate of the mp3, which can be obtained when the file is read with librosa
#     tstart and tend: the range in seconds of the file to be analyzed; default to the beginning and end of file if not specified
class Tonal_Fragment(object):
    def __init__(self, waveform, sr, tstart=None, tend=None):
        self.waveform = waveform
        self.sr = sr
        self.tstart = tstart
        self.tend = tend
        
        if self.tstart is not None:
            self.tstart = librosa.time_to_samples(self.tstart, sr=self.sr)
        if self.tend is not None:
            self.tend = librosa.time_to_samples(self.tend, sr=self.sr)
        self.y_segment = self.waveform[self.tstart:self.tend]
        self.chromograph = librosa.feature.chroma_cqt(y=self.y_segment, sr=self.sr, bins_per_octave=24)
        
        # chroma_vals is the amount of each pitch class present in this time interval
        self.chroma_vals = []
        for i in range(12):
            self.chroma_vals.append(np.sum(self.chromograph[i]))
        pitches = ['C','C#','D','D#','E','F','F#','G','G#','A','A#','B']
        # dictionary relating pitch names to the associated intensity in the song
        self.keyfreqs = {pitches[i]: self.chroma_vals[i] for i in range(12)} 
        
        keys = [pitches[i] + ' major' for i in range(12)] + [pitches[i] + ' minor' for i in range(12)]

        # use of the Krumhansl-Schmuckler key-finding algorithm, which compares the chroma
        # data above to typical profiles of major and minor keys:
        maj_profile = [6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88]
        min_profile = [6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17]

        # finds correlations between the amount of each pitch class in the time interval and the above profiles,
        # starting on each of the 12 pitches. then creates dict of the musical keys (major/minor) to the correlation
        self.min_key_corrs = []
        self.maj_key_corrs = []
        for i in range(12):
            key_test = [self.keyfreqs.get(pitches[(i + m)%12]) for m in range(12)]
            # correlation coefficients (strengths of correlation for each key)
            self.maj_key_corrs.append(round(np.corrcoef(maj_profile, key_test)[1,0], 3))
            self.min_key_corrs.append(round(np.corrcoef(min_profile, key_test)[1,0], 3))

        # names of all major and minor keys
        self.key_dict = {**{keys[i]: self.maj_key_corrs[i] for i in range(12)}, 
                         **{keys[i+12]: self.min_key_corrs[i] for i in range(12)}}
        
        # this attribute represents the key determined by the algorithm
        self.key = max(self.key_dict, key=self.key_dict.get)
        self.bestcorr = max(self.key_dict.values())
        
        # this attribute represents the second-best key determined by the algorithm,
        # if the correlation is close to that of the actual key determined
        self.altkey = None
        self.altbestcorr = None

        for key, corr in self.key_dict.items():
            if corr > self.bestcorr*0.9 and corr != self.bestcorr:
                self.altkey = key
                self.altbestcorr = corr
                
    # prints the relative prominence of each pitch class            
    def print_chroma(self):
        self.chroma_max = max(self.chroma_vals)
        for key, chrom in self.keyfreqs.items():
            print(key, '\t', f'{chrom/self.chroma_max:5.3f}')
                
    # prints the correlation coefficients associated with each major/minor key
    def corr_table(self):
        for key, corr in self.key_dict.items():
            print(key, '\t', f'{corr:6.3f}')
    
    # printout of the key determined by the algorithm; if another key is close, that key is mentioned
    def print_key(self):
        print("likely key: ", max(self.key_dict, key=self.key_dict.get), ", correlation: ", self.bestcorr, sep='')
        if self.altkey is not None:
                print("also possible: ", self.altkey, ", correlation: ", self.altbestcorr, sep='')
    
    # prints a chromagram of the file, showing the intensity of each pitch class over time
    def chromagram(self, title=None):
        C = librosa.feature.chroma_cqt(y=self.waveform, sr=sr, bins_per_octave=24)
        plt.figure(figsize=(12,4))
        librosa.display.specshow(C, sr=sr, x_axis='time', y_axis='chroma', vmin=0, vmax=1)
        if title is None:
            plt.title('Chromagram')
        else:
            plt.title(title)
        plt.colorbar()
        plt.tight_layout()
        plt.show()

top_3_keys = {}
print(config.GTZAN_DIR)

for root, dirs, files in os.walk(config.GTZAN_GENRES_DIR):
    print("entering here")
    for file in files:
        if file.endswith('.wav'):
            audio_path = os.path.join(root, file)
            try:
                y, sr = librosa.load(audio_path)
                y_harmonic, y_percussive = librosa.effects.hpss(y)
                tf = Tonal_Fragment(y_harmonic, sr)  # Assuming Tonal_Fragment and its properties are defined correctly elsewhere
                
                # Calculate total frequency for probability calculation
                # Original correlation values
                correlation_values = np.array(list(tf.key_dict.values()))

                # # Normalize correlation values to be positive
                abs_correlations = np.abs(correlation_values)
                #exp_correlations = np.exp(correlation_values)

                #probabilities = softmax(correlation_values)
                
                # Convert adjusted values to probabilities
                probabilities = abs_correlations / np.sum(abs_correlations)
                #probabilities = exp_correlations / np.sum(exp_correlations)

                # convert to percentage
                probabilities = probabilities * 100

                # Print the probabilities
                #print(probabilities)
                prob = sorted(probabilities, reverse=True)
                
                # Get top keys sorted by their frequencies
                top_keys = sorted(tf.key_dict, key=tf.key_dict.get, reverse=True)[:3]
                
                # Create a nested dictionary for each file
                file_dict = {}
                for i in range(3):
                    # key_with_freq = f"{key}_{tf.key_dict[key]}"
                    # print(key_with_freq)
                    # probability = tf.key_dict[key] / total_freq
                    # file_dict[key_with_freq] = probability
                    # print(key)
                    #print(tf.key_dict[top_keys[i]])
                    # probability = tf.key_dict[key] / total_freq
                    print(top_keys[i], round(prob[i],2))
                    file_dict[top_keys[i]] = round(prob[i],2)
                
                top_3_keys[file] = file_dict
                
            except Exception as e:
                print(f"Error processing {audio_path}: {e}")
                continue

print(top_3_keys)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
/Users/datoapanta/Desktop/mma_2024/src/dataset/data/gtzan
{}


In [14]:
os.walk(config.GTZAN_GENRES_DIR)

<generator object walk at 0x7fbc78638f90>

In [15]:
for root, dirs, files in os.walk(config.GTZAN_GENRES_DIR):
    print("entering here")

In [16]:
import os

if os.path.isdir(config.GTZAN_GENRES_DIR):
    print(f"{config.GTZAN_GENRES_DIR} exists and is a directory.")
else:
    print(f"{config.GTZAN_GENRES_DIR} does not exist or is not a directory.")

/Users/datoapanta/Desktop/mma_2024/src/dataset/data/gtzan/genres does not exist or is not a directory.


In [18]:
print(config.DATA_DIR)
print(config.DATASET_DIR)

/Users/datoapanta/Desktop/mma_2024/src/dataset/data
/Users/datoapanta/Desktop/mma_2024/src/dataset


In [21]:
from pathlib import Path
print(Path(__file__).parent.parent.parent)

NameError: name '__file__' is not defined

In [36]:
import numpy as np
import librosa
import os

class Tonal_Fragment:
    def __init__(self, waveform, sr, tstart=None, tend=None):
        self.waveform = waveform
        self.sr = sr
        self.tstart = librosa.time_to_samples(tstart, sr=sr) if tstart is not None else None
        self.tend = librosa.time_to_samples(tend, sr=sr) if tend is not None else None
        self.y_segment = self.waveform[self.tstart:self.tend]
        self.chromograph = librosa.feature.chroma_cqt(y=self.y_segment, sr=self.sr, bins_per_octave=24)
        
        self.chroma_vals = [np.sum(self.chromograph[i]) for i in range(12)]
        pitches = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
        self.keyfreqs = {pitches[i]: self.chroma_vals[i] for i in range(12)}
        
        keys = [pitches[i] + ' major' for i in range(12)] + [pitches[i] + ' minor' for i in range(12)]
        
        maj_profile = [6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88]
        min_profile = [6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17]
        
        self.maj_key_corrs = [round(np.corrcoef(maj_profile, [self.keyfreqs[pitches[(i + m) % 12]] for m in range(12)])[1, 0], 3) for i in range(12)]
        self.min_key_corrs = [round(np.corrcoef(min_profile, [self.keyfreqs[pitches[(i + m) % 12]] for m in range(12)])[1, 0], 3) for i in range(12)]
        
        self.key_dict = {**{keys[i]: self.maj_key_corrs[i] for i in range(12)},
                         **{keys[i + 12]: self.min_key_corrs[i] for i in range(12)}}
        
        self.key = max(self.key_dict, key=self.key_dict.get)
        self.bestcorr = max(self.key_dict.values())
        
        self.altkey = None
        self.altbestcorr = None
        for key, corr in self.key_dict.items():
            if corr > self.bestcorr * 0.9 and corr != self.bestcorr:
                self.altkey = key
                self.altbestcorr = corr

def extract_keys(data):
    top_3_keys = {}

    for index, row in data.iterrows():
        audio_path = row['file_path']
        try:
            y, sr = librosa.load(audio_path)
            y_harmonic, _ = librosa.effects.hpss(y)
            tf = Tonal_Fragment(y_harmonic, sr)
            
            correlation_values = np.array(list(tf.key_dict.values()))
            abs_correlations = np.abs(correlation_values)
            probabilities = abs_correlations / np.sum(abs_correlations)
            probabilities = probabilities * 100
            prob = sorted(probabilities, reverse=True)
            
            top_keys = sorted(tf.key_dict, key=tf.key_dict.get, reverse=True)[:3]
            file_dict = {top_keys[i]: round(prob[i], 2) for i in range(3)}
            
            top_3_keys[row['filename']] = file_dict
            
        except Exception as e:
            print(f"Error processing {audio_path}: {e}")
            continue
    
    data['keys'] = data['filename'].map(lambda x: top_3_keys.get(x, {}))
    return data


In [37]:
import pandas as pd
columns_to_keep = ['filename', 'label', 'tempo']
data = pd.read_csv("/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/features_30_sec.csv", usecols=columns_to_keep) 

file_paths = {}
# save in a new column the filepath from each song
for root, dirs, files in os.walk("/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/genres"):
    for file in files:
        if file.endswith('.wav'):
            audio_path = os.path.join(root, file)
            file_paths[file] = audio_path

data['file_path'] = data['filename'].map(file_paths)

# reorder columns
data = data[['filename', 'label', 'file_path', 'tempo']]
data = extract_keys(data)

  y, sr = librosa.load(audio_path)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Error processing /Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/genres/jazz/jazz.00054.wav: 


In [38]:
data.to_csv("/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/metadata.csv", index=False)

In [51]:
import numpy as np
# Function to calculate loudness
def calculate_loudness(file_path):
    try:
        y, sr = librosa.load(file_path)
        S = librosa.feature.melspectrogram(y=y, sr=sr)
        S_dB = librosa.power_to_db(S, ref=np.max)
        loudness = S_dB.mean()
        rounded_loudness = round(float(loudness), 2)
        # print(f"Loudness of {file_path} is {rounded_loudness} dB")
        return rounded_loudness
    except Exception as e:
        return None

# Calculate loudness for each file and add it to the DataFrame
data['loudness'] = data['file_path'].apply(calculate_loudness)

  y, sr = librosa.load(file_path)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


In [52]:
data.to_csv("/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/metadata_.csv", index=False)

Song recognition failed for pop.00072.wav.
Song recognition failed for pop.00048.wav.
Song recognition failed for pop.00006.wav.
Song recognition failed for pop.00014.wav.
Song recognition failed for pop.00017.wav.
Song recognition failed for metal.00009.wav.
Song recognition failed for metal.00020.wav.
Song recognition failed for metal.00085.wav.
Song recognition failed for metal.00074.wav.
Song recognition failed for metal.00011.wav.
Song recognition failed for metal.00039.wav.
Song recognition failed for disco.00085.wav.
Song recognition failed for disco.00084.wav.
Song recognition failed for disco.00069.wav.
Song recognition failed for disco.00080.wav.
Song recognition failed for disco.00081.wav.
Song recognition failed for disco.00030.wav.
Song recognition failed for disco.00010.wav.
Song recognition failed for disco.00004.wav.
Song recognition failed for disco.00012.wav.
Song recognition failed for disco.00006.wav.
Song recognition failed for disco.00071.wav.
Song recognition fai

In [70]:
import os
import requests
import json
import pandas as pd
from PIL import Image
from io import BytesIO
import base64
import time

# Last.fm credentials
lastfm_api_key = '6f9bf6923b90e8c77408f7d6e20f5292'

# Spotify credentials (You need to set up a Spotify Developer account and get the API credentials)
spotify_client_id = '667b4c424f0041489f7327d660e49dc9'
spotify_client_secret = '41760d74bc3d48cebb520e904ed9df3c'

KNOWN_PLACEHOLDER_URLS = [
    'https://lastfm.freetls.fastly.net/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png'
]

def get_spotify_token():
    url = 'https://accounts.spotify.com/api/token'
    headers = {
        'Authorization': 'Basic ' + base64.b64encode(f'{spotify_client_id}:{spotify_client_secret}'.encode()).decode('utf-8')
    }
    data = {
        'grant_type': 'client_credentials'
    }
    response = requests.post(url, headers=headers, data=data)
    token_info = response.json()
    return token_info['access_token']

def is_placeholder_image(image):
    """Check if the image is a placeholder (generic) image."""
    placeholder_color = (229, 229, 229)  # The specific color of the star image
    white_color = (255, 255, 255)
    
    # Check several key points for the placeholder image pattern
    points_to_check = [
        (0, 0),
        (100, 100),
        (199, 199),
        (0, 199),
        (199, 0),
        (50, 50),
        (150, 150)
    ]
    
    for point in points_to_check:
        if image.getpixel(point) != placeholder_color and image.getpixel(point) != white_color:
            return False
    return True

def is_placeholder_url(url):
    """Check if the URL is a known placeholder image URL."""
    return url in KNOWN_PLACEHOLDER_URLS

def fetch_cover_image_lastfm(title, artist):
    url = f"http://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={lastfm_api_key}&artist={artist}&track={title}&format=json"
    response = requests.get(url)
    try:
        data = response.json()
        if 'track' in data and 'album' in data['track'] and 'image' in data['track']['album']:
            cover_url = data['track']['album']['image'][-1]['#text']
            print(f"Fetched URL from Last.fm: {cover_url}")
            if not is_placeholder_url(cover_url) and is_valid_image_url(cover_url):
                return cover_url.replace('64x64', '200x200')
            else:
                return fetch_album_image_lastfm(artist)
        else:
            return fetch_album_image_lastfm(artist)
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error fetching cover image from Last.fm for '{title}' by '{artist}': {e}")
        return fetch_album_image_lastfm(artist)

def fetch_album_image_lastfm(artist):
    url = f"http://ws.audioscrobbler.com/2.0/?method=artist.getInfo&api_key={lastfm_api_key}&artist={artist}&format=json"
    response = requests.get(url)
    try:
        data = response.json()
        if 'artist' in data and 'image' in data['artist']:
            cover_url = data['artist']['image'][-1]['#text']
            print(f"Fetched album URL from Last.fm: {cover_url}")
            if not is_placeholder_url(cover_url) and is_valid_image_url(cover_url):
                return cover_url.replace('64x64', '200x200')
        return None
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error fetching artist image from Last.fm for '{artist}': {e}")
        return None

def fetch_cover_image_spotify(title, artist, token):
    url = f"https://api.spotify.com/v1/search?q=track:{title}%20artist:{artist}&type=track&limit=1"
    headers = {
        'Authorization': f'Bearer {token}'
    }
    response = requests.get(url, headers=headers)
    try:
        data = response.json()
        if data['tracks']['items']:
            cover_url = data['tracks']['items'][0]['album']['images'][0]['url']
            print(f"Fetched URL from Spotify: {cover_url}")
            if is_valid_image_url(cover_url):
                return cover_url
            else:
                return fetch_album_image_spotify(artist, token)
        else:
            return fetch_album_image_spotify(artist, token)
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error fetching cover image from Spotify for '{title}' by '{artist}': {e}")
        return fetch_album_image_spotify(artist, token)

def fetch_album_image_spotify(artist, token):
    url = f"https://api.spotify.com/v1/search?q=artist:{artist}&type=artist&limit=1"
    headers = {
        'Authorization': f'Bearer {token}'
    }
    response = requests.get(url, headers=headers)
    try:
        data = response.json()
        if data['artists']['items'] and 'images' in data['artists']['items'][0]:
            cover_url = data['artists']['items'][0]['images'][0]['url']
            print(f"Fetched album URL from Spotify: {cover_url}")
            if is_valid_image_url(cover_url):
                return cover_url
        return None
    except (json.JSONDecodeError, KeyError, IndexError) as e:
        print(f"Error fetching artist image from Spotify for '{artist}': {e}")
        return None

def is_valid_image_url(url):
    """Check if the image URL is valid and not a placeholder."""
    if not url:
        print("Empty URL provided.")
        return False
    response = requests.get(url)
    image = Image.open(BytesIO(response.content))
    if image.mode == 'RGBA':
        image = image.convert('RGB')
    if is_placeholder_image(image):
        print("Placeholder image detected.")
        return False
    return True

def download_and_resize_cover_image(url, save_path):
    if not url:
        print(f"Invalid URL: {url}")
        return
    response = requests.get(url)
    image = Image.open(BytesIO(response.content))
    if image.mode == 'RGBA':
        image = image.convert('RGB')
    if is_placeholder_image(image):
        print(f"Placeholder image detected and skipped: {url}")
        return
    image = image.resize((200, 200))
    image.save(save_path)

def process_error_files(error_files, base_directory):
    token = get_spotify_token()
    for error_file in error_files:
        genre = os.path.basename(error_file).replace('_errors.csv', '')
        error_df = pd.read_csv(error_file)
        remaining_errors = []
        
        for _, row in error_df.iterrows():
            filename, title, artist = row['filename'], row['title'], row['artist']
            cover_url = fetch_cover_image_lastfm(title, artist)
            if not cover_url:
                cover_url = fetch_cover_image_spotify(title, artist, token)
                if not cover_url:
                    print(f"Cover image not found for '{title}' by '{artist}' from both Last.fm and Spotify. Retrying after delay.")
                    time.sleep(1)  # Delay before retrying
                    cover_url = fetch_cover_image_spotify(title, artist, token)

            if cover_url:
                try:
                    genre_dir = os.path.join(base_directory, 'images', genre)
                    os.makedirs(genre_dir, exist_ok=True)
                    save_path = os.path.join(genre_dir, f"{os.path.splitext(filename)[0]}_cover.jpg")
                    download_and_resize_cover_image(cover_url, save_path)
                    print(f"Cover image downloaded and resized for '{title}' by '{artist}' as '{save_path}'")
                except Exception as e:
                    print(f"Error downloading or resizing cover image for '{title}' by '{artist}': {e}")
                    remaining_errors.append((filename, title, artist))
            else:
                print(f"Cover image not found for '{title}' by '{artist}' from both Last.fm and Spotify.")
                remaining_errors.append((filename, title, artist))
        
        # Save remaining errors back to the CSV file
        if remaining_errors:
            remaining_error_df = pd.DataFrame(remaining_errors, columns=['filename', 'title', 'artist'])
            remaining_error_df.to_csv(error_file, index=False)
        else:
            os.remove(error_file)  # Remove the error file if all errors are resolved

if __name__ == "__main__":
    base_directory = '/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan'
    error_files = [
        os.path.join(base_directory, 'errors', 'jazz_errors.csv'),
        os.path.join(base_directory, 'errors', 'country_errors.csv'),
        os.path.join(base_directory, 'errors', 'hiphop_errors.csv'),
        os.path.join(base_directory, 'errors', 'rock_errors.csv'),
        os.path.join(base_directory, 'errors', 'classical_errors.csv'),
        os.path.join(base_directory, 'errors', 'reggae_errors.csv'),
        os.path.join(base_directory, 'errors', 'disco_errors.csv'),
        os.path.join(base_directory, 'errors', 'metal_errors.csv'),
        os.path.join(base_directory, 'errors', 'pop_errors.csv')
    ]
    process_error_files(error_files, base_directory)
    print("Processing of error files completed.")


Fetched album URL from Last.fm: https://lastfm.freetls.fastly.net/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png
Fetched album URL from Spotify: https://i.scdn.co/image/ab67616d0000b273c1fab9d112967a4d8957985e
Cover image downloaded and resized for 'Citizen Tain (Album Version)' by 'Branford Marsalis Trio' as '/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/images/jazz/jazz.00029_cover.jpg'
Fetched URL from Last.fm: https://lastfm.freetls.fastly.net/i/u/300x300/7a533daa11f24339c350f901de3c9d76.png
Cover image downloaded and resized for 'Sweet Lorraine (1991 Remastered)' by 'Henry "Red" Allen' as '/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/images/jazz/jazz.00070_cover.jpg'
Fetched album URL from Last.fm: https://lastfm.freetls.fastly.net/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png
Fetched album URL from Spotify: https://i.scdn.co/image/ab6761610000e5eb60531b02c778461b3048845b
Cover image downloaded and resized for 'For Adults Only (feat. Don Braden & Bruce Cox)' 

In [87]:
import os
import requests
from PIL import Image
from io import BytesIO

def download_image(image_url):
    """
    Download an image from a URL and return it as a PIL Image object.

    Parameters:
    - image_url (str): The URL of the image to download.

    Returns:
    - Image: The downloaded image as a PIL Image object.
    """
    response = requests.get(image_url)
    img = Image.open(BytesIO(response.content))
    return img

def convert_to_jpg_and_resize(image, save_path, size=(200, 200)):
    """
    Convert an image to JPG format, resize it, and save it.

    Parameters:
    - image (Image): The image to convert and resize.
    - save_path (str): The path to save the converted JPG file.
    - size (tuple): The desired size for the output image, default is (200, 200).
    """
    # Convert the image to RGB mode if it's not already
    if image.mode != 'RGB':
        image = image.convert('RGB')
    # Resize the image
    image = image.resize(size)
    # Save the image in JPG format
    image.save(save_path, 'JPEG')

if __name__ == "__main__":
    # Example usage
    image_url = "https://geo-media.beatport.com/image_size/500x500/2e97877a-1133-480c-b478-fcafcedb7551.jpg"

    save_path = "/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/images_missing/rock.00023_cover.jpg"
    
    # Ensure the save directory exists
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    
    # Download and save the image
    img = download_image(image_url)
    convert_to_jpg_and_resize(img, save_path)
    print(f"Image saved as {save_path}")


Image saved as /Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/images_missing/rock.00023_cover.jpg


In [90]:
title_artist = pd.read_csv("/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/title_artist.csv")

# Rename the column 'label' to 'genre'
title_artist.rename(columns={'label': 'genre'}, inplace=True)

# Count the number of distinct values per genre
genre_counts = title_artist['genre'].value_counts()

# Display the result
print(genre_counts)

genre
rock         100
blues         98
classical     97
jazz          97
pop           95
reggae        95
metal         94
hiphop        93
disco         89
country       79
Name: count, dtype: int64


In [93]:
import pandas as pd

# Load the CSV files
metadata_df = pd.read_csv('/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/metadata_.csv')
title_artist_df = pd.read_csv('/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/title_artist.csv')

# Rename columns for consistency
metadata_df.rename(columns={'label': 'genre', 'file_path': 'filepath'}, inplace=True)
title_artist_df.rename(columns={'label': 'genre'}, inplace=True)

# Merge the DataFrames on the 'filename' column
merged_df = pd.merge(metadata_df, title_artist_df[['filename', 'title', 'artist']], on='filename', how='left')

# Reorder the columns
expected_columns = ['filename', 'genre', 'title', 'artist', 'tempo', 'keys', 'loudness', 'filepath']
available_columns = [col for col in expected_columns if col in merged_df.columns]
merged_df = merged_df[available_columns]

# Save the merged DataFrame to a new CSV file
merged_df.to_csv('/Users/datoapanta/Desktop/mma_2024/dataset/data/gtzan/metadata.csv', index=False)

# Display the first few rows of the resulting DataFrame
print(merged_df.head())


          filename  genre                              title           artist  \
0  blues.00000.wav  blues  One Bourbon, One Scotch, One Beer  John Lee Hooker   
1  blues.00001.wav  blues                    I'm In The Mood  John Lee Hooker   
2  blues.00002.wav  blues          Think Twice Before You Go  John Lee Hooker   
3  blues.00003.wav  blues           I'm Bad Like Jesse James  John Lee Hooker   
4  blues.00004.wav  blues                 Walkin' The Boogie  John Lee Hooker   

        tempo                                               keys  loudness  \
0  123.046875  {'G major': 11.35, 'G minor': 9.18, 'C major':...    -44.27   
1   67.999589  {'G major': 9.67, 'G minor': 9.22, 'C major': ...    -50.30   
2  161.499023  {'E minor': 10.58, 'E major': 9.3, 'A minor': ...    -40.07   
3   63.024009  {'E minor': 9.77, 'E major': 8.45, 'B major': ...    -53.16   
4  135.999178  {'A# major': 10.19, 'A# minor': 10.09, 'D mino...    -46.26   

                                            