In [1]:
!pip install pyspark



In [2]:
%pip install torch torchvision torchaudio

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, BinaryType, IntegerType
import numpy as np
from sklearn.metrics import classification_report

# Step 1: Initialize Spark
spark = SparkSession.builder \
    .appName("PySpark_PyTorch_CIFAR10") \
    .getOrCreate()

print("Spark Initialized")

Spark Initialized


In [4]:
# Step 2: Load CIFAR-10 using PyTorch
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [5]:

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)

100%|██████████| 170M/170M [00:23<00:00, 7.25MB/s]


In [6]:
# Convert to numpy arrays for PySpark
x_train = np.array([img.numpy() for img, _ in trainset], dtype=np.float32)
y_train = np.array([label for _, label in trainset], dtype=np.int32)
x_test = np.array([img.numpy() for img, _ in testset], dtype=np.float32)
y_test = np.array([label for _, label in testset], dtype=np.int32)

In [7]:
# Step 3: Convert to PySpark DataFrames
def to_bytes(img):
    return img.tobytes()

train_data = [(to_bytes(x_train[i]), int(y_train[i])) for i in range(10000)]  # Sample 1000
test_data = [(to_bytes(x_test[i]), int(y_test[i])) for i in range(2000)]      # Sample 200

#train_data = [(to_bytes(x_train[i]), int(y_train[i])) for i in range(len(x_train))]
#test_data = [(to_bytes(x_test[i]), int(y_test[i])) for i in range(len(x_test))]

schema = StructType([
    StructField("image", BinaryType(), True),
    StructField("label", IntegerType(), True)
])

train_df = spark.createDataFrame(train_data, schema)
test_df = spark.createDataFrame(test_data, schema)

In [8]:
from pyspark.sql.types import StructType, StructField, BinaryType, IntegerType, ArrayType, FloatType
from pyspark.sql.functions import udf

# Step 4: Preprocess with Spark DataFrame using UDF
def decode_image(image_bytes):
    img = np.frombuffer(image_bytes, dtype=np.float32).reshape((3, 32, 32))
    return img.flatten().tolist()

decode_udf = udf(decode_image, ArrayType(FloatType()))

train_df_processed = train_df.withColumn("image_array", decode_udf("image"))
test_df_processed = test_df.withColumn("image_array", decode_udf("image"))


In [9]:

import time

# Step 5: Convert to PyTorch tensors
print("Collecting and converting training data...")
start = time.time()
train_pd = train_df_processed.select("image_array", "label").toPandas()
x_train_tensor = torch.tensor(train_pd["image_array"].tolist()).reshape(-1, 3, 32, 32)
y_train_tensor = torch.tensor(train_pd["label"].tolist())
print(f"Done in {time.time() - start:.2f} seconds")


Collecting and converting training data...
Done in 49.55 seconds


In [10]:
print("Collecting and converting test data...")
start = time.time()
test_pd = test_df_processed.select("image_array", "label").toPandas()
x_test_tensor = torch.tensor(test_pd["image_array"].tolist()).reshape(-1, 3, 32, 32)
y_test_tensor = torch.tensor(test_pd["label"].tolist())
print(f"Done in {time.time() - start:.2f} seconds")

Collecting and converting test data...
Done in 8.49 seconds


In [11]:
# Step 6: Define CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [12]:
# Step 7: Train the model with accuracy tracking
epochs = 5
batch_size = 64

print("Training started...")
for epoch in range(epochs):
    permutation = torch.randperm(x_train_tensor.size()[0])
    epoch_loss = 0.0
    correct = 0
    total = 0

    for i in range(0, x_train_tensor.size()[0], batch_size):
        indices = permutation[i:i+batch_size]
        batch_x, batch_y = x_train_tensor[indices], y_train_tensor[indices]

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        # Accuracy calculation
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

    epoch_accuracy = 100 * correct / total
    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Training Accuracy: {epoch_accuracy:.2f}%")

Training started...
Epoch 1/5, Loss: 273.5278, Training Accuracy: 36.83%
Epoch 2/5, Loss: 214.2217, Training Accuracy: 51.26%
Epoch 3/5, Loss: 187.0943, Training Accuracy: 57.47%
Epoch 4/5, Loss: 166.1940, Training Accuracy: 61.93%
Epoch 5/5, Loss: 146.5313, Training Accuracy: 67.15%


In [13]:
import pandas as pd
# Manual confusion matrix
with torch.no_grad():
    train_outputs = model(x_train_tensor)
    _, train_predicted = torch.max(train_outputs, 1)
    num_classes = 10
    class_names = ["airplane", "automobile", "bird", "cat", "deer",
                   "dog", "frog", "horse", "ship", "truck"]

    confusion = torch.zeros(num_classes, num_classes, dtype=torch.int32)
    for t, p in zip(y_train_tensor, train_predicted):
        confusion[t.long(), p.long()] += 1

    # Convert to pandas DataFrame for labeled display
    confusion_df = pd.DataFrame(confusion.numpy(), index=class_names, columns=class_names)
    print("Confusion Matrix (manually computed with labels):")
    print(confusion_df)

Confusion Matrix (manually computed with labels):
            airplane  automobile  bird  cat  deer  dog  frog  horse  ship  \
airplane         779          29    43    8    18    2     3      6    97   
automobile        14         876     9    1     5    2     5      3    26   
bird              79          22   669   36    99   38    33     22    24   
cat               21          12    96  556   105   90    61     40    19   
deer              35          10    89   28   741   10    22     48    13   
dog               11          10    74  119    65  549    21     72     6   
frog               9          17    65   33    87   18   764     11     8   
horse             18           6    31   27    63   17     2    823     5   
ship              72          42    16   10     9    2     3      4   853   
truck             49         178    11   10     5    3     1     17    26   

            truck  
airplane       20  
automobile     33  
bird           10  
cat            16  
de

In [14]:
print("Evaluating model...")
with torch.no_grad():
    outputs = model(x_test_tensor)
    _, predicted = torch.max(outputs, 1)
    accuracy = (predicted == y_test_tensor).sum().item() / len(y_test_tensor)
    print(f"Test Accuracy: {accuracy:.4f}")
    print(classification_report(y_test_tensor.numpy(), predicted.numpy(), target_names=[
        "airplane", "automobile", "bird", "cat", "deer",
        "dog", "frog", "horse", "ship", "truck"
    ]))


Evaluating model...
Test Accuracy: 0.6050
              precision    recall  f1-score   support

    airplane       0.61      0.66      0.63       196
  automobile       0.65      0.82      0.73       198
        bird       0.45      0.51      0.48       195
         cat       0.51      0.42      0.46       199
        deer       0.48      0.54      0.51       198
         dog       0.58      0.39      0.46       185
        frog       0.76      0.67      0.71       216
       horse       0.59      0.66      0.62       193
        ship       0.70      0.78      0.74       217
       truck       0.74      0.57      0.64       203

    accuracy                           0.60      2000
   macro avg       0.61      0.60      0.60      2000
weighted avg       0.61      0.60      0.60      2000

