#### Preprocessing Logic

In [0]:
## Requirements Filents, filename_requirements):
def get_df_requirements(folder_input, filename_requirements):
  if not os.path.exists(folder_input):
    os.makedirs(folder_input)
  db_ws.CopyFileFromWorkbench(DatabricksFolder="file:"+folder_input, filename=filename_requirements)
  df_requirements=pd.read_excel(folder_input+filename_requirements, engine='openpyxl')
  # remove faulty entries
  df_requirements = df_requirements[df_requirements["RICEFW Name"].map(len) >= 4].reset_index(drop=True)

  df_requirements_clean=df_requirements.fillna('')
  conditions = [
      (df_requirements_clean['RICEFW Name'].str.strip().str.lower() == df_requirements_clean['RICEFW Description'].str.strip().str.lower())]
  choices = [df_requirements_clean['RICEFW Name']]
  df_requirements_clean['Requirement Cleansed']=np.select(conditions, choices, df_requirements_clean['RICEFW Name'].str.strip()+': '+df_requirements_clean['RICEFW Description'].str.strip())
  # df_requirements_clean['Detailed Description'].fillna('',inplace=True)
  df_requirements_clean['Requirement Generalized'] = df_requirements_clean['RICEFW Name'] # default, if generalization is not run
  return df_requirements_clean
  
## Mapping File
def get_df_mapping(folder_mapping, filename_mapping):
  if not os.path.exists(folder_mapping):
    os.makedirs(folder_mapping)
  db_ws.CopyFileFromWorkbench(DatabricksFolder="file:"+folder_mapping, filename=filename_mapping)
  df_mapping=pd.read_excel(folder_mapping+filename_mapping)
  df_mapping=df_mapping[df_mapping['Requirement Type'].notna()]
  return df_mapping

## Compile result table & control table for batch processing
def get_df_control(df_requirements):
  df_control = df_requirements.copy()
  df_control["Transcripts Lookup"] = ""
  df_control["FSD Lookup"] = ""
  df_control["BPD Lookup"] = ""
  df_control["Images Lookup"] = ""
  df_control["Template"] = ""
  df_control["Outfile"] = ""
  df_control["LastChunk_contenttype"] = ""
  df_control["LastChunk_index"] = -1
  return df_control

def get_df_results(df_requirements):
  df_results = df_requirements.copy()[["RICEFW ID", "RICEFW Name"]]
  df_results["JSON"] = ""
  return df_results

def get_df_errors():
  df_errors = pd.DataFrame(columns=['RICEFW ID', 'RICEFW Name', 'LastChunk_contenttype', 'LastChunk_index', 'response'])
  return df_errors

def get_df_validation():
  df_results = pd.DataFrame(columns = ["RICEFW ID", "RICEFW Name", "Chunk Numb", "Prompt", "Response", "Cos Sim Score", "GPT suggestions"])
  return df_results


#### Helper functions

