In [18]:
import os
from PIL import Image
import matplotlib.pyplot as plt
import random

nigeria_images_path = "/kaggle/input/satellite-images-to-predict-povertyafrica/nigeria_archive/images"

if os.path.exists(nigeria_images_path):
    image_files = [f for f in os.listdir(nigeria_images_path) if f.endswith('.png')]
    num_images = len(image_files)
    print(f"Total number of images: {num_images}")
    
    if num_images >= 3:
        sample_images = random.sample(image_files, 3)
        plt.figure(figsize=(12, 4))
        for i, img_name in enumerate(sample_images):
            img_path = os.path.join(nigeria_images_path, img_name)
            img = Image.open(img_path)
            plt.subplot(1, 3, i + 1)
            plt.imshow(img)
            plt.axis('off')
            plt.title(f"Sample {i+1}")
        plt.show()
    else:
        print("There are less than 3 images in the directory.")
else:
    print(f"Directory {nigeria_images_path} does not exist.")

Directory /kaggle/input/satellite-images-to-predict-povertyafrica/nigeria_archive/images does not exist.


In [19]:
import os
import cv2
import numpy as np
from tqdm import tqdm

image_dir = "/kaggle/input/satellite-images-to-predict-povertyafrica/nigeria_archive/images"
normalized_images = []
resize_dim = (64, 64)

file_names = [f for f in os.listdir(image_dir) if f.endswith('.png')]

for file_name in tqdm(file_names, desc="Processing images", ncols=100):
    file_path = os.path.join(image_dir, file_name)
    img = cv2.imread(file_path)
    if img is not None:
        img_resized = cv2.resize(img, resize_dim)
        img_normalized = img_resized / 255.0
        normalized_images.append(img_normalized)

normalized_images = np.array(normalized_images)

print(f"Number of normalized images: {len(normalized_images)}")
print(f"Dimensions of normalized images: {normalized_images.shape}")

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/satellite-images-to-predict-povertyafrica/nigeria_archive/images'

In [None]:
import os
import cv2
import torch
import numpy as np
from tqdm import tqdm
from torchvision import models, transforms

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

image_dir = "/kaggle/input/satellite-images-to-predict-povertyafrica/nigeria_archive/images"
file_names = [f for f in os.listdir(image_dir) if f.endswith('.png')]

# Load pre-trained ResNet50
model = models.resnet50(pretrained=True)
model = model.to(device)
model.eval()

features = []
resize_dim = (224, 224)

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(resize_dim),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

for file_name in tqdm(file_names, desc="Extracting features", ncols=100):
    file_path = os.path.join(image_dir, file_name)
    img = cv2.imread(file_path)
    if img is not None:
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img_transformed = transform(img_rgb)
        img_transformed = img_transformed.unsqueeze(0).to(device)
        
        with torch.no_grad():
            feature = model(img_transformed)
        features.append(feature.cpu().numpy().flatten())

features = np.array(features)
print(f"Number of feature vectors extracted: {len(features)}")
print(f"Feature dimensions: {features.shape}")

In [None]:
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

inertias = []
k_range = range(1, 11)

for k in tqdm(k_range, desc="Finding optimal clusters"):
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans.fit(features)
    inertias.append(kmeans.inertia_)

# Plot elbow curve
plt.figure(figsize=(10, 6))
plt.plot(k_range, inertias, marker='o', linestyle='-', color='b')
plt.title("Elbow Method for Optimal Number of Clusters")
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Inertia")
plt.xticks(k_range)
plt.grid(True)
plt.show()

In [None]:
from sklearn.cluster import KMeans

# Based on elbow method, choose k=3
k = 3
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(features)

cluster_counts = np.bincount(cluster_labels)
for i, count in enumerate(cluster_counts):
    print(f"Cluster {i}: {count} images")

In [None]:
import matplotlib.pyplot as plt

classes = {0: "Poor", 1: "Rich", 2: "Middle Class"}
cluster_counts = np.bincount(cluster_labels)

class_names = [classes.get(i, f"Class {i}") for i in range(len(cluster_counts))]
counts = cluster_counts

