In [1]:
import numpy as np
import os
import pickle
import copy
import ollama


import requests
import xml.etree.ElementTree as ET
import ollama
from sklearn.model_selection import train_test_split

In [2]:
class Layer_Dense:
    def __init__(self, n_inputs, n_neurons, weight_regularizer_l1=0, weight_regularizer_l2=0, bias_regularizer_l1=0, bias_regularizer_l2=0):
        #intialize weights and biases
        self.weights = .01*np.random.randn(n_inputs,n_neurons)
        self.biases = np.zeros((1, n_neurons))
        #set regularization strength
        self.weight_regularizer_l1 = weight_regularizer_l1
        self.weight_regularizer_l2 = weight_regularizer_l2
        self.bias_regularizer_l1 = bias_regularizer_l1
        self.bias_regularizer_l2 = bias_regularizer_l2
    
    def forward(self, inputs, training):
        self.inputs = inputs
        self.output = np.dot(inputs, self.weights) + self.biases
        return self.output


    def backward(self, dvalues):
        self.dweights = np.dot(self.inputs.T, dvalues)
        self.dbiases = np.sum(dvalues, axis=0, keepdims=True)

        if self.weight_regularizer_l1>0:
            dl1 = np.ones_like(self.weights)
            dl1[self.weights < 0 ] = -1
            self.dweights += self.weights_regularizer_l1*dl1

        if self.weight_regularizer_l2>0:
            self.dweights += 2*self.weight_regularizer_l2*self.weights

        if self.bias_regularizer_l1>0:
            dl1 = np.ones_like(self.biases)
            dl1[self.biases<0] = -1
            self.dbiases += self.bias_regularizer_l1*dl1

        if self.bias_regularizer_l2>0:
            self.dbiases += 2*self.bias_regularizer_l2*self.biases

        self.dinputs = np.dot(dvalues, self.weights.T)

    def get_parameters(self):
        return self.weights, self.biases


    def set_parameters(self,weights,biases):
        self.weights = weights
        self.biases = biases

class Layer_dropout:
    def __init__ (self, rate):
        self.rate = 1-rate


    def forward(self, inputs, training):
        self.input = inputs
        
        if not training:
            self.output = inputs.copy()
            return
        else:
            self.binary_mask = np.random.binomial(1, self.rate, size=inputs.shape)/self.rate
            self.output = inputs*self.binary_mask
        return self.output


    def backward(self,dvalues):
        self.dinputs = dvalues*self.binary_mask

class Layer_Input:
    def forward(self, inputs, training):
        self.output = inputs
        return self.output



# ACTIVATION FUNCTIONS

class Activation_ReLU:
    def forward(self, inputs, training):
        self.inputs = inputs
        self.output = np.maximum(0,inputs)
        return self.output

    def backward(self, dvalues):
        self.dinputs = dvalues.copy()
        self.dinputs[self.inputs <= 0] = 0

    def predictions(self, outputs):
        return outputs

class Activation_Sigmoid:
    def forward(self, inputs, training):
        self.inputs = inputs
        self.output = 1 / (1 + np.exp(-inputs))
        return self.output

    def backward(self, dvalues):
        if self.output is None:
            raise ValueError("Sigmoid backward: self.output is None; forward pass may not have been called correctly")
        self.dinputs = dvalues * (1 - self.output) * self.output
        return self.dinputs

    def predictions(self, outputs):
        return (outputs > 0.5) * 1
#OPTIMIZER
             
class Optimizer_SGD:
    def __init__(self, learning_rate = 1., decay = 0., momentum = 0.):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.momentum = momentum
        
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate*(1./(1.+self.decay*self.iterations))
    
    def update_params(self, layer):

        if self.momentum:
            if not hasattr(layer, 'weight_momentums'):
                layer.weight_momentums = np.zeros_like(layer.weights)
                layer.bias_momentums = np.zeros_like(layer.biases)
            weight_updates = self.momentum*layer.weight_momentums - self.current_learning_rate*layer.dweights
            layer.weights_momentums = weight_updates

            bias_updates = self.momentum*layer.bias_momentums - self.current_learning_rate*layer.dbiases
            layer.bias_momentums = bias_updates

        else:
            weight_updates = -self.current_learning_rate*layer.dweights
            bias_updates = -self.current_learning_rate*layer.dbiases

        layer.weights += weight_updates
        layer.biases += bias_updates 
    
    
    def post_update_params(self):
        self.iterations += 1