In [0]:
def lookup_in_index(index_type,
                    index,
                    req_search_term,
                    req_search_count,
                    df_control,
                    requirement_id,
                    log_id_doc):
  
  # Determine which type of lookup will be performed
  if index_type.lower() == "transcript" or index_type.lower() == "transcripts":
    index_type_str = "transcripts"
    index_type_column_str = "Transcripts Lookup"
  elif index_type.lower() == "fsd" or index_type.lower() == "fsds":
    index_type_str = "FSDs"
    index_type_column_str = "FSD Lookup"
  elif index_type.lower() == "bpd" or index_type.lower() == "bpds":
    index_type_str = "BPDs"
    index_type_column_str = "BPD Lookup"
  elif index_type.lower() == "image" or index_type.lower() == "images":
    index_type_str = "images"
    index_type_column_str = "Images Lookup"

  ## Get requirement transcripts references from vector store
  if index != None:
    if df_control.loc[requirement_id, index_type_column_str] == "":
      ## Whoosh approach
      if isinstance(index, DocumentIndexWhoosh):
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via Whoosh ... ")
        lookup_result=index.search_and_summarize(req_search_term)
        df_control.loc[requirement_id, index_type_column_str] = lookup_result
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via Whoosh ... done.")       
      elif isinstance(index, RetrievalQA):
        ## Langchain (FAISS) approach
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via FAISS ... ")
        prompt = f"""What information can be leveraged to build the below requirement? Also list the names of the meetings/documents you took the information from. Requirement: \n\n'{req_search_term}'"""
        lookup_result=index.run(prompt)
        df_control.loc[requirement_id, index_type_column_str] = lookup_result
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via FAISS ... done.")
      elif isinstance(index, CombinedIndex):
        ## CombinedIndex approach
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via the combined Whoosh, FAISS & GPT approach ... ")
        lookup_result=index.search_and_summarize(req_search_term, k=req_search_count)
        df_control.loc[requirement_id, index_type_column_str] = lookup_result
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via the combined Whoosh, FAISS & GPT approach ... done")
      elif isinstance(index, DocumentIndexACS_v2):
        ## ACS approach
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via the ACS approach ... ")
        lookup_result=index.search_and_summarize(req_search_term, k=req_search_count, mode="hybrid")
        df_control.loc[requirement_id, index_type_column_str] = lookup_result
        log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via the ACS approach ... done")
      ### TODO - Section for langchain ACS indexes
      # elif isinstance(index, AzureSearchRetriever?):
      #   log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via the Langchain ACS approach ... ")
      #   lookup_result= ... query the index using hybrid mode, filter on correct group_id
      #   df_control.loc[requirement_id, index_type_column_str] = lookup_result
      #   log(f"[{log_id_doc}] Looking up requirement in the {index_type_str} index via the Langchain ACS approach ... done")
      ############
      else:
        log(f"[{log_id_doc}] {index_type_str} index is of unsupported type: '{str(type(index))}'. Please use either whoosh (DocumentIndexWhoosh), langchain (RetrievalQA), ACS (DocumentIndexACS_v2) or the custom combined index (CombinedIndex)")
        raise Exception(f"Unsupported {index_type_str} index type")
    else:
      log(f"[{log_id_doc}] {index_type_str} lookup results already set in Control Table. Using this vaule and skipping to next step.")
      lookup_result = df_control.loc[requirement_id, index_type_column_str]
  else:
    log(f"[{log_id_doc}] No index for {index_type_str} provided. Skipping the lookup step.")
    lookup_result = None
  
  return lookup_result

def backup(df, backup_path="tmp_SAP_Control_Table_Backup.csv"):
  df.to_csv(backup_path)

def retrieve(backup_path="tmp_SAP_Control_Table_Backup.csv"):
  return pd.read_csv(backup_path).fillna("")

#### Core Logic (generate content for docs)

##### Control level loop

In [0]:
#########################################
#      Core Logic (batch processing)    #
#########################################

