In [1]:
!pip install langchain openai chromadb tiktoken jq

import os
import csv
import json
import time
import random
import pandas as pd
import re
from openai import OpenAI
from langchain.document_loaders import JSONLoader
from langchain.document_loaders.csv_loader import CSVLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma

api_key = ''
os.environ["OPENAI_API_KEY"] = api_key
client = OpenAI(api_key=api_key)

Collecting langchain
  Downloading langchain-0.0.345-py3-none-any.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting openai
  Downloading openai-1.3.7-py3-none-any.whl (221 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m221.4/221.4 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting chromadb
  Downloading chromadb-0.4.18-py3-none-any.whl (502 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m502.4/502.4 kB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tiktoken
  Downloading tiktoken-0.5.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m25.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jq
  Downloading jq-1.6.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (656 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

# Input Data

In [2]:
def to_str(playlist):
  # convert the playlist information into str
    playlist_str = f"Playlist Name: {playlist['name']}\n"
    playlist_str += "Tracks:\n"
    for track in playlist['tracks']:
        playlist_str += (f"{track['pos']+1}. {track['track_name']} by {track['artist_name']},Keywords: {track['keywords']},Mood: {track['mood']}, genre: {track['track_genre']}\n ")

    return playlist_str

# RAG

In [3]:
def metadata_func(record: dict, metadata: dict) -> dict:
  # custmized metadata load function for chormadb
    metadata["Moods"] = record.get("Moods" ,  "")
    metadata["Keywords"] = record.get("Keywords", "")
    metadata["artist_name"] = record.get("artist_name", "")
    metadata["genre"] = record.get("genre", "")
    metadata["topic"] = record.get("topic", "")
    metadata["release_date"] = record.get("release_date", "")

    return metadata

In [4]:
def initialize_db(file_path, chunk_size=1000, chunk_overlap=0, persist_directory="./chroma_db"):
  # build a chromadb using OpenAi embedding
    # document loader
    loader = JSONLoader(
        file_path=file_path,
        jq_schema='.track_data[]',
        content_key="track_name",
        metadata_func=metadata_func
    )
    data = loader.load()

    # document transform
    text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    docs = text_splitter.split_documents(data)

    # document embedding
    embeddings = OpenAIEmbeddings()

    # vector database
    db = Chroma.from_documents(docs, embeddings, persist_directory=persist_directory)

    return db

In [5]:
def retrieve(query, db, k=15):
  # retrieve information based on user profile
    # retriever
    retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": k})

    # retrieve relevant documents
    retrieved_docs = retriever.get_relevant_documents(query)
    retrieved_str = '\n'.join([doc.page_content for doc in retrieved_docs])
    return retrieved_str

# Prompt

In [51]:
feature_prompt = """
Analyze a user's playlist to deduce musical preferences. Given the playlist name, and a list of tracks with corresponding artist names:
1. Determine the user's top three favourite music genres based on the tracks' styles.
2. Identify the user's top three favourite artists from the playlist.
3. Ascertain the user's preferred language of music, indicating regional music preferences if any.
4. Infer the contextual use of the playlist by identifying any patterns that suggest particular events, locations, or themes (e.g., workout songs, travel music, 90s hits).
5. Assess the overall mood of the playlist. Categorize and summarize the mood based on the tone and tempo of the songs into 5 words.
Compile the findings into a profile summary that shows the user's musical tastes and the intended experience of the playlist.

Strictly follow the desired output format, and do not add any other entries except the following 5 entries:
Music genres: ...
Artists: ...
Languages: ...
Contextual uses: ...
Mood: ...
"""

recommendation_prompt = """
As a music recommender system, provide tailored song recommendations based on detailed user inputs. Given:
- The user's specific music preferences, such as favorite genres and artists.
- The user's language preferences for music.
- The context or setting in which the user typically listens to the playlist (e.g., during workouts, for relaxation, while commuting).
- The user's mood preferences for songs
Recommend 10 unique, real, published songs that align with the user's musical preferences.
Only output the recommendations in a list format, with each entry following the '[song_name] by [artist_name]' structure.
"""

# GPT

In [48]:
def generate(system_prompt, user_input):
  # generate function for ChatGPT 4 model
    try:
        response = client.chat.completions.create(
            model="gpt-4-1106-preview",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_input}
            ],
            temperature=0, # using greedy decoding
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
          )
        return response.choices[0].message.content.strip()
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Evaluate

In [8]:
def extract_track_artist(data, playlist_index):
  # Convert playlist to a datafram contains track name and artist name
    # Extract the playlist based on the provided index
    playlist = data['playlists'][playlist_index]

    # Create a list to store extracted track information
    tracks_data = []

    # Iterate over each track in the playlist
    for track in playlist['tracks']:
        # Extract the required information
        artist_name = track['artist_name']
        track_name = track['track_name']

        # Append the extracted information to the tracks_data list
        tracks_data.append({'artist_name': artist_name, 'track_name': track_name})

    # Create a DataFrame from the tracks_data list
    df = pd.DataFrame(tracks_data)

    return df

In [11]:
def extract_tracks_to_df(generated_output):
  # Convert recommendation to a datafram contains track name and artist name
    # Split the generated_output into lines
    lines = generated_output.split('\n')

    # Create a list to store extracted track information
    tracks_data = []

    # Iterate over each line
    for line in lines:
        # Strip extra spaces and ignore empty lines
        line = line.strip()
        if line:
            # Remove leading numbers, spaces, and potential quotes
            line = re.sub(r'^\d+\.\s*["“”‘’]*', '', line)

            # Split the line into track and artist
            parts = line.split(' by ')
            track_name = parts[0].strip("'“”‘’\"")
            artist_name = parts[1].strip()

            # Append the extracted information to the tracks_data list
            tracks_data.append({'artist_name': artist_name, 'track_name': track_name})

    # Create a DataFrame from the tracks_data list
    df = pd.DataFrame(tracks_data)

    return df



In [67]:
def recommend(playlist):
  # process from playlist to recommendation
    # get input playlist string
    input_playlist = to_str(playlist)

    # get features from gpt
    query = generate(feature_prompt, input_playlist)

    # get relevant context via rag
    context = retrieve(query, db)

    # get song recommendations from gpt
    user_input = query + f"\nContext:\n{context}"
    recommendations = generate(recommendation_prompt, user_input)

    return recommendations

In [69]:
import csv
import random

def evaluation(data, num_playlists=5):
  # recomend and evaluate performance in HR@10
    with open('evaluation.csv', 'w', newline='') as output_file:
        writer = csv.writer(output_file)
        # Write header
        writer.writerow(['playlist_name','track_playlist', 'recommendation', 'hit_number', 'hit_track'])

        random_playlist_index=[random.randint(0, len(data['playlists'])) for _ in range(num_playlists)]


        for idx in random_playlist_index:
            playlist_name = data['playlists'][idx]['name']
            df_track = extract_track_artist(data, idx)
            # Create a string representation of each playlist in df_track
            track_playlist = ', '.join(df_track['track_name'].astype(str) + " by " + df_track['artist_name'].astype(str))

            output = recommend(data['playlists'][idx])

            df_output = extract_tracks_to_df(output)

            # Create a string representation of each playlist in df_output
            recommendation_playlist = ', '.join(df_output['track_name'].astype(str) + " by " + df_output['artist_name'].astype(str))

            merged_df = pd.merge(df_track, df_output, on=['track_name', 'artist_name'], how='inner')
            hit_num = len(merged_df['track_name'])
            hit_track = ', '.join(merged_df['track_name'].astype(str) + " by " + merged_df['artist_name'].astype(str))


            # Write the data to the file
            writer.writerow([playlist_name,track_playlist, recommendation_playlist, hit_num, hit_track ])


# Main

In [18]:
# Initialize chromadb
db = initialize_db('processed_songs_2w.json')

In [None]:
# load dataset
input_file = 'processed_3000-3999.json'
with open(input_file, 'r') as inputFile:
    data = json.load(inputFile)
# Sound recommendation
evaluation(data, num_playlists=50)

In [None]:
# load dataset
input_file = 'processed_5000-5999.json'
with open(input_file, 'r') as inputFile:
    data = json.load(inputFile)
evaluation(data, num_playlists=50)

In [None]:
# load dataset
input_file = 'processed_42000-42999.json'
with open(input_file, 'r') as inputFile:
    data = json.load(inputFile)

evaluation(data, num_playlists=50)

In [None]:
# load dataset
input_file = 'processed_570000-570999.json'
with open(input_file, 'r') as inputFile:
    data = json.load(inputFile)

evaluation(data, num_playlists=50)