In [2]:
from collections import defaultdict
from openai import OpenAI
import replicate
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import AuthenticationException, ConnectionError
from transformers import T5ForConditionalGeneration, T5Tokenizer
from sentence_transformers import SentenceTransformer
import chromadb
import os
from dotenv import load_dotenv

load_dotenv()

from utils import search_podcasts
from ingest import create_index, download_podcast, transcribe_podcast, encode_podcast, index_podcast
from rag import rag, search

# Setup

In [3]:
def update_session(**kwargs):
    for k, v in kwargs.items():
        session_state[k] = v

def text_input(input_text):
    return input(input_text)

def choose_podcast_option(episode_option):
    update_session(episode_option_selected=False)
    if episode_option == "1. Try a sample":
        update_session(episode_option_selected=True, episode_option=episode_option)
    elif episode_option == "2. Provide the iTunes URL for a specific podcast episode":
        episode_url = text_input("Enter the iTunes URL of the episode you want:")
        update_session(episode_option_selected=True, episode_option=episode_option, episode_url=episode_url)
    elif episode_option == "3. Provide a name of a podcast to explore its most recent episode":
        term = text_input("Enter a search term for podcasts:")
        try:
            if term != '':
                found_podcasts = search_podcasts(term)
                if found_podcasts['status'] == 'Fail':
                    raise Exception
                else:
                    podcast_names = [f"{podcast['collectionName']} by {podcast['artistName']}" for podcast in found_podcasts['podcasts']]
                    selected_podcast = selectbox("Select a podcast:", podcast_names)
                    selected_index=podcast_names.index(selected_podcast)
                    update_session(episode_option_selected=True, episode_option=episode_option, found_podcasts=found_podcasts['podcasts'], selected_index=selected_index)
        except Exception:
            print("Please enter a valid search term.")

def choose_encoder(sentence_encoder):
    update_session(sentence_encoder_selected=False)
    if sentence_encoder == "1. T5":
        encoder=SentenceTransformer("sentence-transformers/sentence-t5-base")
        update_session(sentence_encoder_selected=True, sentence_encoder=sentence_encoder, encoder=encoder)
    elif sentence_encoder == "2. OpenAI":
        embedding_model = "text-embedding-3-large"
        openai_api_key = text_input("OpenAI API Key", key="file_oa_api_key", type="password")
        if openai_api_key != '':
            try:
                oa_embedding_client = OpenAI(api_key=openai_api_key)
                response = oa_embedding_client.models.list()
                update_session(sentence_encoder_selected=True, sentence_encoder=sentence_encoder, embedding_client=oa_embedding_client, embedding_model=embedding_model)
            except:
                print("Invalid API key. Please provide a valid API token.")

def choose_transcription_method(transcription_method, session_state):
    if session_state.get('episode_option', False):
        if session_state['episode_option'] != "1. Try a sample":
            update_session(transcription_method_selected=False)
            if transcription_method=="1. Replicate":
                replicate_api_key = os.getenv('REPLICATE_API_KEY')
                if replicate_api_key != '':
                    try:
                        replicate_client = replicate.Client(api_token=replicate_api_key)
                        response = replicate_client.models.list()
                        update_session(transcription_method_selected=True, transcription_method=transcription_method, transcription_client=replicate_client)
                    except:
                        print("Invalid API key. Please provide a valid API token.")
            elif transcription_method=="2. Local transcription":
                update_session(transcription_method_selected=True, transcription_method=transcription_method)
        else:
            print("The sample podcast doesn't require a transcription method.")
            update_session(transcription_method_selected=True)

