# Setup

In [None]:
!pip install -q -U langchain-text-splitters langchain-community langgraph langchain-openai langchain-chroma pymupdf
!pip install langchain_pinecone
!pip install pinecone-client


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/67.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m30.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m148.7/148.7 kB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.5/54.5 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.0/20.0 MB[0m [31m74.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m611.1/611.1 kB[0m [31m44.7 MB/s[0m eta [36m0:00:

In [None]:
import os

os.environ["PINECONE_API_KEY"] = ""  # insert key here
os.environ["OPENAI_API_KEY"] = ""  # insert key here

## Load Pinecone Index

In [None]:
from pinecone import Pinecone

pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index_pc = pc.Index("guy-index-3-large")

In [None]:
# Verify the Index is empty of vectors, if not, empty it
stats = index_pc.describe_index_stats()
if stats["total_vector_count"] > 0:
  index_pc.delete(delete_all=True) # empty index

## Load LLM

In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

## Load Embedding Model

In [None]:
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

# Helper Functions

## MetaData Handling

### Date handling functions

In [None]:
from datetime import date, datetime, timedelta

def string_date_to_epoch_time(date_string: str) -> int:
  '''
  Translate string date to epoch time
  date_string format: YYYY-MM-DD

  return
  ------
  epoch_time
  '''
  date_format = "%Y-%m-%d"
  dt_object = datetime.strptime(date_string, date_format)
  epoch_time = int(dt_object.timestamp())
  return epoch_time

def epoch_time_to_date(epoch_time: int) -> str:
  '''
  Translate epoch time to string date
  date_string format: YYYY-MM-DD

  return
  ------
  date_string
  '''
  dt_object = datetime.utcfromtimestamp(epoch_time)
  date_string = dt_object.strftime("%Y-%m-%d")
  return date_string

def get_last_week_range():
  '''
  Get the start and end dates of the last week.
  (using current time)

  return
  ------
  start_of_last_week: date
  end_of_last_week: date
  '''
  today = datetime.now()
  start_of_this_week = today - timedelta(days=(today.weekday() + 1) % 7)
  start_of_last_week = start_of_this_week - timedelta(weeks=1)
  end_of_last_week = start_of_this_week - timedelta(days=1)
  return start_of_last_week.date(), end_of_last_week.date()

def get_most_recent_date(month, day):
  '''
  For a given month-day pair, gets the last time this date occured.

  Input
  ------
  month: int
  day: int

  return
  ------
  most_recent_date: date
  - "current year"-month-day
  '''
  today = date.today()
  current_year_date = date(today.year, month, day)

  # If the current year's date is in the future, use the previous year
  if current_year_date > today:
      return current_year_date.replace(year=today.year-1)
  return current_year_date

### MetaData extraction

In [None]:
def extract_metadata_from_df(file_name, df_metadata):
  '''
  Extracts metadata from the df into a dict by file_name.

  return
  -------
  - Dict: {
      'source_type': lecture / recitation (str)
      'number': number (int)
      'date': date in epoch time (int)
      'is_summary': True / False (bool)
    }
  '''
  meta_row = df_metadata[df_metadata['file_name'] == file_name]
  source_type = meta_row.iloc[0]['source_type']
  number = int(meta_row.iloc[0]['number'])
  date = string_date_to_epoch_time(meta_row.iloc[0]['date']) # turn to eppoch time
  is_summary = bool(meta_row.iloc[0]['is_summary'])
  return {"source_type": source_type, "number": number, "date": date, "is_summary": is_summary}


def unpack_metadata_from_result(result):
  '''
  Inplace change the metadata types to "datetime.date()" and "int"
  '''
  result.metadata["date"] = epoch_time_to_date(result.metadata["date"])
  result.metadata["number"] = int(result.metadata["number"])

## Query Analyzer

In [None]:
def get_list_of_filter_criteria(filter: dict):
    '''
    For a given Filter returned from query_constructor
    Returns a list of all the filter criteria.

    examples of filters and lists:
    {}
      --> []
    {'filter': {'number': {'$lt': 5}}}
      --> [{'number': {'$lt': 5}}]
    {'filter': {'$and': [{'is_summary': {'$eq': True}}, {'number': {'$in': [5, 8]}}]}}
      --> [{'is_summary': {'$eq': True}}, {'number': {'$in': [5, 8]}}]
    '''
    if filter == {} or not isinstance(filter, dict):
      return []
    criteria_dict = filter['filter']
    if '$and' in criteria_dict:
      return criteria_dict['$and']
    elif '$or' in criteria_dict:
      return criteria_dict['$or']
    else: # only one criteria
      return [criteria_dict]


def get_field_criteria(filter, field: str = 'date'):
    """
    Check if field exists in the filter.
    If it exists:
      returns field_criteria_lst. a list of dics (because sometimes the same field has two)
    Else:
      return []

    examples of dict:
    - {}, 'date'
     --> []
    - {'filter': {'date': {'$gt': [{'date': '2023-09-20', 'type': 'date'}]}}}, 'date'
     --> [{'date': {'$gt': {'date': '2023-09-20', 'type': 'date'}}}]
    - {'filter': {'$and': [{'source_type': {'$in': ['recitation']}}, {'date': {'$lt': {'date': '2023-09-20', 'type': 'date'}}}]}}, 'source_type'
     --> [{'source_type': {'$in': ['recitation']}]

    Args
    -----
    - filter (dict)
    - field: str = 'date' (can be 'number' / 'source_type' / 'is_summary')

    """
    criteria_list = get_list_of_filter_criteria(filter)
    field_criteria_lst = [criteria for criteria in criteria_list if field in criteria]
    return field_criteria_lst


def update_date_in_filter_to_epoch(filter: dict):
    """
    Check if 'date' exists in the filter (within logical operators or standalone)
    Updates structure and converts to epoch time (int).

    examples of dict:
    - {}
     --> {}
    - {'filter': {'date': {'$gt': [{'date': '2023-09-20', 'type': 'date'}]}}}
     --> {'filter': {'date': {'$gt': 1695168000}}}
    - {'filter': {'$and': [{'source_type': {'$in': ['recitation']}}, {'date': {'$lt': [{'date': '2023-09-20', 'type': 'date'}]}}]}}
     --> {'filter': {'$and': [{'source_type': {'$in': ['recitation']}}, {'date': {'$lt': 1695168000}}]}}
    - {'filter': {'$and': [{'source_type': {'$in': ['recitation']}}, {'date': {'$gte': {'date': '2024-12-29', 'type': 'date'}}}, {'date': {'$lte': {'date': '2025-01-04', 'type': 'date'}}}]}}
     --> ...

    Args
    -----
    - translation (tuple): A tuple containing a query (string) and a filter (dict).
    """
    # get 'date' field from filter
    date_criteria_list = get_field_criteria(filter, field='date')
    if len(date_criteria_list)>0: # if date field exists
      for date_criteria in date_criteria_list:
        field_inner_dict = date_criteria['date'] # {'$lt': [{'date': '2023-09-20', 'type': 'date'}]}
        for comparator, value in field_inner_dict.items():
          if not isinstance(value, int):
            if not isinstance(value, str):
              str_date = value['date']
            else:
              str_date = value
            epoch_date = string_date_to_epoch_time(str_date)
            field_inner_dict[comparator] = epoch_date


def update_field_criteria_to_filter(filter: dict, field: str='is_summary', comparator: str = '$eq', val=True):
    """
    Check if field exists in the filter.
    If exists:
      update: field: {comparator: val}
    Else:
      add: field: {comparator: val

    examples of dict (added to 'is_summary' = True)
    - {}
     --> {'filter': {'is_summary': {'$eq': True}}}

    - {'filter': {'date': {'$gt': 1695168000}}}
     --> {'filter': {'$and': [{'date': {'$gt': 1695168000}}, {'is_summary': {'$eq': True}}]}}

    - {'filter': {'$and': [{'date': {'$gt': 1695168000}}, {'is_summary': {'$eq': False}}]}}
     --> {'filter': {'$and': [{'date': {'$gt': 1695168000}}, {'is_summary': {'$eq': True}}]}}
    """
    field_criteria_list = get_field_criteria(filter, field)
    if len(field_criteria_list)>=2:
      print("Error - cannot update fields with 2 criteria")
    elif len(field_criteria_list)==1: # if field allready exists in filter
      field_criteria_list[0][field] = {comparator: val} # update as desired
    else: # if field does not exist in filter
      criteria_list = get_list_of_filter_criteria(filter)
      if len(criteria_list) == 0: # filter is currently empty
        filter['filter'] = {field: {comparator: val}}
      elif len(criteria_list) == 1:
        filter['filter'] = {'$and': [{field: {comparator: val}}, criteria_list[0]]}
      else:
        criteria_list.append({field: {comparator: val}})


def get_lecture_numbers_as_list(filter: dict):
  '''
  The assumption:
  a number has no more one field_criteria (e.g:
    - {'number': {'$lt': 5}}
        return [1,2,3,4]
    - {'number': {'$in': [5]}}
        return [5]
    - {'number': {'$in': [5,6,7,8]}}
        return [5]

  If has 0:
    - return []

  Assuming the first is 1 and the last is 12
  '''
  number_criteria_list = get_field_criteria(filter, 'number')
  min_number = 1
  max_number = 12
  if len(number_criteria_list)==0: # if this field does not exist
    return []
  elif len(number_criteria_list) > 1: # there are 2 number criteria
    print("Error - Number field should have 1 criteria only!")
    return
  else: # if there is only one number criteria
    number_criteria = number_criteria_list[0] # there is only a single number criteria
    comparator, value = next(iter(number_criteria['number'].items()))
    num_list = []
    if comparator == '$in': # {'$in': [5,6,7,8]}
      return value
    elif comparator == '$lt': # {'$lt': 5}
      num_list = [num for num in range(min_number,value)]
    elif comparator == '$lte': # {'$lte': 5}
      num_list = [num for num in range(min_number,value+1)]
    elif comparator == '$gt':
      num_list = [num for num in range(value+1, max_number+1)]
    elif comparator == '$gte':
      num_list = [num for num in range(value, max_number+1)]

    return num_list


def choose_k_by_translated_query(translated_query: tuple) -> int:
  '''
  Chooses the desired k by the transalted_query:
  - translated_query[0] is the query
  - translated_query[1] is the filter

  '''
  filter = translated_query[1]
  # get how many numbers in filter
  number_lst = get_lecture_numbers_as_list(filter)
  count_numbers = len(number_lst)
  is_summary_list = get_field_criteria(filter, field='is_summary')
  is_summary_flag = is_summary_list[0]['is_summary']['$eq'] if (len(is_summary_list)==1) else False

  if is_summary_flag:
    if count_numbers == 0:
      k = 5
    elif count_numbers == 1: # one lecture to summarize
      k = 7
    else: # count_numbers > 1
      k = min(15, count_numbers*5)
  else: # if not "general_information"
    if count_numbers == 0:
      k = 5
    elif count_numbers == 1:
      k = 5
    else: # count_numbers > 1
      k = min(15, count_numbers*5)

  return k


def get_adapted_query_by_translated_query(translated_query: tuple, adapted_query: str) -> str:
  '''
  Uses the filter.
  Adds a prefix to the query, with the relevant sources of documents.

  '''
  if translated_query[0] == " ":
    classes_list = get_lecture_numbers_as_list(translated_query[1])
    is_query_include_date = len(get_field_criteria(translated_query[1], field='date')) > 0
    is_query_include_number = len(classes_list) > 0
    if is_query_include_date and is_query_include_number:
      adapted_query = f"The materials are from the requested dates and from classes {classes_list}. " + adapted_query
    elif is_query_include_date:
      adapted_query = "The materials are from the requested dates. " + adapted_query
    elif is_query_include_number:
      adapted_query = f"The materials are from classes {classes_list}. " + adapted_query

  return adapted_query

def update_filter_by_translated_query(translated_query: tuple):
  '''
  If the content query is empty, add "is_summary"
  * unless its a recitation

  Changes filter inplace

  '''
  if translated_query[0] == " ":
    # currently we don't have summeries for recitations, so we won't add is_summary to those questions
    is_summary_val = "recitation" not in str(get_field_criteria(translated_query[1], field='source_type'))
    update_field_criteria_to_filter(translated_query[1], field = 'is_summary', val = is_summary_val)

## Translation and Retirval

## Printing Functions

In [None]:
import copy

def print_documents_metadata_with_str_date(docs: list):
  '''
  Iterate over the docs list and print the metadata of each Document.
  Print the date as date string.
  '''
  for i, doc in enumerate(docs):
      metadata_copy = copy.deepcopy(doc.metadata) # deep copy
      metadata_copy['date'] = epoch_time_to_date(metadata_copy['date'])
      print(f"Document {i+1} Metadata: {metadata_copy}")

def print_documents_metadata(docs: list):
  '''
  Iterate over the docs list and print the metadata of each Document
  Print the date as epoch_time.

  The Document.metadata is a dict:
  {'source':file_path (str),
    'source_type': source_type (str),
    'number': number (int),
    'date': date (str),
    'is_summary': is_summary (bool)
   }
  '''
  for i, doc in enumerate(docs):
      print(f"Document {i+1} Metadata: {doc.metadata}")


def print_k_chunks(all_splits: list, k: int = 5):
  '''
  Iterate over the first k chunks in all_splits list and print the metadata of each Document
  '''
  for i, split in enumerate(all_splits[:k]):
      print(f"Chunk {i+1}:")
      print(f"Document Metadata: {split.metadata}")
      print(f"Page Content: {split.page_content[:100]} [...]\n")  # Preview the content


def print_query_results(results, print_embedding_flag=True):
  '''
  Iterate over the results returned by index.query()
  print the metadata of each "match".
  results["matches"] is a list of dicts,
  '''
  for match in results["matches"]:
      print(f"Vector ID: {match['id']}")
      print(f"score: {match['score']}")
      if print_embedding_flag:
        print(f"Embedding: {match['values']}")
      print(f"Metadata: {match['metadata']}")


def print_retrieved_documents(results):
  '''
  Iterate over the results from "similarity_search" (Document list)
  print the metadata of each Document.

  * if the similarity search was "with_score", then results is a list of tuples(Documents, score)
  '''
  # Check if results contain tuples (Document, score) or just Documents
  scores_flag = isinstance(results[0], tuple) if results else False

  if scores_flag:
    for result, score in results:
      print("\n")
      print(f"Similarity Score:\t{score}")  # Inspect similarity score
      print(f"Doc id:\t{result.id}")
      print(f"Doc Metadata: {result.metadata}")
      print(f"Doc Content:\n {result.page_content[:100]}\n")
  else:
    for result in results:
      print("\n")
      print(f"Doc id:\t{result.id}")
      print(f"Doc Metadata: {result.metadata}")
      print(f"Doc Content:\n {result.page_content[:100]}\n")


def print_parsed_query(user_query: str, query_constructor):
  '''
  Prints (+ returns) the parsed query (after Constructor)
  '''
  parsed_query = query_constructor.invoke(user_query)
  print(parsed_query)
  return parsed_query


def print_translated_query(translated_query):
  '''
  Prints the translated query (after Constructor + Translator)
  '''
  print(f"Content Query: {translated_query[0]}")
  print(f"Filter: {translated_query[1]}")


## String Formatting

In [None]:
def get_filter_string(filter: dict, print_flag=False):
  '''
  filter examples:
  {},
  {'filter': {'number': {'$lt': 5}}},
  {'filter': {'number': {'$in': [5]}}},
  {'filter': {'date': {'$eq': 1695168000}}},
  {'filter': {'source_type': {'$in': ['lecture']}}},
  {'filter': {'$and': [{'source_type': {'$in': ['recitation']}}, {'number': {'$in': [5, 6]}}]}},
  {'filter': {'$and': [{'source_type': {'$in': ['recitation']}}, {'date': {'$lt': 1695168000}}]}},
  {'filter': {'$and': [{'is_summary': {'$eq': True}}, {'number': {'$in': [5, 8]}}]}}
  '''
  filter_string = ""
  criteria_list = get_list_of_filter_criteria(filter)
  if len(criteria_list) == 0:
    filter_string = "No filter"
  else:
    for criteria in criteria_list:
      filter_string += f"{get_criteria_string(criteria)}\n"

  if print_flag:
    print(filter_string)
  return filter_string


def get_criteria_string(criteria: dict, print_flag=False):
  '''
  criteria examples:
  {'number': {'$lt': 5}},
  {'number': {'$in': [6, 8]}},
  {'date': {'$eq': 1695168000}},
  {'source_type': {'$in': ['recitation']}},
  '''
  for field, value_definition in criteria.items():
    for comparator, value in value_definition.items():
      if comparator in ['$in', '$eq']:
        compare_str = '='
      elif comparator == '$lt':
        compare_str = 'before (not including)'
      elif comparator == '$gt':
        compare_str = 'after (not including)'
      elif comparator == '$lte':
        compare_str = 'before (including)'
      elif comparator == '$gte':
        compare_str = 'after (including)'
      else:
        compare_str = '[un recognised comparator]'
      if isinstance(value, list):
        value_string = ' / '.join(map(str, value))
      else:
        value_string = str(value)

      if field == 'date':
        value_string = epoch_time_to_date(value)

  criteria_string = f" * {field}\t{compare_str} {value_string}"
  if print_flag:
    print(criteria_string)
  return criteria_string


def get_context_string_from_retrieved_docs(docs_list: list):
  '''
  Formats the retrieved docs of the context as a string:
  excerpts of:
  lecture 6, 2024-2-5: {lkajhfkjhkjshdf}
  summary of lecture 6, 2024-2-5: {lkajhfkjhkjshdf}
  recitation 3, 2024-1-4: {lkajhfkjhkjshdf}
  '''
  context_string = 'Excerpts of:\n\n'
  for doc in docs_list:
    source_type = doc.metadata['source_type']
    number = int(doc.metadata['number'])
    date = epoch_time_to_date(doc.metadata['date'])
    if doc.metadata['is_summary']:
      context_string += f"summary of "
    context_string += f"{source_type} {number}, {date}:\n{doc.page_content}\n\n"
  return context_string

## Data structure description


**vector_store.similarity_search_with_score**

returns a *list* of *tuples(Documents, score)*
```
[
  (Document(
      id='08a76c37-242f-440d-a18c-ab2dfc2d3cf3',
      metadata={
          'date': datetime.date(2024, 2, 5),
          'number': 6,
          'source': '/content/materials/texts/lecture_6a_en.txt',
          'source_type': 'recitation'},
      page_content="others.\nThere are a lot of thing......"),
   0.385573089),
  (Document(...),
   0.385573089)
  ...
]

```



# Load Files to Vector Store (Pinecone)

In [None]:
from langchain import hub
from langchain_community.document_loaders import TextLoader, PyMuPDFLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict
from langchain_pinecone import PineconeVectorStore

## Load Files and MetaData

**upload files**

1. *hardware_materials_with_summaries.zip* - With all materials wanted
2. *metadata_table.csv* - Metadata table of all files

In [None]:
!unzip /content/hardware_materials_with_summaries.zip

Archive:  /content/hardware_materials_with_summaries.zip
   creating: __MACOSX/
   creating: __MACOSX/materials/
  inflating: __MACOSX/materials/._.DS_Store  
   creating: __MACOSX/materials/subtitles/
  inflating: __MACOSX/materials/subtitles/._.DS_Store  
   creating: __MACOSX/materials/texts/
  inflating: __MACOSX/materials/texts/._.DS_Store  
  inflating: __MACOSX/materials/texts/._lecture_10_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_11a_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_11b_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_12a_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_12b_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_1a_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_1b_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_2a_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_2b_en.txt  
  inflating: __MACOSX/materials/texts/._lecture_3a_en.txt  
  inflating: __MACOSX/materials/texts/._lecture

In [None]:
import pandas as pd

# path to the CSV file containing the metadata
file_path = '/content/metadata_table.csv'

# Load the CSV into a pandas DataFrame
df_metadata = pd.read_csv(file_path)

In [None]:
# Load all the *.txt* files (with the metadata) as a list of Documents
PATH_TO_TEXTS = '/content/materials/texts'
PATH_TO_RECITATIONS = '/content/materials/recitations'
PATH_TO_SUMMARIES  = '/content/materials/summaries'


text_files = [f for f in os.listdir(PATH_TO_TEXTS) if f.endswith('.txt')]
recitation_files = [f for f in os.listdir(PATH_TO_RECITATIONS) if f.endswith('.txt')]
summary_files = [f for f in os.listdir(PATH_TO_SUMMARIES) if f.endswith('.txt')]

all_files_read = text_files + recitation_files + summary_files

## verify that all read files are in the meta_data table
if set(all_files_read) == set(df_metadata['file_name']):
    print("OK - Loaded files are the same as the csv metadata table (duplicates ignored).")
else:
    print("Warning - Loaded files are not the same as the csv metadata table")

OK - Loaded files are the same as the csv metadata table (duplicates ignored).


In [None]:
docs = []
loaders = []

## read files and meta data
def read_files_and_metadata(path: str, files_list: list):
  for f in files_list:
      loader = TextLoader(f"{path}/{f}")
      file_docs = loader.load()
      for doc in file_docs:
        metadata = extract_metadata_from_df(f, df_metadata) # helper function
        doc.metadata.update(metadata)
      docs.extend(file_docs)

read_files_and_metadata(PATH_TO_TEXTS, text_files)
read_files_and_metadata(PATH_TO_RECITATIONS, recitation_files)
read_files_and_metadata(PATH_TO_SUMMARIES, summary_files)

## Add documents to the PINECONE vector_store

In [None]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)

In [None]:
# Index chunks into vector DB
vector_store = PineconeVectorStore.from_documents(
    all_splits,
    embeddings,
    index_name="guy-index-3-large")

In [None]:
### check the vector_store:
stats = index_pc.describe_index_stats()
print(stats) # total with recitation and summaries 951

{'dimension': 3072,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 951}},
 'total_vector_count': 951}


# Build Query Translation and Retrieval


In [None]:
!pip install lark

Collecting lark
  Downloading lark-1.2.2-py3-none-any.whl.metadata (1.8 kB)
Downloading lark-1.2.2-py3-none-any.whl (111 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/111.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━[0m [32m71.7/111.0 kB[0m [31m2.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m111.0/111.0 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: lark
Successfully installed lark-1.2.2


In [None]:
from langchain_core.structured_query import StructuredQuery, Operation, Comparator, Comparison
from langchain.chains.query_constructor.base import AttributeInfo, get_query_constructor_prompt, StructuredQueryOutputParser
from langchain.retrievers.self_query.pinecone import PineconeTranslator
from lark import Lark

## Define Query Constructor and Translator


**Define parameters to build specialized query constructor:**

1) document_content_description

2) allowed_comparators

3) examples of parsing user queries (few shot learning)

4) metadata fields definition

In [None]:
curr_date = date.today()

document_content_description = "University course materials. The materials are transcripts and notes that contain all the material taught by the staff."

# Define allowed comparators list
allowed_comparators = [
    "$eq",  # Equal to (number, string, boolean)
    "$ne",  # Not equal to (number, string, boolean)
    "$gt",  # Greater than (number)
    "$gte",  # Greater than or equal to (number)
    "$lt",  # Less than (number)
    "$lte",  # Less than or equal to (number)
    "$in",  # In array (string or number)
    "$nin",  # Not in array (string or number)
    "$exists", # Has the specified metadata field (boolean)
    "$and", # Combines multiple filters
]

# Desired MetaData fields to detect
metadata_field_info = [
    AttributeInfo(
        name="source_type",
        description="Type of source: lecture (lesson given by the professor) / recitation (by the Teaching Assistant)", # / lecture_notes / presentation",
        type="string",
    ),
    AttributeInfo(
        name="number",
        description="Number of lesson source (1 being first). Meaning the order of the classes as they were taught",
        type="int",
    ),
    AttributeInfo(
        name="date",
        description="Date of original class, saved as string YYYY-MM-DD",
        type="string",
    ),
    AttributeInfo(
        name="is_summary",
        description="Summary of the lesson, containing the topics talked about. Whenever a user asks about topics, what we talked or learned about, what was in or about, summary of a lecture or a lesson, is_summary should be True in the filter to ensure summaries are retrieved.",
        type="bool",
    )
]

# Examples for few-shot learning
examples = [
    (
        "Can you give me some examples of using flip-flops in adders that we talked about in lecture 3?",
        {
            "query": "examples of using flip-flops in adders",
            "filter": 'and(in("source_type", ["lecture"]),in("number", [3]))',
        },
    ),
    (
        "What kinds of architecture did we learn about in the recitations?",
        {
            "query": "kinds of architecture",
            "filter": 'in("source_type", ["recitation"])',
        },
    ),
    (
        "Can you give me some examples of using flip-flops in adders that we talked about in lecture 3 or 5?",
        {
            "query": "examples of using flip-flops in adders",
            "filter": 'and(in("source_type", ["lecture"]),in("number", [3, 5]))',
        },
    ),
        (
        "What did we learn about Half-Adders until week 5?",
        {
            "query": "Half-Adders",
            "filter": 'lte("number", 5)',
        },
    ),
    (
        "What did we learn about Half-Adders before september 20, 2023?",
        {
            "query": "Half-Adders",
            "filter": 'lt("date", "2023-09-20")',
        },
    ),
        (
        "What did we learn about Half-Adders after september 20, 2023?",
        {
            "query": "Half-Adders",
            "filter": 'gt("date", "2023-09-20")',
        },
    ),
    (
        "What did we learn about Half-Adders in 20.10?",
        {
            "query": "Half-Adders",
            "filter": 'eq("date", "2024-10-20")',
        },
    ),
    (
        "Summarize lecture 8",
        {
            "query": "",
            "filter": 'and(in("source_type", ["lecture"]), eq("is_summary", True) ,in("number", [8]))',
        },
    ),
    (
        "What did we talk about in lecture 8",
        {
            "query": "",
            "filter": 'and(in("source_type", ["lecture"]), eq("is_summary", True) ,in("number", [8]))',
        },
    ),
    (
        "What are the main topics discussed\talked in lecture eight",
        {
            "query": "",
            "filter": 'and(in("source_type", ["lecture"]), eq("is_summary", True) ,in("number", [8]))',
        },
    ),
    (
        "What did we prove in lecture eight",
        {
            "query": "proofs",
            "filter": 'and(in("source_type", ["lecture"]), eq("is_summary", True) ,in("number", [8]))',
        },
    ),
    (
        "What did we learned in the lecture yesterday",
        {
            "query": "",
            "filter": f'and(in("source_type", ["lecture"]), eq("is_summary", True) ,eq("date", "{curr_date-timedelta(days=1)}"))',
        },
    ),
    (
        "What did we learned in the lecture last week",
        {
            "query": "",
            "filter": f'and(in("source_type", ["lecture"]), eq("is_summary", True) ,gte("date", "{get_last_week_range()[0]}"), lte("date", "{get_last_week_range()[1]}"))',
        },
    )
]

In [None]:
# Create constructor prompt using all the defined parameters
constructor_prompt = get_query_constructor_prompt(
    document_content_description,
    metadata_field_info,
    allowed_comparators=allowed_comparators,
    examples=examples,
)

In [None]:
# Create query constructor
output_parser = StructuredQueryOutputParser.from_components()
query_constructor = constructor_prompt | llm | output_parser # llm previously defined

In [None]:
# Create an instance of PineconeTranslator
pc_translator = PineconeTranslator()

## Translation and Retrieval Functions

In [None]:
def translate_user_query(user_query, query_constructor, pc_translator, print_flag = False):
  '''
  Uses the query_constructor and pc_translator to "translate" the user query.
  (Pinecone-compatible format).

  * Changes the date from (str) to epoch time (int)
  '''
  # Step 1: Parse the query using the query constructor
  parsed_query = query_constructor.invoke(user_query)

  # Step 2: Translate the parsed query into a Pinecone-compatible format
  translated_query = pc_translator.visit_structured_query(parsed_query)

  # Step 3: Check if the user's translated query contains a date, if so change it to epoch time
  update_date_in_filter_to_epoch(translated_query[1])

  if print_flag:
    print_translated_query(translated_query)

  return translated_query


def retrieve_docs_by_translated_query(translated_query, k = 5, print_flag = False):
  '''
  Retrieves results (Documents) by the translated_query.
  Creates the filter_criteria if there are any filters, o.w None.
  default is k=5
  '''
  filter_criteria = translated_query[1].get("filter", None) if translated_query[1] else None
  retrieved_docs = vector_store.similarity_search(
      query=translated_query[0],
      filter=filter_criteria,
      k=k  # Number of documents to retrieve
  )

  if print_flag:
    print_retrieved_documents(retrieved_docs)

  return retrieved_docs


# Build RAG Pipeline

## Inner RAG prompt

In [None]:
prompt = hub.pull("rlm/rag-prompt")
prompt.messages[0].prompt.template = "You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, or the answer does not appear in the context, just say that you don't know or the answer is not in the context. Keep the answer concise.\nQuestion: {question} \nContext: {context} \nAnswer:"
print(prompt.messages[0].prompt.template)

You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, or the answer does not appear in the context, just say that you don't know or the answer is not in the context. Keep the answer concise.
Question: {question} 
Context: {context} 
Answer:




## Graph Pipeline

In [None]:
# Define state for application
class State(TypedDict):
    valid_flag: bool
    question: str
    adapted_question: str
    context: List[Document]
    answer: str


# Define application steps
def retrieve(state: State):
  '''
  Retrieves documents based on the translated query.
  '''
  valid_flag = True
  answer_temp = ""
  adapted_query = state["question"]

  # 1: translate query
  translated_query = translate_user_query(
      state["question"],
      query_constructor,
      pc_translator)

  ### 2: Query Analyzer - Use translated query to update
  # update query
  adapted_query = get_adapted_query_by_translated_query(translated_query, adapted_query)

  # update filter
  update_filter_by_translated_query(translated_query)

  # choose adaptive k
  k = choose_k_by_translated_query(translated_query)

  print_translated_query(translated_query) # print?

  ### Step 3: Perform hybrid retrieval using the updated params
  retrieved_docs = retrieve_docs_by_translated_query(translated_query, k)

  # if no docs met the criteria - skip generation phase
  if len(retrieved_docs) == 0:
    answer_temp = f'Sorry, your requested materials could not be found\n{get_filter_string(translated_query[1])}'
    valid_flag = False

  # Update state with the retrieved documents
  return {
    "valid_flag": valid_flag,           # Update valid_flag field
    "adapted_question": adapted_query,  # Update question field
    "context": retrieved_docs,          # Update context field
    "answer": answer_temp
  }


def generate(state: State):
    if not state["valid_flag"]: # skip generation if invalid
        return
    ## format string of docs together with metadata
    llm = ChatOpenAI(model="gpt-4o-mini")
    docs_content = get_context_string_from_retrieved_docs(state["context"])
    messages = prompt.invoke({"question": state["adapted_question"], "context": docs_content}) # changed
    response = llm.invoke(messages)
    return {"answer": response.content}


# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()

## Response formatting utils

In [None]:
import re
import json

def get_srt_file(srt_file_path):
    try:
        with open(srt_file_path, "r", encoding="utf-8") as file:
            srt_content = file.readlines()
        return srt_content
    except FileNotFoundError:
        return []

def srt_to_url(srt_file):
    # search for srt_to_link.json file in the same directory as the srt_file
    srt_to_link_file = os.path.join(os.path.dirname(srt_file), "srt_to_link.json")
    try:
        with open(srt_to_link_file, "r", encoding="utf-8") as file:
            srt_to_link = json.load(file)
        return srt_to_link
    except FileNotFoundError:
        return {}

def extract_first_sentence(input_string):
    result = re.sub(r"^\d*\n*", "", input_string)
    return result.split("\n")[0]

def clean_sentence(sentence):
    # Remove punctuation and convert to lowercase
    return re.sub(r'[^\w\s]', '', sentence.lower()).strip()

def get_timestamp_from_srt(sentence, srt, threshold=0.9):
    cleaned_sentence = clean_sentence(sentence)

    for i, line in enumerate(srt):
        cleaned_line = clean_sentence(line)

        if len(cleaned_line) > 0 and (
                cleaned_sentence in cleaned_line or token_similarity(cleaned_sentence, cleaned_line) >= threshold):
            return srt[i - 1].strip().split(",")[0]
    return "NaN"



def remove_leading_zeros_timestamp(timestamp):
    if timestamp.startswith("00:"):
        return timestamp[3:]  # Remove the first 3 characters, which are "00:"
    return timestamp


def convert_timestamp_to_elapsed_seconds(timestamp: str) -> int:
    parts = list(map(int, timestamp.split(":")))
    if len(parts) == 2:  # M:S format
        return parts[0] * 60 + parts[1]
    elif len(parts) == 3:  # HH:MM:SS format
        return parts[0] * 3600 + parts[1] * 60 + parts[2]
    else:
        raise ValueError("Invalid timestamp format")

def token_similarity(sentence1, sentence2):
    tokens1 = set(sentence1.split())
    tokens2 = set(sentence2.split())
    return len(tokens1.intersection(tokens2)) / len(tokens1.union(tokens2))

def generate_link(url, timestamp):
    if timestamp is None:
        timestamp = 0
    link = f"{url}&t={convert_timestamp_to_elapsed_seconds(timestamp)}"
    return link

def generate_link_text(lecture_name, first_timestamp):
    # link (Lecture 6b 20:23-20:50)
    first_timestamp = remove_leading_zeros_timestamp(first_timestamp)
    return f"{lecture_name} {first_timestamp}"

def decompose_ref(ref):
  source = ref.metadata.get('source')
  if 'summary' or 'Recitation' in source:
    return f"{source.split('/')[-1]}"
  elif source.lower().endswith('.pdf'):
    return f"{source.split('/')[-1]} - Page : {ref.metadata['page']}"
  else:
    srt_file_name = source.replace('txt', 'srt').replace('texts', 'subtitles')
    srt = get_srt_file(srt_file_name)
    first_sentence = extract_first_sentence(ref.page_content)
    timestamp = get_timestamp_from_srt(first_sentence, srt).split(',')[0]
    srt_to_url_dict = srt_to_url(srt_file_name)
    srt_without_path = os.path.basename(srt_file_name)
    lecture_data = srt_to_url_dict[srt_without_path]
    link = generate_link(lecture_data['url'], timestamp)
    link_text = generate_link_text(lecture_data['name'], timestamp)
    return f"{link_text} Link: {link}"


def response_generator(valid_flag, query, adapted_query, source_documents, answer, to_print=True):
  references = '\n'.join([decompose_ref(ref) for ref in source_documents])
  if to_print:
    print(f"""
  User Query:\n'{query}' \n\n
  Adapted Query:\n'{adapted_query}' \n\n
  Answer:\n'{answer}' \n\n
  References:\n{references}"""
    )

# Querying the System

## benchmark queries

In [None]:
## Online Tutor benchmark questions
original_benchmark_queries = {
    1: "Can a Full Adder (FA) be used as a Half Adder (HA), and why?",
    2: "Is it always possible to replace OR with XOR as we did in the development of the FA?",
    3: "Why is there a need to replace OR with XOR?",
    4: "Is Look Ahead Carry also dependent on the delay of each component in sequence?",
    5: "Why do we need Look Ahead Carry?",
    6: "What is the meaning of I0,...,I4 in a multiplexer?",
    7: "Why do we need a MUX at all? What are its uses?",
    8: "When you implemented a logical circuit using a MUX, how did you decide on the free variable?",
    9: "What happens if we input 1-1 into an SR-Latch?",
    10: "With a type D-Latch, it seems like the component does nothing. Why?",
    11: "What is the advantage of a JK-Latch over a T-Latch?",
    12: "Why did you change the inverters to another type of gates (NORs or NANDs)?",
    13: "Is there a difference between SR-Latch based on NORs vs NANDs?",
    14: "Which kind of SR-Latch will we use in our class?",
    15: "How does the gated SR-Latch work? / What is a gated SR-Latch?",
    16: "What is the main problem with the SR-Latch design?",
    17: "What is a D-Latch? / How is the D-Latch built?",
    18: "How do we find the Excitation Table?",
    19: "When to use the Characteristic Table and when to use the Excitation Table?",
    20: "What is a JK-Latch?",
    21: "What are the differences between (R-1)'s complement and R's complement?",
    22: "Can you explain to me deeply about gray code and what is the purpose of it?",
    23: "Can you explain why NOR is functionally complete?",
    24: "What should I do when I have a sum and I have a 'carry' in the leftmost digits, that is, I have no one to transfer the 'carry' to? For example: 1+1 in the leftmost digits sum.",
    25: "Hi, is NAND a complete operating system?",
    26: "How to convert fractions to a decimal base?",
    27: "How to convert hexadecimal base to decimal?",
    28: "What is Boolean algebra?",
    29: "What is the meaning of 'the aritmetica' of SM doesn't work?",
    30: "What is minterm?",
    31: "Can you show me the proof that f(x,y,z)=x'y'z' + xz + yz is functionally complete?",
    32: "How to subtract in base 8?",
    33: "How to perform subtraction in the 1's complement method?",
    34: "I need to add and subtract pairs of numbers in base 8, without converting to decimal. How can I do it?"
}

## our MetaData-related benchmark questions
project_queries_dict = {
    1: "What did we prove in lecture 5?",
    2: "Please summarize the 5th class",
    3: "Make a detailed summary of class 5",
    4: "Make a detailed summary of classes 6 to 8",
    5: "What is the topic of lecture 8?",
    6: "What is the topic of class 9?",
    7: "What is the topic of lesson 9?",
    8: "What is the topic of lecture 9?",
    9: "What was class 10 about?",
    10: "What was lesson 10 about?",
    11: "What was lecture 10 about?",
    12: "What did we learn in recitation 4?",
    13: "In what class we learned about finish state machine?",
    14: "In what class did we talk about T flip-flop?",
    15: "What did we learn about ROM memory in lecture 7?",
    16: "What did we learn about ROM memory in lecture 11?",
    17: "What did we learn about Object-oriented programming in lecture 11?",
    18: "Summarize the last 15 minutes of lecture 1",
    19: "In lecture 6, part a, at time 13:33, the teacher said you can exchange the Or gate in a Xor gate. Explain why.",
    20: "Explain what the teacher said in lecture 6, part a, at time 13:33.",
    21: "What we learned in lecture in May 2, 2020?",
    22: "What we learned in lecture in February 19, 2024?",
    23: "What we learned in lecture in 18.3?",
    24: "What did we learn in the lecture last week?",
    25: "What did we learn in the recitation last week?",
    26: "In lecture 11 we learned about MOS transistor, how does it combine with what we learned throughout the course?",
    27: "In lesson 2 we saw the problem of two different representations of zero. In lesson 6, we talked about implementing adders, and did not address the dual representation problem. Explain.",
    28: "Explain what we did learn about timing, include only things we learned up until lecture 8 (included).",
    29: "Explain what we did learn about flip-flops, include only things we learned up until lecture 7 (included).",
    30: "Explain what we did learn about flip-flops, include only things we learned up until lecture 8 (included).",
    31: "In lecture 4 and in recitation 4 we learned about undefined states in the Karnaugh Map does not matter, explain."
}

## Ask a question

In [None]:
# define which query to answer
q_num = 29
query = project_queries_dict[q_num]
# query = original_benchmark_queries[q_num]

print("printing translated query:")
response_new = graph.invoke({"question": query})

print(f"\nprinting query and answer:")
response_generator(*response_new.values())

printing translated query:
Content Query: flip-flops
Filter: {'filter': {'number': {'$lte': 7}}}


# Save Results to file

Running all the queries and insert the results into csv file

In [None]:
import os
import io
import sys
import csv
from datetime import datetime

folder_name = "Results"
version = "final"
costum = ""

os.makedirs(folder_name, exist_ok=True)

# Get current time formatted as YYYYMMDD_HHMMSS
curr_time = datetime.now().strftime("%Y%m%d_%H-%M")
file_name = f"{folder_name}/results_version_{version}_{curr_time}_{costum}.csv"

data = []

def create_retreived_docs_data_for_excel():
  cell_data = f"Total of {len(response_new['context'])} retrieved files:\n"
  index = 1
  for document in response_new["context"]:
    cell_data += f"{index}. file = {document.metadata['source']}, chunk_id = {document.id}.\n"
    index += 1
  return cell_data

def run_invoke_and_save_output_print(query):
  output_capture = io.StringIO()
  try:
    # Save the current stdout so we can restore it later
    old_stdout = sys.stdout
    # Redirect stdout to the StringIO object
    sys.stdout = output_capture
    # Call the function (it will print to the StringIO object)
    response_new = graph.invoke({"question": query})
    # Restore the original stdout
    sys.stdout = old_stdout
    # Get the captured output as a string
    captured_output = output_capture.getvalue()
  finally:
    sys.stdout = old_stdout
    output_capture.close()

  return response_new, captured_output

def get_translation(captured_output: str):
  lines = captured_output.splitlines()
  tranlated_lines = [line for line in lines if line.startswith("Translated")]
  translation = ""
  for line in lines:
    translation += line + "\n"
  return translation

#### define which answers to test and save
q_to_test = [15, 17]
#### if you want to test all questions, uncomment the next line:
# q_to_test = range(1,len(project_queries_dict)+1)
for q_num in q_to_test:
  query = project_queries_dict[q_num]
  print(f"Generating answer for query {q_num}: {query}")
  response_new, captured_output = run_invoke_and_save_output_print(query)
  response_generator(*response_new.values(), to_print=False)
  data.append({"Query Number": q_num, "Original Query": query, "Translation": get_translation(captured_output), "Retrieved Docs": create_retreived_docs_data_for_excel(), "Adapted Query": response_new['adapted_question'], "Answer": response_new['answer']})
  print("-----------------------------")

with open(file_name, mode='w', newline='', encoding='utf-8') as file:
  writer = csv.DictWriter(file, fieldnames=["Query Number", "Original Query", "Translation", "Retrieved Docs", "Adapted Query", "Answer"])
  writer.writeheader()
  writer.writerows(data)

print(f"file {file_name} was created")

Generating answer for query 15: What did we learn about ROM memory in lecture 7?
-----------------------------
Generating answer for query 17: What did we learn about Object-oriented programming in lecture 11?
-----------------------------
file Results/results_version_5-3_20250119_12-26_partial.csv was created


For convenience - convert the csv to excel

In [None]:
!pip install pandas openpyxl



In [None]:
import pandas as pd

xlsx_file = file_name.replace(".csv", ".xlsx")

data = pd.read_csv(file_name)

data.to_excel(xlsx_file, index=False, engine='openpyxl')

print(f"Converted {file_name} to {xlsx_file}")

Converted Results/results_version_5-3_20250119_12-26_partial.csv to Results/results_version_5-3_20250119_12-26_partial.xlsx
