# PMC RAG
## Why RAG
By embracing RAG, you can unlock a range of benefits for your organization:

 * Improved decision-making: Accessing rights and trustworthy information empowers better choices and strategies.
 * Enhanced customer experience: Delivering reliable answers and insights builds trust and satisfaction.
 * Reduced risk and compliance: Curated data sources minimize the risk of misinformation and ensure compliance with regulations.
 * Increased efficiency: Streamlining access to information saves time and resources.
 
The beauty of RAG lies in its focus on data quality, not just data quantity. We're moving beyond the “bigger is better” mentality of massive models trained on internet data that often include misinformation and biases. RAG puts the emphasis on smaller, more valuable models that use curated, trustworthy data sources.


## The Standard Rag Workflow Empowerd by TileDB

1. **User:** Uploads documents, and the system converts them into vectors (numeric representations) using sentence embeddings.
2. **User:** Stores these document vectors, along with the documents and metadata, into TileDB (a smart database for storing vectors).
3. **User:** Asks a question.
4. **Embedding Model:** Processes the user's question by embedding it into a vector and sends this vector to TileDB.
5. **TileDB:** Searches through the stored document vectors and retrieves the most relevant documents.
6. **System:** Takes these relevant documents and constructs a new query for the LLM, instructing it to use these documents as context.
7. **LLM:** Uses the relevant documents as context to generate and deliver the final answer back to the user.

## The Notebook and Dag Pipelines

### Imports
The below imports are for our local file push to our remote repo as well as the `initialize_step()` that will build our end to end pipeline for us and submit a dag directly. 

In [10]:
import requests
import os
import pandas as pd
import tarfile
import urllib.request
import xml.etree.ElementTree as ET
import tiledb
from tiledb.cloud.dag import dag
import hashlib
import tiledb
import subprocess
import shutil
import math

### Credentials
The below cell is so we can cache our credentials during an initial push for our steps. After you push the local file, you may need to manually enter creds and push the file from the terminal. Afterward, the credentials should be cached and you can run without issues. 

In [11]:
#before running below please run this. The first push to the repo (if necessary) may fail and you will need to manually push thie file.
#after, the credentials should be temporarily cached. 
#!git config --global user.email "you@example.com"
#!git config --global user.name "Your Name"
!git config --global credential.helper cache
!git config --global credential.helper 'cache --timeout=3600'
!git config --global credential.helper store

### Upload "pipeline" Helpers
For now, this is a quick simulation of a pipeline for updating the local file. The first cell is full ofhelper functions for our notebook pipeline (ran in our notebook). 

In [12]:
import os
import subprocess
import hashlib
import pandas as pd
import math

def add_and_commit_files(message: str):
    """
    Adds all files to the Git staging area, commits them with a provided message, and pushes the changes to the remote repository.
    
    :param message: Commit message to be used in the Git commit.
    """
    
    # Print the current working directory
    print(f"Working directory is {os.getcwd()}")

    # Stage all changes (new, modified, deleted) in the current Git repository
    subprocess.run(["git", "add", "-A"])

    # Commit the staged changes with the provided commit message
    subprocess.run(["git", "commit", "-m", f"{message}"])

    # Push the committed changes to the remote repository (origin/master by default)
    subprocess.run(["git", "push"])

def hash_file(file_path: str) -> str:
    """
    Computes the SHA256 hash of the contents of a file.

    :param file_path: Path to the file to be hashed.
    :return: The SHA256 hash of the file contents as a hex string.
    """
    
    # Create a new SHA256 hash object
    hasher = hashlib.sha256()
    
    # Open the file in binary read mode
    with open(file_path, 'rb') as f:
        # Read the file contents and update the hash object
        buffer = f.read()
        hasher.update(buffer)
    
    # Return the hexadecimal digest of the hash
    return hasher.hexdigest()

