# 从 Spotify 中手机播放列表和歌曲

为歌曲推荐系统创建训练集。

登陆 Spotify：用 [Spotipy](https://github.com/plamere/spotipy) 客户端

In [None]:
import json

# create an app on
#     https://developer.spotify.com/dashboard/applications
# and fill configs in "spotify-key.json":
#     { "client_id": "...", "client_secret": "..." }

with open("./spotify-key.json") as f:
    _spotify_key = json.load(f)

CLIENT_ID = _spotify_key["client_id"]
CLIENT_SECRET = _spotify_key["client_secret"]

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

sp = spotipy.Spotify(
    auth_manager=SpotifyClientCredentials(client_id=CLIENT_ID,
                                          client_secret=CLIENT_SECRET))

In [None]:
import time
from spotipy import SpotifyException


def next_page(spotify: spotipy.Spotify, prev_result):
    """fetch next page

    https://spotipy.readthedocs.io/en/2.19.0/#spotipy.client.Spotify.next

    :return: result of next page
    """
    tries = 3
    while tries > 0:
        try:
            return spotify.next(prev_result)
        except SpotifyException:
            tries -= 1
            time.sleep(0.2)
            if tries <= 0:
                raise


def find_playlists(spotify: spotipy.Spotify, w: str, limit=50, max_offset=1000):
    """获取匹配搜索项 w 的所有播放列表

    https://developer.spotify.com/documentation/web-api/reference/#/operations/search

    :param spotify: spotipy.Spotify 实例
    :param w: 要搜索的关键字
    :param limit: 一次请求多少(>= 0 <= 50)，
        控制内部行为，不是生成 limit 个，该函数生成 limit * max_offset 个
    :param max_offset: 你要多少(>= 0 <= 1000)，总量 limit * max_offset
    :return: 生成器，每次出一张播放列表
    """

    offset = 0  # counter

    try:
        res = spotify.search(w, limit=limit, type="playlist")
        while res:
            if (offset := offset + limit) >= max_offset:  # PEP 572: 3.8+
                return

            # for playlist in res["playlists"]["items"]:
            #     yield playlist
            yield from res["playlists"]["items"]  # PEP 380

            res = next_page(spotify, res['playlists'])

    except SpotifyException as e:
        if e.http_status == 404:
            return
        raise

试一下：

In [None]:
i = 0
for pl in find_playlists(sp, 'zutomayo'):  # 只是因为写到这行刚好听着 inside joke
    print(pl, end="\n---\n")
    i += 1
    if i > 3:
        break

Ok，可以跑，接下来，干票大的。

我们从 1 个单词 "a" 开始，获取 5000 个包含这个单词的播放列表。
计算这些列表中所有单词出现的频次。然后对列表中出现次数最多的单词执行相同的操作。
重复，知道获取到足够的列表。

In [None]:
from collections import Counter
from gensim.utils import tokenize

MAX_COUNT_PLAYLISTS = 200000

word_counts = Counter({'a': 1})
playlists = {}
words_seen = set()
playlists = {}
count = 0
dupes = 0

while len(playlists) < MAX_COUNT_PLAYLISTS:
    for w, _ in word_counts.most_common():
        if w not in words_seen:
            break
    word = w

    words_seen.add(word)
    print(f'{count: 7}/{MAX_COUNT_PLAYLISTS: 7}: {count / MAX_COUNT_PLAYLISTS * 100: 2.2f}% > {word}')
    for playlist in find_playlists(sp, word):
        if playlist['id'] in playlists:
            dupes += 1
        elif playlist['name'] and playlist['owner']:
            playlists[playlist['id']] = {
                'owner': playlist['owner']['id'],
                'name': playlist['name'],
                'id': playlist['id'],
            }
            count += 1
            for token in tokenize(playlist['name'], lowercase=True):
                word_counts[token] += 1

In [None]:
len(playlists)

获取列表里的曲目信息：

In [None]:
def tracks_gen(spotify: spotipy.Spotify, playlist):
    """Yield tracks in a playlist

    :param spotify: Spotify 实例
    :param playlist: find_playlists 生成的播放列表
    :return: 生成器，一次产出一个 track
    """
    res = spotify.playlist_items(playlist['id'],
                                 fields='items(track(id, name, artists(name, id), duration_ms)),next')
    while res:
        if not res or not res.get('items'):
            return

        # yield this
        for track in res['items']:
            if track['track']:
                yield track['track']
        # fetch next
        try:
            res = next_page(spotify, res)
        except SpotifyException as e:
            if 400 <= e.http_status <= 499:
                return
            raise

试一下：

In [None]:
p = playlists[(p for p in playlists.keys()).__next__()]

print("playlist:", p)
print("tracks:")

i = 0
for t in tracks_gen(sp, p):
    print(t)
    if (i := i + 1) > 3:
        break

真的跑了：数据写到数据库里。

In [16]:
import os
import sqlite3
import tqdm

BASE_DIR = '/Volumes/shared/murecom/intro/spotify/'
DB_FILE = os.path.join(BASE_DIR, 'songs.db')
PLAYLISTS_FILE = os.path.join(BASE_DIR, 'playlists.ndjson')
SONGS_IDX_FILE = os.path.join(BASE_DIR, 'songs_ids.txt')

if os.path.isfile(DB_FILE):
    os.remove(DB_FILE)

conn = sqlite3.connect(DB_FILE)
c = conn.cursor()

c.execute('CREATE TABLE IF NOT EXISTS songs (id text primary key, name text, artist text)')
c.execute('CREATE INDEX IF NOT EXISTS name_idx on songs(name)')

c.execute('CREATE TABLE IF NOT EXISTS playlists (id text primary key, name text, owner text)')
# c.execute('CREATE INDEX IF NOT EXISTS name_idx on playlists(name)')

c.execute(
    'CREATE TABLE IF NOT EXISTS playlist_song (pid text, sid text, foreign key(pid) references playlists(id), foreign key(sid) references songs(id))')
# c.execute('CREATE INDEX IF NOT EXISTS playlist_idx on playlist_song(pid)')
# c.execute('CREATE INDEX IF NOT EXISTS song_idx on playlist_song(sid)')

tracks_seen = set()
with open(PLAYLISTS_FILE, 'w') as f_playlists:
    with open(SONGS_IDX_FILE, 'w') as f_song_ids:
        for playlist in tqdm.tqdm(list(playlists.values())):
            f_playlists.write(json.dumps(playlist) + '\n')

            c.execute("INSERT OR REPLACE INTO playlists VALUES (?, ?, ?)",
                      (playlist["id"], playlist["name"], playlist["owner"]))

            track_ids = []
            try:
                for track in tracks_gen(sp, playlist):
                    track_id = track['id']
                    if not track_id:
                        continue
                    if not track_id in tracks_seen:
                        c.execute("INSERT OR REPLACE INTO songs VALUES (?, ?, ?)",
                                  (track['id'], track['name'], track['artists'][0]['name']))
                        c.execute("insert or replace into playlist_song values (?, ?)",
                                  (playlist['id'], track['id']))
                    track_ids.append(track_id)
            except SpotifyException as e:
                print("error(tracks_gen): ", e)
                continue
            f_song_ids.write(' '.join(track_ids) + '\n')
            conn.commit()
conn.commit()

  4%|▍         | 7752/193703 [1:40:42<40:15:51,  1.28it/s] 


KeyboardInterrupt: 

In [None]:
c.close()
conn.close()