<a href="https://colab.research.google.com/github/bibinlouis703/NLP_Python_Blockchain/blob/main/NER_QA_finale.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import spacy
import random
from spacy.training.example import Example
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
# Load your dataset
file_path = 'sample.xlsx'
data = pd.read_excel(file_path)

In [None]:
# Clean column names and extract relevant data
data.columns = data.columns.str.strip().str.replace('\xa0', ' ')
column_name = ["How do you think blockchain technology could improve the financial integrity of NGOs? (Select all that apply)",
               "How do you think NGOs can better educate the public about their Strategic Planning, Financial Data reporting and the technologies they use?"]

In [None]:
# List of entities and their labels to annotate
entities_to_annotate = {
    "Blockchain": "TECHNOLOGY",
    "transparency": "INTEGRITY",
    "NGOs": "ORGANIZATION",
    "smart contracts": "TECHNOLOGY",
    "financial transactions": "FINANCE",
    "strategic planning": "STRATEGY",
    "social media": "PLATFORM",
    "webinars": "EDUCATION",
    "workshops": "EDUCATION"
}

In [None]:
# Function to annotate a single sentence
def annotate_text(sentence):
    annotations = {"entities": []}
    for entity, label in entities_to_annotate.items():
        start = sentence.lower().find(entity.lower())  # Case-insensitive search
        if start != -1:
            end = start + len(entity)
            annotations["entities"].append((start, end, label))
    return annotations

In [None]:
import spacy
import random
from spacy.training.example import Example
import logging
import matplotlib.pyplot as plt
# Annotate each row in the column
TRAINING_DATA = []
for column in column_name:
    for sentence in data[column].dropna():
        annotations = annotate_text(sentence)
        if annotations["entities"]:  # Only keep rows with annotations
            TRAINING_DATA.append((sentence, annotations))

# Split the data into 80% training and 20% testing
random.shuffle(TRAINING_DATA)
split_point = int(len(TRAINING_DATA) * 0.8)
train_data = TRAINING_DATA[:split_point]
test_data = TRAINING_DATA[split_point:]

# Load spaCy model and exclude problematic components
nlp = spacy.load("en_core_web_sm", exclude=["norm", "lexeme_norm"])

# Add NER to the pipeline if it's not already there
if "ner" not in nlp.pipe_names:
    ner = nlp.add_pipe("ner", last=True)
else:
    ner = nlp.get_pipe("ner")

# Add labels for NER training
for _, annotations in train_data:
    for ent in annotations.get("entities"):
        ner.add_label(ent[2])

# Create optimizer instead of initializing the pipeline (to avoid lookup issues)
optimizer = nlp.create_optimizer()

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize lists to store losses per iteration
loss_per_iteration = []

# Initialize lists to store accuracy metrics per iteration
exact_match_accuracy_per_iteration = []
partial_match_accuracy_per_iteration = []

# Helper function to check for exact match
def is_exact_match(pred_ent, true_ents):
    for true_ent in true_ents:
        if (pred_ent[0] == true_ent[0]) and (pred_ent[1] == true_ent[1]) and (pred_ent[2] == true_ent[2]):
            return True
    return False

# Helper function to check for partial match
def is_partial_match(pred_ent, true_ents):
    for true_ent in true_ents:
        if (pred_ent[2] == true_ent[2]) and (max(pred_ent[0], true_ent[0]) < min(pred_ent[1], true_ent[1])):  # Overlapping spans
            return True
    return False

