This notebook is intended to be run from the host machine to submit jobs to Kubernetes.

Before running this notebook, download the raw Spotify Million Playlist Challenge data from AI Crowd:
- Download the 2 data folders from [AI Crowd's Spotify contest page](https://www.aicrowd.com/challenges/spotify-million-playlist-dataset-challenge/dataset_files).
- Unzip them, and place them in `kube-transform/data/spotify_mpc/raw`

In [None]:
PROJECT_NAME = 'spotify_mpc'

In [None]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.extend([
    "../../",
    "../../execution",
    "../../orchestration",
])

import os
from orchestration.spotify_mpc import orchestrate as orch
from orchestration.submit import submit_job

os.environ['PROJECT_NAME'] = PROJECT_NAME

In [None]:
# Build Local
os.environ['K8S_ENV'] = 'minikube'
os.environ['DATA_DIR'] = '/'.join(os.getcwd().split("/")[:-2] + ['data'])
! ../../build_scripts/build_local.sh

First, we'll run `standardize_data`.

This function:
- Groups the raw data into 100 batches of 10k playlists each.
- Saves each part as:
    - A thin playlist dataframe containing 1 row per playlist.
    - A track df with information about each unique track found in those playlists.

The output will be stored in the /standardized folder in the data directory.

This function takes in a list of "parts", where a part represents one of the 100 batches.  For this small contest, let's just standardize the first 8 batches.

orch.standardize_data will therefore produce a Job Config that specifies 8 tasks - one for each part that we want to standardize.

These tasks will get scheduled on our k8s cluster. If we're running this locally, some may need to wait for others to finish. If we're running on EKS, they should all execute in parallel.

In [None]:
submit_job(
    orch.standardize_data(
        input_directory="spotify_mpc/raw/spotify_million_playlist_dataset/data",
        output_directory="spotify_mpc/standardized",
        parts=list(range(8)) # 0-5 for features, 6 for training, 7 for testing
    )
)



Next, let's create a contest from this standardized data.

The contest will consist of:
- A train directory containing 6 training files.
- A track dataframe containing all of the unique tracks present in those 60k training playlists.
- A challenge_set file containing the challenge set that we'll test ourselves against.

In [None]:
submit_job(
    orch.create_contest(
        standardized_input_directory="spotify_mpc/standardized",
        train_parts=list(range(6)),
        track_df_output_path="spotify_mpc/80KPC/track_df.parquet",
        train_output_directory="spotify_mpc/80KPC/train",
        test_part=7,
        n_test_cases=5000,
        challenge_set_output_path="spotify_mpc/80KPC/challenge_set.parquet",
    )
)

Next let's create another challenge set to use for trainig.

Our approach will be:
- Use parts 0-5 (60k playlists) for feature generation.
- Use a challenge set derived from part 6 for training and validation (i.e. to train the model that maps features to the probability that a given track is on a given playlist).
- Use a challenge set derived from part 7 for evaluation against Spotify's metrics.

In [None]:
submit_job(
    orch.create_challenge_set(
        standardized_input_directory="spotify_mpc/standardized",
        test_part=6,
        track_df_path="spotify_mpc/80KPC/track_df.parquet",
        n_test_cases=10000,
        output_path="spotify_mpc/80KPC/challenge_set_training.parquet",
    )
)

Next, we'll generate co-occurrence dictionaries.

A given task will handle a single training part (10k playlists). During a task, we'll:
* Count the co-occurrences between every pair of tracks, and represent using a nested dictionary (sparse representation)
    * Each key will be a track ID, and each value will be a dict of track ID to co-occurrence count.
* Split the resulting dictionary into 10 shards, using a hashing algorithm to ensure that a given key will always map to a given shard.
* Do this for several different kinds of co-occurrence type: total, forward (i.e. A preceeds B in the playlist), etc.

Notes:
* Each task handles a fixed number of playlists (10k), regardless of the overall contest scale.
* The output will only contain top-level keys from track IDs that are contained in the seed tracks our challenge DFs. This is an optimization to reduce computation, because we won't need features for other track IDs.
* The output from a given task is split into a configurable number of shards (10), for scalable aggregation later.
* The output is not "complete": any given dictionary contains incomplete information for its keys - only the co-occurrence counts found in a subset of playlists.
* This may take a while (~12 minutes) when running locally, even for the 80KPC contest.

In [None]:
submit_job(
    orch.generate_co_dicts(
        train_directory="spotify_mpc/80KPC/train",
        challenge_df_paths=[f"spotify_mpc/80KPC/challenge_set{suffix}.parquet" for suffix in ["", "_training"]],
        partial_co_dict_output_directory="spotify_mpc/80KPC/pco",
    )
)


Next, we'll aggregate the co-occurrence dictionaries.

A given task will handle a single shard (1/10 of the tracks found in our challenge DFs) for a single co-occurrence type. During a task, we'll:
* Load the co-occurrence dict for the target shard from each training part.
* Sum these counts to create a complete co-occurrence dict for the tracks in the shard.

Notes:
* Each task handles 1/10 of the total track IDs found in our challenge DF seed tracks.
* While we never hold all co-occurrence information in memory at once - or within a single a file - the output from each task is complete in that, for the tracks in the shard, it contains complete co-occurrence info across the entire training set.
* This may also take (~12 minutes) when running locally for the 80KPC contest.


In [None]:
submit_job(
    orch.reduce_co_partials(
        partial_co_dict_directory="spotify_mpc/80KPC/pco",
        co_dict_output_directory="spotify_mpc/80KPC/co",
    )
)

Now let's identify playlists that match an artist's name, and aren't common phrases.

This will create a useful feature for our deep learning model.

In [None]:
for suffix in ["", "_training"]:
    submit_job(
        orch.identify_artist_playlists(
            challenge_df_path=f"spotify_mpc/80KPC/challenge_set{suffix}.parquet",
            track_df_path="spotify_mpc/80KPC/track_df.parquet",
            output_path=f"spotify_mpc/80KPC/artist_pids{suffix}.json",
        )
    )

Next, we'll generate generic features for our deep learning model.

These features are "generic" in that they pertain to a playlist in the challenge set in general, not to a particular <playlist, candidate track> pair.

In [None]:
for suffix in ["", "_training"]:
    submit_job(
        orch.generate_generic_features_fcnn_mfe(
            challenge_df_path=f"spotify_mpc/80KPC/challenge_set{suffix}.parquet", #
            track_df_path="spotify_mpc/80KPC/track_df.parquet",
            artist_playlists_path=f"spotify_mpc/80KPC/artist_pids{suffix}.json", #
            co_dict_directory="spotify_mpc/80KPC/co",
            output_directory=f"spotify_mpc/80KPC/generic_features_fcnn{suffix}", #
        )
    )


This next transformation does the heavy lifting. It will generate the data samples that we'll use to train our model.

It will create a task for each 1000-row subset of the challenge set. For each row, it will:
- Identify several thousand candidate tracks for the playlist using heuristics.
- Create a full set of features for each <playlist, candidate track> pair.

It will do this for both:
- the "training" challenge set, which we'll use to train a model that can map these features into the probability that a candidate track will be present on a playlist.
- the "real" challenge set, which we'll use to evaluate our model.  We'll infer against these features, and then evaluate those predictions using Spotify's evaluation metrics.

This can take around 35 minutes locally.

In [None]:
for suffix in ["", "_training"]:
    submit_job(
        orch.generate_track_features_fcnn_mfe(
            challenge_df_path=f"spotify_mpc/80KPC/challenge_set{suffix}.parquet", #
            track_df_path="spotify_mpc/80KPC/track_df.parquet",
            challenge_df_generic_features_directory=f"spotify_mpc/80KPC/generic_features_fcnn{suffix}", #
            co_dict_directory="spotify_mpc/80KPC/co",
            output_directory=f"spotify_mpc/80KPC/samples_fcnn{suffix}",
        )
    )

Now that we have our training samples, we need to train our model.

For this step, open a high-memory Google Colab instance (CPU is fine for this contest) or any cloud compute instance. You can try running locally instead, but you'll need to install some dependencies first:
`pip install tensorflow pandas numpy`

If you're not running locally, zip your sample data ('samples_fcnn' and 'samples_fcnn_training') and upload it to your Google drive (for Colab) or your compute instance.

Open the colab/spotify-fcnn.ipynb notebook, make sure set the TRAINING_DIRS and TEST_DIRS to point to your sample data, and run the cells.  It should generate a submission parquet file. This holds the 500 top track predictions for each playlist in the challenge set.

Download that submission file, place it in your data directory, and then proceed to the next transformation.

In [None]:
submit_job(
    orch.evaluate_submission(
        challenge_df_path="spotify_mpc/80KPC/challenge_set.parquet",
        track_df_path="spotify_mpc/80KPC/track_df.parquet",
        submission_directory="spotify_mpc/80KPC/submission_fcnn",
        output_directory="spotify_mpc/80KPC/evaluation_fcnn",
    )
)

# Running in EKS

In [None]:
# Build Remote
import boto3
os.environ['AWS_ACCOUNT_ID'] = boto3.client("sts").get_caller_identity()["Account"]
os.environ['K8S_ENV'] = 'eks'
os.environ['DATA_DIR'] = 's3://kube-transform-data-bucket'
! ../../build_scripts/build_eks.sh

In [None]:
# The raw data is large and can take a while to upload to S3. I'd recommend running the standardization job locally first.
# Then, upload the standardized data to S3, build the remote image, and run the rest of the jobs in EKS.

! aws s3 cp ../../data/spotify_mpc/standardized s3://kube-transform-data-bucket/spotify_mpc/standardized --recursive
