<a href="https://colab.research.google.com/github/DevDope/Abracadabra/blob/main/End_Abracadabra_Gradio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip uninstall torch torchvision torchaudio -y
!pip install torch==2.4.1
!pip install chromadb
!pip install tqdm
!pip uninstall pydantic -y
!pip install pydantic==1.10.9
!pip install gradio==3.41.2
!pip install farm-haystack[transformers] transformers torch
!pip install -U httpx httpcore

In [None]:
from google.colab import drive
import chromadb


drive.mount('/content/drive')


chromadb_path = '/content/drive/My Drive/chroma_db'


db = chromadb.PersistentClient(path=chromadb_path)


chroma_collection = db.get_or_create_collection("music_recommendation")

collection = db.get_or_create_collection("music_recommendation")
print(f"Colección 'music_recommendation' cargada: {chroma_collection}")

In [None]:
from huggingface_hub import login


login(token="your token here!")


In [None]:
from google.colab import drive
from chromadb import PersistentClient
from transformers import AutoModelForCausalLM, AutoTokenizer
from IPython.display import Markdown, display
import torch

if torch.cuda.is_available():
    device = "cuda"
else:
    raise ValueError("error")


In [None]:
yaml_content = """
version: "1.0"
components:
  - name: DocumentStore
    type: InMemoryDocumentStore
    params:
      similarity: "cosine"

  - name: Retriever
    type: DensePassageRetriever
    params:
      document_store: DocumentStore
      query_embedding_model: "facebook/dpr-question_encoder-single-nq-base"
      passage_embedding_model: "facebook/dpr-ctx_encoder-single-nq-base"
      use_gpu: True

  - name: PopularSongRetriever
    type: EmbeddingRetriever
    params:
      document_store: DocumentStore
      embedding_model: "sentence-transformers/all-MiniLM-L6-v2"
      pooling_strategy: "mean"
      use_gpu: True
      top_k: 10

pipelines:
  - name: context_retrieval
    nodes:
      - name: Retriever
        inputs: [Query]
      - name: PopularSongRetriever
        inputs: [Retriever]




"""


with open("/content/pipeline.yaml", "w", encoding="utf-8") as file:
    file.write(yaml_content)

print("Archivo 'pipeline.yaml' actualizado y creado con éxito.")



Archivo 'pipeline.yaml' actualizado y creado con éxito.


In [None]:
import time
import re
import torch
import random
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from haystack.pipelines import Pipeline
from haystack.document_stores import InMemoryDocumentStore
from haystack.nodes import DensePassageRetriever
from sklearn.feature_extraction.text import CountVectorizer


model_name = "meta-llama/Llama-2-7b-chat-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=True, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    use_auth_token=True,
    trust_remote_code=True
).to("cuda")


pipeline = Pipeline.load_from_yaml(path="/content/pipeline.yaml", pipeline_name="context_retrieval")
document_store = InMemoryDocumentStore()
retriever = DensePassageRetriever(document_store=document_store)
document_store.update_embeddings(retriever)

torch.cuda.empty_cache()
song_history = set()

GENRES = [
    'hip-hop', 'pop', 'rap', 'rock', 'indie', 'folk', 'electronic', 'soul',
    'classic rock', 'rnb', 'alternative', 'math rock', 'j-pop', 'country', 'indie rock',
    'hip hop', 'alternative rock', 'metal', 'jazz', 'indie pop', 'reggae', 'punk',
    'metalcore', 'post-hardcore', 'christian', 'worship', 'experimental', 'hard rock',
    'punk rock', 'trap', 'pop punk', 'hardcore', 'progressive rock', 'dance', 'emo',
    'new wave', 'acoustic', 'funk', 'blues', 'heavy metal', 'psychedelic', 'post-punk',
    'soundtrack', 'cloud rap', 'lo-fi', 'progressive metal', 'dub', 'dancehall', 'dream pop',
    'synthpop', 'comedy', 'death metal', 'thrash metal', 'alt-country', 'house', 'pop rock',
    'psychedelic rock', 'ambient', 'grunge', 'shoegaze', 'industrial', 'black metal',
    'screamo', 'emo rap', 'nu metal', 'garage rock', 'power metal', 'gospel', 'electropop',
    'deathcore', 'chillout', 'grime', 'britpop', 'trip-hop', 'melodic death metal', 'dubstep',
    'disco', 'doom metal', 'swing', 'lo-fi', 'k-pop', 'chillwave', 'trance', 'techno', 'house',
    'drum and bass', 'electro', 'reggaeton', 'classical', 'latin'
]
EMOTIONS = ["joy", "love", "calm", "mystic", "serene","angry","suprise"]

