In [None]:
# ----------------------------------------------------------------------------------------
# PROGRAMA: Primera Entrega Spotify
# ----------------------------------------------------------------------------------------
# Descripción: El programa XXXXXXXX
# ----------------------------------------------------------------------------------------
# Autores: Juanita Caballero Villalobos - Daniel Hamilton Smith
# Version: 1.0
# [18.02.2023]
# ----------------------------------------------------------------------------------------

!python --version

In [None]:
# !pip install chardet
!pip install pylev

In [None]:
# ----------------------------------------------------------------------------------------
# IMPORTAR MODULOS
# ----------------------------------------------------------------------------------------

import numpy as np
import pandas as pd
pd.set_option('mode.chained_assignment', None)


# Datasets

## utils

In [None]:
def load_df_detect_encoding(filename):
    import chardet
    with open(filename, 'rb') as file:
        print(chardet.detect(file.read()))

def replace_nan_by_median(df, column):
    null_cols = len(df[df[column].isnull()])
    if null_cols > 0:
        median = df[column].median()
        print("changed", null_cols, 'to', median)
        df.fillna(median, inplace=True)
    else:
        print('none changed')
        
def clean_and_check_in_range(df, column, min_v, max_v, column_type=None):
    replace_nan_by_median(df, column)
    
    # df = check_is_number(df, column, number_type)
    if column_type:
        df[column] = df[column].astype(column_type)
    # me hace pensar si valdria la pena en vez de revisar si esta dentro del rango
    # marcar esos valores como 'null' y hacerles otro tipo de proceso
    df[column].clip(min_v, max_v, inplace = True)

    unq = df[column].unique()
    
    print('min', min(unq), 'max', max(unq))
    assert max_v >= max(unq)
    assert min_v <= min(unq)
    
    return df[column]

def clean_str_to_list(x):
    return str(x).replace('"','').replace('\'','').replace('[','').replace(']','').replace(', ', ',').split(',')

## Artists

In [None]:
artists_df = pd.read_csv('artists_mod.csv')
artists_df

In [None]:
artists_df.dtypes

In [None]:
artists_df.dropna(thresh=len(artists_df.columns)/2, inplace=True)

In [None]:
artists_df

### id

In [None]:
# no hay ids duplicados
artists_df[artists_df['id'].duplicated()]

### name

In [None]:
# no hay nombres vacios
artists_df[artists_df['name'] == ""]

artists_df['name'] = artists_df['name'].apply(lambda x: str(x).replace("'", ""))

# artists_df['name']

### id

In [None]:
artists_df['id'] = artists_df['id'].astype(str)

### followers

In [None]:
print(artists_df['followers'].unique())

replace_nan_by_median(artists_df, 'followers')
# only keep rows that dont have nil in the followers column
# artists_df = artists_df[~artists_df['followers'].isnull()]

# this column should be composed only of ints
artists_df['followers'] = artists_df['followers'].astype(int)

# min should be 0
assert 0 <= min(artists_df['followers'].unique())

### name

In [None]:
artists_df['name'] = artists_df['name'].astype(str)

### popularity

In [None]:
replace_nan_by_median(artists_df, 'popularity')

# this column should be composed only of ints
artists_df['popularity'] = artists_df['popularity'].astype(int)

assert 0 <= min(artists_df['popularity'])

### genres

In [None]:
# tried at first to copy it into another dataframe, but this proved to be faster
artists_df['genres_list'] = artists_df['genres'].apply(clean_str_to_list)
artists_df['genres_list']

In [None]:
import pylev

# algunos están en español, otros en ingles 
# algunos parecen ser generos, otros instrumentos

def flatten_list_remove_empty(x):
    flat_list = []
    for elem in x:
        for num in elem.split(' '):
            if num and num not in flat_list:
                flat_list += [num]
    return flat_list
        
artists_df['flat_genres_list'] = artists_df['genres_list'].apply(flatten_list_remove_empty)
artists_df['flat_genres_list']

In [None]:
artists_df.dtypes

## Tracks

In [None]:
# load_df_detect_encoding('tracks_mod.csv')
# if a line generates an error, ignore it
tracks_df = pd.read_csv('tracks_mod.csv', on_bad_lines='warn', encoding= 'latin-1')

tracks_df.dtypes

In [None]:
tracks_df.dtypes

In [None]:
print(tracks_df[tracks_df['id_artists'].isna()])
tracks_df.dropna(thresh=len(tracks_df.columns)/2, inplace=True)

### Popularity

