Import data

In [5]:
import requests
import csv

In [6]:
URL = 'https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2020/2020-01-21/spotify_songs.csv'
response = requests.get(URL)
response.raise_for_status()

columns = ['track_id', 'track_name', 'track_artist', 'playlist_genre', 'playlist_subgenre', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms']

data = response.text.splitlines()
data = csv.DictReader(data)
data = [{k: v for k, v in row.items() if k in columns} for row in data]
#data[:5]

Data Preprocessing

In [7]:
# create a one hot encoding for the playlist_genre and playlist_subgenre columns
playlist_genre = set(row['playlist_genre'] for row in data)
playlist_subgenre = set(row['playlist_subgenre'] for row in data)

for row in data:
    row['label'] = {}

for genre in playlist_genre:
    for subgenre in playlist_subgenre:
        column_name = f'{genre}_{subgenre}'
        for row in data:
            row['label'][column_name] = int(row['playlist_genre'] == genre and row['playlist_subgenre'] == subgenre)

# remove the original playlist_genre and playlist_subgenre columns
for row in data:
    del row['playlist_genre']
    del row['playlist_subgenre']

# remove any columns that are all zeros
columns_to_remove = set()
for column in data[0]['label']:
    if all(row['label'][column] == 0 for row in data):
        columns_to_remove.add(column)

for row in data:
    for column in columns_to_remove:
        del row['label'][column]

print(data[0])
print(f'Number of subgenres: {len(data[0]["label"])}')

{'track_id': '6f807x0ima9a1j3VPbc7VN', 'track_name': "I Don't Care (with Justin Bieber) - Loud Luxury Remix", 'track_artist': 'Ed Sheeran', 'danceability': '0.748', 'energy': '0.916', 'key': '6', 'loudness': '-2.634', 'mode': '1', 'speechiness': '0.0583', 'acousticness': '0.102', 'instrumentalness': '0', 'liveness': '0.0653', 'valence': '0.518', 'tempo': '122.036', 'duration_ms': '194754', 'label': {'r&b_neo soul': 0, 'r&b_urban contemporary': 0, 'r&b_hip pop': 0, 'r&b_new jack swing': 0, 'latin_latin hip hop': 0, 'latin_reggaeton': 0, 'latin_tropical': 0, 'latin_latin pop': 0, 'edm_progressive electro house': 0, 'edm_electro house': 0, 'edm_big room': 0, 'edm_pop edm': 0, 'pop_indie poptimism': 0, 'pop_post-teen pop': 0, 'pop_dance pop': 1, 'pop_electropop': 0, 'rock_album rock': 0, 'rock_classic rock': 0, 'rock_permanent wave': 0, 'rock_hard rock': 0, 'rap_trap': 0, 'rap_hip hop': 0, 'rap_southern hip hop': 0, 'rap_gangster rap': 0}}
Number of subgenres: 24


In [8]:
# create a one hot encoding for the key and mode columns
key = set(row['key'] for row in data)
mode = set(row['mode'] for row in data)

for k in key:
    column_name = f'key_{k}'
    for row in data:
        row[column_name] = int(row['key'] == k)

for m in mode:
    column_name = f'mode_{m}'
    for row in data:
        row[column_name] = int(row['mode'] == m)

# remove the original key and mode columns
for row in data:
    del row['key']
    del row['mode']

Feature engineering: normalization of continuous properties of songs

In [9]:
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import pandas as pd
import numpy as np

# Convert all continuous features to numeric
for track in data:
    track['danceability'] = float(track['danceability'])
    track['energy'] = float(track['energy'])
    track['loudness'] = float(track['loudness'])
    track['speechiness'] = float(track['speechiness'])
    track['acousticness'] = float(track['acousticness'])
    track['instrumentalness'] = float(track['instrumentalness'])
    track['liveness'] = float(track['liveness'])
    track['valence'] = float(track['valence'])
    track['tempo'] = float(track['tempo'])
    track['duration_ms'] = int(track['duration_ms'])

# List of continuous features in our dataset
continuous_features = [
    'danceability', 'energy', 'loudness', 'speechiness', 'acousticness',
    'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms'
]

# --- Feature Engineering ---
for track in data:
    # Basic Interactions
    track['energy_dance'] = track['energy'] * track['danceability']
    track['tempo_energy'] = track['tempo'] * track['energy']
    track['acoustic_instrument'] = track['acousticness'] * track['instrumentalness']
    track['valence_energy'] = track['valence'] * track['energy']
    
    # Time-based features
    track['duration_min'] = track['duration_ms'] / 60000
    track['loudness_per_min'] = track['loudness'] / (track['duration_min'] + 1e-6)
    
    # Log transforms
    track['log_duration'] = np.log1p(track['duration_ms'])
    track['log_loudness'] = np.log1p(abs(track['loudness'])) * np.sign(track['loudness'])
    track['log_instrumentalness'] = np.log1p(track['instrumentalness'])
    
    # Ratios
    track['energy_loudness_ratio'] = track['energy'] / (abs(track['loudness']) + 1e-6)
    track['valence_minus_energy'] = track['valence'] - track['energy']
    track['dance_tempo_ratio'] = track['danceability'] / (track['tempo'] + 1e-6)
    track['acoustic_instrument_ratio'] = track['acousticness'] / (track['instrumentalness'] + 1e-6)
    track['speech_to_instrument_ratio'] = track['speechiness'] / (track['instrumentalness'] + 1e-6)
    
    # Polynomial features
    track['energy_squared'] = track['energy'] ** 2
    track['tempo_squared'] = (track['tempo'] / 200.0) ** 2
    track['valence_squared'] = track['valence'] ** 2
    track['danceability_squared'] = track['danceability'] ** 2
    
    # Complex interactions
    track['dance_energy_tempo'] = track['danceability'] * track['energy'] * (track['tempo'] / 200.0)
    track['valence_energy_loudness'] = track['valence'] * track['energy'] * (track['loudness'] / -60.0)
    track['acoustic_speech_live'] = track['acousticness'] * track['speechiness'] * track['liveness']

# Add new continuous features to the list
continuous_features += [
    # Basic interactions
    'energy_dance', 'tempo_energy', 'acoustic_instrument', 'valence_energy',
    'duration_min', 'loudness_per_min',
    # Log transforms
    'log_duration', 'log_loudness', 'log_instrumentalness',
    # Ratios and differences
    'energy_loudness_ratio', 'valence_minus_energy', 'dance_tempo_ratio',
    'acoustic_instrument_ratio', 'speech_to_instrument_ratio',
    # Polynomial features
    'energy_squared', 'tempo_squared', 'valence_squared', 'danceability_squared',
    # Complex interactions
    'dance_energy_tempo', 'valence_energy_loudness', 'acoustic_speech_live'
]

# Build the final features DataFrame
features_df = pd.DataFrame([{feat: track[feat] for feat in continuous_features} for track in data])

# Normalize all features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features_df)

# Replace data with normalized engineered features
final_feature_names = list(features_df.columns)
for idx, track in enumerate(data):
    for i, feat in enumerate(final_feature_names):
        track[feat] = scaled_features[idx][i]

In [10]:
import csv

# Function to flatten the dictionary
def flatten_dict(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):  # Recursively flatten nested dictionaries
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

# Flatten each dictionary in the list
flattened_data = []

for item in data:
    flattened_item = flatten_dict(item)
    flattened_data.append(flattened_item)

print(type(flattened_data[0]))

# Get the fieldnames from the keys of the first flattened dictionary
fieldnames = flattened_data[0].keys()

# Write the flattened data to CSV
with open('tracks.csv', 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()  # Write the header
    writer.writerows(flattened_data)  # Write the data

print("Data written to 'tracks.csv'")

<class 'dict'>
Data written to 'tracks.csv'


In [12]:
import csv
from collections import Counter

# Function to flatten the dictionary
def flatten_dict(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

# Flatten each dictionary in the list
flattened_data = []
for item in data:
    flattened_item = flatten_dict(item)
    flattened_data.append(flattened_item)

# Remove duplicates by track_name (only keep first occurrence)
unique_data = []
seen = set()
for item in flattened_data:
    track_name = item.get('track_name')
    if track_name and track_name not in seen:
        seen.add(track_name)
        unique_data.append(item)

# Get the fieldnames from the keys of the first unique dictionary
fieldnames = unique_data[0].keys()

# Write the unique data to CSV
with open('no_dupe_tracks.csv', 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(unique_data)

print("Data written to 'no_dupe_tracks.csv' without duplicates")

Data written to 'no_dupe_tracks.csv' without duplicates