#@keep_trying 
def generate_output_batch(df_control,
                          df_results,
                          df_mapping,
                          df_errors,
                          df_validation,
                          input_folder,
                          index_fsd=None,
                          index_bpd=None,
                          index_transcripts=None,
                          index_images=None,
                          knowledge_base_docs=None,
                          temperature=0,
                          verbose=False,
                          validation= False,
                          backup_strict=True,
                          citation=False,
                          template_max_tokens_per_chunk=800,
                          individual_sections=False,
                          halt_on_errors = False):

  ## Prompts and context
  projectbackground = "This project is an SAP S4 transformation focusing on finance for Oklahoma Gas and Electric Company (OGE), using SAP's fit to standard approach."

  systemprompt = "You specialize in writing content for Functional Specification Documents for projects. " + projectbackground

  if citation==False:
    citation_prompt=''
  else:
    citation_prompt=', end element content with <transref> if and only when useful info is referenced from workshop transcript'

  prompt = """
  ### Start of Requirement Info
  Requirement: '{requirement}'
  RICEFW Type='{RICEFWType}'
  RICEFW ID='{RICEFWID}'
  RICEFW Name='{RICEFWName}'
  Requirement detailed description: 
  ```
  {requirement_detailed_description}
  ```
  {transcripts_lookup}
  {bpd_lookup}
  {fsd_lookup}
  {knowledge_base_docs}
  ### End of Requirement Info

  Response must follow rules: Do not change the schema of the JSON template in any way, never add or remove any elements in it, ensure 'placeholder_text' element is always present before each 'replacement_text' element

  Based on the requirement info above, populate detailed content for the 'replacement_text' elements in the following JSON template by including all relevant granular details that can be referenced (should be lengthy paragraphs instead of one liners, use bullet points where necessary, customize based on input requirement{citation_prompt}) by referencing instructions in the corresponding 'placeholder_text' elements (but never copy from it as is, always have your own perspective, back it up with detailed explanations and scenarios/examples), also refer to corresponding 'section_header' values for additional context, the data type of the 'replacement_text' elements must be text, your response MUST be only the updated JSON template and nothing else, ensure character escape is implemented where applicable so that json.loads() can read your response, do not change the schema of the JSON template in any way, never add or remove any elements in it, ensure 'placeholder_text' element is always present before each 'replacement_text' element:

  {jsontemplate}
  """

  ### Exit Criteria for running this func in loop
  exit_crit = "" not in df_control["Outfile"].values # will be True if all documents have been created
  if exit_crit:
    log("SUCCESS - All documents have been generated. Exiting now.", color="green")
    return True

  ### Optional parameters handling
  
  if index_fsd == None:
    log("[Main] WARNING - Index for reference FSDs not provided. Model will ignore reference FSDs. Pass it to the main function with the 'index_fsd' argument.", color="bold")

  if index_bpd == None:
    log("[Main] WARNING - Index for BPDs not provided. Model will ignore BPDs. Pass it to the main function with the 'index_bpd' argument.", color="bold")

  if index_transcripts == None:
    log("[Main] WARNING - Index for workshop transcripts not provided. Model will ignore workshop transcripts. Pass it to the main function with the 'index_transcripts' argument.", color="bold")
  
  if knowledge_base_docs == None:
    log("[Main] WARNING - Knowledge Base for Input FSDs is not set. Pass it to the main function with the 'knowledge_base_docs' argument.", color="bold")
  
  ### Refresh template dict
  log(f"[Main] Parsing templates")
  templatedict = read_templates(input_folder, max_tokens_per_chunk=template_max_tokens_per_chunk)
  defaulttemplatename='OGE_Functional Specification Document Template__Approved'

  ### Iterate through all requirements in df_control
  for index_req,row in df_control.iterrows():
    # try:
    if 1:
      process_document( index_req,
                        row,
                        prompt,
                        systemprompt,
                        citation_prompt,
                        templatedict,
                        defaulttemplatename,
                        df_control,
                        df_results,
                        df_mapping,
                        df_errors,
                        df_validation,
                        index_fsd,
                        index_bpd,
                        index_transcripts,
                        index_images,
                        knowledge_base_docs,
                        temperature,
                        verbose,
                        validation,
                        backup_strict,
                        individual_sections)
    # except Exception as err:
    #   # Skip to next document
    #   log("ERROR - Error in populating FSD for this requirement. Skipping to next requirement.", color="red")
    #   print("Traceback:")
    #   if halt_on_errors: raise err # for DEBUG mode - will raise the actual error and halt execution
    #   else: print(err) # for RUN mode - will skip over the error and continue with the next requirement
    #   continue

  ### Exit Criteria for running this func in loop
  exit_crit = "" not in df_control["Outfile"].values # will be True if all documents have been created
  if exit_crit:
    log("SUCCESS - All documents have been generated. Exiting now.", color="green")
    return True
  else:
    return False



##### Document level loop

