Imports Libraries

In [6]:
import os
import re
import uuid
import json
import numpy as np
from typing import List
from copy import deepcopy
from dotenv import load_dotenv
import matplotlib.pyplot as plt
from pathlib import Path
from collections import defaultdict
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from typing import Optional, Any, defaultdict
from langchain.chat_models import init_chat_model
from pydantic import BaseModel, Field, field_validator
from langchain.output_parsers import PydanticOutputParser
from pydantic_settings import BaseSettings, SettingsConfigDict
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser

#### Load environmental variables

In [7]:
def load_env():
	env_file_path = ".env"
	if os.path.exists(env_file_path):
		load_dotenv(env_file_path, override=True)
		print("Loaded environment variables")
	else:
		print(f"Error: .env file not found at {env_file_path}")

load_env()

class ModelSettings(BaseSettings):
	model: str
	temperature: Optional[float] = 0
	max_tokens: Optional[int] = None

class ProcessModelSettings(BaseSettings):
	embedding: ModelSettings
	gpt4o: ModelSettings
	phi4: ModelSettings
	deepseek: ModelSettings
	o1: ModelSettings



class Settings(BaseSettings):
	model_config = SettingsConfigDict(env_file=".env", env_nested_delimiter="__")

	# Keys
	azure_openai_api_key: Optional[str]
	azure_openai_endpoint: Optional[str]
	openai_api_version: Optional[str]

	openai_api_deployment_name_gpt4_vision_no_filters: Optional[str]
	openai_vision_api_version_gpt4_vision_no_filters: Optional[str]
	openai_api_key_gpt4_vision_no_filters: Optional[str]
	openai_base_url_gpt4_vision_no_filters: Optional[str]

	# models
	processmodel: ProcessModelSettings
	

settings = Settings()


Loaded environment variables


#### Extract information headers from the text

In [8]:
def extract_chapter_title(content):
	pattern = r"^#\s*(.+)$"
	for i, line in enumerate(content.split("\n"), start=1):
		match = re.match(pattern, line)
		if match:
			return match.group(1).strip()
	return ""

def extract_sections_list(content):
	pattern = r'(^## (?!#).*?(?=^## (?!#)|\Z))'
	sections = re.findall(pattern, content, flags=re.MULTILINE | re.DOTALL)
	section_list = []
	for sec in sections:
		sec = sec.strip()
		lines = sec.splitlines()
		if lines:
			header = lines[0][3:].strip()  # Remove "## " prefix
			body = "\n".join(lines[1:]).strip()
			section_list.append((header, body))  # Store as a tuple
	return section_list


def RAG_extract_criteria_advices(text):

	advice_pattern = r"###\s*(?:Advice\s*)?(\d+)\.\s*([^\n]+)\s*([\s\S]+?)(?=###\s*(?:Advice\s*)?\d+\.|\Z)"
	matches = re.findall(advice_pattern, text)
	Advices = {}

	sep = "\n"
	for i, match in enumerate(matches):
		advice_number = int(match[0].strip())
		advice_title = f"{sep}Advice {match[0].strip()}. {match[1].strip()}\n"
		Advices[advice_number] = advice_title + match[2].strip()

	criteria_pattern = r"^-\s*(Critical|Urgent|Normal)\s*\|\s*([^|\n]+)(?:\|\s*([^|\n]+))?$"
	matches = re.findall(criteria_pattern, text, flags=re.MULTILINE)
	criteria_list = []
	for severity, criteria, advice_codes in matches:
		severity = severity.strip()
		criteria = criteria.strip()

		advice_list = []
		for advice_code in map(str.strip, advice_codes.split(".")):

			if advice_code == "":
				print(f"Empty advice code found in criteria: {criteria}")
				print(f"text: {text}")
				raise ValueError("Advice code is empty")

			if not advice_code:
				continue
			try:
				advice_number = int(advice_code)
				if advice_number in Advices:
					advice_list.append(Advices[advice_number])
			except ValueError:
				advice_list.append(f"- No specific emergency advices are given, note: ({advice_code}).")

		# Remove useless criteria/advices
		if "other, not urgent" in criteria.lower() or "other symptoms related to this page" in criteria.lower() or not advice_list:
			continue
		
		criteria_list.append({
			"severity": severity,
			"criteria": criteria,
			"advice_list": advice_list
		})

	return criteria_list