In [None]:
# Training loop
for itn in range(10):  # Decreased iterations for quicker testing
    random.shuffle(train_data)
    losses = {}
    total_entities = 0
    exact_match_count = 0
    partial_match_count = 0

    batches = spacy.util.minibatch(train_data, size=4)  # Batch size set to 4

    for batch in batches:
        examples = []
        for text, annotations in batch:
            doc = nlp.make_doc(text)
            example = Example.from_dict(doc, annotations)
            examples.append(example)

        # Update the model
        nlp.update(examples, drop=0.1, losses=losses, sgd=optimizer)  # Lower dropout to 0.1
    # Evaluate after each iteration on training data (can also use a separate test set)
    for text, annotations in train_data:
        doc = nlp(text)
        pred_ents = [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]
        true_ents = annotations["entities"]

        total_entities += len(true_ents)  # Count all true entities

        # Print predictions for debugging
        print(f"Text: {text}")
        print(f"True entities: {true_ents}")
        print(f"Predicted entities: {pred_ents}")
        print()

        # Exact match calculation
        exact_match_count += len([pred_ent for pred_ent in pred_ents if is_exact_match(pred_ent, true_ents)])

        # Partial match calculation
        partial_match_count += len([pred_ent for pred_ent in pred_ents if is_partial_match(pred_ent, true_ents)])

    # Calculate exact match accuracy and partial match accuracy for this iteration
    exact_match_accuracy = exact_match_count / total_entities if total_entities > 0 else 0
    partial_match_accuracy = partial_match_count / total_entities if total_entities > 0 else 0

    exact_match_accuracy_per_iteration.append(exact_match_accuracy)
    partial_match_accuracy_per_iteration.append(partial_match_accuracy)
    # Capture the NER loss after each iteration
    loss_per_iteration.append(losses.get("ner", 0))
    print(f"Iteration {itn}, Losses: {losses}, Exact Match Accuracy: {exact_match_accuracy:.4f}, Partial Match Accuracy: {partial_match_accuracy:.4f}")
    # Log the accuracy
    logger.info(f"Iteration {itn+1}: Loss = {losses['ner']:.4f}, Exact Match Accuracy = {exact_match_accuracy:.4f}, Partial Match Accuracy = {partial_match_accuracy:.4f}")

# Plotting the Exact and Partial Match Accuracy over Iterations
plt.figure(figsize=(8, 6))
plt.plot(exact_match_accuracy_per_iteration, marker='o', label='Exact Match Accuracy', color='b')
plt.plot(partial_match_accuracy_per_iteration, marker='o', label='Partial Match Accuracy', color='r')
plt.title('Exact and Partial Match Accuracy over Iterations')
plt.xlabel('Iteration')
plt.ylabel('Accuracy')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Plotting the Loss vs Iteration graph
plt.figure(figsize=(8, 6))
plt.plot(loss_per_iteration, marker='o', color='g', label='NER Loss')
plt.title('Loss vs Iteration')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Step 4: Test the Model
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
# Test on the 20% test set
true_labels = []
predicted_labels = []
# Track accuracy over iterations
accuracy_scores = []
iterations = []

for text, annotations in test_data:
    doc = nlp(text)
    print(f"\nText: {text}")
    for ent in doc.ents:
        print(f"Entity: {ent.text}, Label: {ent.label_}")

    # Collect true labels
    true_entities = [ent[2] for ent in annotations["entities"]]
    predicted_entities = [ent.label_ for ent in doc.ents]

    # Padding for mismatched lengths
    max_len = max(len(true_entities), len(predicted_entities))

    true_entities += ["O"] * (max_len - len(true_entities))  # Pad true labels
    predicted_entities += ["O"] * (max_len - len(predicted_entities))  # Pad predicted labels

    # Append to final lists
    true_labels.extend(true_entities)
    predicted_labels.extend(predicted_entities)
        # Compute accuracy and store it
    accuracy = accuracy_score(true_labels, predicted_labels)
    accuracy_scores.append(accuracy)
    iterations.append(itn)


In [None]:
# Step 5: Evaluate the Model
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Compute accuracy
accuracy = accuracy_score(true_labels, predicted_labels)
print(f"Accuracy: {accuracy}")

# Classification report
print(classification_report(true_labels, predicted_labels))

# Confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print(f"Confusion Matrix:\n{conf_matrix}")

In [None]:
def plot_training_accuracy(trainer):
    epochs = []
    accuracy = []

    # Extract accuracy for each epoch
    for log in trainer.state.log_history:
        if "eval_accuracy" in log:
            epochs.append(log['epoch'])
            accuracy.append(log['eval_accuracy'])

    # Plot accuracy over epochs
    plt.figure(figsize=(8, 6))
    plt.plot(epochs, accuracy, marker='o', label='Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training Accuracy over Epochs')
    plt.grid(True)
    plt.legend()
    plt.show()

In [None]:
# Plot graphs
import matplotlib.pyplot as plt
# Step 6: Evaluate the Model

# Compute accuracy
accuracy = accuracy_score(true_labels, predicted_labels)
print(f"Final Accuracy: {accuracy}")

