### Let's try to extract the nodes first

In [1]:
import cv2
import numpy as np
import pandas as pd
from glob import glob
import os
import easyocr
import pytesseract
import torch
import torch.nn as nn
import torch.optim as optim
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import StratifiedKFold

In [2]:
def detect_nodes(image):
    """
    Detect circular nodes in the graph
    Returns node positions and radii
    """
    # Convert to grayscale if necessary
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray = image

    # Apply Gaussian blur to reduce noise
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)

    # Use Hough Circle Transform to detect circles
    circles = cv2.HoughCircles(
        blurred,
        cv2.HOUGH_GRADIENT,
        dp=1,
        minDist=10,
        param1=50,
        param2=30,
        minRadius=15,
        maxRadius=40
    )

    # If no circles are detected, try alternative approach with blob detection
    if circles is None:
        # Set up SimpleBlobDetector parameters
        params = cv2.SimpleBlobDetector_Params()
        params.filterByCircularity = True
        params.minCircularity = 0.8
        params.filterByConvexity = True
        params.minConvexity = 0.9
        params.filterByInertia = True
        params.minInertiaRatio = 0.5

        detector = cv2.SimpleBlobDetector_create(params)
        keypoints = detector.detect(blurred)

        # Convert keypoints to circles format
        circles = np.array([[[kp.pt[0], kp.pt[1], kp.size / 2]] for kp in keypoints])

    return circles[0] if circles is not None else np.array([])


In [3]:
def draw_detected_circles(image, circles):
    """
    Draws detected circles on the image.

    Parameters:
        image (np.array): Original image.
        circles (np.array): Nx3 array of circles [x, y, radius].

    Returns:
        image_with_circles (np.array): Image with circles drawn.
    """
    # Make a copy to draw on
    output = image.copy()

    # Ensure circles is in expected shape
    if circles is not None and len(circles) > 0:
        for circle in circles:
            x, y, r = int(circle[0]), int(circle[1]), int(circle[2])
            # Draw the circle outline
            cv2.circle(output, (x, y), r, (0, 255, 0), 2)
            # Draw the center of the circle
            cv2.circle(output, (x, y), 2, (0, 0, 255), 3)
    else:
        print("No circles to draw.")
    return output


In [6]:
# Detect circles
image = cv2.imread("graphs_images/1.png")
circles = detect_nodes(image)
mean_radius = np.mean(circles[:,2])
scale = 24.1/mean_radius
width = int(image.shape[1] * scale)
height = int(image.shape[0] * scale)
scaled_image = cv2.resize(image, (width, height))
circles = detect_nodes(scaled_image)

image_with_circles = draw_detected_circles(scaled_image, circles)
# Show image (for debugging)
cv2.imshow("Detected Circles", image_with_circles)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [39]:
np.mean(circles[:,2])

np.float32(24.249998)

### I shall build a number classifier for the nodes.  For this I will have to build a dataset and train a model on it

#### The following code crops the images of nodes and saves them as separate images

In [7]:
input_dir = "./graphs_images"
output_dir = "./images"
csv_path = "node.csv"

os.makedirs(output_dir, exist_ok=True)
image_files = glob(os.path.join(input_dir, "*.png"))
records = []
node_counter = 0

for image_path in image_files:
    image = cv2.imread(image_path)
    if image is None:
        print(f"Failed to read {image_path}")
        continue

    initial_circles = detect_nodes(image)
    if len(initial_circles) == 0:
        print(f"No nodes detected in {image_path}")
        continue

    mean_radius = np.mean(initial_circles[:, 2])
    scale = 24.1 / mean_radius
    width = int(image.shape[1] * scale)
    height = int(image.shape[0] * scale)
    scaled_image = cv2.resize(image, (width, height))
    circles = detect_nodes(scaled_image)

    for circle in circles:
        x, y, r = int(circle[0]), int(circle[1]), int(circle[2])
        x1 = max(0, x - r)
        y1 = max(0, y - r)
        x2 = min(scaled_image.shape[1], x + r)
        y2 = min(scaled_image.shape[0], y + r)

        cropped = scaled_image[y1:y2, x1:x2]
        filename = f"node_{node_counter:05d}.png"
        cv2.imwrite(os.path.join(output_dir, filename), cropped)
        records.append({"filename": filename, "key": ""})
        node_counter += 1

    print(f"Processed {os.path.basename(image_path)} -> {len(circles)} nodes")

