In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Load MNIST Dataset
# transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(lambda x: (x > 0.5).float())])  # Binarize images
# train_dataset = torchvision.datasets.MNIST(root="./data", train=True, transform=transform, download=True)
# test_dataset = torchvision.datasets.MNIST(root="./data", train=False, transform=transform, download=True)

# train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
# test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

# RBM Class
class RBM(nn.Module):
    def __init__(self, num_visible, num_hidden):
        super(RBM, self).__init__()
        self.num_visible = num_visible
        self.num_hidden = num_hidden
        self.W = nn.Parameter(torch.randn(num_hidden, num_visible) * 0.01)  # Weights
        self.v_bias = nn.Parameter(torch.zeros(num_visible))  # Visible bias
        self.h_bias = nn.Parameter(torch.zeros(num_hidden))  # Hidden bias

    def forward(self, v):
        """One Gibbs sampling step: v -> h -> v'"""
        h_prob = torch.sigmoid(torch.matmul(v, self.W.T) + self.h_bias)  # P(h|v)
        h_state = (torch.rand_like(h_prob) < h_prob).float()  # Sample h
        v_prob = torch.sigmoid(torch.matmul(h_state, self.W) + self.v_bias)  # P(v|h)
        v_state = (torch.rand_like(v_prob) < v_prob).float()  # Sample v
        return v_prob, v_state

    def free_energy(self, v):
        """Energy function for Contrastive Divergence."""
        # Term 1: Visible bias term (v^T * b_v)
        vb_term = torch.matmul(v, self.v_bias)

        # Term 2: Hidden term (sum over log(1 + exp(v^T * W_j + b_h_j)))
        hidden_term = torch.sum(
            torch.log(1 + torch.exp(torch.matmul(v, self.W.T) + self.h_bias)),
            dim=1
        )

        # Free energy: F(v) = -vb_term - hidden_term
        return -vb_term - hidden_term

    def train_rbm(self, train_loader, lr=0.001, epochs=50):
        optimizer = optim.Adam(self.parameters(), lr=lr)  # Use Adam instead of SGD
        loss_history = []

        for epoch in range(epochs):
            epoch_loss = 0
            for batch, (data, _) in enumerate(train_loader):
                v0 = data.view(-1, 28*28).to(device)  # Flatten images & move to GPU
                v1_prob, v1_state = self.forward(v0)  # Gibbs sampling

                # Compute gradients using Contrastive Divergence (CD-1)
                loss = torch.mean(self.free_energy(v0)) - torch.mean(self.free_energy(v1_state))
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()

            avg_loss = epoch_loss / len(train_loader)
            loss_history.append(avg_loss)
            print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}")

            # Visualize Weights & Reconstructions
            if (epoch + 1) % 10 == 0:
                self.visualize_weights()
                self.visualize_reconstruction(v0, v1_prob)

        # Plot loss curve
        self.plot_loss(loss_history)

    def plot_loss(self, loss_history):
        """Plot training loss curve."""
        plt.figure(figsize=(8, 4))
        plt.plot(loss_history, label="Loss")
        plt.xlabel("Epochs")
        plt.ylabel("Free Energy Loss")
        plt.title("RBM Training Loss Curve")
        plt.legend()
        plt.show()

    def visualize_weights(self, num_images=16):
        """Plot learned features (weights)."""
        fig, axes = plt.subplots(4, 4, figsize=(6, 6))
        for i, ax in enumerate(axes.flatten()):
            if i >= num_images:
                break
            weight_img = self.W[i].detach().cpu().view(28, 28)
            ax.imshow(weight_img, cmap="gray")
            ax.axis("off")
        plt.suptitle("RBM Learned Features")
        plt.show()

    def visualize_reconstruction(self, original, reconstructed, num_images=10):
        """Visualize original and reconstructed images."""
        fig, axes = plt.subplots(2, num_images, figsize=(15, 3))
        for i in range(num_images):
            # Original
            axes[0, i].imshow(original[i].detach().cpu().view(28, 28), cmap="gray")
            axes[0, i].axis("off")

            # Reconstructed
            axes[1, i].imshow(reconstructed[i].detach().cpu().view(28, 28), cmap="gray")
            axes[1, i].axis("off")

        axes[0, 0].set_title("Original Images")
        axes[1, 0].set_title("Reconstructed Images")
        plt.show()

    def evaluate(self, test_loader):
        """Evaluate RBM on unseen test data using MSE, SSIM, and PSNR."""
        mse_total, ssim_total, psnr_total, count = 0, 0, 0, 0
        with torch.no_grad():
            for batch, (data, _) in enumerate(test_loader):
                v0 = data.view(-1, 28*28).to(device)  # Flatten images & move to GPU
                v1_prob, _ = self.forward(v0)  # Reconstruct images

                # Convert to numpy for metric calculations
                original_np = v0.cpu().numpy()
                reconstructed_np = v1_prob.cpu().numpy()

                # Compute MSE, SSIM, PSNR
                for i in range(original_np.shape[0]):
                    mse = np.mean((original_np[i] - reconstructed_np[i])**2)
                    ssim_score = ssim(original_np[i].reshape(28, 28), reconstructed_np[i].reshape(28, 28), data_range=1)
                    psnr_score = psnr(original_np[i], reconstructed_np[i], data_range=1)

                    mse_total += mse
                    ssim_total += ssim_score
                    psnr_total += psnr_score
                    count += 1



        # Print results
        print("\n--- RBM Evaluation on Unseen Data ---")
        print(f"Mean Squared Error (MSE): {mse_total / count:.5f}")
        print(f"Structural Similarity Index (SSIM): {ssim_total / count:.5f}")
        print(f"Peak Signal-to-Noise Ratio (PSNR): {psnr_total / count:.5f}")

        # Visualize some reconstructions
        self.visualize_reconstruction(v0, v1_prob)

    def extract_features(self, data_loader):
        """Extract hidden layer features for classification"""
        features, labels = [], []
        with torch.no_grad():
            for batch, (data, targets) in enumerate(data_loader):
                v = data.view(-1, 28 * 28).to(device)
                h_prob, _ = self.forward(v)  # Get hidden layer activations
                features.append(h_prob.cpu().numpy())
                labels.append(targets.cpu().numpy())

        return np.vstack(features), np.hstack(labels)

