In [None]:
!pip install transformers torch pinecone-client pandas scikit-learn

In [None]:
# Required imports
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pinecone
from collections import Counter
import re
import nltk
from nltk.corpus import stopwords
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# Download NLTK stopwords
nltk.download('stopwords')
STOPWORDS = set(stopwords.words('english'))

# Preprocessing functions
REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;]')
BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_]')

In [None]:
def clean_text(text):
    """Cleans the input text by removing special characters and stopwords."""
    if isinstance(text, str):  # Proceed only if the text is a string
        text = text.lower()  # Lowercase the text
        text = REPLACE_BY_SPACE_RE.sub(' ', text)  # Replace unwanted symbols with space
        text = BAD_SYMBOLS_RE.sub('', text)  # Remove unwanted symbols
    return text  # Return the cleaned text (or the original if not a string)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def plot_class(df):
    """Plots the count of each class in the dataset."""
    plt.figure(figsize=(8, 6))
    sns.countplot(data=df, x='Class', order=df['Class'].value_counts().index)
    plt.title('Count of Each Class')
    plt.xlabel('Class')
    plt.ylabel('Count')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

In [None]:
def load_data(data_file):
    """Loads and preprocesses the data based on the new CSV format, removing rows with null values."""
    df = pd.read_csv(data_file)
    specified_tags = ['Class A', 'Class B', 'Class C', 'Class D']
    
    # Remove rows with null values in 'Name of Device', 'Intended Use', or 'Class'
    df = df.dropna(subset=['Name of Device', 'Intended Use', 'Class'])
    
    # Filter the specified classes
    df = df[df['Class'].isin(specified_tags)]
    
    # Clean only the 'Intended Use' column
    df['Intended Use'] = df['Intended Use'].apply(clean_text)
    
    # Optional: plot class distribution
    plot_class(df)  # If you have a plotting function for visualization
    
    # Extract the 'Name of Device' and cleaned 'Intended Use'
    NameOfDevice = df['Name of Device'].tolist()
    IntendedUse = df['Intended Use'].tolist()
    
    # Encode labels
    label_encoder = LabelEncoder()
    labels = label_encoder.fit_transform(df['Class'])
    
    # Extract the index as a list
    index = df.index.tolist()
    
    return NameOfDevice, IntendedUse, labels, index

In [None]:
from pinecone import Pinecone

pc = Pinecone(api_key="fb2d9eb8-000e-4f67-9017-1b81a94359cc")
index_name="mdc"

In [None]:
# Load BERT model and tokenizer
model_name = "emilyalsentzer/Bio_ClinicalBERT"  # Use a BERT variant from Hugging Face
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

In [None]:
# Load and preprocess the dataset
data_file = "/kaggle/input/medical-data/mdc_data.csv" # Replace with the correct dataset path
NameOfDevice, IntendedUse, labels, index = load_data(data_file)

In [None]:

# Function to compute sentence embeddings
def get_embeddings(text,tokenizer,model):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True,max_length=768)
    outputs = model(**inputs)
    embeddings = torch.mean(outputs.last_hidden_state, dim=1).detach().numpy().squeeze()  # Mean pooling
    return embeddings

In [None]:
# Function to split the data into train and test sets (80-20 split)
def split_data(NameOfDevice, IntendedUse, labels, index, test_size=0.2):
    data = pd.DataFrame({
        'NameOfDevice': NameOfDevice,
        'IntendedUse': IntendedUse,
        'labels': labels,
        'index': index
    })
    
    train_data, test_data = train_test_split(data, test_size=test_size, random_state=42)
    
    return train_data, test_data

In [None]:
# Split the data into training and testing sets (80-20 split)
train_data, test_data = split_data(NameOfDevice, IntendedUse, labels, index)

In [None]:

# Store embeddings to Pinecone with progress tracking
def store_embeddings_to_pinecone(index_name, train_data, tokenizer, model):
    index = pc.Index(index_name)  # Initialize the Pinecone index
    
    # Using tqdm to show a progress bar while iterating through train_data
    for i, row in tqdm(train_data.iterrows(), total=len(train_data), desc="Storing embeddings"):
        text = row['NameOfDevice'] + ' ' + row['IntendedUse']
        embedding = get_embeddings(text, tokenizer, model)
        
        # Metadata containing the label, device name, and index
        metadata = {
            'label': row['labels'],
            'device_name': row['NameOfDevice'],
            'index': int(row['index'])  # Ensure index is an integer
        }
        
        # Upsert embeddings with metadata into the Pinecone index
        index.upsert(vectors=[(str(i), embedding, metadata)])

    print("All embeddings stored successfully.")

In [None]:
# Store embeddings and metadata to Pinecone
store_embeddings_to_pinecone(index_name, train_data, tokenizer, model)

In [None]:
# Function to predict the label using majority pooling
def predict_label(test_text, index, tokenizer, model, top_k=5):
    # Get the embedding for the input test_text
    embedding = get_embeddings(test_text, tokenizer, model)
    
    # Query Pinecone for the top K similar embeddings
    query_result = index.query(vector=embedding.tolist(), top_k=top_k, include_metadata=True)
    
    # Retrieve labels of nearest neighbors
    labels = [match['metadata']['label'] for match in query_result['matches']]
    
    # Perform majority pooling to get the most common label
    most_common_label = Counter(labels).most_common(1)[0][0]
    
    return most_common_label

# Test the model on the test dataset
def test_model(test_data, index, tokenizer, model, top_k=5):
    correct = 0
    test_texts = test_data['NameOfDevice'] + ' ' + test_data['IntendedUse']
    test_labels = test_data['labels']
    
    # Loop through the test dataset and make predictions
    for text, label in tqdm(zip(test_texts, test_labels), total=len(test_texts)):
        predicted_label = predict_label(text, index, tokenizer, model, top_k)
        if predicted_label == str(label):  # Convert label to string to ensure compatibility
            correct += 1

    # Calculate accuracy
    accuracy = correct / len(test_texts)
    print(f"Accuracy: {accuracy * 100:.2f}%")

In [None]:
index = pc.Index(index_name)
test_model(test_data, index, tokenizer, model)