In [None]:
# DOCS: The popularity of the album. 
# The value will be between 0 and 100, with 100 being the most popular.

print(tracks_df['popularity'].unique())
tracks_df['popularity'] = clean_and_check_in_range(tracks_df, 'popularity', 0, 100, int)

print(tracks_df['popularity'].unique())

### duration_ms

In [None]:
# DOCS: The duration of the track in milliseconds.

tracks_df['duration_ms'] = tracks_df['duration_ms'].astype(int)
replace_nan_by_median(tracks_df, 'duration_ms')
tracks_df['duration_ms']

### explicit

In [None]:
# DOCS: Whether or not the track has explicit lyrics ( true = yes it does; false = no it does not OR unknown).
print(tracks_df['explicit'].unique())
tracks_df['explicit'][tracks_df['explicit'] == -1 ] = 0
print(tracks_df['explicit'].unique())

### artist id_artist

In [None]:
def clean_artists(artists, ids):
    return list(zip(clean_str_to_list(artists), clean_str_to_list(ids)))

# if 'artists' in tracks_df.columns and 'id_artists' in tracks_df.columns:
tracks_df['artists_list'] = tracks_df.apply(lambda x: clean_artists(x.artists, x.id_artists), axis=1)
# tracks_df.drop(columns=['artists', 'id_artists'], inplace=True)
tracks_df

tracks_df['artists_list']

### release date

In [None]:
from datetime import datetime
import re

def check_format(x, format):
    try:
        return datetime.strptime(x, format)
    except:
        return None

def fix_date(datestr):
    if isinstance(datestr, datetime):
        return datestr
    
    datestr = "".join([ele for ele in str(datestr) if ele.isdigit() or ele == '-' ])
    if date := check_format(datestr, "%Y-%m-%d"):
        pass
    elif date:= check_format(datestr, "%Y-%m"):
        pass
    elif date:= check_format(datestr, "%Y"):
        pass

    return date

tracks_df['release_date'] = tracks_df["release_date"].apply(fix_date)
tracks_df['release_date']

### danceability

In [None]:
# DOCS: Danceability describes how suitable a track is for dancing based on a combination of musical elements including tempo, rhythm stability, beat strength, and overall regularity. 
# A value of 0.0 is least danceable and 1.0 is most danceable.

tracks_df['danceability'] = clean_and_check_in_range(tracks_df, 'danceability', 0, 1)

tracks_df['danceability']

### energy

In [None]:
# DOCS: Energy is a measure from 0.0 to 1.0 and represents a perceptual measure of intensity and activity.

tracks_df['energy'] = clean_and_check_in_range(tracks_df, 'energy', 0, 1)

tracks_df['energy']

### key

In [None]:
# DOCS: The key the track is in. Integers map to pitches using standard Pitch Class notation. E.g. 0 = C, 1 = C♯/D♭, 2 = D, and so on. If no key was detected, the value is -1.
# >= -1<= 11

tracks_df['key'] = clean_and_check_in_range(tracks_df, 'key', -1, 11)

tracks_df['key']

### Loudness

In [None]:
# DOCS: Values typically range between -60 and 0 db.

tracks_df['loudness'] = clean_and_check_in_range(tracks_df, 'loudness', -60, 0)

tracks_df['loudness']

### mode

In [None]:
# DOCS: Mode indicates the modality (major or minor) of a track, the type of scale from which its melodic content is derived. 
# Major is represented by 1 and minor is 0.

print(tracks_df['mode'].unique())

def mode_type_from_mode(x):
    if x == 0:
        ret = "minor"  
    else:
        ret = "mayor"
    return ret

tracks_df['mode'] = tracks_df['mode'].apply(lambda x: 0 if x == 0 else 1)
tracks_df['mode_type'] = tracks_df['mode'].apply(mode_type_from_mode)

print(tracks_df['mode'].unique())

tracks_df['mode_type']

### speechiness

In [None]:
# DOCS: Speechiness detects the presence of spoken words in a track. 
# The more exclusively speech-like the recording (e.g. talk show, audio book, poetry), 
# the closer to 1.0 the attribute value. 
# Values above 0.66 describe tracks that are probably made entirely of spoken words. 
# Values between 0.33 and 0.66 describe tracks that may contain both music and speech, 
# either in sections or layered, including such cases as rap music. 
# Values below 0.33 most likely represent music and other non-speech-like tracks.

