In [None]:
!pip install torch
!pip install sentence-transformers
!pip install transformers
!pip install numpy
!pip install pandas
!pip install scikit-learn
!pip install pinecone
!pip install pinecone-client
!pip install langchain


In [27]:
import numpy as np
import pandas as pd
import os
import torch
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.metrics.pairwise import cosine_similarity
import math
import csv
from openai import OpenAI


In [28]:
os.environ['OPENAI_API_KEY'] = 'api_key'

client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)

In [29]:
from pinecone import Pinecone, ServerlessSpec, Index

# Initialize Pinecone
pc = Pinecone(api_key="api_key")

index_name = "air"

# Check if the index exists; create it if it doesn't
if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=768,  # Dimension of your embeddings
        metric="cosine",  # Similarity metric
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )

index = pc.Index(index_name)

print("Pinecone setup complete!")


Pinecone setup complete!


In [30]:
def load_lyrics_dataset(file_path):
    try:
        # Attempt to read the file with 'latin1' encoding
        df = pd.read_csv(file_path, encoding='latin1', on_bad_lines='skip')
    except Exception as e:
        print(f"Failed to read file: {e}")
        return None

    # Preprocessing
    # Remove rows with missing values in key columns
    df = df.dropna(subset=['Artist', 'Song', 'Lyrics'])
    # Remove rows where lyrics have fewer than 3 words
    df['lyrics_word_count'] = df['Lyrics'].apply(lambda x: len(str(x).split()))
    df = df[df['lyrics_word_count'] >= 3]

    df = df.drop(columns=['lyrics_word_count'])
    return df



In [31]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunk_lyrics(lyrics, chunk_size=100, overlap=50):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=overlap
    )
    return splitter.split_text(lyrics)


In [32]:
def preprocess_and_store_embeddings(data, index, chunk_size=100, overlap=50, batch_size=100):
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    bi_encoder = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')

    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap)
    rows = []

    for idx, row in data.iterrows():
        lyrics = row['Lyrics']
        Song = row['Song']
        artist = row['Artist']

        # Convert lyrics to string and handle potential NaN values
        lyrics = str(lyrics)  # Ensure lyrics is a string
        if lyrics.lower() == 'nan':
            continue

        # Use LangChain chunker
        chunks = splitter.split_text(lyrics)
        for i, chunk in enumerate(chunks):
            rows.append((f"{idx}-{i}", chunk, Song, artist))

    for i in range(0, len(rows), batch_size):
        batch = rows[i:i+batch_size]

        # Extract chunks for embedding
        chunks = [row[1] for row in batch]
        embeddings = bi_encoder.encode(chunks, convert_to_tensor=False)

        # Prepare data for upsert
        vectors = []
        for (vector_id, chunk, Song, artist), embedding in zip(batch, embeddings):
            metadata = {
                "Song": Song,
                "Artist": artist,
                "Lyrics": chunk
            }
            vectors.append((vector_id, embedding.tolist(), metadata))

        # Upsert the batch to Pinecone
        index.upsert(vectors)
        print(f"Upserted batch {i//batch_size + 1}/{(len(rows) + batch_size - 1) // batch_size}")

    print("Embeddings stored in Pinecone!")


In [33]:
class BiEncoder:
    def __init__(self, model_name='sentence-transformers/all-mpnet-base-v2'):
        self.model = SentenceTransformer(model_name)

    def encode_texts(self, texts):
        """Used for encoding lyrics into embeddings."""
        return self.model.encode(texts, convert_to_tensor=True, show_progress_bar=True)


In [34]:
def refine_query_with_chatgpt(query):
    # Few-shot examples to guide the model
    example_message = (
        "You are a helpful assistant tasked with extracting and correcting song lyrics "
        "from user input and identifying the author if mentioned. You should return only the "
        "lyric portion of the query and refine it for correctness. Format your output as follows:\n"
        "query: <start_query>original_query<end_query>\n"
        "refined_query: <start_refined_query>refined_lyric<end_refined_query>\n"
        "author: <start_author>author_name<end_author>\n"
        "If the author is not mentioned, return 'NOT_MENTIONED' for the author.\n\n"
        "Examples:\n"
        "1. User input: \"What's the song from Rihanna that goes like 'how about a round of applause'\"\n"
        "   Output:\n"
        "   query: <start_query>how about a round of applause<end_query>\n"
        "   refined_query: <start_refined_query>How about a round of applause<end_refined_query>\n"
        "   author: <start_author>Rihanna<end_author>\n\n"
        "2. User input: \"Twinkle twinkle litl star how I wondr wht u ar\"\n"
        "   Output:\n"
        "   query: <start_query>Twinkle twinkle litl star how I wondr wht u ar<end_query>\n"
        "   refined_query: <start_refined_query>Twinkle twinkle little star how I wonder what you are<end_refined_query>\n"
        "   author: <start_author>NOT_MENTIONED<end_author>\n\n"
        "3. User input: \"song about a broken heart\"\n"
        "   Output:\n"
        "   query: <start_query>a broken heart<end_query>\n"
        "   refined_query: <start_refined_query>A broken heart<end_refined_query>\n"
        "   author: <start_author>NOT_MENTIONED<end_author>\n\n"
        "Now process this input:\n"
        f"User input: {query}"
    )

    # Call the OpenAI API
    chat_completion = client.chat.completions.create(
        messages=[
            {"role": "system", "content": "You are an assistant tasked with extracting and correcting song lyrics from user input and identifying the author if mentioned."},
            {"role": "user", "content": example_message},
        ],
        model="gpt-4o",
    )

    # Extract and format the result
    output = chat_completion.choices[0].message.content.strip()

    # Manually parse the refined query from the output
    start_tag = "<start_refined_query>"
    end_tag = "<end_refined_query>"
    refined_query = ""
    if start_tag in output and end_tag in output:
        refined_query = output.split(start_tag)[-1].split(end_tag)[0].strip()

    print("Refined query:\n", refined_query)
    return refined_query