# Classification report
print(classification_report(true_labels, predicted_labels))

# Confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print(f"Confusion Matrix:\n{conf_matrix}")

# Visualize the confusion matrix using a heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=sorted(set(true_labels)),
            yticklabels=sorted(set(true_labels)))
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()

In [None]:
!python -m spacy download en_core_web_md

In [None]:
!python -m spacy validate

In [None]:
import spacy
from sklearn.metrics.pairwise import cosine_similarity

# Load the spacy model for embeddings
nlp = spacy.load("en_core_web_md")  # Using medium model for better embeddings
# Define a function to calculate cosine similarity between question and sentence embeddings
def calculate_similarity(question_embedding, sentence_embedding):
    return cosine_similarity([question_embedding], [sentence_embedding])[0][0]

# Function to find the best answer for a question based on NER-annotated sentences
def get_best_answer(question, annotated_sentences):
    question_embedding = nlp(question).vector  # Get embedding for the question
    best_answer = None
    best_score = -1  # Initialize with a very low score

    # Loop through all sentences and their annotations
    for sentence, annotations in annotated_sentences:
        sentence_embedding = nlp(sentence).vector  # Get embedding for the sentence

        # Compute similarity
        similarity_score = calculate_similarity(question_embedding, sentence_embedding)

        # If the similarity score is higher, update the best answer
        if similarity_score > best_score:
            best_score = similarity_score
            best_answer = sentence

    return best_answer

# Example of using NER-annotated sentences (TRAINING_DATA is already created in previous steps)
def answer_questions(questions, annotated_sentences):
    answers = {}
    for question in questions:
        answer = get_best_answer(question, annotated_sentences)
        answers[question] = answer
    return answers

In [None]:
# Use the annotated sentences (TRAINING_DATA already contains NER annotations)
answers = answer_questions(column_name, TRAINING_DATA)

# Print the answers
for question, answer in answers.items():
    print(f"Question: {question}")
    print(f"Answer: {answer}\n")

In [None]:
from spacy import displacy
from datetime import datetime
import hashlib
import networkx as nx

# Example blockchain block structure
class QA_Block:
    def __init__(self, block_id, question, answer, entities, prev_hash=""):
        self.block_id = block_id
        self.question = question
        self.answer = answer
        self.entities = entities
        self.timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.prev_hash = prev_hash
        self.block_hash = self.compute_hash()

    def compute_hash(self):
        block_string = f"{self.block_id}{self.question}{self.answer}{self.entities}{self.timestamp}{self.prev_hash}"
        return hashlib.sha256(block_string.encode()).hexdigest()

# Build the blockchain structure based on the QA results
blocks = []
prev_hash = ""

for idx, (question, answer) in enumerate(answers.items()):
    # Find the entities in the answer (from NER results)
    doc = nlp(answer)
    entities = set(ent.label_ for ent in doc.ents)  # Collect unique entity labels
    block = QA_Block(block_id=idx+1, question=question, answer=answer, entities=entities, prev_hash=prev_hash)
    blocks.append(block)
    prev_hash = block.block_hash  # Update the previous hash

# Create a knowledge graph using networkx
G = nx.Graph()

# Add nodes for each QA block and include its entities in the label
for block in blocks:
    G.add_node(block.block_id, label=f"Block {block.block_id}\n{block.entities}")

# Connect nodes that share entities
for i, block1 in enumerate(blocks):
    for j, block2 in enumerate(blocks):
        if i != j and block1.entities & block2.entities:  # If they share at least one entity
            G.add_edge(block1.block_id, block2.block_id)

# Plot the graph
plt.figure(figsize=(10, 8))
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, node_size=3000, node_color='lightblue', font_size=10, font_weight='bold', labels={node: G.nodes[node]['label'] for node in G.nodes()})
plt.title("QA Blockchain Knowledge Graph")
plt.show()

# Print the blockchain structure
for block in blocks:
    print(f"Block ID: {block.block_id}")
    print(f"Question: {block.question}")
    print(f"Answer: {block.answer}")
    print(f"Entities: {block.entities}")
    print(f"Timestamp: {block.timestamp}")
    print(f"Previous Hash: {block.prev_hash}")
    print(f"Current Block Hash: {block.block_hash}")
    print("-" * 60)