In [0]:
def process_document( index_req,
                      row,
                      prompt,
                      systemprompt,
                      citation_prompt,
                      templatedict,
                      defaulttemplatename,
                      df_control,
                      df_results,
                      df_mapping,
                      df_errors,
                      df_validation,
                      index_fsd,
                      index_bpd,
                      index_transcripts,
                      index_images,
                      knowledge_base_docs,
                      temperature,
                      verbose,
                      validation,
                      backup_strict,
                      individual_sections):
  
  # For logging the current document number (and total document count)
  log_id_doc = f"Document {index_req+1}/{len(df_control)}"

  log(f"[{log_id_doc}] Processing Requirement : {row['RICEFW ID']} ({row['RICEFW Name']})")

  if df_control.loc[index_req, "Outfile"] != "":
    log(f"[{log_id_doc}] Files for requirement already written to workbench ('{df_control.loc[index_req, 'Outfile']}'). Skipping to next requirement.", color="green")
    return

  ## Get template for current requirement, looking it up in the mapping df
  if df_control.loc[index_req, "Template"] == "":
    calctemplatename=df_mapping.loc[df_mapping['Requirement Type'].str.strip().str.lower()==row['RICEFW Type'].strip().lower(),'Template Name']
    if len(calctemplatename)>0:
      templatename=calctemplatename.values[0]
    else:
      templatename = defaulttemplatename
    df_control.loc[index_req, "Template"] = templatename
  else:
    log(f"[{log_id_doc}] Template already set in Control Table. Using this value ('{df_control.loc[index_req, 'Template']}') and skipping to next step.")
    templatename = df_control.loc[index_req, "Template"]

  log(" checkpoint 1 ")
  ### Lookups in thrious indexes
  req_search_term=(row['Requirement Cleansed']+'\n\n'+row['Detailed Description']).rstrip('\n\n')
  req_search_count=10

  ## Get transcripts references from vector store
  transcripts_lookup = None
  if index_transcripts != None:
    transcripts_lookup = lookup_in_index("transcripts",
                                          index_transcripts,
                                          req_search_term,
                                          req_search_count,
                                          df_control,
                                          index_req,
                                          log_id_doc)

  ## Get FSD references from vector store
  fsd_lookup = None
  if index_fsd != None:
    fsd_lookup = lookup_in_index("fsd",
                                  index_fsd,
                                  req_search_term,
                                  req_search_count,
                                  df_control,
                                  index_req,
                                  log_id_doc)

  ## Get BPD references from vector store
  bpd_lookup = None
  if index_bpd != None:
    bpd_lookup = lookup_in_index("bpd",
                                  index_bpd,
                                  req_search_term,
                                  req_search_count,
                                  df_control,
                                  index_req,
                                  log_id_doc)

  ## Get Images (flowcharts, screenshots, ...) references from vector store
  images_lookup = None
  if index_images != None:
    images_lookup = lookup_in_index("images",
                                  index_images,
                                  req_search_term,
                                  req_search_count,
                                  df_control,
                                  index_req,
                                  log_id_doc)

  ## Get latest version of the JSON output or initiate a fresh one from the corresponding template
  output_dict = {}
  if df_results.loc[index_req, "JSON"] == "":
    output_dict = deepcopy(templatedict[templatename]["placeholders"])
    df_results.loc[index_req, "JSON"] = json.dumps(output_dict)
  else:
    log(f"[{log_id_doc}] Output JSON already present in Results Table -> output generation has already started. Continuing at latest chunk.")
    output_dict = json.loads(df_results.loc[index_req, "JSON"], strict=False)

  for content_type in ["paragraphs", "tables"]:
    if output_dict[content_type]["chunks"]!=[[]]:
      for index_chunk, chunk in enumerate(output_dict[content_type]["chunks"]):
        # swap out the processing method based on the individual_sections argument - run prompts either chunk by chunk (multiple sections at once) or section by section
        process_func = process_chunk
        if individual_sections: process_func = process_chunks_individual_sections
        # try:
        if 1:
          process_func(content_type,
                        index_chunk,
                        chunk,
                        output_dict,
                        index_req,
                        prompt,
                        systemprompt,
                        citation_prompt,
                        templatename,
                        df_control,
                        df_results,
                        df_errors,
                        df_validation,
                        fsd_lookup,
                        bpd_lookup,
                        transcripts_lookup,
                        images_lookup,
                        knowledge_base_docs,
                        temperature,
                        verbose,
                        validation,
                        backup_strict,
                        log_id_doc)
        # except Exception as err:
        #   raise Exception("Error in populating chunk/section. Traceback: " + str(err)) # will be handled in upper-level error handling -> skip to next document

  ## Write document
  docwriteback = write_back_to_document(templatedict[templatename]['docs'], output_dict, templatedict[templatename]['originalpositions'], verbose=verbose)
  out_file='OGE_FSD_'+templatename.strip('OGE_Functional Specification Document').strip('.docx').strip('Template__Approved').strip('_').strip()+'-'+row['RICEFW ID']+'_'+row['RICEFW Name'].replace('/',',')+'_'+datetime.now().strftime('%Y-%m-%d_%H_%M_%S')+'.docx'
  file_path='/tmp/SAP/'+out_file
  log(f"[{log_id_doc}] Writing file to workbench ('{file_path}') ...")
  docwriteback.save(file_path)
  db_ws.CopyFileToWorkbench(filename=out_file, DatabricksFolder="file:/tmp/SAP/")
  log(f"[{log_id_doc}] Writing file to workbench ('{file_path}') ... done", color="green")
  df_control.loc[index_req, "Outfile"] = out_file



##### Chunk level loop