df = pd.DataFrame(records)
df.to_csv(csv_path, index=False)

print(f"\nDone! Saved {node_counter} node images and '{csv_path}'")


Processed 329.png -> 3 nodes
Processed 4.png -> 6 nodes
Processed 84.png -> 5 nodes
Processed 390.png -> 3 nodes
Processed 269.png -> 5 nodes
Processed 292.png -> 3 nodes
Processed 246.png -> 6 nodes
Processed 25.png -> 5 nodes
Processed 377.png -> 5 nodes
Processed 132.png -> 6 nodes
Processed 196.png -> 6 nodes
Processed 330.png -> 5 nodes
Processed 391.png -> 6 nodes
Processed 171.png -> 6 nodes
Processed 75.png -> 5 nodes
Processed 68.png -> 6 nodes
Processed 412.png -> 6 nodes
Processed 371.png -> 5 nodes
Processed 76.png -> 4 nodes
Processed 300.png -> 5 nodes
Processed 274.png -> 4 nodes
Processed 159.png -> 6 nodes
Processed 147.png -> 6 nodes
Processed 213.png -> 4 nodes
Processed 344.png -> 3 nodes
Processed 266.png -> 4 nodes
Processed 452.png -> 6 nodes
Processed 157.png -> 3 nodes
Processed 409.png -> 6 nodes
Processed 1.png -> 4 nodes
Processed 99.png -> 3 nodes
Processed 383.png -> 3 nodes
Processed 345.png -> 3 nodes
Processed 197.png -> 4 nodes
Processed 15.png -> 5 no

### I used easy ocr to help me with the labelling. Though some 0, 1,and 4  failed ocr. These were labelled tediously by hand.

In [13]:

# Check GPU availability
if not torch.cuda.is_available():
    print("CUDA not available. OCR will run on CPU.")
else:
    print("GPU detected. Running EasyOCR on GPU.")

image_dir = "./images"
csv_path = "node.csv"
output_csv = "nodes_labeled_easyocr.csv"

# Initialize EasyOCR reader
reader = easyocr.Reader(['en'], gpu=torch.cuda.is_available())

# OCR fallback using pytesseract
def pytesseract_ocr(image):
    config = r'--psm 10 -c tessedit_char_whitelist=0123456789'
    text = pytesseract.image_to_string(image, config=config)
    return text.strip()

