# Curate Tech Talks from Events

We're now going to use the power of an LLM with a custom prompt to build our curated corpus.

Here's some principals of how this is going to work:

1. We load our IDs of meetups stored in Parquet
2. For each ID:
    1. We run a query against our event corpus with the metadata filter "doc.id = 'ID'", this limits the data to just that event
    2. We use a custom prompt which:
        * Looks at who was speaking, and what they were speaking about
        * Identifies if this was a paid or free event
        * Outputs the result into a JSON structure which we can leverage
    3. With our JSON in hand, we can now create our curated corpus of tech talks with speaker information.

In [None]:
## Standard Imports and Logging


In [None]:
from vectara_client.core import Factory
from vectara_client.admin import CorpusBuilder
import pandas as pd
import duckdb
import pyarrow as pa
import logging
import json

logging.basicConfig(format='%(asctime)s:%(name)-35s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%H:%M:%S %z')
logging.getLogger("OAuthUtil").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)

In [None]:
client = Factory().build()
manager = client.corpus_manager

In [None]:
con = duckdb.connect()
con.execute("CREATE TABLE session_events AS SELECT * FROM '../output/session_events.parquet';")

events = con.execute("SELECT * FROM session_events;").fetchdf().to_dict('records')

# Restore the JSON
for event in events:
    event['sessions'] = json.loads(event['sessions_json'])
    del event['sessions_json']


In [None]:
valid_sessions = []
events_by_id = {}

for event in events:
    events_by_id[event["id"]] = event
    if "sessions" in event:    
        sessions = event["sessions"]
        if "tech-sessions" in sessions:
            logger.debug(f"Event [{event['id']}] is valid")
            tech_sessions = sessions["tech-sessions"]
            if len(tech_sessions) == 0:
                logger.debug(f"tech-sessions is empty in event [{event['id']}]")
            else:
                for session_index, tech_session in enumerate(tech_sessions):
                    if "title" not in tech_session:
                        logger.debug(f"In event [{event['id']}], session [{session_index}], title is missing")
                        continue
                    if "description" not in tech_session:
                        logger.debug(f"In event [{event['id']}], session [{session_index}], description is missing")
                        continue
                    if "speakers" not in tech_session:
                        logger.debug(f"In event [{event['id']}], session [{session_index}], speakers is missing")
                        continue
                    else:
                        speakers = tech_session["speakers"]
                        if len(speakers) == 0:
                            logger.debug(f"In event [{event['id']}], session [{session_index}], speakers is empty")
                            tech_session["speakers_valid"] = False
                        else:
                            speakers_valid = True
                            for speaker_index, speaker in enumerate(speakers):
                                if "name" not in speaker:
                                    logger.debug(f"In event [{event['id']}], session [{session_index}], speaker [{speaker_index}] name is missing")
                                    speakers_valid = False
                                    break
                                if "bio" not in speaker:
                                    logger.debug(f"In event [{event['id']}], session [{session_index}], speaker [{speaker_index}] bio is missing")
                                    speakers_valid = False
                                    break
                                if "company" not in speaker:
                                    logger.debug(f"In event [{event['id']}], session [{session_index}], speaker [{speaker_index}] company is missing")
                                    speakers_valid = False
                                    break
                            if speakers_valid:
                                    logger.debug(f"In event [{event['id']}], session [{session_index}], speaker [{speaker_index}], Valid Speaker info")
                            tech_session["speakers_valid"] = speakers_valid

                    logger.debug(f"In event [{event['id']}], found valid session [{session_index}]")
                    tech_session["event_id"] = event["id"]
                    tech_session["session_index"] = session_index
                    valid_sessions.append(tech_session)
                    
        else:
            logger.debug(f"sessions in event [{event['id']}] is not valid, missing \"tech-sessions\"")
    else:
        logger.debug(f"event [{event['id']}] is not valid, missing \"sessions\"")    
     

In [None]:
vectara_documents = []

for session in valid_sessions:
    event = events_by_id[session["event_id"]]

    description = session["description"]
    
    if session["speakers_valid"]:
        speakers = []
        companies = []
    
        # Semantic friendly represnetation
        speaker_infos = []
    
        for speaker_index, speaker in enumerate(session["speakers"]):
            name = speaker["name"]
            company = speaker["company"]
            bio = speaker["bio"]
            
            speakers.append(name)
            companies.append(company)
            speaker_infos.append(f"Speaker {speaker_index+1}: {name}@{company} - {bio}.")

        speaker_fragment = "\n".join(speaker_infos)
        description = f"Title:{event['title']}\nWhen: {event['event_date']}\n{description}\n{speaker_fragment}"
    
    metadata = {
        "event_date": event["event_date"],
        "event_year": event["event_year"],
        "event_month": event["event_month"],
        "event_type": event["event_type"],
        "is_online": event["is_online"],
        "url": event["url"],
        "speaker": speakers,
        "company": companies,
        "num_going": event["num_going"]
    }
    metadata_json = json.dumps(metadata)


    
    to_index = {
      "document_id": f"{event['id']}-{session['session_index']}",
      "title": session["title"],
      "metadata_json": metadata_json,
      "section": [
        {
          "text": description
        }
      ],
      "customDimension": [
          {"name": "going", "value": event["num_going"]}
      ]
    }
    vectara_documents.append(to_index)

In [None]:
for doc in vectara_documents[0:1]:
    logger.info(f"Tech session:\n{json.dumps(doc, indent=4)}")

In [None]:
from vectara_client.core import Factory
from vectara_client.admin import CorpusBuilder
from vectara_client.domain import Dimension
import logging

corpus = (CorpusBuilder("AICamp Sessions")
         .description("This is where we put our events with their raw description")
         .add_attribute("event_date", "When the event occurred in yyyy-mm-dd format", type="text")
         .add_attribute("event_year", "Which year the event occured")
         .add_attribute("event_month", "Which month the event occurred")
         .add_attribute("event_type", "Delivery format: (online or physical)")
         .add_attribute("is_online", "Whether this was an online event (boolean)", type="boolean")
         .add_attribute("url", "A trackback to meetups.com", indexed=False)
         .add_attribute("speaker", "List of presenters", type="text_list")
         .add_attribute("company", "List of organizations of the Speakers.", type="text_list")
         .add_attribute("num_going", "Count of people who are attending.", type="integer") 
        .build())

going_dim = Dimension("going", "How many attended", 0.2, 0.2)

corpus.customDimensions = [ going_dim ]

corpus_id = manager.create_corpus(corpus, delete_existing=True)

In [None]:
class SubIndexer:

    def __init__(self, indexer_service, corpus_id):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.indexer_service = indexer_service
        self.corpus_id = corpus_id
        self.docs = []

    def add_doc(self, doc):
        self.docs.append(doc)

    def index_docs(self):
        try:
            for doc in self.docs:
                self.indexer_service.index_doc(self.corpus_id, doc)
        except Exception as e:
            # Ignore for lab
            self.logger("Error: {e}")

thread_count = 10
sub_indexers = [ SubIndexer(client.indexer_service, corpus_id) for x in range(thread_count)]


for index, doc in enumerate(vectara_documents):
    thread_index = index % thread_count
    sub_indexers[thread_index].add_doc(doc)

In [None]:
from threading import Thread

threads = []
for sub_indexer in sub_indexers:
    thread = Thread(target = sub_indexer.index_docs)
    threads.append(thread)
    thread.start()


for index, thread in enumerate(threads):
    logger.info(f"Joining thread {index}")
    thread.join()