def check_track_type_by_speechiness(x):
    tp = ''
    if x < 0.33: 
        tp = 'music'
    elif 0.33 < x:
        tp = 'mixed'
    elif 0.66 < x:
        tp = 'speech'
    return tp

tracks_df['speechiness'] = clean_and_check_in_range(tracks_df, 'speechiness', 0, 1)

tracks_df['track_type'] = tracks_df['speechiness'].apply(check_track_type_by_speechiness)
tracks_df['track_type']

### acousticness

In [None]:
# DOCS: A confidence measure from 0.0 to 1.0 of whether the track is acoustic. 
# 1.0 represents high confidence the track is acoustic.

tracks_df['acousticness'] = clean_and_check_in_range(tracks_df, 'acousticness', 0, 1)

tracks_df['acousticness']

### instrumentalness

In [None]:
# Predicts whether a track contains no vocals. 
# "Ooh" and "aah" sounds are treated as instrumental in this context. 
# Rap or spoken word tracks are clearly "vocal". 
# The closer the instrumentalness value is to 1.0, 
# the greater likelihood the track contains no vocal content. 
# Values above 0.5 are intended to represent instrumental tracks, 
# but confidence is higher as the value approaches 1.0.

def check_track_type_by_instrumentalness(x):
    tp = ''
    if 0.5 < x:
        tp = 'instrumental'
    else:
        tp = 'vocal'
    return tp

tracks_df['instrumentalness'] = clean_and_check_in_range(tracks_df, 'instrumentalness', 0, 1)

tracks_df['instrumental_type'] = tracks_df['instrumentalness'].apply(check_track_type_by_instrumentalness)
tracks_df['instrumental_type']

### liveness

In [None]:
# DOCS: Detects the presence of an audience in the recording. 
# Higher liveness values represent an increased probability that the track 
# was performed live. A value above 0.8 provides strong likelihood that the track is live.

def check_track_type_by_liveness(x):
    tp = ''
    if 0.8 < x:
        tp = 'live'
    else:
        tp = 'recorded'
    return tp

tracks_df['liveness'] = clean_and_check_in_range(tracks_df, 'liveness', 0, 1)

tracks_df['live_type'] = tracks_df['liveness'].apply(check_track_type_by_instrumentalness)
tracks_df['live_type']

### valence

In [None]:
# DOCS: A measure from 0.0 to 1.0 describing the musical positiveness conveyed by a track. 
# Tracks with high valence sound more positive (e.g. happy, cheerful, euphoric), 
# while tracks with low valence sound more negative (e.g. sad, depressed, angry).

def check_track_type_by_valence(x):
    tp = ''
    if 0.5 < x:
        tp = 'positive'
    else:
        tp = 'negative'
    return tp

tracks_df['valence'] = clean_and_check_in_range(tracks_df, 'valence', 0, 1)

tracks_df['valence_type'] = tracks_df['valence'].apply(check_track_type_by_valence)
tracks_df['valence_type']

### tempo

In [None]:
# The overall estimated tempo of a track in beats per minute (BPM). 
# In musical terminology, tempo is the speed or pace of a given piece and 
# derives directly from the average beat duration.
# FLOAT

tracks_df['tempo'] = tracks_df['tempo'].astype(float)

tracks_df['tempo']

### time_signature

In [None]:
# DOCS: An estimated time signature. The time signature (meter) is a 
# notational convention to specify how many beats are in each bar (or measure). 
# The time signature ranges from 3 to 7 indicating time signatures of "3/4", to "7/4".

print(tracks_df['time_signature'].unique())
tracks_df['time_signature'] = clean_and_check_in_range(tracks_df, 'time_signature', 3, 7, int)

print(tracks_df['time_signature'].unique())

tracks_df['time_signature']

# EDA

In [None]:
"""
genres_list = []
    
for a in artists_df.nlargest(100, 'popularity')['flat_genres_list']: # .apply(aaaa)
    for b in a:
        if b not in genres_list:
            genres_list += [b]
genres_list.sort()
genres_list
"""

