In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Assuming your file is named 'phecode_definitions1.2.csv'
# and is located in 'My Drive/data' folder in your Google Drive.
file_path = '/content/drive/My Drive/STA496/Codes and Data/W17+18 files/phecode_definitions1.2.csv'

%cd /content/drive/My\ Drive

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive


In [None]:
%%writefile text_encode.py

import torch
import numpy as np
from transformers import AutoModel
from transformers import AutoTokenizer
from tqdm import tqdm

config_dict = {
    "coder": "GanjinZero/coder_eng",
    "coder_all": "GanjinZero/coder_all",
    "coder_old": "GanjinZero/UMLSBert_ENG",
    "coder_pp": "GanjinZero/coder_eng_pp",
    "sapbert": "cambridgeltl/SapBERT-from-PubMedBERT-fulltext",
    "pubmedbert": "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext",
    "biobert": "monologg/biobert_v1.1_pubmed",
    "bert": "bert-base-uncased"
}

class TextEncoder(object):
    def __init__(
            self,
            model_name,
            device,
            run_check_max_length = False
        ):
        self.device = device
        self.model_name = model_name
        self.config = config_dict[self.model_name]
        self.tokenizer = AutoTokenizer.from_pretrained(self.config)
        self.model = AutoModel.from_pretrained(self.config).to(self.device)
        self.run_check_max_length = run_check_max_length

        self.pred_batch_size = 64
        self.max_length = 32 #128

    def check_max_length(self, inputs):
        cnt = 0
        for each in inputs:
            ids = self.tokenizer.encode_plus(each, add_special_tokens=True, padding='do_not_pad')
            if len(ids) > self.max_length:
                cnt += 1
        all_cnt = len(inputs)
        print(f"Current max length is {self.max_length}.")
        print(f"{(1 - cnt / all_cnt)*100}% samples can fit in length {self.max_length}.")


    def get_embed(
            self,
            phrase_list,
            normalize=True,
            summary_method="CLS"
        ):
        if self.run_check_max_length:
            self.check_max_length(phrase_list)
        input_ids = []
        pbar = tqdm(total=len(phrase_list))
        pbar.set_description("Tokenizing phrases:")
        for phrase in phrase_list:
            input_ids.append(self.tokenizer.encode_plus(
                phrase, max_length=self.max_length, add_special_tokens=True,
                truncation=True, padding='max_length')['input_ids'])
            pbar.update(1)
        self.model.eval()

        count = len(input_ids)
        now_count = 0
        pbar = tqdm(total=count)
        pbar.set_description("Encoding:")
        with torch.no_grad():
            while now_count < count:
                input_gpu_0 = torch.LongTensor(input_ids[now_count:min(
                    now_count + self.pred_batch_size, count)]).to(self.device)
                if summary_method == "CLS":
                    embed = self.model(input_gpu_0)[1]
                if summary_method == "MEAN":
                    embed = torch.mean(self.model(input_gpu_0)[0], dim=1)
                if normalize:
                    embed_norm = torch.norm(
                        embed, p=2, dim=1, keepdim=True).clamp(min=1e-12)
                    embed = embed / embed_norm
                embed_np = embed.cpu().detach().numpy()
                if now_count == 0:
                    output = embed_np
                else:
                    output = np.concatenate((output, embed_np), axis=0)
                now_count = min(now_count + self.pred_batch_size, count)
                pbar.update(self.pred_batch_size)
        return output


if __name__ == "__main__":
    phrases = ["abs"]
    device = torch.device("cuda:0")
    encoder = TextEncoder("pubmedbert", device)
    print(encoder.get_embed(phrases).shape)

Writing text_encode.py


In [None]:
import pandas as pd
import os
import torch
from text_encode import TextEncoder, config_dict
from collections import defaultdict

In [None]:
# Step 1: Load the Phecode definitions CSV into a Pandas DataFrame
df = pd.read_csv(file_path)

