# Parameters

In [None]:
show_intermediate_results = False

# Preparations

## Imports

In [None]:
import polars as pl

playlists = pl.scan_parquet('../processed_data/data_playlist_metadata.parquet')
playlist_tracks = pl.scan_parquet('../processed_data/data_playlist_songs.parquet')
tracks = pl.scan_parquet('../processed_data/data_song_metadata.parquet')

## Source Data

In [None]:
playlists.collect() if show_intermediate_results else None

In [None]:
playlist_tracks.collect() if show_intermediate_results else None

In [None]:
tracks.collect()  if show_intermediate_results else None

# Analysis

## Tokenization

In [None]:
def tokenize(expr: pl.Expr) -> pl.Expr:
    return expr.str.to_lowercase().str.split(' ')

def tokenize_unique(expr: pl.Expr) -> pl.Expr:
    return tokenize(expr)\
        .list.filter(pl.element().ne(''))\
        .list.unique(maintain_order=True)

## Keyword statistics

Step 1: Tokenize the playlist names by splitting on whitespaces.

We currently turn every word into its own separate keyword term.
As a later optimization, it might make sense to treat words most often
occuring together (e.g. `late night`) to make the output more useful.

In [None]:
playlists_tokenized = playlists.select(
    pl.col('playlist.id'),
    pl.col('playlist.name'),
    pl.col('playlist.name').pipe(tokenize_unique).alias('unique_terms'),
)

playlists_tokenized.collect(engine='streaming') if show_intermediate_results else None

Step 2: Aggregate over playlist terms

In [None]:
exploded_playlists_tokenized = playlists_tokenized\
    .explode('unique_terms')\
    .rename({'unique_terms': 'term'})

exploded_playlists_tokenized.limit(100).collect(engine='streaming') if show_intermediate_results else None

In [None]:
tokens = exploded_playlists_tokenized\
    .group_by('term')\
    .agg(pl.col('term').count().alias('playlist_count'))\
    .sort('playlist_count', descending=True)

Review query plan for potential performance/memory problems:

In [None]:
tokens.show_graph(plan_stage='physical', engine='streaming', optimized=True)

In [None]:
tokens.filter(pl.col('playlist_count').ge(100)).collect(engine='streaming')

In [None]:
# Write to CSV
# tokens.filter(pl.col('playlist_count').ge(20)).sink_csv('playlist_keywords.csv', engine='streaming')

The following discoveries where made when manually reviewing the CSV data:

- Also split on & remove common punctuation (`(`, `)`, `[`, `]`,`:`, `#` etc.)
- Remove certain common words that do not provide any information:
  - on
  - by
  - with
  - at
  - and
  - a
  - I
  - ...
- Unify `90's`/`90s` etc.
- Unify `bday`/`birthday`/`b-day` etc.
- Check correlations between consecutive words

## Keyword <=> Song correlations

Ignore tokens that appear only a few types to reduce the size of the `join`/`group_by`.

In [None]:
tokens\
    .filter(pl.col('playlist_count').le(5))\
    .group_by(pl.col('playlist_count').alias('max_playlist_count'))\
    .agg(pl.col('playlist_count').count().alias('num_terms'))\
    .sort('max_playlist_count')\
    .collect(engine='streaming')

In [None]:
relevant_tokens = tokens\
    .filter(pl.col('playlist_count').ge(5))\
    .collect(engine='streaming')

track_keywords = playlist_tracks\
    .join(exploded_playlists_tokenized.join(relevant_tokens.lazy(), how='semi', on='term'), how='inner', on='playlist.id')\
    .group_by('track.id', 'term')\
    .agg(pl.col('term').count().alias('playlist_count'))

track_keywords.limit(50).collect(engine='streaming') if show_intermediate_results else None

In [None]:
q = track_keywords\
    .sort('track.id', 'playlist_count', 'term')\
    .group_by('track.id')\
    .agg(pl.col('term').sort_by('playlist_count', descending=True).head(30))

q.show_graph(plan_stage='physical', engine='streaming', optimized=True)

In [None]:
# q = track_keywords\
#     .sort('track.id', 'playlist_count', 'term')\
#     .group_by('track.id')\
#     .agg(pl.col('term').first())

q = track_keywords.count()

# q.show_graph(plan_stage='physical', engine='streaming', optimized=True)

q.collect() if show_intermediate_results else None

In [None]:
temp_file = 'temp_track_keywords.parquet'
track_keywords.sink_parquet(temp_file)
track_keywords = pl.scan_parquet(temp_file)

In [None]:
temp_file = 'temp_track_keywords_by_track_id.parquet'
track_keywords_by_track_id = track_keywords.sort('track.id')
track_keywords_by_track_id.sink_parquet(temp_file)
track_keywords_by_track_id = pl.scan_parquet(temp_file)

In [None]:
track_keywords_grouped_by_track_id = track_keywords_by_track_id\
    .join(tracks.slice(20, 20).select('track.id').cache(), how='semi', on='track.id')\
    .group_by('track.id')\
    .agg(pl.col('term').sort_by('playlist_count', descending=True).head(20),
         pl.col('playlist_count').sort(descending=True).head(20).alias('playlist_counts'),
         pl.col('playlist_count').sort(descending=True).head(20).sum())\
    .join(tracks.select('track.id', 'track.name', 'track.artists'), how='inner', on='track.id')

# track_keywords_grouped_by_track_id.show_graph(plan_stage='physical', engine='streaming', optimized=True)
track_keywords_grouped_by_track_id.collect()

Using `group_by` to aggregate a column into a list is currently not supported by Polars' `streaming` engine.
To avoid crashing with an OOM, we sequentially process batches of `track.id`s instead:

In [None]:
import math


def process_track_keywords_batch(tracks_batch: pl.LazyFrame) -> pl.LazyFrame:
    return track_keywords_by_track_id\
        .join(tracks_batch, how='semi', on='track.id')\
        .group_by('track.id')\
        .agg(pl.col('term').sort_by('playlist_count', descending=True).head(20),
             pl.col('playlist_count').sort(descending=True).head(20).alias('playlist_counts'),
             pl.col('playlist_count').sort(descending=True).head(20).sum())\
        .join(tracks_batch.select('track.id', 'track.name', 'track.artists'), how='inner', on='track.id')


def process_track_keywords_in_batches():
    row_count = tracks.select(pl.len()).collect().item()
    batch_size = 5000  # Higher batch sizes are faster but have a righer OOM risk
    batch_count = int(math.ceil(row_count / batch_size))

    print(f"Processing {row_count:,} tracks in {batch_count:,} batches of {batch_size:,} items...")

    for batch_index in range(0, batch_count):
        batch_start = batch_index * batch_size
        print(f"Processing batch {batch_index:,}/{batch_count:,}")
        batch_result = process_track_keywords_batch(tracks.slice(batch_start, batch_size))\
            .sort('track.id')
        batch_result.sink_parquet(f'temp_batch_{batch_index}.parquet')

    print("Merging batches...")

    merged: pl.LazyFrame | None = None
    for batch_index in range(0, batch_count):
        batch_data = pl.scan_parquet(f'temp_batch_{batch_index}.parquet')
        merged = (batch_data if merged is None else
                  merged.merge_sorted(batch_data, 'track.id'))

    merged.sink_parquet('temp_keywords_by_track.parquet')

    print("Done.")


process_track_keywords_in_batches()

In [None]:
keywords_by_track = pl.scan_parquet('temp_keywords_by_track.parquet')

keywords_by_track.limit(1000).collect()