# # Train RBM
# rbm = RBM(num_visible=28*28, num_hidden=256).to(device)  # 256 hidden neurons, move to GPU
# rbm.train_rbm(train_loader, lr=0.001, epochs=50)

# # Evaluate on Unseen Test Data
# rbm.evaluate(test_loader)


Using device: cpu


In [3]:
# Save the trained model
from google.colab import drive
drive.mount('/content/drive')
# torch.save(rbm.state_dict(), "/content/drive/MyDrive/research/rbm_mnist.pth")
# print("✅ Model saved as rbm_mnist.pth")

# Load the saved model
# rbm_loaded = RBM(num_visible=28*28, num_hidden=256).to(device)  # Initialize same architecture
# rbm_loaded.load_state_dict(torch.load("/content/drive/MyDrive/research/rbm_mnist.pth", map_location=torch.device('cpu')))
# rbm_loaded.eval()  # Set to evaluation mode
# print("✅ Model loaded successfully")
# rbm = rbm_loaded

Mounted at /content/drive


In [None]:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import torchvision
# import torchvision.transforms as transforms
# import matplotlib.pyplot as plt
# import numpy as np
# from sklearn.linear_model import LogisticRegression
# from sklearn.metrics import accuracy_score
# from sklearn.svm import SVC
# from sklearn.metrics import accuracy_score

# # Extract features using the trained RBM
# train_features, train_labels = rbm.extract_features(train_loader)
# test_features, test_labels = rbm.extract_features(test_loader)

# # Train SVM Classifier
# classifier = SVC(kernel='rbf', C=1.0, gamma='scale')  # RBF kernel for better performance
# classifier.fit(train_features, train_labels)

# # Evaluate Model
# predictions = classifier.predict(test_features)
# accuracy = accuracy_score(test_labels, predictions)
# print(f"Classification Accuracy: {accuracy:.4f}")
# import joblib
# joblib.dump(classifier, "/content/drive/MyDrive/research/logistic_classifier.pkl")

# print("Models saved successfully!")



In [4]:
import torch
import joblib
from torchvision import transforms
from PIL import Image
import numpy as np

# Load the trained RBM and SVM model
device = torch.device("cpu")
rbm_path = "/content/drive/MyDrive/research/rbm_mnist.pth"
rbm = RBM(num_visible=28*28, num_hidden=256).to(device)  # Initialize same architecture
rbm.load_state_dict(torch.load(rbm_path, map_location=device))
rbm.eval()  # Set model to evaluation mode

# Load trained SVM classifier
svm_path = "/content/drive/MyDrive/research/svm_classifier.pkl"
svm_classifier = joblib.load(svm_path)

# Define image preprocessing
transform = transforms.Compose([
    transforms.Resize((28, 28)),  # Resize to MNIST size
    transforms.ToTensor(),
])