In [0]:
import tiktoken
encoding = tiktoken.encoding_for_model("gpt-4")
def num_tokens_from_messages(messages, model="gpt-4"):
    """Return the number of tokens used by a list of messages."""
    try:
        encoding = tiktoken.encoding_for_model(model)
    except KeyError:
        print("Warning: model not found. Using cl100k_base encoding.")
        encoding = tiktoken.get_encoding("cl100k_base")
    if model in {
        "gpt-3.5-turbo-0613",
        "gpt-3.5-turbo-16k-0613",
        "gpt-4-0314",
        "gpt-4-32k-0314",
        "gpt-4-0613",
        "gpt-4-32k-0613",
        }:
        tokens_per_message = 3
        tokens_per_name = 1
    elif model == "gpt-3.5-turbo-0301":
        tokens_per_message = 4  # every message follows <|start|>{role/name}\n{content}<|end|>\n
        tokens_per_name = -1  # if there's a name, the role is omitted
    elif "gpt-3.5-turbo" in model:
        print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
        return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613")
    elif "gpt-4" in model:
        print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
        return num_tokens_from_messages(messages, model="gpt-4-0613")
    else:
        raise NotImplementedError(
            f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
        )
    num_tokens = 0
    num_tokens += len(encoding.encode(messages))

    num_tokens += 3  # every reply is primed with <|start|>assistant<|message|>
    return num_tokens