def choose_vector_db(vector_db):
    update_session(index_name="podcast-transcriber", vector_db_selected=False)
    if vector_db=="1. Minsearch":
        update_session(vector_db=vector_db)
        update_session(index=create_index(**session_state))
        update_session(vector_db_selected=True, index_created=True)
        print(f"Index {session_state['index'].index_name} was created successfully.")
    elif vector_db=="2. Elasticsearch":
        elasticsearch_api_key = os.getenv('ES_API_KEY')
        elasticsearch_cloud_id = os.getenv('ES_CLOUD_ID')
        if elasticsearch_api_key != '' and elasticsearch_cloud_id != '':
            try:
                es_client = Elasticsearch(cloud_id=elasticsearch_cloud_id, api_key=elasticsearch_api_key)
                response = es_client.cluster.health()
                update_session(vector_db=vector_db, vector_db_client=es_client)
                update_session(index=create_index(**session_state))
                update_session(vector_db_selected=True, index_created=True)
                print(f"Index {[k for k,v in session_state['index'].items()][0]} was created successfully.")
            except AuthenticationException:
                print("Invalid API key or Cloud ID. Please provide a valid tokens.")
            except ConnectionError:
                print("Connection error. Could not connect to the cluster.")
            except Exception as e:
                print(f"An error occurred: {e}")
    elif vector_db=="3. ChromaDB":
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        update_session(vector_db=vector_db, vector_db_client=chroma_client)
        update_session(index=create_index(**session_state))
        update_session(vector_db_selected=True, index_created=True)
        print(f"Index {session_state['vector_db_client'].list_collections()[0].name} was created successfully.")

def choose_llm(llm_option):
    update_session(llm_option_selected=False)
    if llm_option == "1. GPT-4o":
        if session_state['sentence_encoder'] != "2. OpenAI":
            openai_api_key = os.getenv('OPENAI_API_KEY')
            if openai_api_key != '':
                try:
                    oa_client = OpenAI(api_key=openai_api_key)
                    response = oa_client.models.list()
                    update_session(llm_option_selected=True, llm_option=llm_option, llm_client=oa_client)
                except:
                    print("Invalid API key. Please provide a valid API token.")
        else:
            oa_client = session_state['embedding_client']
            update_session(llm_option_selected=True, llm_option=llm_option, llm_client=oa_client)

    elif llm_option == "2. FLAN-5":
        model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large")
        tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large")
        update_session(llm_option_selected=True, llm_option=llm_option, llm_client=model, llm_tokenizer=tokenizer)

# Main

In [4]:
session_state = defaultdict(
    episode_option = "1. Try a sample",
    sentence_encoder = "1. T5",
    transcription_method = "1. Replicate",
    vector_db = "1. Minsearch",
    llm_option = "1. GPT-4o"
)

In [5]:
choose_podcast_option(session_state['episode_option'])
# https://podcasts.apple.com/us/podcast/what-if-the-russian-revolution-hadnt-been-bolshevik/id1682047968?i=1000668755545

In [6]:
choose_encoder(session_state['sentence_encoder'])



In [7]:
choose_transcription_method(session_state['transcription_method'], session_state)

The sample podcast doesn't require a transcription method.


In [8]:
choose_vector_db(session_state['vector_db'])

Index podcast-transcriber was created successfully.


In [9]:
choose_llm(session_state['llm_option'])

In [10]:
# download
episode_details = download_podcast(**session_state)
if episode_details['status'] == 'Success':
    print(episode_details['status_message'])
    update_session(episode_details=episode_details, podcast_downloaded=True)
else:
    print(episode_details['status_message'])
    update_session(podcast_downloaded=False)

Podcast Past Present Future downloaded successfully.


In [11]:
# transcribe
if session_state['podcast_downloaded'] and not session_state.get('interaction_started', False):
    session_state['episode_details'].update(transcribe_podcast(**session_state))
    update_session(podcast_transcribed=True)

In [12]:
# import json

# file_path = "episode_details_id.json"

# # Read the JSON file
# with open(file_path, 'r') as file:
#     episode_details = json.load(file)

# # Now 'data' contains the contents of your JSON file
# session_state['episode_details'] = episode_details

In [13]:
# encode
if session_state['podcast_transcribed'] and not session_state.get('interaction_started', False):
    if session_state['vector_db'] != "1. Minsearch":
        # try:
        session_state['episode_details'].update(encode_podcast(**session_state))
        update_session(podcast_embedded=True)
        # except:
        #     print("Encoding failed.")          
        #     update_session(podcast_embedded=False)
    else:
        update_session(podcast_embedded=True)

In [14]:
# import json

# session_state['episode_details']['cos_sim'] = session_state['episode_details']['cos_sim'][0].item()

# # Assume 'data' is the dictionary or list you want to save as JSON
# data = session_state['episode_details']

# # Specify the file path where you want to save the JSON file
# file_path = 'episode_details_id.json'

# # Write the data to a JSON file
# with open(file_path, 'w') as json_file:
#     json.dump(data, json_file, indent=4)

# print(f"JSON file has been saved to {file_path}")