plt.figure(figsize=(10, 6))
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1']
plt.bar(class_names, counts, color=colors, alpha=0.8)
plt.title("Distribution of Images by Poverty Class", fontsize=14, fontweight='bold')
plt.xlabel("Poverty Class", fontsize=12)
plt.ylabel("Number of Images", fontsize=12)
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Add value labels on bars
for i, v in enumerate(counts):
    plt.text(i, v + 0.5, str(v), ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

In [20]:
import matplotlib.pyplot as plt
import cv2

fig, axes = plt.subplots(k, 3, figsize=(15, 12))

for cluster_num in range(k):
    cluster_indices = [i for i, label in enumerate(cluster_labels) if label == cluster_num]
    
    # Take up to 3 samples from each cluster
    sample_indices = cluster_indices[:3] if len(cluster_indices) >= 3 else cluster_indices
    
    for i, idx in enumerate(sample_indices):
        if i < 3:  # Ensure we don't exceed 3 columns
            file_path = os.path.join(image_dir, file_names[idx])
            img = cv2.imread(file_path)
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            
            axes[cluster_num, i].imshow(img_rgb)
            axes[cluster_num, i].axis('off')
            if i == 1:  # Title only on middle image
                axes[cluster_num, i].set_title(f"{classes[cluster_num]} (Cluster {cluster_num})", 
                                             fontweight='bold', fontsize=12)

# Remove empty subplots
for i in range(len(sample_indices), 3):
    for cluster_num in range(k):
        axes[cluster_num, i].axis('off')

plt.tight_layout()
plt.show()

NameError: name 'k' is not defined

In [None]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np

# Prepare data for CNN
labels = cluster_labels

# Convert to PyTorch tensors
normalized_images_tensor = torch.tensor(normalized_images, dtype=torch.float32)
labels_tensor = torch.tensor(labels, dtype=torch.long)

# Permute to (batch_size, channels, height, width)
normalized_images_tensor = normalized_images_tensor.permute(0, 3, 1, 2)

# Split data
X_train, X_temp, y_train, y_temp = train_test_split(
    normalized_images_tensor, labels_tensor, test_size=0.3, random_state=42
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42
)

print(f"Training set: {X_train.shape[0]} samples")
print(f"Validation set: {X_val.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")

# Create data loaders
batch_size = 32
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define CNN model
class PovertyCNN(nn.Module):
    def __init__(self, num_classes):
        super(PovertyCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(128 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Initialize model
num_classes = len(np.unique(labels))
model = PovertyCNN(num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 50
train_losses = []
val_losses = []
val_accuracies = []

for epoch in range(num_epochs):
    # Training
    model.train()
    running_loss = 0.0
    for images, targets in train_loader:
        images, targets = images.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, targets in val_loader:
            images, targets = images.to(device), targets.to(device)
            outputs = model(images)
            loss = criterion(outputs, targets)
            val_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    
    train_loss = running_loss / len(train_loader)
    val_loss_avg = val_loss / len(val_loader)
    val_accuracy = 100 * correct / total
    
    train_losses.append(train_loss)
    val_losses.append(val_loss_avg)
    val_accuracies.append(val_accuracy)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, "
          f"Val Loss: {val_loss_avg:.4f}, Val Accuracy: {val_accuracy:.2f}%")

print("Training completed!")

In [None]:
import torch
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Evaluate on test set
model.eval()
all_preds = []
all_labels = []
all_probabilities = []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        probabilities = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs, 1)
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        all_probabilities.extend(probabilities.cpu().numpy())

# Convert to numpy arrays
all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
all_probabilities = np.array(all_probabilities)

# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
conf_matrix = confusion_matrix(all_labels, all_preds)
class_report = classification_report(all_labels, all_preds, 
                                   target_names=[classes[i] for i in range(num_classes)])

print("=" * 50)
print("MODEL EVALUATION RESULTS")
print("=" * 50)
print(f"Overall Accuracy: {accuracy:.4f}")
print(f"Overall Accuracy (%): {accuracy * 100:.2f}%")
print("\\nClassification Report:")
print(class_report)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", 
            xticklabels=[classes[i] for i in range(num_classes)],
            yticklabels=[classes[i] for i in range(num_classes)])
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.title("Confusion Matrix")
plt.tight_layout()
plt.show()

# Plot training history
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss', color='blue')
plt.plot(val_losses, label='Validation Loss', color='red')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(val_accuracies, label='Validation Accuracy', color='green')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.title('Validation Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Save the trained model
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'classes': classes,
    'num_classes': num_classes,
    'model_architecture': model.__class__.__name__
}, 'poverty_cnn_model.pth')