#LOSS

class Loss:
    def regularization_loss(self):
        regularization_loss = 0

        for layer in self.trainable_layers:

            if layer.weight_regularizer_l1 > 0:
                regularization_loss += layer.weight_regularizer_l1 * np.sum(np.abs(layer.weights))
            if layer.weight_regularizer_l2 > 0:
                regularization_loss += layer.weight_regularizer_l2 * np.sum(layer.weights*layer.weights)

            if layer.bias_regularizer_l1 > 0:
                regularization_loss += layer.bias_regularizer_l1 * np.sum(np.abs(layer.bias))
            if layer.bias_regularizer_l2 > 0:
                regularization_loss += layer.bias_regularizer_l2 * np.sum(layer.biases*layer.biases)
    
        return regularization_loss

    def remember_trainable_layers(self, trainable_layers):
        self.trainable_layers  = trainable_layers

    def calculate(self, output, y, *, include_regularization=False):
        sample_losses = self.forward(output, y)
        data_loss = np.mean(sample_losses)

        self.accumulated_sum += np.sum(sample_losses)
        self.accumulated_count += len(sample_losses)

        if not include_regularization:
            return data_loss
        return data_loss, self.regularization_loss()

    def calculate_accumulated(self, *, include_regularization=False):
        data_loss = self.accumulated_sum / self.accumulated_count
        if not include_regularization:
            return data_loss
        return data_loss, self.regularization_loss()

    def new_pass(self):
        self.accumulated_sum = 0
        self.accumulated_count = 0

class Loss_BinaryCrossEntropy(Loss):
    def forward(self, y_pred, y_true):
        y_pred_clipped = np.clip(y_pred, 1e-7, 1-1e-7)

        sample_losses = -(y_true*np.log(y_pred_clipped)  +  (1-y_true)*np.log(1-y_pred_clipped))

        sample_losses = np.mean(sample_losses, axis=-1)
        
        return sample_losses

    def backward(self, dvalues, y_true):
        samples = len(dvalues)
        outputs = len(dvalues[0])

        clipped_dvalues = np.clip(dvalues, 1e-7, 1-1e-7)

        self.dinputs = -(y_true/clipped_dvalues - (1-y_true)/(1-clipped_dvalues))  / outputs
        self.dinputs = self.dinputs / samples
        


#ACCURACY


class Accuracy:
    def calculate(self, predictions, y):
        comparisons = self.compare(predictions, y)
        accuracy = np.mean(comparisons)
        self.accumulated_sum += np.sum(comparisons)
        self.accumulated_count += len(comparisons)
        return accuracy

    def calculate_accumulated(self):
        if self.accumulated_count == 0:
            return 0  # Avoid division by zero
        accuracy = self.accumulated_sum / self.accumulated_count
        return accuracy

    def new_pass(self):
        self.accumulated_sum = 0
        self.accumulated_count = 0

class Accuracy_Binary(Accuracy):
    def __init__(self):
        self.new_pass()  # Initialize counters at creation
    
    def compare(self, predictions, y):
        # Threshold sigmoid outputs at 0.5
        binary_predictions = predictions > 0.5
        # Ensure y is flat and binary (not one-hot or 2D with single column)
        if len(y.shape) == 2 and y.shape[1] == 1:
            y = y.flatten()
        return binary_predictions == y



In [3]:
#MODEL OBJECT