In [15]:
# populate index
if session_state['podcast_embedded'] and not session_state.get('interaction_started', False):
    index_podcast(**session_state)
    update_session(podcast_indexed=True)

In [16]:
session_state['episode_details']['chunks'][0]

{'text': ' Balancing a wellness routine and busy travel plans?',
 'timestamp': [0, 3.06],
 'id': 1}

In [17]:
session_state['num_results'] = 5
query = 'How might the political landscape have changed if the Bolsheviks were not successful?'
search(query, **session_state)

[{'id': '13',
  'text': ' How might the history of the world have been different?'},
 {'id': '11',
  'text': ' What if not the Bolsheviks, but the left SRs, the left socialist revolutionaries'},
 {'id': '12',
  'text': ' had come out on top? How might the revolution have been different?'},
 {'id': '89',
  'text': ' how powerful were the left SRs relative to the Bolsheviks?'},
 {'id': '17',
  'text': ' So maybe you could sketch out for us just what the political landscape in Russia looked like after that revolution.'}]

In [18]:
result = rag(query, **session_state)
print(list(result))

['If ', 'the ', 'Bolsheviks ', 'had ', 'not ', 'been ', 'successful ', 'and ', 'the ', 'left ', 'Socialist ', 'Revolutionaries ', '(left ', 'SRs) ', 'had ', 'come ', 'out ', 'on ', 'top, ', 'the ', 'political ', 'landscape ', 'in ', 'Russia ', 'could ', 'have ', 'been ', 'markedly ', 'different. ', 'The ', 'left ', 'SRs, ', 'unlike ', 'the ', 'Bolsheviks, ', 'may ', 'have ', 'pursued ', 'a ', 'different ', 'set ', 'of ', 'policies ', 'and ', 'priorities ', 'focused ', 'more ', 'on ', 'agrarian ', 'socialism ', 'and ', 'rural ', 'reform, ', 'given ', 'their ', 'stronger ', 'base ', 'among ', 'the ', 'peasantry. ', 'This ', 'could ', 'have ', 'led ', 'to ', 'a ', 'different ', 'structure ', 'of ', 'governance, ', 'possibly ', 'with ', 'more ', 'emphasis ', 'on ', 'decentralized ', 'power ', 'and ', 'land ', 'redistribution. ', 'The ', 'influence ', 'and ', 'control ', 'over ', 'the ', 'means ', 'of ', 'production ', 'may ', 'have ', 'unfolded ', 'differently, ', 'potentially ', 'affectin

# Retrieval evaluation

In [19]:
import pandas as pd

ground_truth = pd.read_csv('sample/ground-truth-retrieval.csv')
ground_truth = ground_truth.to_dict(orient='records')

In [58]:
ground_truth[0]

{'id': '1',
 'question': 'How can I maintain my wellness routine while traveling?'}

In [59]:
def hit_rate(relevance_total):
    cnt = 0

    for line in relevance_total:
        if True in line:
            cnt = cnt + 1

    return cnt / len(relevance_total)

def mrr(relevance_total):
    total_score = 0.0

    for line in relevance_total:
        for rank in range(len(line)):
            if line[rank] == True:
                total_score = total_score + 1 / (rank + 1)

    return total_score / len(relevance_total)

In [70]:
index = session_state['index']

def minsearch_search(query):
    boost = {'text':3.0}

    results = index.search(
        query=query,
        boost_dict=boost, 
        num_results=10
    )
    
    return results

In [71]:
def evaluate(ground_truth, search_function):
    relevance_total = []

    for q in tqdm(ground_truth):
        doc_id = q['id']
        results = search_function(q)
        relevance = [str(d['id']) == str(doc_id) for d in results]
        relevance_total.append(relevance)

    return {
        'hit_rate': hit_rate(relevance_total),
        'mrr': mrr(relevance_total),
    }

In [72]:
session_state['num_results'] = 5

In [73]:
ground_truth[:1]

[{'id': '1',
  'question': 'How can I maintain my wellness routine while traveling?'}]

In [74]:
from tqdm import tqdm

evaluate(ground_truth, lambda q: minsearch_search(q['question']))

100%|██████████████████████████████████████████████████████████████████████████████████████████████| 2685/2685 [00:04<00:00, 669.73it/s]


{'hit_rate': 0.7515828677839851, 'mrr': 0.5692242322130585}