def extract_keywords_from_prompt(prompt, top_n=10):
    vectorizer = CountVectorizer(max_features=top_n, stop_words='english')
    X = vectorizer.fit_transform([prompt])
    keywords = vectorizer.get_feature_names_out()
    return keywords

def extract_filters(prompt):
    emotion_filter = [word for word in EMOTIONS if word in prompt.lower()]
    genre_filter = [word for word in GENRES if word in prompt.lower()]
    return {"emotion": emotion_filter, "genre": genre_filter}

PROHIBITED_KEYWORDS = ["violence", "explicit", "inappropriate_word1", "inappropriate_word2"]

def filter_prohibited_content(documents):
    filtered_docs = []
    for doc in documents:
        title = doc.get('song', '').lower()
        text = doc.get('text', '').lower()

        if not any(keyword in title for keyword in PROHIBITED_KEYWORDS) and \
           not any(keyword in text for keyword in PROHIBITED_KEYWORDS):
            filtered_docs.append(doc)
    return filtered_docs

def search_documents(query, genre_filter, top_k=50):
    lyric_keywords = extract_keywords_from_prompt(query)

    start_time = time.time()
    results = pipeline.run(query=query, params={"Retriever": {"top_k": 100, "filters": {"genre": genre_filter}}})
    popular_results = pipeline.run(query=query, params={"PopularSongRetriever": {"top_k": 100}})
    end_time = time.time()
    print(f"Búsqueda de documentos completada en {end_time - start_time:.2f} segundos.")

    filtered_results = filter_prohibited_content(results['documents'])
    unique_artists = set()
    final_results = []

    for doc in filtered_results:
        title = doc['song'].lower()
        text = doc['text'].lower()
        artist = doc['artist'].lower()
        song_genre = doc.get('genre', '').lower()

        if genre_filter in song_genre and artist not in unique_artists:
            title_match = any(keyword in title for keyword in lyric_keywords)
            text_match = any(keyword in text for keyword in lyric_keywords)

            if title_match:
                final_results.append((doc, 2))
                unique_artists.add(artist)
            elif text_match:
                final_results.append((doc, 1))
                unique_artists.add(artist)

    random.shuffle(popular_results['documents'])
    for pos in [0, 2, 9]:
        if pos < len(popular_results['documents']):
            popular_doc = popular_results['documents'][pos]
            artist = popular_doc['artist'].lower()
            song_genre = popular_doc.get('genre', '').lower()
            if artist not in unique_artists and genre_filter in song_genre:
                final_results.insert(pos, (popular_doc, 3))
                unique_artists.add(artist)

    final_results.sort(key=lambda x: x[1], reverse=True)
    random.shuffle(final_results)

    return [doc[0].content for doc in final_results[:top_k]]