def TRAIN_get_criteria_advices(text):

	advice_pattern = r"###\s*(?:Advice\s*)?(\d+)\.\s*([^\n]+)\s*([\s\S]+?)(?=###\s*(?:Advice\s*)?\d+\.|\Z)"
	matches = re.findall(advice_pattern, text)
	Advices = {}

	sep = "\n"
	for i, match in enumerate(matches):
		advice_number = int(match[0].strip())
		advice_title = f"{sep}Advice {match[0].strip()}. {match[1].strip()}\n"
		Advices[advice_number] = advice_title + match[2].strip()

	criteria_pattern = r"^-\s*(Critical|Urgent|Normal)\s*\|\s*([^|\n]+)(?:\|\s*([^|\n]+))?$"
	matches = re.findall(criteria_pattern, text, flags=re.MULTILINE)
	criteria_list = []
	for severity, criteria, advice_codes in matches:
		severity = severity.strip()
		criteria = criteria.strip()

		advice_list = []
		for advice_code in map(str.strip, advice_codes.split(".")):  

			if not advice_code:
				continue  
			try:
				advice_number = int(advice_code)
				if advice_number in Advices:
					advice_list.append(Advices[advice_number])
			except ValueError:
				continue

		# Remove useless criteria/advices
		if "other, not urgent" in criteria.lower() or "other symptoms related to this page" in criteria.lower() or not advice_list:
			continue
		
		criteria_list.append({
			"severity": severity,
			"criteria": criteria,
			"advice_list": advice_list
		})

	return criteria_list


### Extract criteria and advices

In [10]:
class ExtractData():
	def __init__(self, file_path):
		self.file_path = file_path
		self.base_name = Path(file_path).stem
		self.formatted_data = []
		self.categorized_data = {
			"Criteria": [],
			"Scenario": [],
			"Other": []
		}

	def extract_critiera(self, chapter, section_text):
		criteria_list = RAG_extract_criteria_advices(section_text)
		for i, entry in enumerate(criteria_list):
			criteria = entry["criteria"]
			advices = "\n".join(entry["advice_list"])
			self.categorized_data["Criteria"].append({"criteria": criteria, "advices": advices, "chapter": chapter})
					  
	def extract_scenario(self, chapter, section_text):
		scenario_match = re.search(r"### SCENARIO\s*(.*?)(?=\n###|\n##|\n#|$)", section_text, re.DOTALL)

		# Improved regex to handle spaces and stop at the next section (##, ###, or #)
		if_yes_match = re.search(r"### IF YES\s*(.*?)(?=\n###|\n##|\n#|$)", section_text, re.DOTALL)
		if_no_match = re.search(r"### IF NO\s*(.*?)(?=\n###|\n##|\n#|$)", section_text, re.DOTALL)

		if_yes_content = if_yes_match.group(1).strip() if if_yes_match else None
		if_no_content = if_no_match.group(1).strip() if if_no_match else None
		scenario_content = scenario_match.group(1).strip() if scenario_match else None

		scenario_content = scenario_content.lower().replace("\n", ", ").replace("-", "").strip()
		scenario_content = " ".join(scenario_content.split())

		if not scenario_content:
			raise Exception("No scenario content found")
		
		if if_yes_content:
			self.categorized_data["Scenario"].append({"criteria": scenario_content + " (IF YES)", "advices": if_yes_content, "chapter": chapter})

		if if_no_content:
			self.categorized_data["Scenario"].append({"criteria": scenario_content + " (IF NO)", "advices": if_no_content, "chapter": chapter})


	def extract_other(self, chapter, header, section_text):
		self.categorized_data["Other"].append({"criteria": f"{header}\n{section_text}", "chapter": chapter})

	def extract_data(self):
		chapter_text = Path(self.file_path).read_text(encoding="utf-8")
		chapter = extract_chapter_title(chapter_text).strip()
		section_list = extract_sections_list(chapter_text)
		
		for index, (header, body) in enumerate(section_list):
			header = header.strip().lower()
			body = body.strip()
			if header == "criteria":
				self.extract_critiera(chapter, body)
			elif header == "emergency response":
				self.extract_scenario(chapter, body)
			# else:
			# 	self.extract_other(chapter, header, body)

		#print(json.dumps(self.categorized_data, indent=4, ensure_ascii=False))

	def save_data(self, categorized_data=None):
		self.categorized_data = categorized_data if categorized_data else self.categorized_data
		self.formatted_data = []

		with open(f"Data/Json/Unique/Chap_{self.base_name}.jsonl", "w") as f_out:
			for category, extractions in self.categorized_data.items():
				for entry in extractions:
					unique_id = str(uuid.uuid4())
					criteria = entry["criteria"]
					chapter = entry["chapter"]
					advices = entry.get("advices", None)

					transformed_entry = [
						{"role": "user", "content": criteria, "uuid": unique_id, "category": category},
						{
							"role": "assistant",
							"content": chapter,
							"advices": advices
						},
					]
					f_out.write(
						json.dumps({"entry": transformed_entry}, ensure_ascii=False) + "\n"
					)

		#print(f"Saved Chap_{self.base_name}.jsonl")

