In [1]:

import pandas as pd
from utils.api_utils import AuthenticateSpotify
from etl.extract.extract import extract_data
from utils.transform_utils import (
    get_track_data,
    get_tracks_data,
    check_in_list,
    genre_dict
)

In [2]:
tracks = pd.read_csv('test_input.csv')

In [3]:
import os

### format_column_names

In [4]:
# remove spaces and change to lower case
tracks.columns = tracks.columns.str.lower().str.replace(' ', '_')

# remove special characters
tracks.columns = tracks.columns.str.replace('(', '').str.replace(')', '')

# rename columns
tracks = tracks.rename(columns={'track_uri': 'track_id',
                                'artist_uris': 'artist_id',
                                'album_uri': 'album_id',
                                'album_artist_uris': 'album_artist_id',
                                'album_release_date': 'album_year'})

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("format_column_names_data.csv")
tracks.to_csv(filepath, index=False)

In [5]:
# Remove user API data
tracks = tracks.drop(columns=['added_at', 'added_by'])

# Remove unnecessary columns
tracks = tracks.drop(columns=['track_preview_url', 'album_genres',
                                'copyrights', 'label'])

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("drop_columns_data.csv")
tracks.to_csv(filepath, index=False)

In [10]:
# remove duplicates
tracks = tracks.drop_duplicates()

# standardise date format
tracks['album_year'] = pd.to_datetime(tracks['album_year'],
                                        errors='coerce').dt.year

# drop rows with invalid dates
tracks = tracks.dropna(subset=['album_year'])

tracks = tracks.dropna(subset=['popularity'])

# Convert the year to an integer
tracks['album_year'] = tracks['album_year'].astype(int)

tracks['explicit'] = tracks['explicit'].map({'True': True, 'False': False})
with pd.option_context("future.no_silent_downcasting", True):
    tracks['explicit'] = tracks['explicit'].fillna(False).astype(bool)

tracks['popularity'] = tracks['popularity'].astype(int)

tracks['disc_number'] = tracks['disc_number'].astype(int)

tracks['track_number'] = tracks['track_number'].astype(int)

tracks['track_duration_ms'] = tracks['track_duration_ms'].astype(int)

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("clean_tracks_data.csv")
tracks.to_csv(filepath, index=False)

In [6]:
# Convert Spotify URIs to IDs
tracks['track_id'] = tracks['track_id'].str.replace('spotify:track:',
                                                    '', regex=False)

tracks['album_id'] = tracks['album_id'].str.replace('spotify:album:', '',
                                                    regex=False)
tracks['album_artist_id'] = tracks['album_artist_id'].str.replace(
                                    'spotify:artist:', '', regex=False
                                                                    )
# format artist_id
tracks['artist_id'] = tracks['artist_id'].str.replace('spotify:artist:',
                                                        '', regex=False)

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("convert_uris_to_ids_data.csv")
tracks.to_csv(filepath, index=False)

In [7]:
# Remove rows with missing values
tracks = tracks.dropna(subset=['track_id', 'track_name',
                                'artist_id', 'artist_names',
                                'album_year', 'album_id',
                                'album_name', 'album_artist_id',
                                'album_artist_names', 'album_image_url'])

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("remove_missing_values_data.csv")
tracks.to_csv(filepath, index=False)

In [8]:
dataframe = tracks.copy()

updated_df = dataframe.copy()

token = AuthenticateSpotify()
# process the DataFrame in batches to optimise API calls
batch_size = 50

for i in range(0, len(dataframe), batch_size):

    # select batch of track_id
    end_index = min(i + batch_size, len(dataframe))
    tracks_ids = dataframe[i:end_index]['track_id'].tolist()

    try:
        response = get_tracks_data(token, tracks_ids)

        # select popularity and a second id for comformation
        temp_isrc = [item.get('external_ids')['isrc'] for
                        item in response['tracks']]
        temp_pop = [item['popularity'] for item in response['tracks']]

        # Create a mapping of ISRC to popularity
        isrc_pop_map = dict(zip(temp_isrc, temp_pop))

        # Update the popularity column in df2 only if ISRC matches
        for idx in dataframe[i:end_index].index:
            try:
                isrc_value = dataframe.loc[idx, 'isrc']
                if isrc_value in isrc_pop_map:
                    updated_df.loc[idx, 'popularity'] = isrc_pop_map[
                            isrc_value
                        ]
                else:
                    # Set as null if ISRC doesn't match
                    updated_df.loc[idx, 'popularity'] = pd.NA
            except Exception as row_error:

                print(f"Error processing row {idx}: {row_error}")
                # Set as null for problematic rows
                updated_df.loc[idx, 'popularity'] = pd.NA
    except Exception as batch_error:
        # if a batch fails, move to row by row updation for that batch

        for j in range(i, end_index):
            try:
                # Attempt to process each track IDs individually
                track_id = dataframe.loc[j, 'track_id']
                a = get_track_data(token, track_id)

                if 'isrc' in a.get('external_ids'):
                    isrc_value = a.get('external_ids')['isrc']

                    if isrc_value == updated_df.loc[j, 'isrc']:
                        updated_df.loc[j, 'popularity'] = a['popularity']
                else:
                    updated_df.loc[j, 'popularity'] = None
                    print(
                            "ISRC not found for track ID {j}".format(
                            j=j
                            )
                        )
                    print(
                            "IRSC: {irsc}".format(
                                irsc=updated_df.loc[j, 'isrc']
                                )
                            )
            except Exception as e:
                # if the row updation fail print error and continue
                print(
                        "Error processing track ID {j}, irsc {irsc}".format(
                        j=j, irsc=updated_df.loc[j, 'isrc']
                        )
                    )
                print(
                    "Error: {e}".format(e=e)
                )
                continue

        # prin tth ebatch that failed and the IRSC ids in it
        print(f"Error processing batch {i}-{end_index}: {batch_error}")
        print(f"Track IDs in batch: {tracks_ids}")
        continue