print("Model saved successfully!")
print("Model file: 'poverty_cnn_model.pth'")

In [None]:
import ipywidgets as widgets
from IPython.display import display, clear_output
import torch
from PIL import Image as PILImage
import io
import numpy as np

# Load the saved model for inference
def load_model_for_inference():
    checkpoint = torch.load('poverty_cnn_model.pth', map_location=device)
    model = PovertyCNN(checkpoint['num_classes'])
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()
    return model, checkpoint['classes']

# Load model
inference_model, class_names = load_model_for_inference()

# Preprocessing function for new images
def preprocess_image(uploaded_image):
    # Convert to PIL Image
    image = PILImage.open(io.BytesIO(uploaded_image))
    
    # Convert to numpy array
    img_array = np.array(image)
    
    # If image has 4 channels (RGBA), convert to 3 channels (RGB)
    if img_array.shape[-1] == 4:
        img_array = img_array[:, :, :3]
    
    # Resize and normalize
    img_resized = cv2.resize(img_array, (64, 64))
    img_normalized = img_resized / 255.0
    
    # Convert to tensor
    img_tensor = torch.tensor(img_normalized, dtype=torch.float32)
    img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0)  # (1, 3, 64, 64)
    
    return img_tensor, img_array

# Prediction function
def predict_poverty(uploaded_image):
    # Preprocess image
    img_tensor, original_img = preprocess_image(uploaded_image)
    img_tensor = img_tensor.to(device)
    
    # Make prediction
    with torch.no_grad():
        output = inference_model(img_tensor)
        probabilities = torch.softmax(output, dim=1)
        predicted_class = torch.argmax(output, dim=1).item()
        confidence = torch.max(probabilities).item()
    
    return predicted_class, confidence, probabilities.cpu().numpy()[0], original_img

# Create widgets
upload_button = widgets.FileUpload(
    description='Upload Image',
    accept='.png,.jpg,.jpeg',
    multiple=False
)

predict_button = widgets.Button(
    description='Predict Poverty Level',
    button_style='success',
    tooltip='Click to make prediction'
)

output_area = widgets.Output()