#### Initialize and generate QA

In [11]:
file_paths = ["Data/LabelWork/" + file for file in os.listdir("Data/LabelWork")]
file_paths.sort()
for i, file_path in enumerate(file_paths):
	data_object = ExtractData(file_path=file_path)
	data_object.extract_data()
	data_object.save_data()
	print(f"Processed {i+1}/{len(file_paths)} files.")

Processed 1/41 files.
Processed 2/41 files.
Processed 3/41 files.
Processed 4/41 files.
Processed 5/41 files.
Processed 6/41 files.
Processed 7/41 files.
Processed 8/41 files.
Processed 9/41 files.
Processed 10/41 files.
Processed 11/41 files.
Processed 12/41 files.
Processed 13/41 files.
Processed 14/41 files.
Processed 15/41 files.
Processed 16/41 files.
Processed 17/41 files.
Processed 18/41 files.
Processed 19/41 files.
Processed 20/41 files.
Processed 21/41 files.
Processed 22/41 files.
Processed 23/41 files.
Processed 24/41 files.
Processed 25/41 files.
Processed 26/41 files.
Processed 27/41 files.
Processed 28/41 files.
Processed 29/41 files.
Processed 30/41 files.
Processed 31/41 files.
Processed 32/41 files.
Processed 33/41 files.
Processed 34/41 files.
Processed 35/41 files.
Processed 36/41 files.
Processed 37/41 files.
Processed 38/41 files.
Processed 39/41 files.
Processed 40/41 files.
Processed 41/41 files.


### Generate paraphrases

In [23]:
json_parser = JsonOutputParser()


Paraphrase_Count = 12

expanded_criteria_queries = PromptTemplate(
    template="""
### SYSTEM:
You are a medical language model working strictly within a clinical and emergency dispatch context. 
The following inputs are part of a dataset used for emergency medical protocols. It may contain references to trauma, injuries, or sensitive conditions, but these are strictly medical and not graphic, explicit, or abusive.
Given a chapter and a situation, your task is to generate medically accurate, varied and diverse keyword-style paraphrases that express the same idea using different terminology and phrasing.

### Goals:
- Capture **diverse wording** and structure without changing meaning.
- Include **all symptoms**, **demographic indicators** (e.g., adult, child, infant, teen), and **context** (e.g., trauma, CPR-trained, location/environment).
- The **chapter** should be reflected **implicitly** through phrasing, not stated directly.
- This diversity improves **model generalization** during training, so prioritize **variety and uniqueness** in phrasing.

### Style:
- Use **short fragments**, separated by commas or keywords.
- Avoid full sentences.
- Each paraphrase must retain the **intent, medical significance, and critical details**.

### INSTRUCTIONS:
- Ensure each paraphrase is unique in terms and structure and wording to help the model **generalize** better.
- Each paraphrase must retain critical medical meaning while varying language significantly.
- Avoid just reordering words; use synonyms, alternate symptom phrasing, and alternate demographic framing.
- 

Chapter: "{Chapter}"
Situation: "{Situation}"

Response Format:
```json
{{
  "Paraphrases": [],
}}
```

### IMPORTANT:
- Return a JSON object with exactly one key: "Paraphrases" containing {Paraphrase_Count} UNIQUE, keyword-based paraphrases that express the same medical situation with diverse wording, capturing all symptoms, demographics, and context from the input chapter and situation.
- Do not use latex or markdown formatting in your answer.
- Do not follow the example verbatim; use it as a guide an apply the same logic to the new chapter and situation.
""",
    input_variables=["Chapter", "Situation", "Paraphrase_Count"],
)

