# Music channels reporting
------------------------

Illustrated report on the contribution of YouTube music channels to my “musical assets” over the last few years (from 2021 to today).

## Introduction

The aim of this notebook is quite simple: to summarize and illustrate the importance of music channels in their daily
 contribution to my personal listening. With a long-format listening process automated for almost 3 years at the time
  of writing (January 7th, 2025), and a single-track listening process automated since January 1st, 2024
  (accompanied
  by metrics collection), it was time to create this new process, which this time will enable me to adjust daily listening sources.

In other words, a new way of choosing which music channels to follow, or not. Channel selection will be based on the
following playlists:

* [🔂 Re-listening](https://www.youtube.com/playlist?list=PLOMUdQFdS-XP8fi89uBQ5P01DN_9tGJHu) (Private): A playlist of all the re-listens I need to do.
* [💙 2021 by dyl_m](https://www.youtube.com/playlist?list=PLOMUdQFdS-XMaC0KBG2EN8lnwkdShXSk9)
* [💙 2022 by dyl_m](https://www.youtube.com/playlist?list=PLOMUdQFdS-XMikLg-T7EuUAnCPdGbTAAJ)
* [💙 2023 by dyl_m](https://www.youtube.com/playlist?list=PLOMUdQFdS-XPGM7HHAX9hVb5lVEKSPnK3)
* [💙 2024 by dyl_m](https://www.youtube.com/playlist?list=PLOMUdQFdS-XNqUpFzE89aHgwn0wrBidyG)
* [💙 2025 by dyl_m](https://www.youtube.com/playlist?list=PLOMUdQFdS-XMfbJk0XdreFpdm2CRCGC-e)

## Dependencies

In [None]:
import datetime as dt
import numpy as np
import pandas as pd
import json
import plotly.express as px
import pyyoutube as pyt
import sys

import src.youtube as s_yt

from sklearn.preprocessing import StandardScaler

In [None]:
SERVICE = s_yt.create_service_local(log=False)  # Create the YouTube API Client prior any API request

In [None]:
with open('../data/pocket_tube.json', 'r', encoding='utf-8') as j_file:
    local_db = json.load(j_file)['MUSIQUE']

## Functions

In [None]:
def get_playlists_content(service: pyt.Client, playlist_id: str) -> list:
    """Get the videos in a YouTube playlist
    :param service: a Python YouTube Client
    :param playlist_id: a YouTube playlist ID
    :return p_items: playlist items (videos) as a list.
    """
    p_items = []
    next_page_token = None
    date_format = '%Y-%m-%dT%H:%M:%S%z'

    while True:
        try:
            request = service.playlistItems.list(part=['snippet', 'contentDetails'],
                                                 playlist_id=playlist_id,
                                                 max_results=50,
                                                 pageToken=next_page_token)  # Request playlist's items

            # Keep necessary data
            p_items += [{'video_id': item.contentDetails.videoId,
                         'video_title': item.snippet.title,
                         'release_date': dt.datetime.strptime(item.contentDetails.videoPublishedAt, date_format) if
                         item.contentDetails.videoPublishedAt else None,
                         'channel_id': item.snippet.videoOwnerChannelId,
                         'channel_name': item.snippet.videoOwnerChannelTitle,
                         'playlist_id': playlist_id} for item in request.items]

            next_page_token = request.nextPageToken
            if next_page_token is None:
                break

        except pyt.error.PyYouTubeException as error:
            print(f'{error.status_code}: {error.message}')
            sys.exit()

    return p_items

In [None]:
def get_channels(service: pyt.Client, channel_list: list) -> list:
    """Get YouTube channels basic information
    :param service: a YouTube service build with 'googleapiclient.discovery'
    :param channel_list: list of YouTube channel ID
    :return information: a dictionary with channels names, IDs and uploads playlist IDs.
    """
    information = []

    # Split task in chunks of size 50 to request on a maximum of 50 channels at each iteration.
    channels_chunks = [channel_list[i:i + min(50, len(channel_list))] for i in range(0, len(channel_list), 50)]

    for chunk in channels_chunks:
        try:
            # Request channels
            request = service.channels.list(part=['snippet'], channel_id=chunk, max_results=50).items

            # Extract upload playlists, channel names and their ID.
            information += [{'channel_name': an_item.snippet.title, 'channel_id': an_item.id} for an_item in request]

        except pyt.error.PyYouTubeException as error:
            print(f'{error.status_code}: {error.message}')
            sys.exit()

    # Sort by channel name alphabetical order
    information = sorted(information, key=lambda dic: dic['channel_name'].lower())

    return information

## Reporting

### Data collection

In [None]:
# Playlist IDs
ids = {'music_2021': 'PLOMUdQFdS-XMaC0KBG2EN8lnwkdShXSk9',
       'music_2022': 'PLOMUdQFdS-XMikLg-T7EuUAnCPdGbTAAJ',
       'music_2023': 'PLOMUdQFdS-XPGM7HHAX9hVb5lVEKSPnK3',
       'music_2024': 'PLOMUdQFdS-XNqUpFzE89aHgwn0wrBidyG',
       'music_2025': 'PLOMUdQFdS-XMfbJk0XdreFpdm2CRCGC-e',
       're_listening': 'PLOMUdQFdS-XP8fi89uBQ5P01DN_9tGJHu'}

rev_ids = {value: key for key, value in ids.items()}  # Reversed dict. for labeling

In [None]:
music_2021 = pd.DataFrame(get_playlists_content(SERVICE, playlist_id=ids['music_2021']))
music_2022 = pd.DataFrame(get_playlists_content(SERVICE, playlist_id=ids['music_2022']))
music_2023 = pd.DataFrame(get_playlists_content(SERVICE, playlist_id=ids['music_2023']))
music_2024 = pd.DataFrame(get_playlists_content(SERVICE, playlist_id=ids['music_2024']))
music_2025 = pd.DataFrame(get_playlists_content(SERVICE, playlist_id=ids['music_2025']))
re_listening = pd.DataFrame(get_playlists_content(SERVICE, playlist_id=ids['re_listening']))

# All data
data = pd.concat([music_2021, music_2022, music_2023, music_2024, music_2025, re_listening]). \
    sort_values(['release_date', 'video_id'], ascending=False, ignore_index=True).dropna()

# Without re-listening
selection = pd.concat([music_2021, music_2022, music_2023, music_2024, music_2025]). \
    sort_values(['release_date', 'video_id'], ascending=False, ignore_index=True).dropna()

data.replace({'playlist_id': rev_ids}, inplace=True)
selection.replace({'playlist_id': rev_ids}, inplace=True)

In [None]:
data

In [None]:
selection

### Release Dates Distribution

In [None]:
release_date_his = px.histogram(data, 'release_date', title='Videos Release Dates Distribution', labels={'release_date': 'Release Date'})
release_date_his.show()

In [None]:
sel_release_date_his = px.histogram(selection, 'release_date', title='Videos Release Dates Distribution',
                                    labels={'release_date': 'Release Date'})
sel_release_date_his.show()

### Count of videos by channel
#### Channel Database

In [None]:
channel_from_pl = data[['channel_id', 'channel_name']] \
    .drop_duplicates() \
    .sort_values('channel_name', ignore_index=True)

channel_from_local = pd.DataFrame(get_channels(SERVICE, local_db))
channel_from_local

#### Count by videos
##### All playlists

In [None]:
chan_count = data.groupby('channel_id')['video_id'].count()

chan_names = data[['channel_id', 'channel_name']] \
    .drop_duplicates() \
    .sort_values('channel_name', ignore_index=True)

chan_count = pd.DataFrame(chan_count) \
    .merge(chan_names, how='left', on='channel_id')[['channel_id', 'channel_name', 'video_id']] \
    .rename(columns={'video_id': 'n_videos'}) \
    .sort_values('n_videos', ascending=False, ignore_index=True)

chan_count

##### Without re-listening

In [None]:
sel_chan_count = selection.groupby('channel_id')['video_id'].count()

sel_chan_count = pd.DataFrame(sel_chan_count) \
    .merge(chan_names, how='left', on='channel_id')[['channel_id', 'channel_name', 'video_id']] \
    .rename(columns={'video_id': 'n_videos_fil'}) \
    .sort_values('n_videos_fil', ascending=False, ignore_index=True)

sel_chan_count

#### Weight based on Release Date
##### All playlists

In [None]:
scaler = StandardScaler()
data['date_weight'] = np.exp(scaler.fit_transform(data.release_date \
                                                  .astype('int') \
                                                  .to_numpy() \
                                                  .reshape(-1, 1)))

data_w = data.groupby('channel_id')['date_weight'].sum()

data_w = pd.DataFrame(data_w) \
    .merge(chan_names, how='left', on='channel_id')[['channel_id', 'channel_name', 'date_weight']] \
    .sort_values('date_weight', ascending=False, ignore_index=True)

data_w

##### Without re-listening

In [None]:
selection['date_weight'] = np.exp(scaler.fit_transform(selection.release_date \
                                                       .astype('int') \
                                                       .to_numpy() \
                                                       .reshape(-1, 1)))

selection_w = selection.groupby('channel_id')['date_weight'].sum()

selection_w = pd.DataFrame(selection_w) \
    .merge(chan_names, how='left', on='channel_id')[['channel_id', 'channel_name', 'date_weight']] \
    .rename(columns={'date_weight': 'date_weight_fil'}) \
    .sort_values('date_weight_fil', ascending=False, ignore_index=True)

selection_w

### All metrics and classification

In [None]:
metrics = chan_count.merge(sel_chan_count, how='left', on=['channel_id', 'channel_name']) \
    .merge(data_w, how='left', on=['channel_id', 'channel_name']) \
    .merge(selection_w, how='left', on=['channel_id', 'channel_name']) \
    .fillna(0) \
    .sort_values(['n_videos', 'n_videos_fil', 'date_weight', 'date_weight_fil'], ascending=False, ignore_index=True)

metrics

#### To add to Favorites

In [None]:
nv_95 = float(metrics.n_videos.quantile(0.95))
nvf_95 = float(metrics.n_videos_fil.quantile(0.95))
dw_95 = float(metrics.date_weight.quantile(0.95))
dwf_95 = float(metrics.date_weight_fil.quantile(0.95))

favorites = metrics.loc[(metrics['n_videos'] >= nv_95) &
                        (metrics['n_videos_fil'] >= nvf_95) &
                        (metrics['date_weight'] >= dw_95) &
                        (metrics['date_weight_fil'] >= dwf_95), :]

print(favorites[['channel_name', 'channel_id']])

#### To add to DB

In [None]:
nv_75 = float(metrics.n_videos.quantile(0.75))
nvf_75 = float(metrics.n_videos_fil.quantile(0.75))
dw_75 = float(metrics.date_weight.quantile(0.75))
dwf_75 = float(metrics.date_weight_fil.quantile(0.75))

not_following = metrics.loc[~metrics.channel_id.isin(channel_from_local.channel_id), :]

to_follow = not_following.loc[(not_following['n_videos'] > nv_75) &
                              (not_following['n_videos_fil'] > nvf_75) &
                              (not_following['date_weight'] > dw_75) &
                              (not_following['date_weight_fil'] > dwf_75), :]

print(to_follow[['channel_name', 'channel_id']])

#### Uninteresting channels, to delete from DB

In [None]:
dw_25 = float(metrics.date_weight.quantile(0.25))
dwf_25 = float(metrics.date_weight_fil.quantile(0.25))

listed = metrics.loc[metrics.channel_id.isin(channel_from_local.channel_id), :]
uninteresting = listed.loc[(listed['date_weight'] <= dw_25) &
                           (listed['date_weight_fil'] <= dwf_25), :]

print(uninteresting[['channel_name', 'channel_id']])

#### Unlisted in playlists, to delete from DB (?)

In [None]:
unlisted = channel_from_local.loc[~channel_from_local.channel_id.isin(metrics.channel_id),:]
print(unlisted)