In [1]:
import os

import numpy as np
import pandas as pd
from pymongo import MongoClient, ReplaceOne, InsertOne
import pymongo

In [2]:
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())

True

In [3]:
MONGO_CONN = os.getenv("MONGO_CONN")

mongo = MongoClient(MONGO_CONN)
mongo_spotify_data = mongo["spotifyData"]
mongo_spotify_track_features = mongo_spotify_data["trackFeatures"]
mongo_spotify_artist_features = mongo_spotify_data["artistFeatures"]

In [4]:
try:
    stored_tids = [doc.get("_id") for doc in mongo_spotify_track_features.aggregate([
      { "$group": { "_id": "$track_spid" } }
    ])]
except:
    stored_tids = []

In [5]:
len(stored_tids)

773828

In [6]:
def chunk(data, n):
    return [data[x : x + n] for x in range(0, len(data), n)]

In [7]:
#TODO: Generalize
#TODO: remove mongo from this to prevent a new connection per worker?
#TODO: multithreading inside of this mapped partition so each worker can efficiently process the chunk ops?
def upsert_mongo_data(df, id_col):
    
    MONGO_CONN = os.getenv("MONGO_CONN")
    mongo = MongoClient(MONGO_CONN)
    mongo_spotify_data = mongo["spotifyData"]
    mongo_spotify_track_features = mongo_spotify_data["trackFeatures"]
    mongo_spotify_artist_features = mongo_spotify_data["artistFeatures"]
    
    db_coll = mongo_spotify_track_features
    
    ops_list = []
    
    datetime_cols = df.select_dtypes(include=['datetime64']).columns
    for col in datetime_cols:
        df[col] = df[col].fillna(pd.to_datetime('1970-01-01'))
    
    object_str_cols = df.select_dtypes(include=["string"]).columns
    for col in object_str_cols:
        df[col] = df[col].fillna("<NA>")
    
    feature_records = df.to_dict("records")
    for record in feature_records:
        ops_list.append(
#             InsertOne({id_col: record[id_col]}, record)
            ReplaceOne({id_col: record[id_col]}, record, upsert=True)
        )
    chunked_ops = chunk(ops_list, 1000)
    for ops in chunked_ops:
        db_coll.bulk_write(ops, ordered=False)
    return True

In [8]:
import dask.dataframe as dd
import dask.distributed as d_dist
from dask.dataframe.core import repartition
from dask.distributed import Client

In [9]:
client = d_dist.client._get_global_client() or Client()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 55432 instead


In [10]:
mpd_track_features = catalog.load("mpd_track_features")

2021-08-25 13:44:08,548 - kedro.io.data_catalog - INFO - Loading data from `mpd_track_features` (ParquetDataSet)...


In [11]:
mpd_track_features.head(1)

Unnamed: 0,track_danceability,track_energy,track_key,track_loudness,track_mode,track_speechiness,track_acousticness,track_instrumentalness,track_liveness,track_valence,track_tempo,track_type,track_spid,track_uri,track_track_href,track_analysis_url,track_duration_ms,track_time_signature,time_pulled
0,0.458,0.409,0,-13.886,1,0.0317,0.733,0.916,0.23,0.249,167.352,audio_features,7daKhXFKpT9Zw95fCNDMxG,spotify:track:7daKhXFKpT9Zw95fCNDMxG,https://api.spotify.com/v1/tracks/7daKhXFKpT9Z...,https://api.spotify.com/v1/audio-analysis/7daK...,348333,4,2021-08-23T21:47:42.002035+00:00


In [12]:
mpd_track_features["track_spid"].count().compute()

2262191

In [13]:
mpd_track_features["time_pulled"] = (
    mpd_track_features["time_pulled"]
    .fillna(pd.to_datetime('1970-01-01'))
    .map_partitions(
        pd.to_datetime, errors="coerce"
    )
)

In [14]:
mpd_track_features.head(1)

Unnamed: 0,track_danceability,track_energy,track_key,track_loudness,track_mode,track_speechiness,track_acousticness,track_instrumentalness,track_liveness,track_valence,track_tempo,track_type,track_spid,track_uri,track_track_href,track_analysis_url,track_duration_ms,track_time_signature,time_pulled
0,0.458,0.409,0,-13.886,1,0.0317,0.733,0.916,0.23,0.249,167.352,audio_features,7daKhXFKpT9Zw95fCNDMxG,spotify:track:7daKhXFKpT9Zw95fCNDMxG,https://api.spotify.com/v1/tracks/7daKhXFKpT9Z...,https://api.spotify.com/v1/audio-analysis/7daK...,348333,4,2021-08-23 21:47:42.002035+00:00


In [15]:
mpd_track_features.dtypes

track_danceability               float64
track_energy                     float64
track_key                          int64
track_loudness                   float64
track_mode                         int64
track_speechiness                float64
track_acousticness               float64
track_instrumentalness           float64
track_liveness                   float64
track_valence                    float64
track_tempo                      float64
track_type                        string
track_spid                        string
track_uri                         string
track_track_href                  string
track_analysis_url                string
track_duration_ms                  int64
track_time_signature               int64
time_pulled               datetime64[ns]
dtype: object

In [16]:
import multiprocessing

num_cores = multiprocessing.cpu_count()

In [17]:
num_cores

12

In [18]:
#could probably speed up using map partitions but no need
mpd_track_features_df = mpd_track_features.compute()
selected_mpd_track_features_df = mpd_track_features_df[~(mpd_track_features_df["track_spid"].isin(set(stored_tids)))]

In [19]:
selected_mpd_track_features_df.shape[0] + len(stored_tids)

2262190

In [20]:
mpd_track_features["track_spid"].count().compute()

2262191

In [21]:
selected_mpd_tracks = dd.from_pandas(selected_mpd_track_features_df, npartitions = num_cores)

In [None]:
_ = (
    selected_mpd_tracks
#     mpd_track_features
#     .repartition(npartitions=num_cores)
    .map_partitions(
        upsert_mongo_data, 
#         db_coll=mongo_mpd_track_features, 
        id_col="track_spid",
        meta="float"
    )
).compute()

In [None]:
#TODO: delete the mongo junk entries created by dask by track_spid in ["a", "<NA>"]

In [None]:
# TODO: Return Tuple to dict?
# TODO: Unpack kwargs?
def get_mongo_track_data(df_name, kwargs):
    if df_name == "track_features":
        found_tracks_data = mongo_spotify_tracks.find(
            {"track_spid": {"$in": kwargs["track_ids"]}}
        )
        found_tracks_df = pd.DataFrame(list(found_tracks_data))
        del found_tracks_df["_id"]
        found_track_ids = found_tracks_df["track_spid"].unique().tolist()
        missing_track_ids = np.setdiff1d(kwargs["track_ids"], found_track_ids)
        data = (found_tracks_df, missing_track_ids)

    elif df_name == "artist_features":
        found_artists_data = mongo_spotify_artists.find(
            {"artist_spid": {"$in": kwargs["artist_ids"]}}
        )
        found_artist_df = pd.DataFrame(list(found_artists_data))
        del found_artist_df["_id"]
        found_artist_ids = found_artist_df["artist_spid"].unique().tolist()
        missing_artist_ids = np.setdiff1d(kwargs["artist_ids"], found_artist_ids)
        data = (found_artist_df, missing_artist_ids)
    else:
        return None
    return data