class funny_guy:
    def __init__ (self):
        self.layers = []

    def add(self, layer):
        self.layers.append(layer)
    

    def set(self, *, loss, optimizer, accuracy):
        if loss is not None:
            self.loss = loss
        if optimizer is not None:
            self.optimizer = optimizer
        if accuracy is not None:
            self.accuracy = accuracy


    def finalize(self):
        self.input_layer = Layer_Input()
        layer_count = len(self.layers)

        self.trainable_layers = []


        for i in range(layer_count):
            if i == 0:
                self.layers[i].prev = self.input_layer
                self.layers[i].next = self.layers[i+1]

            elif i<layer_count-1:
                self.layers[i].prev = self.layers[i-1]
                self.layers[i].next = self.layers[i+1]

            else:
               self.layers[i].prev = self.layers[i-1] 
               self.layers[i].next = self.loss
               self.output_layer_activation = self.layers[i]
    
            if hasattr(self.layers[i], 'weights'):
                self.trainable_layers.append(self.layers[i])

        if self.loss is not None:
            self.loss.remember_trainable_layers(self.trainable_layers)

    


    def get_parameters(self):
        parameters=[]
        for layer in self.trainable_layers:
            parameters.append(layer.get_parameters())
            return parameters



    def set_parameters(self, parameters):
        for parameter_set, layer in zip(parameters, self.trainable_layers):
            layer.set_parameters(*parameter_set)


    def evaluate(self, X_val, y_val, *, batch_size=None):
        validation_steps = 1
        
        if batch_size is not None:
            validation_steps = len(X_val)//batch_size

            if validation_steps*batch_size<len(X_val):
                validation_steps+=1
        self.loss.new_pass()
        self.accuracy.new_pass()


        for step in range(validation_steps):
            if batch_size is None:
                batch_X = X_val
                batch_y = y_val

            else:
                batch_X = X_val[step*batch_size:(step+1)*batch_size]
                batch_y = y_val[step*batch_size:(step+1)*batch_size]

            output = self.forward(batch_X, training=False)
            self.loss.calculate(output, batch_y)
            predictions = self.output_layer_activation.predictions(output)
            self.accuracy.calculate(predictions, batch_y)

        validation_loss = self.loss.calculate_accumulated()
        validation_accuracy = self.accuracy.calculate_accumulated()

        print(f'validation, ' + f'acc:{validation_accuracy:.3f} ' + f'loss:{validation_loss:.3f}')   
   
    
    def forward(self, X, training):
        self.input_layer.forward(X, training)
        output = self.input_layer.output  # Start with input layer's output
        for layer in self.layers:
            output = layer.forward(output, training)
        return output

    def backward(self, output, y):
        dvalues = self.loss.backward(output, y)
        for layer in reversed(self.layers):
            dvalues = layer.backward(dvalues)

    def train(self, X, y, *, epochs=1, batch_size=None, print_every=1, validation_data=None):
        train_steps = 1
        if batch_size is not None:
            train_steps = len(X) // batch_size
            if train_steps * batch_size < len(X):
                train_steps += 1

        for epoch in range(1, epochs + 1):
            print(f'epoch: {epoch}')
            self.loss.new_pass()
            self.accuracy.new_pass()

            for step in range(train_steps):
                batch_X = X if batch_size is None else X[step * batch_size:(step + 1) * batch_size]
                batch_y = y if batch_size is None else y[step * batch_size:(step + 1) * batch_size]

                output = self.forward(batch_X, training=True)
                data_loss, regularization_loss = self.loss.calculate(output, batch_y, include_regularization=True)
                loss = data_loss + regularization_loss
                predictions = self.output_layer_activation.predictions(output)
                accuracy = self.accuracy.calculate(predictions, batch_y)

                self.backward(output, batch_y)
                self.optimizer.pre_update_params()
                for layer in self.trainable_layers:
                    self.optimizer.update_params(layer)
                self.optimizer.post_update_params()

                if not step % print_every or step == train_steps - 1:
                    print(f'step: {step}, acc: {accuracy:.3f}, loss: {loss:.3f}, '
                          f'(data_loss: {data_loss}, reg_loss: {regularization_loss:.3f}), '
                          f'lr: {self.optimizer.current_learning_rate}')

    def predict(self, X, *, batch_size=None):
        prediction_steps =1
        if batch_size is not None:
            prediction_steps = len(X) // batch_size
            if prediction_steps*batch_size<len(X):
                prediction_steps+=1
    
        output = []
        for step in range(prediction_steps):
            if batch_size is None:
                batch_X = X
            else:
                batch_X = X[step*batch_size:(step+1)*batch_size]
            batch_output = self.forward(batch_X, training=False)
            batch_predictions = self.output_layer_activation.predictions(batch_output)
            output.append(batch_output)

        return np.vstack(output)





    def save_parameters(self, path):
        with open(path, 'wb') as f: #write binary
            pickle.dump(self.get_parameters(),f)


    def load_parameters(self, path):
        with open(path, 'rb') as f: #read binary
            self.set_parameters(pickle.load(f))




    def save(self, path):
        model = copy.deepcopy(self)

        model.loss.new_pass()
        model.accuracy.new_pass()

        model.input_layer.__dict__.pop('output', None)
        model.loss.__dict__.pop('dinputs', None)

        for layer in model.layers:
            for property in ['inputs', 'output', 'dinputs', 'dweights', 'dbiases']:
                layer.__dict__.pop(property, None)

        with open(path, 'wb') as f:
            pickle.dump(model, f)

    @staticmethod
    def load(path):
        with open(path, 'rb') as f:
            model = pickle.load(f)
            return model