# Step 2: Convert the DataFrame to a dictionary mapping 'phecode' to 'phenotype'
Phecode_dict = dict()
df = df.reset_index()  # Ensure indexes are properly aligned with rows
for index, row in df.iterrows():
    Phecode_dict[row['name']] = row['description']

# Step 3: Separate the dictionary keys (codes) and values (descriptions) into lists
code_name = []
code_desc = []
for k, v in Phecode_dict.items():
    code_name.append(k)
    # Check if the description is a string and not empty before appending
    if isinstance(v, str) and v:
        code_desc.append(v)
    else:
        # Handle cases where description is not a valid string
        code_desc.append("")  # Replace with empty string or a placeholder

# Configuration parameters for generating embeddings
configs = (
    "coder",
    "coder_pp",
    "sapbert",
    "pubmedbert",
    "biobert",
    "bert",
    "coder_all"
)

config = "coder"  # Select the embedding model configuration
method = "CLS"  # Specify the embedding method
output_data_dir = "./"  # Directory to save the output embeddings

print(f"Generating embeddings for {config} with {method} embedding...")

# Step 4: Set up embedding parameters
embed_params = {
    "normalize": True,  # Normalize embeddings
    "summary_method": method  # Summary method for embedding generation
}
output_file_name = f"{config}_{embed_params['summary_method']}"
output_file_path = os.path.join(output_data_dir, output_file_name)

# Step 5: Initialize the TextEncoder and generate embeddings
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # Use GPU if available
encoder = TextEncoder(config, device)
# Filter out non-string values from code_desc before passing to get_embed
embeddings = encoder.get_embed([desc for desc in code_desc if isinstance(desc, str)], **embed_params)

# Step 6: Save the embeddings to a file
with open(output_file_path, "w") as f:
    # Iterate using enumerate to get index and write code_name based on index
    for i, embedding in enumerate(embeddings):
        # Use i to access corresponding code_name
        line = [str(code_name[i])] + [str(each) for each in embedding]  # Combine code ID and embedding values
        line = ",".join(line) + "\n"  # Format as a comma-separated line
        f.write(line)

Generating embeddings for coder with CLS embedding...





  0%|          | 0/36537 [00:00<?, ?it/s][A[A[A


Tokenizing phrases::   0%|          | 0/36537 [00:00<?, ?it/s][A[A[A


Tokenizing phrases::   2%|▏         | 557/36537 [00:00<00:06, 5566.54it/s][A[A[A


Tokenizing phrases::   3%|▎         | 1114/36537 [00:00<00:06, 5349.00it/s][A[A[A


Tokenizing phrases::   5%|▍         | 1650/36537 [00:00<00:06, 5012.19it/s][A[A[A


Tokenizing phrases::   6%|▌         | 2189/36537 [00:00<00:06, 5092.84it/s][A[A[A


Tokenizing phrases::   7%|▋         | 2700/36537 [00:00<00:06, 5015.35it/s][A[A[A


Tokenizing phrases::   9%|▉         | 3320/36537 [00:00<00:06, 5399.79it/s][A[A[A


Tokenizing phrases::  11%|█         | 3958/36537 [00:00<00:05, 5710.40it/s][A[A[A


Tokenizing phrases::  12%|█▏        | 4536/36537 [00:00<00:05, 5730.80it/s][A[A[A


Tokenizing phrases::  14%|█▍        | 5159/36537 [00:00<00:05, 5882.16it/s][A[A[A


Tokenizing phrases::  16%|█▌        | 5749/36537 [00:01<00:05, 5879.17it/s][A[A[A


T

In [None]:
embeddings.shape # 768 dim embeddings

(36537, 768)

In [None]:
import numpy as np

# Assuming embeddings is a numpy array
file_path = '/content/PLMembeddings.csv'

# Save the embeddings array to CSV
np.savetxt(file_path, embeddings, delimiter=',')

from google.colab import files
files.download(file_path)  # Trigger the download


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>