def predict_digit(image_path: str,rbm=rbm, svm_classifier=svm_classifier, transform=transform):
    """
    Predicts the digit in the image using the trained RBM and SVM classifier.

    Args:
        image_path (str): Path to the input image.

    Returns:
        int: Predicted digit label.
    """
    # Load and preprocess the image
    image = Image.open(image_path).convert("L")  # Convert to grayscale
    image = transform(image)  # Apply preprocessing
    image = image.view(-1, 28*28)  # Flatten image

    # Extract features using the trained RBM
    with torch.no_grad():
        features,_ = rbm(image)  # Extract RBM features

    # Convert features to numpy array for SVM input
    # features = features.numpy().reshape(1, -1)  # Flatten the feature vector

    # Predict with SVM classifier

    predicted_label = svm_classifier.predict(features)[0]

    return int(predicted_label)

# # # Example usage
# image_path = "image.jpg"  # Path to an image file
# predicted_digit = predict_digit(image_path)
# print(f"Predicted Digit: {predicted_digit}")
# type((predicted_digit))


In [5]:
!pip install -q fastapi uvicorn python-multipart torch torchvision pytorchvideo pyngrok


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/132.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m132.7/132.7 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.2/50.2 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.9/94.9 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.3/62.3 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m2.2 MB/s[0m eta [36m0:

In [6]:
import os
import requests
from urllib.parse import urlparse

def download_image(url, save_path=None):
    """
    Download an image from the given URL and save it to the specified path.

    Args:
        url (str): URL of the image to download
        save_path (str, optional): Path where the image should be saved.
                                  If None, saves with original filename in current directory.

    Returns:
        str: Path to the saved image file
    """
    try:
        # Send request to get the image
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an exception for bad responses

        # Extract filename from URL if save_path is not provided
        if not save_path:
            parsed_url = urlparse(url)
            filename = os.path.basename(parsed_url.path)

            # Use a default name if filename couldn't be determined
            if not filename:
                filename = "image.jpg"

            save_path = filename

        # Create directories if they don't exist
        directory = os.path.dirname(save_path)
        if directory and not os.path.exists(directory):
            os.makedirs(directory)

        # Write the image to file
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        return os.path.abspath(save_path)

    except Exception as e:
        print(f"Error downloading image: {e}")
        return None

In [7]:
import uvicorn
import torch
import joblib
import numpy as np
from fastapi import FastAPI, File, UploadFile, Query
from fastapi.middleware.cors import CORSMiddleware
from torchvision import transforms
from PIL import Image
from pyngrok import ngrok
import nest_asyncio
import requests
from pydantic import BaseModel
# Initialize FastAPI
app = FastAPI()

# Allow all CORS requests
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.post("/predict/")
async def predict_digit_from_image(file: UploadFile = File(...)):
    image_path = "image.jpg"

    # Write the uploaded file content to the image path
    with open(image_path, "wb") as f:
        f.write(await file.read())


    print(image_path)
    # Pass the file path to the predict_digit function
    predicted_label = predict_digit(image_path)
    print(f"predicted {predicted_label}, {type(predicted_label)}")
    # Clean up the temp file after prediction

    return {"predicted_digit": predicted_label}

class URLInput(BaseModel):
    url: str
@app.post("/predict_from_url/")
async def predict_digit_from_url(image_url: URLInput):
    # image_path = download_image("https://res.cloudinary.com/chatappjeevanneupane/image/upload/v1730944042/vp9z8mt1oushfsszpdvg.jpg")
    image_path = download_image(image_url.url)

    predicted_label = predict_digit(image_path)
    print(f"predicted {predicted_label}, {type(predicted_label)}")

    return {"predicted_digit": predicted_label}

if __name__ == "__main__":
    import os
    os.environ["NGROK_AUTHTOKEN"] = "2rCbW45ffaTmVf03HbMluTUNCv1_4uD242hh56wg9SvHorrNR"

    ngrok_tunnel = ngrok.connect(8000)
    print('Public_URL:', ngrok_tunnel.public_url)

    nest_asyncio.apply()
    uvicorn.run(app, host="0.0.0.0", port=8000)




INFO:     Started server process [181]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


Public_URL: https://3494-34-46-36-171.ngrok-free.app
INFO:     2407:5200:405:1d7d:a152:1f97:7099:9392:0 - "GET / HTTP/1.1" 404 Not Found
INFO:     2407:5200:405:1d7d:a152:1f97:7099:9392:0 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     2407:5200:405:1d7d:a152:1f97:7099:9392:0 - "GET /docs HTTP/1.1" 200 OK
INFO:     2407:5200:405:1d7d:a152:1f97:7099:9392:0 - "GET /openapi.json HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Finished server process [181]