In [0]:
def process_chunk(content_type,
                  index_chunk,
                  chunk,
                  output_dict,
                  index_req,
                  #row,
                  prompt,
                  systemprompt,
                  citation_prompt,
                  templatename,
                  df_control,
                  df_results,
                  df_errors,
                  df_validation,
                  fsd_lookup,
                  bpd_lookup,
                  transcripts_lookup,
                  images_lookup,
                  knowledge_base_docs,
                  temperature,
                  verbose,
                  validation,
                  backup_strict,
                  log_id_doc):
  
  # print(content_type)
  # print()
  # print(index_chunk)
  # print()
  # print(chunk)
  # print()
  # print(output_dict)
  # print()
  # print(index_req)
  # print()
  # print(prompt)
  # print()
  # print(systemprompt)
  # print()
  # print(citation_prompt)
  # print()
  # print(templatename)
  # print()
  # print(df_control)
  # print()
  # print(df_results)
  # print()
  # print(df_errors)
  # print()
  # print(df_validation)
  # print()
  # print(fsd_lookup)
  # print()
  # print(bpd_lookup)
  # print()
  # print(transcripts_lookup)
  # print()
  # print(images_lookup)
  # print()
  # print(knowledge_base_docs)
  # print()
  # print(temperature)
  # print()
  # print(verbose)
  # print()
  # print(validation)
  # print()
  # print(backup_strict)
  # print()
  # print(log_id_doc)
  # raise Exception("skipped chunk processing in debug mode. Edit cmd 6 in SAP_FDD_CORE to turn off debug mode.") # DEBUG

  # restartability idea - check if replacement texts for chunk already populated, if so, skip to next chunk
  # this way, if the run gets interrupted, it can just be restarted
  if content_type == "paragraphs":
    if df_control.loc[index_req, "LastChunk_contenttype"] == "tables":
      # if the paragraphs are done already (pointer is at tables), skip all paragraphs
      log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
      return
    elif df_control.loc[index_req, "LastChunk_contenttype"] == "paragraphs":
      # if the paragraphs are started, but not done (pointer is at paragraphs)
      if df_control.loc[index_req, "LastChunk_index"] >= index_chunk:
        #  if the last written chunk number is higher than the current, skip current
        log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
        return
      # else: #  if the last written chunk number is lower than the current, go on and populate for current
  elif content_type == "tables":
    if df_control.loc[index_req, "LastChunk_contenttype"] == "tables":
      # if the tables are started, but not done (pointer is at tables)
      if df_control.loc[index_req, "LastChunk_index"] >= index_chunk:
        #  if the last written chunk number is higher than the current, skip current
        log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
        return
      # else: #  if the last written chunk number is lower than the current, go on and populate for current
    # else: if the pointer is still at paragraphs, go on and populate (happens only for first table after paragraphs are done)
  # else: nothing populated yet (pointer is blank), go on and populate first paragraph & chunk

  if df_control.loc[index_req, "LastChunk_contenttype"] == content_type and df_control.loc[index_req, "LastChunk_index"] >= index_chunk:
    log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
    return  
  
  log(f"[{log_id_doc}] Processing {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} ... ")
  curr_prompt = prompt\
    .replace("{requirement}", df_control.loc[index_req, "Requirement Cleansed"])\
    .replace("{jsontemplate}", json.dumps(chunk))\
    .replace("{RICEFWType}", df_control.loc[index_req, "RICEFW Type"])\
    .replace("{RICEFWID}", df_control.loc[index_req, "RICEFW ID"])\
    .replace("{RICEFWName}", df_control.loc[index_req, "RICEFW Name"])\
    .replace("{requirement_detailed_description}",df_control.loc[index_req, "Detailed Description"])\
    .replace("{citation_prompt}",citation_prompt)

  if transcripts_lookup != None:
    curr_prompt = curr_prompt.replace("{transcripts_lookup}", f"""\n\nInfo on requirement from workshop transcripts: \n```\n{transcripts_lookup}\n```\n""")
  else: curr_prompt = curr_prompt.replace("{transcripts_lookup}", "")

  if fsd_lookup != None:
    curr_prompt = curr_prompt.replace("{fsd_lookup}", f"""\n\nInfo on requirement from FSDs for other projects (Use for reference only and do not copy as is, don't refer to specific people, entity or asset names): \n```\n{fsd_lookup}\n```\n""")
  else: curr_prompt = curr_prompt.replace("{fsd_lookup}", "")

  if bpd_lookup != None:
    curr_prompt = curr_prompt.replace("{bpd_lookup}", f"""\n\nInfo on requirement from business process docs (Use for reference only and do not copy as is, don't refer to specific people, entity or asset names): \n```\n{bpd_lookup}\n```\n""")
  else: curr_prompt = curr_prompt.replace("{bpd_lookup}", "")

  if knowledge_base_docs != None:
    kb_docs_lookup = knowledge_base_docs[templatename]["placeholders"][content_type]["chunks"][index_chunk]
    curr_prompt = curr_prompt.replace("{knowledge_base_docs}", f"""\n\nExamples of 'what good looks like' from FSDs for other projects, not directly related to this requirement (Use for reference only and do not copy as is, don't refer to specific people, entity or asset names): \n```\n{kb_docs_lookup}\n```\n""")
  else: curr_prompt = curr_prompt.replace("{knowledge_base_docs}", "")

  if verbose: print(curr_prompt)
  print('*'*20 + "Sending 3.5 turbo request")
  print("Token count:")
  print(num_tokens_from_messages(curr_prompt) + num_tokens_from_messages(systemprompt))

  # response = gpt4(prompt=curr_prompt, context=systemprompt, temperature=temperature, max_tokens=6000, large=True, tries=3).replace('\\\\','/').replace('}],','},')
  response = gpt35(prompt=curr_prompt, context=systemprompt, temperature=temperature, max_tokens=10000, tries=3)
  
  if validation: 
    # print("writing prompt to validation df") # DEBUG
    val_row = {'RICEFW ID': df_control.loc[index_req, "RICEFW ID"],
              'RICEFW Name': df_control.loc[index_req, "RICEFW Name"],
              'Chunk Numb': index_chunk + 1,
              'Prompt': curr_prompt,
              'Response': response}
    df_validation.loc[len(df_validation.index)] = val_row

  fix_json_system_prompt='You are an expert at fixing JSONs'
  fix_json_user_prompt='''Fix the following JSON so that it can be successfully loaded by json.loads, response should be only the fixed JSON:
  {error_json}'''

  try:
    response_json = json.loads(response, strict=False)
    print(' response_json:')
    print(response_json)
  except:
    print("ERROR - Error in populating requirement. GPT response could not be parsed as JSON. Asking GPT to correct response...")
    response=gpt35(prompt=fix_json_user_prompt.replace('{error_json}',response), context=fix_json_system_prompt, temperature=0, max_tokens=10000, large=True, tries=3)
    try:
      response_json = json.loads(response, strict=False)
    except Exception as err:
      # GPT returned ill-formatted JSON in its response -> Skip to next requirement
      log(f"[{log_id_doc}] ERROR - Error in populating requirement. GPT response could not be parsed as JSON. This event has been noted in the error table.", color="red")
      print("Traceback:")
      print(err)
      if verbose:
        print("Full GPT Prompt response:")
        print(response)
      # Append incomplete response to error capturing dataframe
      err_row = {'RICEFW ID': df_control.loc[index_req, "RICEFW ID"],
                  'RICEFW Name': df_control.loc[index_req, "RICEFW Name"],
                  'LastChunk_contenttype': content_type,
                  'LastChunk_index': index_chunk,
                  'response': response}
      df_errors.loc[len(df_errors.index)] = err_row
      raise Exception("Invalid JSON response") # will jump into upper-level try-block, thus skipping to next requirement

  # check if all elements contain all required JSON attributes ('section_header', placeholder_text', 'replacement_text')
  # this is basically a big try - except
  for elem in response_json:
    elem_keys = list(elem.keys())
    # print(elem_keys) # DEBUG
    if not ("section_header" in elem_keys and "placeholder_text" in elem_keys and "replacement_text" in elem_keys):
      # GPT returned ill-formatted JSON in its response -> Skip to next requirement
      log(f"[{log_id_doc}] ERROR - Error in populating requirement. GPT response is valid JSON, but does not contain all required fields. This event has been noted in the error table.", color="red")
      if verbose:
        print("Full GPT Prompt response:")
        print(response)
      # Append incomplete response to error capturing dataframe
      err_row = {'RICEFW ID': df_control.loc[index_req, "RICEFW ID"],
                  'RICEFW Name': df_control.loc[index_req, "RICEFW Name"],
                  'LastChunk_contenttype': content_type,
                  'LastChunk_index': index_chunk,
                  'response': response}
      df_errors.loc[len(df_errors.index)] = err_row
      raise Exception("Valid, but incomplete JSON response") # will jump into upper-level try-block, thus skipping to next requirement   
  
  # if no exceptions above (i.e. the response is valid and complete) - write response to result
  output_dict[content_type]["chunks"][index_chunk] = response_json
  
  # write back to control table
  df_results.loc[index_req, "JSON"] = json.dumps(output_dict)
  df_control.loc[index_req, "LastChunk_contenttype"] = content_type
  df_control.loc[index_req, "LastChunk_index"] = index_chunk
  if backup_strict: backup(df_control)

  log(f"[{log_id_doc}] Processing {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} ... done.")