# Preprocessing function
def preprocess_image(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        return None
    image = cv2.resize(image, (64, 64), interpolation=cv2.INTER_CUBIC)
    _, thresh = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return thresh

# Combined OCR function
def ocr_image(image_path):
    image = preprocess_image(image_path)
    if image is None:
        print(f"Could not load image: {image_path}")
        return ""

    # Try EasyOCR first
    result = reader.readtext(image, detail=0, paragraph=False, allowlist='0123456789')
    if result and result[0].strip().isdigit():
        return result[0].strip()

    # Fallback to pytesseract
    fallback = pytesseract_ocr(image)
    if fallback.isdigit():
        return fallback

    return ""

# Process CSV
df = pd.read_csv(csv_path)
df["key"] = ""

for idx, row in df.iterrows():
    image_path = os.path.join(image_dir, row["filename"])
    label = ocr_image(image_path)
    df.at[idx, "key"] = label
    if label:
        print(f"{row['filename']}: '{label}'")
    else:
        print(f"{row['filename']}: [OCR failed]")

df.to_csv(output_csv, index=False)
print(f"OCR labeling complete. Saved to {output_csv}")


GPU detected. Running EasyOCR on GPU.
node_00000.png: [OCR failed]
node_00001.png: [OCR failed]
node_00002.png: '1'
node_00003.png: [OCR failed]
node_00004.png: '4'
node_00005.png: '0'
node_00006.png: '5'
node_00007.png: [OCR failed]
node_00008.png: '2'
node_00009.png: '0'
node_00010.png: '4'
node_00011.png: '1'
node_00012.png: '2'
node_00013.png: '3'
node_00014.png: [OCR failed]
node_00015.png: [OCR failed]
node_00016.png: '1'
node_00017.png: [OCR failed]
node_00018.png: '4'
node_00019.png: '1'
node_00020.png: '3'
node_00021.png: '2'
node_00022.png: [OCR failed]
node_00023.png: [OCR failed]
node_00024.png: '1'
node_00025.png: '3'
node_00026.png: '4'
node_00027.png: '1'
node_00028.png: '2'
node_00029.png: '5'
node_00030.png: [OCR failed]
node_00031.png: '4'
node_00032.png: '2'
node_00033.png: '1'
node_00034.png: '3'
node_00035.png: '0'
node_00036.png: [OCR failed]
node_00037.png: '1'
node_00038.png: '2'
node_00039.png: '4'
node_00040.png: '0'
node_00041.png: '2'
node_00042.png: '5'
nod

### Model to predict the numeral on the node

In [11]:
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
# Configuration
IMAGE_DIR = './images'
CSV_PATH = './nodes_labeled_easyocr.csv'  # Assuming this is the CSV file with labels
IMG_SIZE = 28
NUM_CLASSES = 6 # 0 to 5
BATCH_SIZE = 32
NUM_EPOCHS = 50
LEARNING_RATE = 0.001
N_FOLDS = 5

# Custom Dataset
class DigitDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        # Read image
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        # Resize to required dimensions
        img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
        # Normalize pixel values to [0,1]
        img = img / 255.0
        # Convert to torch tensor
        img_tensor = torch.tensor(img, dtype=torch.float32).unsqueeze(0)  # Add channel dimension
        return img_tensor, label

# Define our CNN with Min-Pooling (inverse of MaxPooling for white background)
class MinPooling(nn.Module):
    def __init__(self, kernel_size, stride=None, padding=0):
        super(MinPooling, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride or kernel_size
        self.padding = padding

    def forward(self, x):
        # Invert the image (1 - x) to make min values become max
        inverted = 1 - x
        # Use max pooling on inverted image
        pooled = nn.functional.max_pool2d(inverted, self.kernel_size, self.stride, self.padding)
        # Invert back
        return 1 - pooled

class DigitCNN(nn.Module):
    def __init__(self, num_classes=NUM_CLASSES):
        super(DigitCNN, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.min_pool1 = MinPooling(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()
        self.min_pool2 = MinPooling(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.relu3 = nn.ReLU()
        self.min_pool3 = MinPooling(kernel_size=2, stride=2)
        # Fully connected layers
        self.fc1 = nn.Linear(128 * 3 * 3, 128)
        self.relu4 = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        # Convolutional layers
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.min_pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.min_pool2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.min_pool3(x)
        # Flatten
        x = x.view(x.size(0), -1)
        # Fully connected layers
        x = self.fc1(x)
        x = self.relu4(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

def load_data():
    """Load images and labels from CSV."""
    df = pd.read_csv(CSV_PATH)
    # Assuming CSV has columns 'filename' and 'label'
    image_paths = [os.path.join(IMAGE_DIR, filename) for filename in df['filename']]
    labels = df['key'].values
    # Print class distribution
    label_counts = Counter(labels)
    print("Class distribution:", label_counts)
    return image_paths, labels

def compute_class_weights(labels):
    """Compute weights for each class for dealing with class imbalance."""
    labels = labels.astype(int)
    class_counts = np.bincount(labels, minlength=NUM_CLASSES)
    total = len(labels)
    class_weights = total / (NUM_CLASSES * class_counts)
    return torch.FloatTensor(class_weights)

def train_and_validate():
    """Train the model using stratified K-fold cross-validation."""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    # Load data
    image_paths, labels = load_data()
    # Compute class weights for handling imbalance
    class_weights = compute_class_weights(labels)
    print("Class weights:", class_weights)
    # Setup cross-validation
    skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=42)
    fold_accuracies = []
    best_model = None
    best_accuracy = 0.0
    for fold, (train_idx, val_idx) in enumerate(skf.split(image_paths, labels)):
        print(f"\nTraining fold {fold+1}/{N_FOLDS}")
        # Split data for this fold
        train_paths = [image_paths[i] for i in train_idx]
        train_labels = [labels[i] for i in train_idx]
        val_paths = [image_paths[i] for i in val_idx]
        val_labels = [labels[i] for i in val_idx]
        # Create datasets
        train_dataset = DigitDataset(train_paths, train_labels)
        val_dataset = DigitDataset(val_paths, val_labels)
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
        # Initialize model, loss function, and optimizer
        model = DigitCNN().to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
        optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)
        # Training loop
        for epoch in range(NUM_EPOCHS):
            model.train()
            train_loss = 0.0
            for inputs, targets in train_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                # Zero the parameter gradients
                optimizer.zero_grad()
                # Forward + backward + optimize
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                train_loss += loss.item() * inputs.size(0)
            train_loss = train_loss / len(train_loader.dataset)
            # Validation
            model.eval()
            val_loss = 0.0
            val_preds = []
            val_targets = []
            with torch.no_grad():
                for inputs, targets in val_loader:
                    inputs, targets = inputs.to(device), targets.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, targets)
                    val_loss += loss.item() * inputs.size(0)
                    _, preds = torch.max(outputs, 1)
                    val_preds.extend(preds.cpu().numpy())
                    val_targets.extend(targets.cpu().numpy())
            val_loss = val_loss / len(val_loader.dataset)
            scheduler.step(val_loss)
            val_accuracy = accuracy_score(val_targets, val_preds)
            if (epoch + 1) % 5 == 0:
                print(f"Epoch {epoch+1}/{NUM_EPOCHS}, "
                      f"Train Loss: {train_loss:.4f}, "
                      f"Val Loss: {val_loss:.4f}, "
                      f"Val Accuracy: {val_accuracy:.4f}")
        # Final validation metrics for this fold
        val_accuracy = accuracy_score(val_targets, val_preds)
        print(f"Fold {fold+1} Validation Accuracy: {val_accuracy:.4f}")
        print(classification_report(val_targets, val_preds))
        fold_accuracies.append(val_accuracy)
        # Save best model across folds
        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            best_model = model
    print(f"\nCross-Validation Results:")
    print(f"Mean Accuracy: {np.mean(fold_accuracies):.4f}")
    print(f"Std Deviation: {np.std(fold_accuracies):.4f}")
    # Save the best model
    torch.save(best_model.state_dict(), 'best_digit_classifier.pth')
    print(f"Best model saved with accuracy: {best_accuracy:.4f}")
    return best_model

def predict_from_numpy(img_array, model=None):
    """Predict digit from a numpy array."""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # Load model if not provided
    if model is None:
        model = DigitCNN()
        model.load_state_dict(torch.load('best_digit_classifier.pth'))
        model.to(device)
    model.eval()
    # Preprocess image array
    img = cv2.resize(img_array, (IMG_SIZE, IMG_SIZE))
    # Convert to grayscale if it's RGB
    if len(img.shape) == 3:
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    img = img / 255.0
    img_tensor = torch.tensor(img, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)

    # Get prediction
    with torch.no_grad():
        output = model(img_tensor)
        prob = torch.nn.functional.softmax(output, dim=1)
        confidence, predicted = torch.max(prob, 1)
    return predicted.item(), confidence.item()

if __name__ == "__main__":
    # Train the model
    best_model = train_and_validate()
    # Example of how to use the prediction function with a numpy array
    print("\nExample of prediction from numpy array:")
    # Create a sample 28x28 array (replace with actual image data)
    sample_image = cv2.imread("images/node_00016.png")
    sample_image = sample_image.astype(np.uint8)
    digit, confidence = predict_from_numpy(sample_image)
    print(f"Predicted digit: {digit}, Confidence: {confidence:.4f}")

Using device: cuda
Class distribution: Counter({np.int64(0): 501, np.int64(2): 500, np.int64(1): 500, np.int64(3): 396, np.int64(4): 264, np.int64(5): 120})
Class weights: tensor([0.7588, 0.7603, 0.7603, 0.9600, 1.4400, 3.1681])

Training fold 1/5
Epoch 5/50, Train Loss: 0.0207, Val Loss: 0.0003, Val Accuracy: 1.0000
Epoch 10/50, Train Loss: 0.0134, Val Loss: 0.0000, Val Accuracy: 1.0000
Epoch 15/50, Train Loss: 0.0167, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 20/50, Train Loss: 0.0129, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 25/50, Train Loss: 0.0111, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 30/50, Train Loss: 0.0101, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 35/50, Train Loss: 0.0115, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 40/50, Train Loss: 0.0085, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 45/50, Train Loss: 0.0099, Val Loss: 0.0001, Val Accuracy: 1.0000
Epoch 50/50, Train Loss: 0.0094, Val Loss: 0.0001, Val Accuracy: 1.0000
Fold 1 Validation Accuracy: 1.000