def estimate_resources(job_df: pd.DataFrame):
    """
    Estimates the CPU and memory usage required for processing a given DataFrame, with an additional 2 GB overhead, 
    and rounds the total memory usage to the nearest GB.

    :param job_df: A pandas DataFrame representing job data.
    :return: A tuple containing the estimated number of CPUs per job and the total memory usage in GB (rounded).
    """
    
    # Print the data types of each column in the DataFrame for reference
    print("Data types of the DataFrame:")
    print(job_df.dtypes)

    # Calculate the memory usage of each column in the DataFrame in MB (deep=True considers the actual memory usage)
    memory_usage_per_column = job_df.memory_usage(deep=True) / (1024 ** 2)  # Convert bytes to MB
    total_memory_usage = memory_usage_per_column.sum()  # Sum of the memory usage of all columns
    
    # Convert total memory usage to GB and add an additional 2 GB overhead for processing
    total_memory_usage_gb = total_memory_usage / 1024  # Convert MB to GB
    total_memory_usage_gb += 2  # Add 2 GB overhead for processing
    
    # Round up the total memory usage to the nearest GB
    rounded_memory_usage_gb = math.ceil(total_memory_usage_gb)
    
    # Print the memory usage for each column and the total estimated memory usage
    print("Memory usage per column (MB):")
    print(memory_usage_per_column)
    
    print(f"Total memory usage for job (GB, rounded to the nearest GB, with overhead): {rounded_memory_usage_gb}")

    # Estimate the CPU usage per job (adjustable based on job complexity)
    cpu_per_job = 1  # Example: assuming 1 CPU per job
    
    # Return the estimated CPU count and the rounded memory usage in GB
    return cpu_per_job, rounded_memory_usage_gb


### The "Upload Pipeline" 
This next cell is similar to a DevOps runner pipeline where a user would commit an updated file and the run would create a container with a run hash as the tag. The goal here is to determine if there is a change in the local file and update the hash. The next stage of the pipeline will/would use the hash to determine if a run is necessary. The frequency of checking a run really depends on the frequency of the file update and how we want to tune it to adjust the quality of our LLMs outputs. Fututre state this could be a webhook upon git update, or s3 storage. The benefit of git for this is the "GitOps" like workflow of tracking changes to our ingestion documents file for RAG and then using that knowledge to understand the impact on our model outputs. 

#### The Code Below in Plain English

1. **Change to Home Directory:** It starts by navigating to the user's home directory.

2. **Clone or Pull Repository:** It checks if a given repository (based on the URL) already exists locally. If it exists, it updates the repository by pulling the latest changes. If it doesn't exist, it clones the repository from the given URL.

3. **Hash a Local File:** It calculates a hash (unique identifier) of a local file's contents to check if it has been modified.

4. **Compare the Hash:** It checks whether a previously saved hash (in a hash.txt file) exists. If it does, the new hash is compared to the saved one to determine if the file has changed.

5. **Skip or Proceed:** If the file hasn't changed (i.e., the current hash matches the previous one), it skips any further action. If the file has changed, it proceeds.

6. **Update the Hash File:** It writes the new hash into a hash.txt file in the repository.

7. **Commit Changes to Git:** Finally, it navigates to the repository directory, adds, commits, and pushes the new or modified file and updated hash to the Git repository

In [4]:
import os
import subprocess