In [None]:
""" 
for a in artists_df['flat_genres_list']: # .apply(aaaa)
    for b in a:
        if b not in genres_list:
            genres_list += [b]

genres_list.sort()
genres_counter = list(np.full_like(genres_list, 0, dtype=int))
# print(genres_list, genres_counter)

def loquesea(fgl, popu):
    for i in range(len(genres_)):
        if genres_[i] in fgl:
            genres_counter[i] += int(popu)

artists_df[artists_df['flat_genres_list'].apply(comp_genres)].apply(lambda x: loquesea(x.flat_genres_list, x.popularity), axis=1)

ll = list(zip(genres_list, genres_counter))


print(ll[:10])

sorted(ll, key=lambda x: x[1])

print(ll[0:10])

fig, ax = plt.subplots()

ax.set_ylabel('popularity')
ax.set_xlabel('genre')
# ax.set_title('Fruit supply by kind and color')

genres_x = []
genres_y = []
for i in range(10):
    genres_x += ll[0]
    genres_y += ll[1]

ax.bar(genres_x, genres_y, color=np.random.rand(len(genres_x),3))
"""

In [None]:
"""
genres_list = ['rock', 'pop', 'hip', 'rap', 'k-pop', 'reggaeton', 'trap', 'electro']
genres_list.sort()
genres_counter = list(np.full_like(genres_list, 0, dtype=int))
print(genres_list, genres_counter)

def comp(list1, list2):
    for val in list1:
        if val in list2:
            return True
    return False

def comp_genres(x):
    return comp(x, genres_list)


def loquesea(fgl, popu):
    for i in range(len(genres_)):
        if genres_[i] in fgl:
            genres_counter[i] += int(popu)

artists_df[artists_df['flat_genres_list'].apply(comp_genres)].apply(lambda x: loquesea(x.flat_genres_list, x.popularity), axis=1)

print(genres_counter)

fig, ax = plt.subplots()

ax.set_ylabel('popularity')
ax.set_xlabel('genre')
# ax.set_title('Fruit supply by kind and color')

ax.bar(genres_list, genres_counter, color=np.random.rand(len(genres_list),3))
""" 

In [None]:
"""
top10_genere = artists_df.nlargest(10, 'popularity')
top10_genere
"""

# Postgres

## Connection

In [None]:
!pip install psycopg2-binary
!pip install sqlalchemy

## DB

In [None]:
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import insert

engine = create_engine("postgresql+psycopg2://postgresgd:Password@localhost:5432/postgresgd")
engine.fast_executemany = True

In [None]:
from typing import List
from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, DateTime

import datetime

class Base(DeclarativeBase):
    pass

class Artists(Base):
    __tablename__ = "artists"
                
    id: Mapped[str] = mapped_column(primary_key=True)
    followers: Mapped[int]
    name: Mapped[str]  # = mapped_column(String(100))
    popularity: Mapped[int]
    

class Genres(Base):
    __tablename__ = "genres"
    
    id: Mapped[int] = mapped_column(primary_key=True) 
    name: Mapped[str] = mapped_column(unique=True)# = mapped_column(String(30))

class ArtistGenres(Base):
    __tablename__ = "artist_genres"
    artist_id: Mapped[str] = mapped_column(ForeignKey("artists.id"), primary_key=True)
    
    genre_id: Mapped[int] = mapped_column(ForeignKey("genres.id"), primary_key=True)
    
    artist: Mapped["Artist"] = relationship(cascade="all, delete")
    genre: Mapped["Genre"] = relationship(cascade="all, delete")
    
class Tracks(Base):
    __tablename__ = "tracks"
    
    id: Mapped[str] = mapped_column(primary_key=True)
    name: Mapped[str] # = mapped_column(String(100))
    popularity: Mapped[int]
    duration_ms: Mapped[int]
    explicit: Mapped[int]
    # artists                      object
    # id_artists                   object
    release_date = Column(DateTime, default=datetime.datetime.utcnow)
    danceability: Mapped[float]
    energy: Mapped[float]
    key: Mapped[float]
    loudness: Mapped[float]
    mode: Mapped[int]
    
    speechiness: Mapped[float]
    acousticness: Mapped[float]
    instrumentalness: Mapped[float]
    liveness: Mapped[float]
    valence: Mapped[float]
    tempo: Mapped[float]
    time_signature: Mapped[int]
    
    mode_type: Mapped[str]
    track_type: Mapped[str]
    instrumental_type: Mapped[str]
    live_type: Mapped[str]
    valence_type: Mapped[str]
    
class ArtistsTracks(Base):
    __tablename__ = "artist_tracks"
    artist_id: Mapped[str] = mapped_column(ForeignKey("artists.id"), primary_key=True)
    track_id: Mapped[int] = mapped_column(ForeignKey("tracks.id"), primary_key=True)
    
    
    artist: Mapped["Artist"] = relationship(cascade="all, delete")
    track: Mapped["Track"] = relationship(cascade="all, delete")