# Prediction handler - FIXED VERSION
def on_predict_button_clicked(b):
    with output_area:
        clear_output()
        if upload_button.value:
            try:
                # Get uploaded image - FIXED: Handle different return formats
                uploaded_data = upload_button.value
                
                # Handle different versions of ipywidgets
                if isinstance(uploaded_data, dict):
                    # Older version - dict format
                    uploaded_image = list(uploaded_data.values())[0]['content']
                elif isinstance(uploaded_data, tuple) and len(uploaded_data) > 0:
                    # Newer version - tuple format
                    if hasattr(uploaded_data[0], 'content'):
                        uploaded_image = uploaded_data[0].content
                    else:
                        uploaded_image = uploaded_data[0]['content']
                else:
                    # Try to handle other formats
                    uploaded_image = uploaded_data[0].content if hasattr(uploaded_data[0], 'content') else uploaded_data[0]['content']
                
                # Make prediction
                predicted_class, confidence, probabilities, original_img = predict_poverty(uploaded_image)
                
                # Display results
                plt.figure(figsize=(12, 5))
                
                # Display original image
                plt.subplot(1, 2, 1)
                plt.imshow(original_img)
                plt.axis('off')
                plt.title('Uploaded Image', fontweight='bold', fontsize=12)
                
                # Display prediction results
                plt.subplot(1, 2, 2)
                colors = ['#FF6B6B', '#4ECDC4', '#45B7D1']
                classes_list = [class_names[i] for i in range(len(class_names))]
                
                bars = plt.bar(classes_list, probabilities, color=colors, alpha=0.8)
                plt.ylim(0, 1)
                plt.ylabel('Probability', fontsize=12)
                plt.title('Poverty Level Prediction Probabilities', fontweight='bold', fontsize=12)
                
                # Add value labels on bars
                for bar, prob in zip(bars, probabilities):
                    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, 
                            f'{prob:.3f}', ha='center', va='bottom', fontweight='bold', fontsize=10)
                
                plt.grid(axis='y', alpha=0.3)
                plt.xticks(rotation=45)
                plt.tight_layout()
                plt.show()
                
                # Print prediction summary
                print("=" * 60)
                print("üéØ POVERTY PREDICTION RESULTS")
                print("=" * 60)
                print(f"üè† Predicted Poverty Level: {class_names[predicted_class]}")
                print(f"üìä Confidence: {confidence:.3f} ({confidence*100:.1f}%)")
                print(f"üî¢ Class ID: {predicted_class}")
                print("\nProbability Distribution:")
                for i, prob in enumerate(probabilities):
                    print(f"  üìà {class_names[i]}: {prob:.3f} ({prob*100:.1f}%)")
                print("=" * 60)
                
                # Add interpretation
                if confidence > 0.7:
                    print("‚úÖ High confidence prediction")
                elif confidence > 0.5:
                    print("‚ö†Ô∏è  Moderate confidence prediction")
                else:
                    print("‚ùì Low confidence prediction - consider manual review")
                    
            except Exception as e:
                print(f"‚ùå Error processing image: {str(e)}")
                print("Please try uploading a different image.")
        else:
            print("‚ùå Please upload an image first!")

# Connect button to handler
predict_button.on_click(on_predict_button_clicked)

# Display the interface
print("üéØ SATELLITE IMAGE POVERTY PREDICTION INTERFACE")
print("Upload a satellite image to predict poverty level")
print("=" * 60)
print("Supported formats: PNG, JPG, JPEG")
print("Classes: Poor, Middle Class, Rich")
print("=" * 60)

# Layout
vbox = widgets.VBox([
    widgets.HBox([upload_button, predict_button]),
    output_area
])

display(vbox)

In [21]:
# Test the prediction with some sample images from the dataset
print("Testing prediction with sample images...")
print("=" * 50)

# Get one sample from each class
sample_predictions = []
for cluster_num in range(k):
    cluster_indices = [i for i, label in enumerate(cluster_labels) if label == cluster_num]
    if cluster_indices:
        sample_idx = cluster_indices[0]
        sample_path = os.path.join(image_dir, file_names[sample_idx])
        
        # Read and process the image
        with open(sample_path, 'rb') as f:
            image_data = f.read()
        
        predicted_class, confidence, probabilities, original_img = predict_poverty(image_data)
        
        sample_predictions.append({
            'true_class': cluster_num,
            'predicted_class': predicted_class,
            'confidence': confidence,
            'image': original_img
        })

# Display sample predictions
fig, axes = plt.subplots(1, k, figsize=(15, 5))
if k == 1:
    axes = [axes]

for i, prediction in enumerate(sample_predictions):
    axes[i].imshow(prediction['image'])
    axes[i].axis('off')
    true_class_name = class_names[prediction['true_class']]
    pred_class_name = class_names[prediction['predicted_class']]
    status = "‚úì" if prediction['true_class'] == prediction['predicted_class'] else "‚úó"
    axes[i].set_title(f"True: {true_class_name}\\nPred: {pred_class_name}\\nConf: {prediction['confidence']:.3f} {status}", 
                     fontweight='bold')

plt.tight_layout()
plt.show()

Testing prediction with sample images...


NameError: name 'k' is not defined

# Attempt 1 - Streamlit app with joblib file in ngrok server

In [None]:
%%writefile app.py

import streamlit as st
import os
import cv2
import torch
import numpy as np
from torchvision import models, transforms
from PIL import Image
import joblib

# --- 1. Load Pre-computed Model and Set Up ---
st.title("Poverty Prediction from Satellite Images")

