In [36]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import CIFAR10
from PIL import Image
import numpy as np
from scipy import stats
from scipy.stats import wasserstein_distance
from skimage.io import imread
from skimage.color import rgb2gray
from skimage.transform import resize

In [6]:
def get_image_metadata(image):
    # get width and height of image
    with Image.open(image) as img:
        width, height = img.size

    # calculate image ratio and area
    ratio = width / height
    area = width * height

    # print the metadata
    print(f"image width: {width}")
    print(f"image height: {height}")
    print(f"image ratio: {ratio}")
    print(f"image area: {area}")

In [7]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size = 3, stride = 2, padding = 1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size = 3, stride = 2, padding = 1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size = 3, stride = 2, padding = 1, output_padding = 1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 3, kernel_size = 3, stride = 2, padding = 1, output_padding = 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.transform = transforms.Compose([
            transforms.ToTensor(),
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        img = self.transform(img)
        return img

In [3]:
get_image_metadata('../../../dataset/coco8-seg/train/images/000000000009_jpg.rf.75d0eb6e9dbc11b9bd17070cb18445a1.jpg')

image width: 640
image height: 480
image ratio: 1.3333333333333333
image area: 307200


In [8]:
train_data = CIFAR10(root='./data', train=True, download=True, transform=transforms.ToTensor())
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data\cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:16<00:00, 10219232.71it/s]


Extracting ./data\cifar-10-python.tar.gz to ./data


In [25]:
train_data[0][0].shape

torch.Size([3, 32, 32])

In [9]:
autoencoder = Autoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

In [10]:
num_epochs = 10
for epoch in range(num_epochs):
    for data in train_loader:
        img, _ = data
        recon = autoencoder(img)
        loss = criterion(recon, img)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

Epoch [1/10], Loss: 0.0047
Epoch [2/10], Loss: 0.0024
Epoch [3/10], Loss: 0.0025
Epoch [4/10], Loss: 0.0019
Epoch [5/10], Loss: 0.0022
Epoch [6/10], Loss: 0.0011
Epoch [7/10], Loss: 0.0016
Epoch [8/10], Loss: 0.0012
Epoch [9/10], Loss: 0.0008
Epoch [10/10], Loss: 0.0011


In [11]:
def calculate_outlier_score(data, model):
    reconstructions = model(data)
    loss = criterion(reconstructions, data)
    return loss.item()

In [12]:
test_data = CIFAR10(root='./data', train=False, download=True, transform=transforms.ToTensor())
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)

Files already downloaded and verified


In [13]:
outlier_scores = []
for data in test_loader:
    img, _ = data
    score = calculate_outlier_score(img, autoencoder)
    outlier_scores.append(score)

In [14]:
threshold = 0.05
outliers = [i for i, score in enumerate(outlier_scores) if score > threshold]
print("Outlier Indices:", outliers)

Outlier Indices: []


In [15]:
outlier_scores

[0.0011305701918900013,
 0.0010664417641237378,
 0.0007881349883973598,
 0.0009546690271236002,
 0.0013027302920818329,
 0.0006785181467421353,
 0.0013623018749058247,
 0.0008013865444809198,
 0.000616375997196883,
 0.0011051533510908484,
 0.0006333675119094551,
 0.0015468494966626167,
 0.0007231976487673819,
 0.0011369535932317376,
 0.001628823927603662,
 0.001737710670568049,
 0.0013340060831978917,
 0.0009555052383802831,
 0.001313071814365685,
 0.0014442725805565715,
 0.001073312247171998,
 0.0025593715254217386,
 0.00038199484697543085,
 0.002680395729839802,
 0.0010028067044913769,
 0.0016763206804171205,
 0.0006543186609633267,
 0.0009150184341706336,
 0.0013620927929878235,
 0.0016597756184637547,
 0.0007032460416667163,
 0.0003354876535013318,
 0.0011430646991357207,
 0.0008762418292462826,
 0.0004538890498224646,
 0.0009379739058203995,
 0.0007602134137414396,
 0.002080292673781514,
 0.0018940010340884328,
 0.0006238374044187367,
 0.0009673567838035524,
 0.0010550866136327386

In [35]:
def ks_drift(image_files1, image_files2):
    # load and flatten the image
    data1 = np.array([np.array(Image.open(f)).flatten() for f in image_files1])
    data2 = np.array([np.array(Image.open(f)).flatten() for f in image_files2])

    # perform the ks test
    ks_stat, p_value = stats.ks_2samp(data1.ravel(), data2.ravel())
    
    # print the result
    print(f"KS statistic: {ks_stat}")
    print(f"P-value: {p_value}")

path = "D:/Development/Deep Learning/Source Code/MLVision/Models/Object Detection/torchvision-v1/dataset/ram_module/image/"

image_files1 = [
    f'{path}JAAFS9Z009_1.jpeg',
    f'{path}JAAFS9Z009_2.jpeg',
    f'{path}JAAFS9Z009_3.jpeg',
    f'{path}JAAFS9Z009_4.jpeg',
    f'{path}JAAFS9Z009_FLIP_1.jpeg',
    f'{path}JAAFS9Z009_FLIP_2.jpeg',
    f'{path}JAAFS9Z009_FLIP_3.jpeg',
    f'{path}JAAFS9Z009_FLIP_4.jpeg',
    f'{path}JAAHPZ2001_1.jpeg',
]

image_files2 = [
    f'{path}JAAHPZ2001_1.jpeg',
]

ks_drift(image_files1, image_files2)

KS statistic: 0.10071401381371414
P-value: 0.0


In [None]:
# Load two sets of images
image_set1 = [imread(f'{path}JAAFS9Z009_1.jpeg')]
image_set2 = [imread(f'{path}JAAFS9Z009_1.jpeg')]

# Preprocess images: convert to grayscale and resize
image_set1 = [resize(rgb2gray(img), (64, 64)).flatten() for img in image_set1]
image_set2 = [resize(rgb2gray(img), (64, 64)).flatten() for img in image_set2]

# Compute Wasserstein distance for each pair of images
distances = [wasserstein_distance(img1, img2) for img1, img2 in zip(image_set1, image_set2)]

# Average Wasserstein distance
average_distance = np.mean(distances)

print(f"Average Wasserstein distance: {average_distance}")