In [None]:
class GenerateParaphraseData():
	def __init__(self, file_path):
		self.retries = 3
		self.file_path = file_path
		self.base_name = Path(file_path).stem
		self.formatted_data = []
		self.categorized_data = {
			"Criteria": [],
		}

	def expand_criteria(self, Chapter, Situation):
		# Try with o1 model first
		for i in range(self.retries):
			try:
				llm = init_chat_model(**settings.processmodel.o1.model_dump(exclude_none=True))
				chain = expanded_criteria_queries | llm | StrOutputParser()
				response = chain.invoke({"Chapter": Chapter, "Situation": Situation, "Paraphrase_Count": Paraphrase_Count})
				object = json_parser.parse(response.strip())

				if len(object["Paraphrases"]) != Paraphrase_Count:
					print(f"- Warning: Less than {Paraphrase_Count} paraphrases generated. Retrying... ({i+1}/{self.retries})")
					continue
				
				return object["Paraphrases"]
			except Exception as e:
				print(f"- Error: {e}")
			finally:
				if llm is not None:
					del llm

		# Retry with gpt4o model
		print("- Warning: Retrying with gpt4o model...")
		try:
			llm = init_chat_model(**settings.processmodel.gpt4o.model_dump(exclude_none=True))
			chain = expanded_criteria_queries | llm | StrOutputParser()
			response = chain.invoke({"Chapter": Chapter, "Situation": Situation, "Paraphrase_Count": Paraphrase_Count})
			object = json_parser.parse(response.strip())

			if len(object["Paraphrases"]) != Paraphrase_Count:
				raise Exception(f"Less than {Paraphrase_Count} paraphrases generated.")
			
			return object["Paraphrases"]
		except Exception as e:
			print(f"- Error: {e}")
		finally:
			if llm is not None:
				del llm

		# Retry with deepseek model
		print("- Warning: Retrying with deepseek model...")
		try:
			llm = init_chat_model(**settings.processmodel.deepseek.model_dump(exclude_none=True))
			chain = expanded_criteria_queries | llm | StrOutputParser()
			cot_response = chain.invoke({"Chapter": Chapter, "Situation": Situation, "Paraphrase_Count": Paraphrase_Count})
			response = re.sub(r"<think>.*?</think>", "", cot_response, flags=re.DOTALL).strip()
			object = json_parser.parse(response.strip())

			if len(object["Paraphrases"]) != Paraphrase_Count:
				raise Exception(f"Less than {Paraphrase_Count} paraphrases generated.")

			return object["Paraphrases"]
		except Exception as e:
			print(f"- Error: {e}")
		finally:
			if llm is not None:
				del llm

		# Retry with phi4 model
		print("- Warning: Retrying with phi4 model...")
		try:
			llm = init_chat_model(**settings.processmodel.phi4.model_dump(exclude_none=True))
			chain = expanded_criteria_queries | llm | StrOutputParser()
			response = chain.invoke({"Chapter": Chapter, "Situation": Situation, "Paraphrase_Count": Paraphrase_Count})
			object = json_parser.parse(response.strip())

			if len(object["Paraphrases"]) != Paraphrase_Count:
				raise Exception(f"Less than {Paraphrase_Count} paraphrases generated.")
			return object["Paraphrases"]
		except Exception as e:
			print(f"- Error: {e}")
		finally:
			if llm is not None:
				del llm

		print("- Error: All models failed to generate paraphrases.")


	def extract_critiera(self, chapter, section_text):
		criteria_list = TRAIN_get_criteria_advices(section_text)
		for i, entry in enumerate(criteria_list):
			unique_id = str(uuid.uuid4())
			criterion = entry["criteria"]
			advices = f"# Relevant Chapters:\n- {chapter}\n" + "\n".join(entry["advice_list"])
			paraphrases = self.expand_criteria(chapter, criterion)
			for criteria in paraphrases:
				self.categorized_data["Criteria"].append({"criteria": criteria,  "uuid": unique_id, "advices": advices, "chapter": chapter})

	def extract_data(self):
		chapter_text = Path(self.file_path).read_text(encoding="utf-8")
		chapter = extract_chapter_title(chapter_text).strip()
		section_list = extract_sections_list(chapter_text)
		for index, (header, body) in enumerate(section_list):
			header = header.strip().lower()
			body = body.strip()
			if header == "criteria":
				self.extract_critiera(chapter, body)

	def save_data(self, categorized_data=None):
		self.categorized_data = categorized_data if categorized_data else self.categorized_data
		self.formatted_data = []

		with open(f"Data/Json/Paraphrase/Chap_{self.base_name}.jsonl", "w") as f_out:
			for category, extractions in self.categorized_data.items():
				for entry in extractions:
					unique_id = entry["uuid"]
					criteria = entry["criteria"]
					chapter = entry["chapter"]
					advices = entry.get("advices", None)

					transformed_entry = [
						{"role": "user", "content": criteria, "uuid": unique_id, "category": category},
						{
							"role": "assistant",
							"content": chapter,
							"advices": advices
						},
					]
					f_out.write(
						json.dumps({"entry": transformed_entry}, ensure_ascii=False) + "\n"
					)

		#print(f"Saved Chap_{self.base_name}.jsonl")

In [28]:
file_paths = ["Data/LabelWork/" + file for file in os.listdir("Data/LabelWork")]
file_paths.sort()
for i, file_path in enumerate(file_paths):
	data_object = GenerateParaphraseData(file_path=file_path)
	data_object.extract_data()
	data_object.save_data()
	print(f"Processed {i+1}/{len(file_paths)} files.")

Processed 1/41 files.
Processed 2/41 files.
Processed 3/41 files.
Processed 4/41 files.
Processed 5/41 files.
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
Processed 6/41 files.
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has not provided the response due to a content filter being triggered
- Error: Azure has