@st.cache_resource
def load_prediction_pipeline():
    # Load the trained KMeans model
    model_path = "/kaggle/input/poverty-prediction/kmeans_model (1).joblib"
    kmeans_model = joblib.load(model_path)
    
    # --- DEFINITIVE FIX for DTYPE MISMATCH ---
    # The error is inside the model file itself. We force its internal
    # cluster centers to be the correct data type (float64/double).
    kmeans_model.cluster_centers_ = kmeans_model.cluster_centers_.astype(np.float64)
    # --- END OF FIX ---
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Load the ResNet50 model for feature extraction
    feature_extractor = models.resnet50(weights='IMAGENET1K_V1')
    feature_extractor = feature_extractor.to(device)
    feature_extractor.eval()
    
    transform_pipeline = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    
    return kmeans_model, feature_extractor, transform_pipeline, device

with st.spinner('Loading prediction model...'):
    kmeans, model, transform, device = load_prediction_pipeline()
st.success("Model loaded successfully!")

classes = {0: "Poor", 1: "Rich", 2: "Middle Class"} 

# --- 2. Main Streamlit Application Logic ---
st.header("Upload an Image for Prediction")
uploaded_file = st.file_uploader("Choose a satellite image...", type="png")

if uploaded_file is not None:
    image = Image.open(uploaded_file)
    
    # FIX: Update for Streamlit deprecation warning
    st.image(image, caption='Uploaded Image.', width=None) # Using default width handling

    with st.spinner('Analyzing the image...'):
        img_array = np.array(image)

        # Handle different image formats (Grayscale, RGBA, etc.)
        if len(img_array.shape) < 3 or img_array.shape[2] == 1:
            img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
        elif img_array.shape[2] == 4:
            img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
        
        img_transformed = transform(img_array).unsqueeze(0).to(device)
        
        with torch.no_grad():
            feature = model(img_transformed)
            
        # Ensure input feature is also float64 for consistency
        feature_final = feature.cpu().numpy().flatten().astype(np.float64)
        
        prediction = kmeans.predict([feature_final])[0]

    st.write(f"### Predicted Poverty Level: **{classes[prediction]}**")

In [1]:
# Install required libraries, forcing the correct scikit-learn version
!pip install streamlit -q
!pip install pyngrok -q
!pip install scikit-learn==1.2.2

# Get the ngrok authtoken from secrets
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
ngrok_token = user_secrets.get_secret("NGROK_AUTHTOKEN")

# Launch ngrok
from pyngrok import ngrok
ngrok.set_auth_token(ngrok_token)
public_url = ngrok.connect(8501)
print(f"‚úÖ Your app is live! Click here: {public_url}")

# Run the streamlit app
!streamlit run app.py