In [0]:
def process_chunks_individual_sections(content_type,
                  index_chunk,
                  chunk,
                  output_dict,
                  index_req,
                  prompt,
                  systemprompt,
                  citation_prompt,
                  templatename,
                  df_control,
                  df_results,
                  df_errors,
                  fsd_lookup,
                  bpd_lookup,
                  transcripts_lookup,
                  knowledge_base_docs,
                  temperature,
                  verbose,
                  backup_strict,
                  log_id_doc):
  
  # raise Exception("skipped chunk processing in debug mode. Edit cmd 6 in SAP_FDD_CORE to turn off debug mode.") # DEBUG

  prompt_individual_sections = """
  ### Start of Requirement Info
  Requirement: '{requirement}'
  RICEFW Type='{RICEFWType}'
  RICEFW ID='{RICEFWID}'
  RICEFW Name='{RICEFWName}'
  Requirement detailed description: 
  ```
  {requirement_detailed_description}
  ```
  {transcripts_lookup}
  {bpd_lookup}
  {fsd_lookup}
  {knowledge_base_docs}
  ### End of Requirement Info

  Based on the requirement info above, your task is to fill the placeholder '{placeholder_text}' for section header '{section_header}' with content. Your response should be only the generated content in plain text
  
  Your generated content:
  """

  #, should be lengthy paragraphs instead of one liners, use bullet points where necessary, customize based on input requirement {citation_prompt}, always have your own perspective, back it up with detailed explanations and scenarios/examples, also refer to the section_header for additional context.

  for index_section, section in enumerate(chunk):

    # # restartability idea - check if replacement texts for chunk already populated, if so, skip to next chunk
    # # this way, if the run gets interrupted, it can just be restarted
    # if content_type == "paragraphs":
    #   if df_control.loc[index_req, "LastChunk_contenttype"] == "tables":
    #     # if the paragraphs are done already (pointer is at tables), skip all paragraphs
    #     log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
    #     return
    #   elif df_control.loc[index_req, "LastChunk_contenttype"] == "paragraphs":
    #     # if the paragraphs are started, but not done (pointer is at paragraphs)
    #     if df_control.loc[index_req, "LastChunk_index"] >= index_chunk:
    #       #  if the last written chunk number is higher than the current, skip current
    #       log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
    #       return
    #     # else: #  if the last written chunk number is lower than the current, go on and populate for current
    # elif content_type == "tables":
    #   if df_control.loc[index_req, "LastChunk_contenttype"] == "tables":
    #     # if the tables are started, but not done (pointer is at tables)
    #     if df_control.loc[index_req, "LastChunk_index"] >= index_chunk:
    #       #  if the last written chunk number is higher than the current, skip current
    #       log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
    #       return
    #     # else: #  if the last written chunk number is lower than the current, go on and populate for current
    #   # else: if the pointer is still at paragraphs, go on and populate (happens only for first table after paragraphs are done)
    # # else: nothing populated yet (pointer is blank), go on and populate first paragraph & chunk

    # if df_control.loc[index_req, "LastChunk_contenttype"] == content_type and df_control.loc[index_req, "LastChunk_index"] >= index_chunk:
    #   log(f"[{log_id_doc}] {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} already populated. skipping to next chunk.")
    #   return

    log(f"[{log_id_doc}] Processing {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} section {index_section+1}/{len(chunk)} ... ")
    curr_prompt = prompt_individual_sections\
      .replace("{requirement}", df_control.loc[index_req, "Requirement Cleansed"])\
      .replace("{placeholder_text}", json.dumps(section["placeholder_text"]))\
      .replace("{section_header}", json.dumps(section["section_header"]))\
      .replace("{RICEFWType}", df_control.loc[index_req, "RICEFW Type"])\
      .replace("{RICEFWID}", df_control.loc[index_req, "RICEFW ID"])\
      .replace("{RICEFWName}", df_control.loc[index_req, "RICEFW Name"])\
      .replace("{requirement_detailed_description}",df_control.loc[index_req, "Detailed Description"])\
      .replace("{citation_prompt}",citation_prompt)

    if transcripts_lookup != None:
      curr_prompt = curr_prompt.replace("{transcripts_lookup}", f"""\n\nInfo on requirement from workshop transcripts: \n```\n{transcripts_lookup}\n```\n""")
    else: curr_prompt = curr_prompt.replace("{transcripts_lookup}", "")

    if fsd_lookup != None:
      curr_prompt = curr_prompt.replace("{fsd_lookup}", f"""\n\nInfo on requirement from FSDs for other projects (Use for reference only and do not copy as is, don't refer to specific people, entity or asset names): \n```\n{fsd_lookup}\n```\n""")
    else: curr_prompt = curr_prompt.replace("{fsd_lookup}", "")

    if knowledge_base_docs != None:
      kb_docs_lookup = knowledge_base_docs[templatename]["placeholders"][content_type]["chunks"][index_chunk]
      curr_prompt = curr_prompt.replace("{knowledge_base_docs}", f"""\n\nExamples of 'what good looks like' from FSDs for other projects, not directly related to this requirement (Use for reference only and do not copy as is, don't refer to specific people, entity or asset names): \n```\n{kb_docs_lookup}\n```\n""")
    else: curr_prompt = curr_prompt.replace("{knowledge_base_docs}", "")

    if verbose: print(curr_prompt)

    # response = gpt4(prompt=curr_prompt, context=systemprompt, temperature=temperature, max_tokens=20000, large=True, tries=3)
    response = gpt35(prompt=curr_prompt, context=systemprompt, temperature=temperature, max_tokens=6000, tries=3)
    
    try:
      output_dict[content_type]["chunks"][index_chunk][index_section]["replacement_text"] = response
    except Exception as err:
      # GPT returned ill-formatted JSON in its response -> Skip to next requirement
      log(f"[{log_id_doc}] ERROR - Error in populating requirement. GPT response could not be parsed as JSON. This event has been noted in the error table.", color="red")
      print("Traceback:")
      print(err)
      if verbose:
        print("Full GPT Prompt response:")
        print(response)
      # Append incomplete response to error capturing dataframe
      err_row = {'RICEFW ID': df_control.loc[index_req, "RICEFW ID"],
                  'RICEFW Name': df_control.loc[index_req, "RICEFW Name"],
                  'LastChunk_contenttype': content_type,
                  'LastChunk_index': index_chunk,
                  'response': response}
      df_errors.loc[len(df_errors.index)] = err_row
      raise Exception("Invalid JSON response") # will jump into upper-level try-block, thus skipping to next requirement   
    
    # write back to control table
    df_results.loc[index_req, "JSON"] = json.dumps(output_dict)
    df_control.loc[index_req, "LastChunk_contenttype"] = content_type
    df_control.loc[index_req, "LastChunk_index"] = index_chunk
    if backup_strict: backup(df_control)

    log(f"[{log_id_doc}] Processing {content_type} chunk {index_chunk+1}/{len(output_dict[content_type]['chunks'])} section {index_section+1}/{len(chunk)} ... done.")