In [1]:
#Data filtering/conversion to json for readibility and efficieny. Ran on Kaggle
!git clone https://github.com/abachaa/MedQuAD.git


Cloning into 'MedQuAD'...
remote: Enumerating objects: 11310, done.[K
remote: Counting objects: 100% (18/18), done.[K
remote: Compressing objects: 100% (15/15), done.[K
remote: Total 11310 (delta 8), reused 8 (delta 3), pack-reused 11292[K
Receiving objects: 100% (11310/11310), 11.01 MiB | 23.88 MiB/s, done.
Resolving deltas: 100% (6806/6806), done.


In [2]:
#extract data and convert it into json
import os
import xml.etree.ElementTree as ET
import json
def extract_data_from_xml(file_path):
    tree = ET.parse(file_path)
    root = tree.getroot()
    data = []
    qa_pairs = root.find('QAPairs')
    if qa_pairs is not None:
        for qa in qa_pairs.findall('QAPair'):
            question = qa.find('Question')
            answer = qa.find('Answer')
            
            # Extract question and answer data only if both are present
            if question is not None and answer is not None:
                question_text = question.text if question.text else ""
                answer_text = answer.text if answer.text else ""
                
                if question_text and answer_text:  # Ensure both question and answer are non-empty
                    data.append({
                        "question": question_text,
                        "answer": answer_text
                    })
    return data
base_directory = "/kaggle/working/MedQuAD"
all_data = {"data": []}
for folder_name in os.listdir(base_directory):
    folder_path = os.path.join(base_directory, folder_name)
    if os.path.isdir(folder_path):
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.xml'):
                file_path = os.path.join(folder_path, file_name)
                file_data = extract_data_from_xml(file_path)
                all_data["data"].extend(file_data)

# Convert the data to JSON format
output_json = json.dumps(all_data, indent=4)
output_file_path = "medquad_onlyqna.json"
with open(output_file_path, 'w') as output_file:
    output_file.write(output_json)

print(f"Data has been extracted and saved to {output_file_path}")


Data has been extracted and saved to medquad_onlyqna.json


In [7]:
#split the data into multiple chunks
import json
import os
def split_json_file(input_file, output_dir, chunk_size):
    with open(input_file, 'r') as f:
        data = json.load(f)
    qa_pairs = data.get("data", [])

    # Determine the number of chunks needed
    num_chunks = (len(qa_pairs) + chunk_size - 1) // chunk_size
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for i in range(num_chunks):
        chunk_data = qa_pairs[i * chunk_size:(i + 1) * chunk_size]
        chunk_file_path = os.path.join(output_dir, f"medquad_chunk_{i + 1}.json")
        with open(chunk_file_path, 'w') as chunk_file:
            json.dump({"data": chunk_data}, chunk_file, indent=4)

    print(f"Data has been split into {num_chunks} chunks and saved in {output_dir}")
input_file = "/kaggle/working/medquad_onlyqna.json"
output_dir = "500_chunks"
chunk_size = 500  # 500 for multiple chunks which would make it easier while indexing

# Split the JSON file into chunks
split_json_file(input_file, output_dir, chunk_size)


Data has been split into 33 chunks and saved in 500_chunks


In [5]:
#check if data is stored properly in json
import json
with open('/kaggle/working/750_chunks/medquad_chunk_13.json', 'r') as file:
    data = json.load(file)
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list) and len(data['data']) > 0:
    first_entry = data['data'][0]
    print(first_entry)
else:
    print("The 'data' key is missing, or it is not a list or it is empty.")

{'question': 'What are the symptoms of Endocarditis ?', 'answer': "Infective endocarditis (IE) can cause a range of signs and symptoms that can vary from person to person. Signs and symptoms also can vary over time in the same person.\n                \nSigns and symptoms differ depending on whether you have an underlying heart problem, the type of germ causing the infection, and whether you have acute or subacute IE.\n                \nSigns and symptoms of IE may include:\n                \nFlu-like symptoms, such as fever, chills, fatigue (tiredness), aching muscles and joints, night sweats, and headaches.\n                \nShortness of breath or a cough that won't go away.\n                \nA new heart murmur or a change in an existing heart murmur.\n                \nSkin changes such as:   - Overall paleness.   - Small, painful, red or purplish bumps under the skin on the fingers or toes.   - Small, dark, painless flat spots on the palms of the hands or the soles of the feet.  

Saving final zip file containing files stored in json, for usage with Atlas Vector Store and Google embeddings done on load.py and then final Streamlit implementation with gemini-flash in extract.py

In [8]:
import os
import zipfile

def zip_dir(dir_path, zip_path):
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Walk through the directory
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, dir_path)
                zipf.write(file_path, arcname)
dir_to_zip = '/kaggle/working/500_chunks'
zip_file_path = '/kaggle/working/500_chunks.zip'
zip_dir(dir_to_zip, zip_file_path)

print(f"Directory '{dir_to_zip}' has been compressed to '{zip_file_path}'.")


Directory '/kaggle/working/500_chunks' has been compressed to '/kaggle/working/500_chunks.zip'.


In [10]:
import os
import zipfile

def zip_dir(dir_path, zip_path):
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Walk through the directory
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, dir_path)
                zipf.write(file_path, arcname)
dir_to_zip = '/kaggle/working/1k_chunks'
zip_file_path = '/kaggle/working/1k_chunks.zip'
zip_dir(dir_to_zip, zip_file_path)

print(f"Directory '{dir_to_zip}' has been compressed to '{zip_file_path}'.")


Directory '/kaggle/working/1k_chunks' has been compressed to '/kaggle/working/1k_chunks.zip'.