[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m10.2/10.2 MB[0m [31m69.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m6.9/6.9 MB[0m [31m76.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
‚úÖ Your app is live! Click here: NgrokTunnel: "https://nonhistrionically-loudish-latosha.ngrok-free.dev" -> "http://localhost:8501"

Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.19.2.2:8501[0m
[34m  External URL: [0m[1mhttp://34.90.49.39:8501[0m
[0m
Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub

# Attempt 2 - Direct run in notebook

In [7]:
!pip install ipywidgets -q
!pip install scikit-learn==1.2.2
!pip install torchvision torch -q

import os
import cv2
import torch
import numpy as np
import joblib
import ipywidgets as widgets
from PIL import Image
from torchvision import models, transforms
from IPython.display import display, clear_output
import io
import matplotlib.pyplot as plt  # <-- ADD THIS LINE



In [4]:
# Create an output widget to show loading status
load_output = widgets.Output()

@load_output.capture(wait=True)
def load_model_pipeline():
    """Loads the ML pipeline and fixes data types."""
    print("Loading prediction pipeline...")
    
    # Load the trained KMeans model
    model_path = "/kaggle/input/poverty-prediction/kmeans_model (1).joblib"
    kmeans_model = joblib.load(model_path)
    
    # --- DEFINITIVE FIX 1: Fix the model's internal data type ---
    # This forces the model's cluster centers to be float64 ('double')
    kmeans_model.cluster_centers_ = kmeans_model.cluster_centers_.astype(np.float64)
    
    # Load the ResNet50 feature extractor
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    feature_extractor = models.resnet50(weights='IMAGENET1K_V1').to(device).eval()
    
    # Define the image transformation
    transform_pipeline = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    
    print("Pipeline loaded successfully!")
    return kmeans_model, feature_extractor, transform_pipeline, device

# Display the loading output
display(load_output)

# Run the loading function
kmeans, model, transform, device = load_model_pipeline()

# Define class labels
classes = {0: "Poor", 1: "Rich", 2: "Middle Class"}

Output()

In [8]:
def predict_image(image_bytes):
    """Runs a single image through the prediction pipeline."""
    try:
        # Open image from bytes
        image = Image.open(io.BytesIO(image_bytes))
        
        # Convert to numpy array and handle channels
        img_array = np.array(image)
        if len(img_array.shape) < 3 or img_array.shape[2] == 1:
            img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
        elif img_array.shape[2] == 4:
            img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
            
        # Apply transforms
        img_transformed = transform(img_array).unsqueeze(0).to(device)
        
        # Extract features
        with torch.no_grad():
            feature = model(img_transformed)
            
        # --- Fix the input data type ---
        feature_final = feature.cpu().numpy().flatten().astype(np.float64)
        
        # --- Get distances to ALL clusters ---
        distances = kmeans.transform([feature_final])[0]
        
        # Make prediction
        prediction_index = np.argmin(distances)
        prediction_class = classes[prediction_index]
        
        return prediction_class, distances, img_array
        
    except Exception as e:
        return f"Error: {e}", None, None

In [9]:
# Create the file uploader
uploader = widgets.FileUpload(
    accept='image/*',  # Only accept image files
    description='Upload Image'
)

# Create an output area
output_area = widgets.Output()

def on_upload(change):
    """This function runs when a new file is uploaded."""
    with output_area:
        # Clear previous output
        clear_output(wait=True) 
        
        # Get the uploaded file info
        uploaded_file = uploader.value
        if not uploaded_file:
            return
            
        # Get the first file from the upload
        file_info = uploaded_file[-1]
        file_name = file_info['name']
        image_bytes = file_info['content']
        
        # Display the uploaded image
        img_pil = Image.open(io.BytesIO(image_bytes))
        display(img_pil)
        
        # Run prediction
        print("Analyzing...")
        predicted_class, distances, img_array = predict_image(image_bytes)
        
        if predicted_class == "Error:":
            print(distances) # Print the error message
            return
            
        # --- 1. Image Analytics ---
        print("\n--- Image Analytics ---")
        print(f"File Name: {file_name}")
        print(f"Dimensions: {img_pil.width} x {img_pil.height} pixels")
        print(f"File Size: {len(image_bytes) / 1024:.1f} KB")

        # Create a color histogram
        plt.figure(figsize=(6, 2))
        colors = ('r', 'g', 'b')
        for i, col in enumerate(colors):
            hist = cv2.calcHist([img_array], [i], None, [256], [0, 256])
            plt.plot(hist, color=col)
        plt.title("Image Color Histogram")
        plt.xlim([0, 256])
        plt.yticks([])
        plt.show()

        # --- 2. Prediction Analytics ---
        print("\n--- Prediction Analytics ---")
        
        # Calculate a simple "confidence" score.
        # Lower distance = higher confidence.
        # We invert and normalize to get percentages.
        confidence_scores = 1.0 / (distances + 1e-6) # Add epsilon to avoid division by zero
        confidence_percent = (confidence_scores / np.sum(confidence_scores)) * 100
        
        print("Model Confidence (based on distance to cluster centers):")
        for i, label in classes.items():
            is_prediction = " (Prediction)" if label == predicted_class else ""
            print(f"  - Cluster {label}: {confidence_percent[i]:.1f}% (Distance: {distances[i]:.2f}){is_prediction}")

        print(f"\n--- Final Prediction: {predicted_class} ---")

# Link the uploader to the function
uploader.observe(on_upload, names='value')

# Display the UI
print("Poverty Prediction Interface:")
display(uploader, output_area)

Poverty Prediction Interface:


FileUpload(value=(), accept='image/*', description='Upload Image')

Output()