def handle_local_file(repo_url: str, local_file_name: str, hash_file_name: str):
    """
    Clones or pulls a Git repository, checks if a local file has changed by comparing its hash with a stored hash,
    and if the file has been modified, pushes the updated file and hash to the repository.
    
    :param repo_url: The URL of the Git repository.
    :param local_file_name: The local file whose hash will be checked for modifications.
    :param hash_file_name: The file in the repo that stores the previous hash of the local file.
    """
    
    # Step 1: Navigate to the home directory to clone or pull the repo
    home_directory = os.path.expanduser("~")  # Get the user's home directory
    os.chdir(home_directory)  # Change the working directory to the home directory
    
    # Extract the repository name from the repo URL (assumes .git format at the end)
    repo_name = repo_url.split('/')[-1].replace('.git', '')
    
    # Check if the repository already exists locally
    if os.path.exists(repo_name): 
        # If the repo exists, navigate into it and pull the latest changes
        os.chdir(repo_name)
        subprocess.run(["git", "pull"])  # Pull latest changes from the remote repo
        os.chdir("..")  # Go back to the home directory after pulling changes
    else:
        # If the repo doesn't exist, clone it from the provided URL
        subprocess.run(["git", "clone", repo_url])  # Clone the repository
    
    # Get the full path to the local repository
    local_repo_path = os.path.join(os.getcwd(), repo_name)
    print(f"Created {local_repo_path}")  # Print the path to the repo
    
    # Step 2: Hash the contents of the local file
    current_hash = hash_file(local_file_name)  # Hash the local file
    
    # Step 3: Check if the hash file exists in the repository and compare hashes
    hash_file_path = os.path.join(local_repo_path, hash_file_name)  # Path to the hash file in the repo
    
    if os.path.exists(hash_file_path):
        # If the hash file exists, read the previous hash
        with open(hash_file_path, 'r') as f:
            previous_hash = f.read().strip()  # Strip any extra whitespace
    else:
        previous_hash = ""  # If the hash file doesn't exist, assume no previous hash
    
    # Step 4: Compare the current hash with the previous hash
    if current_hash == previous_hash:
        # If the hashes match, the file hasn't changed
        print("File has not changed, skipping submission.")
        return
    else:
        # If the hashes differ, the file has changed, so proceed with updating the repo
        print("File has changed, proceeding with submission.")
    
    # Step 5: Update the hash file in the repository with the new hash
    with open(hash_file_path, 'w') as f:
        f.write(current_hash)  # Write the new hash to the file
    
    # Step 6: Commit the changes (the updated file and the new hash) to the Git repository
    os.chdir(local_repo_path)  # Change directory to the repo
    add_and_commit_files("added local articles file to the repository")  # Stage, commit, and push the changes


In [None]:
# Example usage
#handle_local_file(
#    repo_url="https://github.com/TileDB-Inc/pmc-llm.git", 
#    local_file_name="rag-article-list.txt", 
#    hash_file_name="hash.txt"
#)

## Ingestion Tasks for the TileDB DAG

In [5]:
def consolidate_chunks(total_jobs, bucket_region, object_path):
    """
    Consolidates a list of chunked files into a single file in S3.

    :param total_jobs: Number of chunked files to process
    :param bucket_region: The S3 bucket region
    :param object_path: The S3 path to the files
    """
    import tiledb
    
    # Create a TileDB context with the specified S3 region
    ctx = tiledb.Ctx({"vfs.s3.region": bucket_region})
    # Initialize the TileDB VFS (Virtual File System) to interact with S3
    vfs = tiledb.VFS(ctx=ctx)
    
    # Initialize tmp_data as an empty bytes object to store the consolidated data
    tmp_data = b""  # Use bytes since the files are being read in binary mode
    
    # Loop through all the chunked files and concatenate them
    for i in range(total_jobs):
        # Construct the file path for each chunk
        file_path = f"{object_path}/file_history_{i}"
        
        # Check if the file exists in the S3 bucket
        if vfs.is_file(file_path):
            # If the file exists, open it in binary read mode and append its contents
            with vfs.open(file_path, 'rb') as f:
                tmp_data += f.read()  # Read and concatenate the binary content
        else:
            # If the file doesn't exist, print a message and continue to the next file
            print(f"No such file path {file_path}. Moving onto the next file.")
            continue
    
    # Now `tmp_data` contains the concatenated data from all chunked files
    # Define the path for the consolidated file in the S3 bucket
    consolidated_file_path = f"{object_path}/consolidated_file_history"
    
    # Open the consolidated file in binary write mode and write the consolidated data
    with vfs.open(consolidated_file_path, 'wb') as f:
        f.write(tmp_data)  # Write the concatenated binary data to the new file
    
    # Print a success message with the path of the consolidated file
    print(f"Consolidation complete. Consolidated file uploaded to: {consolidated_file_path}")