# Generating the Dataset from Pubmed

def fetch_pubmed_ids(query, max_results=20):
    base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
    search_url = f"{base_url}esearch.fcgi?db=pubmed&term={query}&retmax={max_results}&retmode=xml"
    response = requests.get(search_url)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch IDs: {response.status_code}")
    root = ET.fromstring(response.content)
    return [id_elem.text for id_elem in root.findall(".//Id")]

def fetch_pubmed_details(pmids):
    base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
    fetch_url = f"{base_url}efetch.fcgi?db=pubmed&id={','.join(pmids)}&retmode=xml"
    response = requests.get(fetch_url)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch details: {response.status_code}")
    fetch_root = ET.fromstring(response.content)
    articles = {}
    for article in fetch_root.findall(".//PubmedArticle"):
        pmid = article.findtext(".//PMID")
        title = article.findtext(".//ArticleTitle") or "No title"
        abstract = article.findtext(".//AbstractText") or ""
        articles[pmid] = {'title': title, 'abstract': abstract}
    return articles

def get_ollama_embedding(text, model='nomic-embed-text'):
    response = ollama.embeddings(model=model, prompt=text)
    return np.array(response['embedding'])

def process_articles(pmids, articles_dict):
    X, y, texts = [], [], []
    for pmid in pmids:
        article = articles_dict[pmid]
        text = f"{article['title']} {article['abstract']}"
        embedding = get_ollama_embedding(text)
        X.append(embedding)
        texts.append(text)
    X = np.array(X)
    print("Label the following articles (1 = useful, 0 = not useful):")
    # How many characters do are needed in the snippet
    y = [int(input(f"PMID {pmid}: {texts[i][:250]}...\nLabel (1/0): ")) for i, pmid in enumerate(pmids)]   
    return X, np.array(y)

def create_article_data(query, max_results=20):
    all_pmids = fetch_pubmed_ids(query, max_results)
    train_pmids, test_pmids = train_test_split(all_pmids, test_size=0.2, random_state=42)
    train_articles = fetch_pubmed_details(train_pmids)
    test_articles = fetch_pubmed_details(test_pmids)
    X_train, y_train = process_articles(train_pmids, train_articles)
    X_test, y_test = process_articles(test_pmids, test_articles)
    assert not set(train_pmids) & set(test_pmids), "Overlap detected!"
    return X_train, y_train, X_test, y_test





In [5]:
# Usage
if __name__ == "__main__":
    
    query = "heart disease and diabetes"  # Query topic
    X_train, y_train, X_test, y_test = create_article_data(query, max_results=20)

    model = funny_guy()
    model.add(Layer_Dense(768, 64))
    model.add(Activation_ReLU())
    model.add(Layer_Dense(64, 1))
    model.add(Activation_Sigmoid())
    model.set(
        loss=Loss_BinaryCrossEntropy(),
        optimizer=Optimizer_SGD(learning_rate=0.1, decay=1e-3),
        accuracy=Accuracy_Binary()
    )
    model.finalize()

    # Train with validation
    model.train(X_train, y_train, epochs=10, batch_size=4, print_every=2, validation_data=(X_test, y_test))

    # Predict
    predictions = model.predict(X_test[:5])
    print("Predictions:", predictions.flatten())

Label the following articles (1 = useful, 0 = not useful):
Label the following articles (1 = useful, 0 = not useful):
epoch: 1


ValueError: shapes (4,4) and (1,64) not aligned: 4 (dim 1) != 1 (dim 0)