<a href="https://colab.research.google.com/github/GaryM02/fyp_repo/blob/main/fyp_gather_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# mount drive
from google.colab import drive
import os

drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [None]:
os.chdir("/content/drive/MyDrive/Colab Notebooks/PredictiveAnalytics")

#Download compressed xml from https://ftp.ncbi.nlm.nih.gov

In [None]:
_URLs = [
    f"https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/pubmed24n{i:04d}.xml.gz"
    for i in range(1, 1220)
]

#Extract Data From XML files

In [None]:
import os
import requests
import concurrent.futures


def download_file(url, download_dir):
    """Helper function to download a single file without progress tracking."""
    filename = os.path.join(download_dir, url.split("/")[-1])

    # Skip download if file already exists
    if os.path.exists(filename):
        print(f"{filename} already downloaded, skipping...")
        return filename

    try:
        with requests.get(url, stream=True, timeout=60) as response:
            block_size = 4096  # Larger block size for faster writing

            with open(filename, "wb") as file:
                for data in response.iter_content(block_size):
                    file.write(data)
        return filename

    except requests.RequestException as e:
        print(f"Failed to download {url}: {e}")
        return None


def download_pubmed_files(urls, download_dir="default", max_workers=8):
    """Downloads PubMed files concurrently and saves only compressed files."""
    os.makedirs(download_dir, exist_ok=True)  # Ensure directory exists

    # Use ThreadPoolExecutor for parallel downloads
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit download tasks
        futures = {
            executor.submit(download_file, url, download_dir): url for url in urls
        }

        # Track and check completed downloads
        for future in concurrent.futures.as_completed(futures):
            url = futures[future]
            try:
                result = future.result()
            except Exception as e:
                print(f"Error downloading {url}: {e}")

In [None]:
download_pubmed_files(_URLs, "Data/pubmed/compressed")

In [None]:
import xml.etree.ElementTree as ET
import gzip
import json
from tqdm import tqdm
import concurrent.futures
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import torch
from transformers import AutoModel, AutoTokenizer
import gc