In [7]:
def delete_unwanted_files(bucket_region, bucket_path):
    """
    Deletes files in an S3 bucket that are not listed in the 'consolidated_file_history' file.
    Also creates a few empty test files and skips deleting the 'consolidated_file_history' file.
    
    :param bucket_region: The S3 bucket region
    :param bucket_path: The S3 path to the bucket
    """
    import tiledb
    import random
    import string
    import os
    
    # Initialize TileDB context and VFS with the specified S3 region
    ctx = tiledb.Ctx({"vfs.s3.region": bucket_region})
    vfs = tiledb.VFS(ctx=ctx)
    
    # Path to the 'consolidated_file_history' file in S3
    consolidated_history_path = os.path.join(bucket_path, "consolidated_file_history")
    
    # Step 1: Load the list of valid files from 'consolidated_file_history'
    valid_files = set()  # Initialize an empty set to store valid file names
    if vfs.is_file(consolidated_history_path):
        # If 'consolidated_file_history' exists, open and read it
        with vfs.open(consolidated_history_path, 'rb') as f:
            # Read each line, decode from bytes to string, and strip newline characters
            valid_files = set(line.decode('utf-8').strip() for line in f.read().splitlines())
    else:
        # Raise an error if the 'consolidated_file_history' file is not found
        raise FileNotFoundError(f"{consolidated_history_path} not found.")
    
    # Step 2: Create a few empty test files in the S3 bucket
    for _ in range(5):
        # Generate a random file name (e.g., 'Khaos_file_12345')
        random_file_name = f"Khaos_file_{''.join(random.choices(string.digits, k=5))}"
        random_file_path = os.path.join(bucket_path, random_file_name)
        # Write the test file content in binary mode
        with vfs.open(random_file_path, 'wb') as f:
            test = "I am an agent of Khaos here to ruin your RAG!!"
            f.write(test.encode('utf-8'))  # Encode the string before writing as bytes
    
    # Step 3: List all files in the S3 bucket
    all_files = []  # Initialize a list to store all file paths
    if vfs.is_dir(bucket_path):
        # If the bucket is a directory, list all files
        for file in vfs.ls(bucket_path):
            if not file.endswith('/'):  # Skip directories
                all_files.append(file)
    
    print("Begin the search!")
    
    # Step 4: Delete unwanted files
    for file_path in all_files:
        # Extract the file name from the full path
        file_name = os.path.basename(file_path)
        
        # Skip the 'consolidated_file_history' file to avoid deleting it
        if file_name == "consolidated_file_history":
            print(f"Skipping deletion of {file_name}, as it is the consolidated file.")
            continue  # Skip further checks for this file
        
        # Check if the file name is NOT in the consolidated list, delete if not
        if file_name not in valid_files:
            print(f"Deleting {file_path} because it's not in consolidated_file_history.")
            vfs.remove_file(file_path)  # Remove the file from S3
        else:
            print(f"Keeping {file_path}, it's in consolidated_file_history.")