updated_df.reset_index(inplace=True)
updated_df = updated_df.dropna(subset=['popularity'])
# save the transformed data tpo csv to test against sql database
filepath = os.path.join("update_API_data.csv")
tracks.to_csv(filepath, index=False)

Request was successful
Request was successful
Request was successful
Request was successful
Request was successful


In [8]:
filepath = '../../data/clean/transformed_data.csv'
from etl.extract.extract import extract_data
new_pop_data = extract_data(file=filepath)

# Create a mapping of ISRC to popularity
new_pop_data_map = dict(zip(new_pop_data['track_id'],
                            new_pop_data['popularity']))

tracks.set_index('track_id', inplace=True)

for idx in tracks.index:
    tracks.loc[idx, 'popularity'] = new_pop_data_map.get(idx, pd.NA)

# Reset index to avoid side effects
tracks.reset_index(inplace=True)


tracks = tracks.dropna(subset=['popularity'])
# save the transformed data tpo csv to test against sql database
filepath = os.path.join("update_data_data.csv")
tracks.to_csv(filepath, index=False)

Extracted 8578 rows from c:\users\ashle\documents\github\df_capstone\etl\extract\../../data/clean/transformed_data.csv in 0.0580 seconds


In [9]:
from utils.transform_utils import genre_dict, check_in_list

# import genres dictionary
genres = genre_dict()

# add genre count column for missing value count
for key, value in genres.items():
    tracks[key] = tracks.apply(lambda x: check_in_list(x['artist_genres'],
                                                        value), axis=1)

tracks['genre_count'] = tracks[genres.keys()].sum(axis=1)
tracks = tracks.drop(columns=['artist_genres'])

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("simplify_and_expand_artist_genres_data.csv")
tracks.to_csv(filepath, index=False)

In [11]:
import pandas as pd
import os

In [None]:
from etl.transform.transform import transform_data
tracks = pd.read_csv('test_input.csv')
print(tracks.head())
try:
    tracks = transform_data(tracks, state="old")
except Exception as e:
    print(f"Error occurred: {e}")

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("test_output_data.csv")
tracks.to_csv(filepath, index=False)

                              Track URI  \
0  spotify:track:1XAZlnVtthcDZt2NI1Dtxo   
1  spotify:track:6a8GbQIlV8HBUW3c6Uk9PH   
2  spotify:track:70XtWbcVZcpaOddJftMcVi   
3  spotify:track:1NXUWyPJk5kO6DQJ5t7bDu   
4  spotify:track:72WZtWs6V7uu3aMgMmEkYe   

                                Track Name  \
0  Justified & Ancient - Stand by the Jams   
1          I Know You Want Me (Calle Ocho)   
2       From the Bottom of My Broken Heart   
3         Apeman - 2014 Remastered Version   
4       You Can't Always Get What You Want   

                           Artist URI(s)      Artist Name(s)  \
0  spotify:artist:6dYrdRlNZSKaVxYg5IrvCH             The KLF   
1  spotify:artist:0TnOYISbd1XYRBk9myaseg             Pitbull   
2  spotify:artist:26dSoYclwsYLMAKD3tpOr4      Britney Spears   
3  spotify:artist:1SQRv42e4PjEYfPhS0Tk9E           The Kinks   
4  spotify:artist:22bE4uQ6baNwSHPVcDxLCe  The Rolling Stones   

                              Album URI  \
0  spotify:album:4MC0ZjNtVP1nDD5lsLx

In [13]:
tracks = transform_data(tracks, state="old")

# save the transformed data tpo csv to test against sql database
filepath = os.path.join("test_output_data.csv")
tracks.to_csv(filepath, index=False)

KeyError: "['added_at', 'added_by'] not found in axis"

In [39]:
# save the transformed data tpo csv to test against sql database
filepath = os.path.join("test_output_data.csv")
tracks.to_csv(filepath, index=False)

In [None]:
tracks = tracks['explicit'].astype(bool)
tracks