class DataProcessor:

    def __init__(self):
        pass

    def append_to_parquet(self, new_data, parquet_file):
        # Convert new data to Arrow Table
        table = pa.Table.from_pandas(new_data)

        # Check if file exists and has existing data
        try:
            existing_data = pq.read_table(parquet_file)
            with pq.ParquetWriter(parquet_file, table.schema) as writer:
                writer.write_table(existing_data)
                writer.write_table(table)
        except FileNotFoundError:
            # If file doesn't exist, create it and write new data
            pq.write_table(table, parquet_file)

    def process_files_and_save(self, file_paths, output_filename):
        articles = self.process_files(file_paths)
        flattened_articles = [article for sublist in articles for article in sublist]
        df = pd.DataFrame(flattened_articles)
        self.append_to_parquet(df, output_filename)
        print(f"Data saved to {output_filename}")

    def process_file(self, file_path):
        articles = self.extract_article_title_and_abstract(file_path)
        return articles

    def process_files(self, file_paths):
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = list(tqdm(executor.map(self.process_file, file_paths), total=len(file_paths)))
        return results

    def extract_article_title_and_abstract(self, filename):
        articles = []
        # Parse the XML content
        with gzip.open(filename) as f:
            try:
                tree = ET.parse(f)
                root = tree.getroot()
            except ET.ParseError:
                print(f"Error parsing {filename}. Skipping...")
                return []  # Return an empty list on parse error to avoid issues

            # Iterate over each PubmedArticle in the PubmedArticleSet
            for article in root.findall("PubmedArticle"):
                # Extract ArticleTitle
                title_elem = article.find(".//ArticleTitle")
                article_title = title_elem.text if title_elem is not None else ""

                # Extract AbstractText
                abstract_elem = article.find(".//AbstractText")
                abstract_text = abstract_elem.text if abstract_elem is not None else "Na"

                # Store the extracted title and abstract in a dictionary
                if abstract_text != "Na":  # Include only if abstract is available
                    article_data = {
                        "title": article_title,
                        "abstract": abstract_text,
                        "embedding": None  # Placeholder for embeddings
                    }
                    articles.append(article_data)

        return articles

    """
    Method to generate embeddings using transformers module

    Parameters:
    file_path: path to parquet file
    model_name: name of the model to be used for embedding
    batch_size: number of rows to process at a time

    Returns: List of embeddings
    """


    def get_embeddings(self, file_path, model_name, output_file, batch_size=5):
        # Initialize tokenizer and model only once
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModel.from_pretrained(model_name)
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.to(device)
        model.eval()

        # Initialize the output Parquet writer schema only once
        parquet_writer = None

        # Stream the Parquet file row by row in batches
        dataset = pq.ParquetFile(file_path)
        num_rows = dataset.metadata.num_rows
        for i in range(0, num_rows, batch_size):
            # Read batch, convert abstracts to list
            batch_df = dataset.read_row_group(i // batch_size).to_pandas()
            abstracts = batch_df["abstract"].fillna("").tolist()
            titles = batch_df["title"].tolist()

            if not abstracts:
                continue

            # Tokenize and get embeddings
            inputs = tokenizer(abstracts, padding=True, truncation=True, return_tensors="pt").to(device)

            with torch.no_grad():
                outputs = model(**inputs)
                embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()

            # Stream embeddings directly to Parquet
            batch_output_df = pd.DataFrame({
                "title": titles,
                "abstract": abstracts,
                "embedding": list(embeddings)
            })
            table = pa.Table.from_pandas(batch_output_df)

            # Write directly to Parquet to avoid holding all embeddings in memory
            if parquet_writer is None:  # Create writer with schema on the first batch
                parquet_writer = pq.ParquetWriter(output_file, table.schema)

            parquet_writer.write_table(table)

            # Print progress
            print(f"Processed {i + batch_size} / {num_rows} rows")

            # Explicitly clear memory
            del batch_df, abstracts, titles, inputs, outputs, embeddings, batch_output_df
            torch.cuda.empty_cache() if torch.cuda.is_available() else None
            gc.collect()

        # Close the writer after completing all batches
        if parquet_writer:
            parquet_writer.close()

        print(f"Embeddings saved to {output_file}")









In [None]:
file_paths = [f"Data/pubmed/compressed/pubmed24n{i:04d}.xml.gz" for i in range(1, 5)]

##Using our DataProcessor class we can extract our data
The data that is extracted will be saved in parquet format to reduce memory usage on disk.

In [None]:
# create parquet file in Data/pubmed/parquet
import os
import pandas as pd
from pathlib import Path

def create_directory(dir_path, filename=None):
    """
    Create a new directory if it doesn't already exist.
    """
    try:
        # Create directory if it doesn't exist
        Path(dir_path).mkdir(parents=True, exist_ok=True)
        print(f"Directory created at: {dir_path}")
    except Exception as e:
        print(f"Error creating directory: {e}")
        return None

In [None]:
create_directory("Data/pubmed/parquet")

Directory created at: Data/pubmed/parquet


In [None]:
# process files and store in one parquet file
processor = DataProcessor()
processor.process_files_and_save(file_paths, "Data/pubmed/parquet/pubmed.parquet")

100%|██████████| 4/4 [01:06<00:00, 16.65s/it]


Data saved to Data/pubmed/parquet/pubmed.parquet


##Let's examine the parquet file we created

In [None]:
df = pd.read_parquet("Data/pubmed/parquet/pubmed.parquet")
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71900 entries, 0 to 71899
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   title      71900 non-null  object
 1   abstract   71900 non-null  object
 2   embedding  0 non-null      object
dtypes: object(3)
memory usage: 1.6+ MB


##Now Lets update our file with embedding usings some BERT model

In [None]:
# Define paths and parameters
import pyarrow.parquet as pq

batch_dataframe = []

parquet_file = pq.ParquetFile('Data/pubmed/parquet/pubmed.parquet')

In [None]:
from transformers import AutoModel, AutoTokenizer
import torch
import gc
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")


In [None]:
import os
import gc
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModel
import concurrent.futures

def process_batch(batch, tokenizer, model_name):
    """Process a single batch: tokenize, generate embeddings, and prepare data for writing."""
    # Load model and tokenizer within the process to avoid GPU memory sharing issues
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name).to("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()

    batch_df = batch.to_pandas()
    abstracts = batch_df["abstract"].fillna("").tolist()

    # Skip empty abstracts
    if not abstracts:
        return None

    # Tokenize and compute embeddings
    inputs = tokenizer(abstracts, padding=True, truncation=True, return_tensors="pt").to("cuda" if torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        outputs = model(**inputs)
        embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()

    # Append embeddings to DataFrame
    batch_df["embedding"] = list(embeddings)
    new_data = batch_df[["title", "abstract", "embedding"]]

    # Convert to Arrow Table for writing
    table = pa.Table.from_pandas(new_data)

    # Free up memory
    del batch_df, abstracts, inputs, outputs, embeddings, new_data
    torch.cuda.empty_cache() if torch.cuda.is_available() else None
    gc.collect()

    return table

def append_to_parquet(input_file, output_file, model_name, batch_size=100):
    # Load schema from input file
    schema_ = pq.read_schema(input_file)
    schema_ = schema_.remove(2)  # Remove old 'embedding' column if it exists
    schema_ = schema_.append(pa.field("embedding", pa.list_(pa.float32())))  # Add new 'embedding' column

    # Initialize output file if it doesn't exist
    if not os.path.exists(output_file):
        pq.write_table(pq.read_table(input_file), output_file)

    # Initialize ParquetWriter with the schema
    with pq.ParquetWriter(output_file, schema=schema_, use_dictionary=True) as writer:
        # Load input file in batches
        dataset = pq.ParquetFile(input_file)

        # Use ProcessPoolExecutor for parallel processing
        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = []
            for i, batch in enumerate(dataset.iter_batches(batch_size=batch_size)):
                # Submit each batch to the executor for parallel processing
                futures.append(executor.submit(process_batch, batch, AutoTokenizer.from_pretrained(model_name), model_name))

            # Collect results as they complete
            for future in concurrent.futures.as_completed(futures):
                table = future.result()
                if table:
                    writer.write_table(table)

                # Optional: Track progress
                print(f"Processed batch {i * batch_size} / {dataset.metadata.num_rows} rows")

    print(f"Data appended successfully to {output_file}")


In [None]:
output_file = "Data/pubmed/parquet/pubmed_with_embeddings.parquet"

In [None]:
append_to_parquet("Data/pubmed/parquet/pubmed.parquet", "Data/pubmed/parquet/pubmed_with_embeddings.parquet", "sentence-transformers/all-MiniLM-L6-v2")

In [None]:
df = pd.read_parquet("Data/pubmed/parquet/pubmed_with_embeddings.parquet")
df.info()
print(df.head())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71900 entries, 0 to 71899
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   title      71900 non-null  object
 1   abstract   71900 non-null  object
 2   embedding  71900 non-null  object
dtypes: object(3)
memory usage: 1.6+ MB
                                               title  \
0  Effects of axotomy on the trans-synaptic regul...   
1  The effects of putative neurotransmitters on t...   
2  A quantitative comparison of the formation of ...   
3  Properties of soluble and particulate cysteine...   
4  [Tissue lactates, pH and blood gasses in hiber...   

                                            abstract  \
0  The effects of surgical transection of the pos...   
1  Cultures established from mechanically dissoci...   
2  The rat superior cervical sympathetic ganglion...   
3  Some properties of cysteine sulfinate decarbox...   
4  Periodic arousal in the garden dormou