In [None]:
def pmid_ingestion(job_df, object_directory: str, bucket_region: str, job_id: int):
    """
    Main function to handle the ingestion of articles using PubMed API and TileDB VFS.
    
    :param job_df: DataFrame containing PubMed PMIDs and gene-disease data.
    :param object_directory: Path to the S3 bucket for file storage.
    :param bucket_region: Region of the S3 bucket.
    :param job_id: ID of the job to track file history.
    """
    
    import time
    from datetime import datetime
    import os
    import urllib.request
    import xml.etree.ElementTree as ET
    import tarfile
    import requests
    import tiledb
    import glob
    
    now = datetime.now()
    ctx = tiledb.Ctx({"vfs.s3.region": bucket_region})
    vfs = tiledb.VFS(ctx=ctx)
    
    # Initialize the file history log for the job
    new_file_path = f"{object_directory}/file_history_{job_id}"
    with vfs.open(new_file_path, 'wb') as f:
        date_time_str = now.strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"Run launched at {date_time_str}"
        f.write(log_entry + '\n')  # Ensure writing bytes (use `.encode('utf-8')` if needed)
    
    start_time = time.time()

    # Function to convert PMID to PMCID and handle ingestion
    def convert_pmid_to_pmcid_and_ingest(pmid):
        """
        Convert PMID to PMCID and attempt to download article or abstract.
        
        :param pmid: PubMed ID for the article.
        """
        file_name = os.path.join(object_directory, f"{pmid}_abstract.txt")
        url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
        params = {
            "db": "pubmed",
            "id": pmid,
            "retmode": "json",
            "tool": "your_tool_name",
            "email": "your_email@example.com"
        }

        max_retries = 5
        retry_delay = 5

        for attempt in range(max_retries):
            try:
                response = requests.get(url, params=params)
                if response.status_code == 200:
                    data = response.json()
                    pmid_str = str(pmid)

                    if "result" in data and pmid_str in data["result"]:
                        result = data["result"][pmid_str]
                        pmcid = None

                        for article_id in result.get('articleids', []):
                            if article_id['idtype'] == 'pmc':
                                pmcid = article_id['value']
                                print(f"Attempting to download the article for {pmcid}")
                                
                                if download_pmc_article(pmcid):
                                    add_to_new_file(f"{pmcid}.pdf")
                                    return
                                
                        print(f"No PMCID found, fetching abstract for PMID {pmid}")
                        if fetch_abstract(pmid):
                            add_to_new_file(f"{pmid}_abstract.txt")
                            return

                    print(f"Invalid response for PMID {pmid}.")
                elif response.status_code == 429:
                    print(f"Rate limit hit for PMID {pmid}, retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff

            except requests.exceptions.ConnectionError as e:
                print(f"Network error on attempt {attempt + 1}: {e}")
                time.sleep(retry_delay)
                retry_delay *= 2

        print(f"Failed to retrieve data for PMID {pmid} after {max_retries} attempts.")

    # Download PMCID article using the OA API
    def download_pmc_article(pmcid):
        """
        Download the article package by PMCID and upload the PDF using TileDB VFS.
        
        :param pmcid: PubMed Central ID.
        """
        file_name = os.path.join(object_directory, f"{pmcid}.pdf")
        if vfs.is_file(file_name):
            print(f"File {pmcid}.pdf already exists in {object_directory}")
            return True

        url = f"https://www.ncbi.nlm.nih.gov/pmc/utils/oa/oa.fcgi?id={pmcid}"
        print(f"Fetching data from {url}...")
        
        max_retries = 5
        retry_delay = 5

        for attempt in range(max_retries):
            try:
                response = urllib.request.urlopen(url)
                response_content = response.read()

                if response.getcode() == 200:
                    root = ET.fromstring(response_content)
                    error_element = root.find('.//error')

                    if error_element is not None:
                        error_code = error_element.get('code')
                        if error_code == 'idIsNotOpenAccess':
                            print(f"PMCID {pmcid} is not Open Access. We will attempt to download the abstract for {pmid}")
                            return False
                    else:
                        # Proceed to download and extract article package
                        print("Attempting to download and extract the PMC article")
                        return download_and_extract_article(root, pmcid)
                elif response.status_code == 429:
                    print(f"Rate limit hit for PMCID {pmid}, retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff

            except requests.exceptions.ConnectionError as e:
                print(f"Network error on attempt {attempt + 1}: {e}")
                time.sleep(retry_delay)
                retry_delay *= 2

        print(f"Failed to retrieve article for PMCID {pmcid} after {max_retries} attempts.")
        return False

    def download_and_extract_article(root, pmcid):
        """
        Download and extract the article tarball.
        
        :param root: XML root element.
        :param pmcid: PubMed Central ID.
        """
        import shutil
        records = root.find('records')
        if records is not None:
            record = records.find(f'record[@id="{pmcid}"]')
            link = record.find('link[@format="tgz"]')
            if link is not None:
                tar_url = link.get('href')
                tar_file_name = f"{pmcid}.tar.gz"

                print(f"Downloading {tar_url}...")
                urllib.request.urlretrieve(tar_url, tar_file_name)

                print(f"Extracting {tar_file_name}...")
                with tarfile.open(tar_file_name, 'r:gz') as tar:
                    tar.extractall(pmcid)
                os.remove(tar_file_name)

                os.chdir(pmcid)
                pdf_files = glob.glob("**/*.pdf", recursive=True)
                if not pdf_files:
                    print(f"No PDF found in {os.getcwd()}")
                    os.chdir("..")
                    os.rmdir(pmcid)
                    return False

                pdf_file = pdf_files[0]
                print(f"Found PDF: {pdf_file}")

                with open(pdf_file, 'rb') as local_pdf_file:
                    pdf_content = local_pdf_file.read()

                with vfs.open(os.path.join(object_directory, f"{pmcid}.pdf"), 'wb') as vfs_pdf_file:
                    vfs_pdf_file.write(pdf_content)

                print(f"PDF {pdf_file} successfully written to TileDB VFS.")
                os.chdir("..")
                shutil.rmtree(pmcid)
                return True
        print(f"No records found for PMCID {pmcid}.")
        return False

    def fetch_abstract(pmid):
        """
        Fetch the abstract from PubMed by PMID.
        
        :param pmid: PubMed ID.
        """
        url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
        params = {'db': 'pubmed', 'id': pmid, 'retmode': 'xml'}
        
        file_name = os.path.join(object_directory, f"{pmid}_abstract.txt")
        if vfs.is_file(file_name):
            print(f"Abstract {pmid}_abstract.txt already exists.")
            return True

        print(f"Fetching abstract for PMID {pmid}...")
        max_retries = 5
        retry_delay = 5

        for attempt in range(max_retries):
            try:
                response = requests.get(url, params=params)
                if response.status_code == 200:
                    root = ET.fromstring(response.content)
                    abstract_text = "\n".join([abstract.text for abstract in root.findall(".//AbstractText")])

                    if not abstract_text.strip():
                        print(f"No abstract available for PMID {pmid}.")
                        return False

                    with vfs.open(file_name, 'wb') as file:
                        file.write(abstract_text.encode('UTF-8'))

                    print(f"Abstract saved to {file_name}.")
                    return True

                elif response.status_code == 429:
                    print(f"Rate limit hit for PMID {pmid}, retrying in {retry_delay} seconds...")
                    time.sleep(retry_delay)
                    retry_delay *= 2

            except requests.exceptions.ConnectionError as e:
                print(f"Network error on attempt {attempt + 1}: {e}")
                time.sleep(retry_delay)
                retry_delay *= 2

        print(f"Failed to retrieve abstract for PMID {pmid} after {max_retries} attempts.")
        return False

    def add_to_new_file(file_name):
        """
        Append the name of the processed file to the file history log in S3.
        
        :param file_name: The file name to append to the log.
        """
        print(f"Appending {file_name} to the file history.")
        new_file_path = f"{object_directory}/file_history_{job_id}"
        tmp_data = b""  # Use bytes for consistency

        try:
            if vfs.is_file(new_file_path):
                with vfs.open(new_file_path, 'rb') as f:
                    tmp_data = f.read()

            file_name_as_bytes = (file_name + '\n').encode('utf-8')
            tmp_data += file_name_as_bytes

            with vfs.open(new_file_path, 'wb') as f:
                f.write(tmp_data)
            print(f"Appended {file_name} to {new_file_path}")
        except Exception as e:
            print(f"Error appending to file history: {e}")

    # Process each row in the DataFrame
    job_df.dropna(subset=['PMID Gene-disease'], inplace=True)
    job_df.reset_index(drop=True, inplace=True)
    total_entries = len(job_df)
    print(f"Total entries: {total_entries}")

    for index, row in job_df.iterrows():
        try:
            pmid = str(int(row["PMID Gene-disease"]))
            convert_pmid_to_pmcid_and_ingest(pmid)
        except ValueError as e:
            print(f"Error processing row {index}: {e}")
            continue

    # End timing
    total_time = time.time() - start_time
    total_time_minutes = total_time / 60
    print(f"Processed {total_entries} in {total_time:.2f} seconds or {total_time_minutes:.2f} minutes")


## Pipeline Factory Function
The `initialize_step` will build out our DAG starting with the ingestion steps. The below code: (write when it works)

In [3]:
def pipeline_step(access_credentials: str, repo_url: str, out_directory: str, bucket_path: str, in_file: str = "rag-article-list.txt", num_jobs: int = 4, bucket_region: str = "us-west-2", hash_check: bool = True):
    """
    Pipeline step to pull the repo, check for changes using a hash, and divide the input file into jobs for batch processing.
    
    :param access_credentials: Credentials to access TileDB Cloud for DAG execution.
    :param repo_url: Git repository URL to clone or pull the latest version from.
    :param out_directory: Output directory where the files will be stored.
    :param bucket_path: S3 bucket path for file storage.
    :param in_file: Input file name, defaulting to 'rag-article-list.txt'.
    :param num_jobs: Number of jobs to divide the input file into, default is 4.
    :param bucket_region: S3 bucket region, default is 'us-west-2'.
    :param hash_check: If True, the function checks if the file has changed using a hash before proceeding.
    """
    
    # Step 1: Pull the latest changes from the repository
    home_directory = os.path.expanduser("~")  # Get the user's home directory
    os.chdir(home_directory)  # Change to home directory
    
    repo_name = repo_url.split('/')[-1].replace('.git', '')  # Extract repo name from URL

    # Check if the repo exists locally; if it does, pull the latest changes, otherwise clone it
    if os.path.exists(repo_name):
        os.chdir(repo_name)
        subprocess.run(["git", "pull"])  # Pull latest changes
    else:
        subprocess.run(["git", "clone", repo_url])  # Clone the repo if it doesn't exist
        os.chdir(repo_name)

    # Step 2: Prepare paths and file hashes for comparison
    full_bucket_path = f"{bucket_path}/{out_directory}"  # Full path to the S3 bucket
    previous_hash_path = "previous_hash.txt"  # Path to the previous hash file
    current_hash = ""  # Initialize current hash variable
    current_hash_path = "hash.txt"  # Path to the current hash file

    # Step 2: Check if the current hash file exists
    if os.path.exists(current_hash_path):
        # If the hash file exists, read the current hash value
        with open(current_hash_path, 'r') as f:
            current_hash = f.read().strip()
            print(f"Current hash: {current_hash}")
    else:
        # If no current hash file exists, create one by hashing the input file
        print(f"{current_hash_path} doesn't exist. Let's create it.")
        if os.path.exists(in_file):
            current_hash = hash_file(in_file)  # Compute the hash of the input file
            with open(current_hash_path, 'w') as fh:
                fh.write(current_hash)  # Write the current hash to the hash file
        else:
            print(f"{in_file} does NOT exist. Please submit a valid file and rerun this function.")
            return  # Exit the function if the input file does not exist

    # Step 3: Check for the previous hash and compare it to the current hash
    if os.path.exists(previous_hash_path):
        with open(previous_hash_path, 'r') as f:
            previous_hash = f.read().strip()  # Read the previous hash
            if current_hash == previous_hash:
                print("File did not change.")  # If the hash matches, the file hasn't changed
                if hash_check:
                    exit(1)  # Stop if hash_check is True and file hasn't changed
                else:
                    print("Continuing with processing despite no changes.")
            else:
                # If the hash differs, update the previous hash and continue processing
                print("Input file has changed, proceeding with processing.")
                with open(previous_hash_path, 'w') as fh:
                    fh.write(current_hash)  # Write the new hash to the previous hash file
    else:
        # If no previous hash exists, create one and proceed
        print("No previous hash found. Creating one now.")
        with open(previous_hash_path, 'w') as fh:
            fh.write(current_hash)

    # Step 4: Commit the updated hash and file to the Git repository
    add_and_commit_files("updating the hash and file to keep repo up to date")  # Commit changes to the repo

    # Step 5: Load the DataFrame from the input file
    df = pd.read_csv(in_file, sep='\t', engine='python')  # Load input file into a DataFrame
    print("Available columns in the DataFrame:")
    print(df.columns)  # Print the available columns in the DataFrame for debugging

    # Step 6: Divide the DataFrame into jobs based on the number of user-defined jobs
    total_rows = len(df)  # Get the total number of rows in the DataFrame
    print(f"Total rows in the DataFrame: {total_rows}")

    # Ensure that num_jobs does not exceed the number of rows
    if num_jobs > total_rows:
        num_jobs = total_rows  # If there are more jobs than rows, reduce num_jobs to match the rows
        print(f"Number of jobs reduced to {num_jobs} to match total rows.")

    # Calculate job size: distribute rows as evenly as possible
    job_size = math.ceil(total_rows / num_jobs)

    # Split the DataFrame into chunks (jobs) of approximately equal size
    jobs = [df.iloc[i:i + job_size] for i in range(0, total_rows, job_size)]

    print(f"Divided into {len(jobs)} jobs")  # Print the number of jobs created

    # Step 7: Set up TileDB Cloud DAG for batch processing
    dag = tiledb.cloud.dag.DAG(name="document_batch", mode=tiledb.cloud.dag.Mode.BATCH)  # Initialize a DAG
    consolidate_node = dag.submit(consolidate_chunks, num_jobs, bucket_region, full_bucket_path, access_credentials_name=access_credentials)
    delete_unwanted_node = dag.submit(delete_unwanted_files, bucket_region, full_bucket_path, access_credentials_name=access_credentials)
    delete_unwanted_node.depends_on(consolidate_node)  # Set dependency so delete_unwanted_node waits for consolidate_node

    # Step 8: Submit each job for ingestion
    i = 0  # Initialize a counter for job IDs
    for job_df in jobs:
        if job_df.empty:
            print("Skipping empty job.")  # Skip empty jobs
            continue
        # Estimate CPU and memory resources for each job
        cpu, mem = estimate_resources(job_df)
        print(f"Processing {len(job_df)} rows, estimated CPU: {cpu}, estimated memory: {mem:.2f} GB.")

        # Submit each job to the TileDB Cloud DAG for ingestion
        ingest_node = dag.submit(
            pmid_ingestion,
            job_df,
            full_bucket_path,
            bucket_region,
            i,
            access_credentials_name=access_credentials,
            resources={"cpu": f"{cpu}", "memory": f"{mem}Gi"}  # Set resources for the job
        )
        consolidate_node.depends_on(ingest_node)  # Make consolidate_node dependent on this ingestion job
        i += 1  # Increment the job ID

    delete_unwanted_node.depends_on(consolidate_node)  # Ensure delete_unwanted_node runs after consolidation
    dag.compute()  # Compute the DAG
    dag.wait()  # Wait for all DAG tasks to complete
    print(f"Finished processing all jobs, consolidating history, and cleaning up undesired S3 files at {full_bucket_path}")


In [None]:
#def initialize_step(access_credentials: str,repo_url: str, out_directory: str, bucket_path: str, in_file: str = "rag-article-list.txt", num_jobs: int = 4, bucket_region: str = "us-west-2", hash_check: bool = True)

pipeline_step(repo_url="https://github.com/TileDB-Inc/pmc-llm.git", out_directory="pmc/rag/ingestion", in_file="rag-article-list.txt", access_credentials="chase-bucket-access", bucket_path="s3://chase-cloud",hash_check=False,num_jobs=40)

# TileDB Ingestion Pipeline

In [20]:
# Optional: Download the consolidated file from S3 and write it to your local Git repository
# Set up TileDB context with S3 credentials (replace with your actual credentials)
ctx = tiledb.Ctx({
    "vfs.s3.region": "us-west-2",  # S3 region
    "vfs.s3.aws_access_key_id": "",  # AWS Access Key (add your key here)
    "vfs.s3.aws_secret_access_key": ""  # AWS Secret Access Key (add your key here)
})

# S3 path to the consolidated file
consolidated_file_path = "s3://chase-cloud/pmc/rag/ingestion/consolidated_file_history"

# Initialize TileDB VFS to interact with S3
vfs = tiledb.VFS(ctx=ctx)

# Step 1: Read the contents of the consolidated file from S3
file_contents = ""
with vfs.open(consolidated_file_path, 'rb') as f:  # Open the file in binary read mode
    file_contents = f.read()  # Read the entire file contents

# Step 2: Write the contents to the local file system in the current directory
# You may want to navigate to your Git repository directory before writing
# Uncomment and modify the following line if needed:
# os.chdir("pmc-llm")  # Change to your Git repository directory

# Create the local path for the consolidated file
path = os.path.join(os.getcwd(), "consolidated_file_history")  # Store in the current working directory

# Write the contents from S3 to the local file in binary write mode
with open(path, 'wb') as f:
    f.write(file_contents)  # Write the downloaded file contents to the local file

# Step 3: Assuming `add_and_commit_files` is a valid function, commit the changes to Git
# This function should add the file, commit it with a message, and push the changes
add_and_commit_files("updating the consolidated file history for tracking purposes")


Working direcotry is /home/jovyan/pmc-llm
[main ba25068] updating the hash and file to keep repo up to date
 1 file changed, 1668 insertions(+)
 create mode 100644 consolidated_file_history


To https://github.com/TileDB-Inc/pmc-llm.git
   679f72f..ba25068  main -> main