In [35]:
def song_retrieval_pipeline(query, index, bi_encoder_model='sentence-transformers/all-mpnet-base-v2',
                            cross_encoder_model='cross-encoder/ms-marco-MiniLM-L-6-v2',
                            bi_encoder_k=100, cross_encoder_k=10):
    # Bi-Encoder
    bi_encoder = BiEncoder(bi_encoder_model)

    refined_query = refine_query_with_chatgpt(query)
    query_embedding = bi_encoder.encode_texts([refined_query])[0].tolist()

    # Query Pinecone
    results = index.query(vector=query_embedding, top_k=bi_encoder_k, include_metadata=True)

    # Initial retrieval with duplicate removal
    candidates = []
    seen_songs = set()

    for match in results["matches"]:
        song_key = (match["metadata"]["Song"], match["metadata"]["Artist"])
        if song_key not in seen_songs:
            seen_songs.add(song_key)
            candidates.append({
                "track_name": match["metadata"]["Song"],
                "artist_name": match["metadata"]["Artist"],
                "lyrics_chunk": match["metadata"]["Lyrics"],
                "score": match.score
            })
            if len(candidates) >= bi_encoder_k:  # Stop if we have enough unique songs
                break

    print("Top 10 results before reranking:")
    for candidate in candidates[:10]:
        print(f"Track: {candidate['track_name']}, Artist: {candidate['artist_name']}, Score: {candidate['score']}")

    candidates = sorted(candidates, key=lambda x: x['score'], reverse=True)[:cross_encoder_k]
    lyrics_chunks = [candidate["lyrics_chunk"] for candidate in candidates]


    from sentence_transformers import CrossEncoder
    
    # Cross-Encoder
    cross_encoder = CrossEncoder(model_name=cross_encoder_model)
    candidate_pairs = [(query, chunk) for chunk in lyrics_chunks]
    scores = cross_encoder.predict(candidate_pairs, batch_size=16)

    # Re-ranking
    ranked_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
    re_ranked_songs = [candidates[i] for i in ranked_indices]

    print("\nTop 10 results after reranking:")
    for song in re_ranked_songs[:10]:
        print(f"Track: {song['track_name']}, Artist: {song['artist_name']}, Score: {song['score']}")

    return re_ranked_songs


In [36]:
# Main program
query = "songs about broken heart"

results = song_retrieval_pipeline(query, index)
print("Top retrieved songs:")
for idx, song in enumerate(results):
    print(f"{idx + 1}. {song['track_name']} by {song['artist_name']}")
    print(f"Lyrics Chunk: {song['lyrics_chunk']}\n")




Refined query:
 Broken heart


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Top 10 results before reranking:
Track: the heart wants what it wants, Artist: selena gomez, Score: 0.464003891
Track: damaged, Artist: danity kane, Score: 0.458360195
Track: cross my broken heart, Artist: the jets, Score: 0.455789953
Track: take a bow, Artist: madonna, Score: 0.44819954
Track: second chance, Artist: 38 special, Score: 0.447755396
Track: me and my broken heart, Artist: rixton, Score: 0.444622606
Track: bad blood, Artist: taylor swift featuring kendrick lamar, Score: 0.429715604
Track: bad time, Artist: grand funk, Score: 0.426480383
Track: stuck like glue, Artist: sugarland, Score: 0.42581436
Track: brokenhearted, Artist: brandy featuring wanya morris, Score: 0.425740361

Top 10 results after reranking:
Track: cross my broken heart, Artist: the jets, Score: 0.455789953
Track: me and my broken heart, Artist: rixton, Score: 0.444622606
Track: take a bow, Artist: madonna, Score: 0.44819954
Track: brokenhearted, Artist: brandy featuring wanya morris, Score: 0.425740361
Tra

In [None]:
# use this to populate the db IF it is empty
def populate_db(dataset_path):
    if not os.path.exists(dataset_path):
        print("Dataset not found. Download the dataset to insert it into the db.")
        return

    data = load_lyrics_dataset(dataset_path)
    preprocess_and_store_embeddings(data, index, chunk_size=100, overlap=50)
    print("Data saved in db.")

dataset_path = "dataset.csv"
populate_db(dataset_path)