def generate_response(prompt, retrieved_songs, max_new_tokens=800):
    context = "\n".join(
        f"{i+1}. '{song['title']}' by {song['artist']} - A {song.get('genre', 'genre')} song with a {song.get('emotion', 'emotion')} feel. "
        f"Lyric snippet: \"{song['text'][:100]}...\""
        for i, song in enumerate(retrieved_songs)
    )

    final_prompt = (
        f"{prompt}\n\n"
        f"Here is a playlist that captures the ambiance you described, with popular and themed selections:\n\n"
        f"{context}\n\n"
        f"Playlist:\n1."
    )

    inputs = tokenizer(final_prompt, return_tensors="pt", truncation=True, max_length=2048).to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        pad_token_id=tokenizer.eos_token_id
    )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return response.strip()

def clean_response(response):
    lines = response.split("\n")
    cleaned_lines = []
    global song_history
    start_collecting = False

    for line in lines:
        if "Playlist:" in line:
            start_collecting = True
            continue
        if start_collecting and line.strip() and line[0].isdigit() and '.' in line:
            line = line.replace("'", "").replace('"', "")
            song_info = line.split(' - ')
            if len(song_info) >= 2:
                cleaned_lines.append(line.strip())
        if len(cleaned_lines) == 10:
            break
    if len(cleaned_lines) != 10:
        return "Error: The generated playlist does not contain exactly 10 songs."
    return "\n".join(cleaned_lines)

def query_rag_for_playlist(question):
    filters = extract_filters(question)
    genre_filter = filters.get("genre", [])
    emotion_filter = filters.get("emotion", [])


    if genre_filter:
        genre_filter = genre_filter[0]
    else:
        genre_filter = None

    context_documents = search_documents(question, genre_filter)
    if context_documents:
        context_excerpt = "\n".join(context_documents[:1])
    else:
        context_excerpt = "No relevant context found."


    if genre_filter:
        genre_text = f"the genre '{genre_filter}'"
    else:
        genre_text = "the ambiance and emotions described"

    prompt = (
        f"{question}\n\n"
        f"Provide exactly 10 unique songs that match this setting and reflect {genre_text}. "
        f"Include both popular hits and hidden gems, with each song by a different artist. "
        f"List them as 'Song Title - Artist'.\n\n"
        f"Playlist:\n1."
    )

    response = generate_response(prompt, retrieved_songs=context_documents, max_new_tokens=800)
    cleaned_response = clean_response(response)
    return cleaned_response

def generate_youtube_links(response):
    lines = response.split("\n")
    youtube_links = []

    for line in lines:
        if ' - ' in line:
            song_info = line.split(" - ")
            song_title = song_info[0].strip()
            artist_info = song_info[1].split(" (")[0].strip()

            query = f"{song_title} {artist_info}"
            youtube_link = f"https://www.youtube.com/results?search_query={query.replace(' ', '+')}"
            youtube_links.append(f"[{song_title} - {artist_info}]({youtube_link})")

    return "\n".join(youtube_links)

def generate_playlist_explanation(response, genre_filter):
    if genre_filter:
        genre_text = f"the genre '{genre_filter}'"
    else:
        genre_text = "the emotions and themes described in the prompt"

    prompt_for_explanation = (
        f"Based on the following playlist, explain why each song was chosen for {genre_text}. "
        f"Discuss why each song is a good fit for the playlist.\n\n"
        f"Playlist:\n{response}\n\nExplanation:"
    )

    inputs = tokenizer(prompt_for_explanation, return_tensors="pt", truncation=True, max_length=2048).to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=1000,
        pad_token_id=tokenizer.eos_token_id
    )
    explanation = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return explanation.strip()

def gradio_generate_playlist(question):
    response = query_rag_for_playlist(question)
    youtube_links = generate_youtube_links(response)

    filters = extract_filters(question)
    genre_filter = filters.get("genre", [])[0] if filters.get("genre") else None
    explanation = generate_playlist_explanation(response, genre_filter)

    return response, youtube_links, explanation

iface = gr.Interface(
    fn=gradio_generate_playlist,
    inputs="text",
    outputs=["text", "markdown", "text"],
    title="Abracadabra Playlist Generator",
    description="A LLM with a RAG of 500000 songs"
)

iface.launch(share=True)

