In [None]:
import pandas as pd
import json
import os
import numpy as np
import difflib
import re
import sys

sys.path.append('../..')

from tqdm import tqdm
from collections import deque, namedtuple

from IPython.display import display

# Listened log

In [None]:
DATA = './source/music.json'
TRACKS = './source/tracks/'

FIX_ARTISTS = {
    'Tarja Turunen': 'Tarja',
    'Thomas Bergersen': 'Two Steps from Hell',
    'Томас Бергерсен': 'Two Steps from Hell',
    'Two Steps from Hell & Thomas Bergersen': 'Two Steps from Hell',
    'Tribe': 'Amaranthe',
    'Two Steps From Hell': 'Two Steps from Hell',
    'Two Steps From Hell, Thomas Bergersen': 'Two Steps from Hell',
    'Two Steps From Hell, Nick Phoenix': 'Two Steps from Hell',
    'Андреас Вальдетофт': 'Andreas Waldetoft',
    'Andreas Waltedoft': 'Andreas Waldetoft',
    'Paradox Interactive': 'Andreas Waldetoft',
    'Meyer': 'Andreas Waldetoft',
    'Saxon': 'Sabaton',
    'Visions Of Atlantis': 'Visions of Atlantis',
    'Leaves´ Eyes': 'Leaves\' Eyes',
    'Leaves \' Eyes': 'Leaves\' Eyes'
}

FIX_TITLES = {
    'inf': 'Infinity'
}

def fix_artist(artist):
    return FIX_ARTISTS.get(artist, artist)

def fix_title(title):
    title = re.sub(r" ?\([^)]+\)", "", title)
    title = re.sub(r"Listened to", "", title)
    title = title.strip()
    return FIX_TITLES.get(title, title)

def check_title(title):
    return "Skipped" not in title and "Google Play Music" not in title

with open(DATA, 'r') as f:
    data = json.load(f)

In [None]:
df = pd.DataFrame(data)
df

In [None]:
df['artist'] = df['description'].apply(fix_artist)
df['title'] = df['title'].apply(fix_title)
df = df[[check_title(t) for t in df.title]]
df = df.reset_index(drop=True)
df = df[['artist', 'title', 'time']]
df

# Tracks data

In [None]:
files = [f for f in os.listdir(TRACKS) if os.path.isfile(os.path.join(TRACKS, f))]

tracks = []

for file in tqdm(files):
    df_f = pd.read_csv(os.path.join(TRACKS, file))
    obj = df_f.to_dict(orient='index')[0]
    tracks.append(obj)

df_t = pd.DataFrame(tracks)
df_t.head()

In [None]:
import html
df_t['Title'] = df_t['Title'].apply(lambda t: html.unescape(str(t))).apply(fix_title)
df_t['Album'] = df_t['Album'].apply(lambda t: html.unescape(str(t)))
df_t['Artist'] = df_t['Artist'].apply(lambda t: html.unescape(str(t))).apply(fix_artist)
df_t.columns = 'title', 'album', 'artist', 'duration', 'rating', 'play_count', 'removed'
df_t = df_t.drop(['rating', 'removed'], axis=1)
df_t.head()

# Join listened with tracks

In [None]:
tracks_by_name = {}

for track in df_t.itertuples(index=True):
    try:
        tracks_by_name[track.title].append(track)
    except KeyError:
        tracks_by_name[track.title] = [track]

track_names = list(tracks_by_name.keys())

In [None]:
def get_tracks(title, cutoff=0.6):
    try:
        track = tracks_by_name[title]
        return track
    except KeyError:
        pass
    if len(title) > 5:
        try:
            matches = [t for t in track_names if title in t]
            if len(matches) > 2:
                match = difflib.get_close_mathces(title, matches, cutoff=0.4)[0]
            else:
                match = matches[0]
            return tracks_by_name[match]
        except (StopIteration, IndexError):
            pass
    elif "Instrumental" in title:
        title = re.sub('Instrumental', "", title)
        title = re.sub('-', "", title)
        title = title.strip()
        return get_tracks(title, cutoff)
    matches = difflib.get_close_matches(title, track_names, cutoff=cutoff)
    try:
        closest = matches[0]
        return tracks_by_name[closest]
    except IndexError:
        return get_tracks(title, cutoff=0.3)
    return None

def pick_track(tracks, track_listened, id_series):
    same_artists = [t for t in tracks if t.artist == track_listened.artist]
    if len(same_artists) == 0:
        same_artists = [t for t in tracks if t.artist == 'nan']
        if len(same_artists) == 0:
            return None
    if len(same_artists) == 1:
        return same_artists[0]
    by_album = {t.album: t for t in same_artists}
    try:
        previous_track = df_t.iloc[id_series[len(id_series) - 1]]
        if previous_track.artist == track_listened.artist:
            return by_album[previous_track.album]
    except (KeyError, TypeError):
        pass
    return same_artists[0]

id_series = deque()
not_matched = []
NotMatched = namedtuple('NotMatched', ['track', 'candidates'])

for track_listened in tqdm(list(df.itertuples(index=True))):
    tracks = get_tracks(track_listened.title)
    if tracks is None:
        id_series.append(None)
        not_matched.append(NotMatched(track_listened))
        continue
    track = pick_track(tracks, track_listened, id_series)
    if track is None:
        id_series.append(None)
        not_matched.append(NotMatched(track_listened, tracks))
        continue
    id_series.append(track.Index)

In [None]:
df_nm = pd.DataFrame([t.track for t in not_matched])
# with pd.option_context('display.max_rows', 500):
    # display(df_nm)
display(df_nm)

In [None]:
df['song_id'] = id_series

# To DB

In [None]:
from smo_data.api import DBConn

from models import Base, AlbumSong, MusicListened

DBConn()

In [None]:
# DBConn.engine.execute(f'DROP SCHEMA IF EXISTS {SCHEMA} CASCADE')
DBConn.engine.execute(f'CREATE SCHEMA IF NOT EXISTS google')
DBConn.engine.execute('DROP TABLE IF EXISTS "google"."AlbumSong" CASCADE')
DBConn.engine.execute('DROP TABLE IF EXISTS "google"."MusicListened" CASCADE')
Base.metadata.create_all(DBConn.engine)

In [None]:
with DBConn.get_session() as db:
    for track in df_t.itertuples(index=True):
        track = track._asdict()
        track['id'] = track['Index']
        del track['Index']

        song = AlbumSong(**track)
        db.add(song)
    db.commit()

    for track in df.itertuples(index=False):
        track = track._asdict()

        ml = MusicListened(**track)
        db.add(ml)

    db.commit()