In [None]:
ArtistGenres.__table__.drop(engine)
ArtistsTracks.__table__.drop(engine)
Tracks.__table__.drop(engine)
Genres.__table__.drop(engine)
Artists.__table__.drop(engine)

In [None]:
Artists.__table__.create(engine)
Genres.__table__.create(engine)
ArtistGenres.__table__.create(engine)
Tracks.__table__.create(engine)
ArtistsTracks.__table__.create(engine)

In [None]:
# artists_df[["id","followers","name","popularity"]].head(10000).to_sql(
artists_df[["id","followers","name","popularity"]].to_sql(
    name='artists', 
    con=engine, 
    # if_exists= 'replace', 
    if_exists='append',
    method='multi',
    index=False,
    chunksize=1000
)

In [None]:
tracks_df[[
    "id",
    "name",
    "popularity",
    "duration_ms",
    "explicit",
    "release_date",
    "danceability",
    "energy",
    "key",
    "loudness",
    "mode",
    "speechiness",
    "acousticness",
    "instrumentalness",
    "liveness",
    "valence",
    "tempo",
    "time_signature",
    "mode_type",
    "track_type",
    "instrumental_type",
    "live_type",
    "valence_type"
# ]].head(10000).to_sql(
]].to_sql(
    name='tracks', 
    con=engine, 
    # if_exists= 'replace', 
    if_exists='append',
    method='multi',
    index=False,
    chunksize=1000
)

In [None]:
from sqlalchemy import insert

with engine.connect() as conn:    
    def process_track_id_artists_id(track_id, artists_list):
        # print(track_id, artists_list)
        for artist in artists_list:
            
            a_id = artist[1]
            try:
                stmt = insert(ArtistsTracks).values(artist_id=a_id, track_id=track_id)
                # print(stmt)
                result = conn.execute(stmt)
                conn.commit()
                # print(result)
            except Exception as e: 
                conn.rollback()
                if "is not present in table" not in str(e): 
                    print(e)
                


    # tracks_df.head(500).apply(lambda x: process_track_id_artists_id(x.id, x.artists_list), axis=1)
    tracks_df.apply(lambda x: process_track_id_artists_id(x.id, x.artists_list), axis=1)

In [None]:
from sqlalchemy import insert

genres_count = {}
def add_to_genres_count(genres_list, artist_id):
    for genre in genres_list:
        if genre in genres_count.keys():
            genres_count[genre] += 1 
        else:
            genres_count[genre] = 1

artists_df.apply(lambda x: add_to_genres_count(x.flat_genres_list, x.id), axis=1)
genres_count

# faster than reading the database each time, and not large enough to be a problem
try:
    thevariable
except NameError:
    print("creating inserted_genres")
    inserted_genres = {}
else:
    print("inserted_genres already created")

with engine.connect() as conn:    
    for genre in genres_count.keys(): 
        try:
            stmt = insert(Genres).values(name=genre)
            # print(stmt)
            result = conn.execute(stmt)
            conn.commit()
            
            inserted_key = result.inserted_primary_key
            inserted_genres[genre] = inserted_key[0]
        except Exception as e: 
            conn.rollback()

  
    def process_genres_list(artist_id, genres_list):
        # print(track_id, artists_list)
        for genre in genres_list:
            try:
                stmt = insert(ArtistGenres).values(artist_id=artist_id, genre_id=inserted_genres[genre])
                result = conn.execute(stmt)
                conn.commit()
                # print(result)
            except Exception as e: 
                conn.rollback()
                if "is not present in table" not in str(e): 
                    print(e)
                
    # artists_df.head(500).apply(lambda x: process_genres_list(x.id, x.flat_genres_list), axis=1)
    artists_df.apply(lambda x: process_genres_list(x.id, x.flat_genres_list), axis=1)

## load 

In [None]:
import pandas as pd

def load_sql_table(name: str, engine):
    with engine.connect() as conn: 
        return pd.read_sql_table(name, conn)
   
artistsDf = load_sql_table(Artists.__tablename__, engine)
genresDf = load_sql_table(Genres.__tablename__, engine)
tracksDf = load_sql_table(Tracks.__tablename__, engine)
artistsGenresDf = load_sql_table(ArtistGenres.__tablename__, engine)
artistsTracksDf = load_sql_table(ArtistsTracks.__tablename__, engine)

In [None]:
artistsDf

In [None]:
genresDf

In [None]:
tracksDf[tracksDf['name'].str.contains('ella')]

In [None]:
artistsGenresDf

In [None]:
artistsTracksDf