If you want to train/test the model, run the following modules:
- Dependencies (ALL)
- CLIP FINETUNE: Prepare Data, Neural Network (ignore import data and generate embeddings)
  

In [None]:
import pandas as pd
from tqdm import tqdm
from PIL import Image
from pathlib import Path

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPProcessor, CLIPModel


device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# CLIP FINETUNE

In [None]:
# Initialize empty lists to store file names and labels
file_names = []
labels = []

for file in sorted((Path("datasets/cifake").glob("*/*/*.*"))):
    label = str(file).split("/")[-2]
    labels.append(label)
    file_names.append(str(file))


# Create a pandas dataframe from the collected file names and labels
df = pd.DataFrame.from_dict({"image": file_names, "label": labels})
df_shuffled = df.sample(frac=1, random_state=42).reset_index(drop=True)

df_train = df_shuffled[: int(0.8 * len(df_shuffled))]
df_test = df_shuffled[int(0.8 * len(df_shuffled)) :]

In [None]:
df_train.to_csv("df_train_clip.csv", index=False)
df_test.to_csv("df_test_clip.csv", index=False)

## Generating CLIP Embeddings

In [None]:
# Initialize CLIP model and processor
model_name = "openai/clip-vit-base-patch32"
device = "cuda" if torch.cuda.is_available() else "cpu"

model = CLIPModel.from_pretrained(model_name).to(device)
processor = CLIPProcessor.from_pretrained(model_name)


def generate_embeddings(image):
    inputs = processor(images=image, return_tensors="pt").to(device)

    # Get image embeddings
    with torch.no_grad():
        image_embeds = model.get_image_features(**inputs)

    # Normalize embeddings
    image_embeds = image_embeds / image_embeds.norm(p=2, dim=-1, keepdim=True)

    return image_embeds

In [None]:
embeddings_train = []

for i in range(len(df_train)):
    if i % 1000 == 0:
        print(f"On epoch {i}, we have reached {i*100/len(df_train)}% of the way")
    image = Image.open(df_shuffled.loc[i]["image"])
    embed = generate_embeddings(image)
    embeddings_train.append(embed)


embeddings_test = []

for i in range(len(df_test)):
    if i % 1000 == 0:
        print(f"On epoch {i}, we have reached {i*100/len(df_test)}% of the way")
    image = Image.open(df_shuffled.loc[i]["image"])
    embed = generate_embeddings(image)
    embeddings_test.append(embed)

print("embeddings generated successfully")

embeddings generated successfully


## Prepare Data

In [None]:
train_df = pd.read_csv("df_train_clip.csv")
test_df = pd.read_csv("df_test_clip.csv")
traininglabels_worded = train_df["label"].tolist()
testlabels_worded = test_df["label"].tolist()

traininglabels = []
testlabels = []

for i in range(len(traininglabels_worded)):
    if traininglabels_worded[i] == "REAL":
        traininglabels.append(1)
    else:
        traininglabels.append(0)

for i in range(len(testlabels_worded)):
    if testlabels_worded[i] == "REAL":
        testlabels.append(1)
    else:
        testlabels.append(0)


train_embeddings = torch.load("embeddings_train.pt")
test_embeddings = torch.load("embeddings_test.pt")

In [None]:
train_embeddings = torch.stack(train_embeddings).to(device)
test_embeddings = torch.stack(test_embeddings).to(device)

traininglabels = torch.tensor(traininglabels).to(device)
testlabels = torch.tensor(testlabels).to(device)


class TrainDS(Dataset):
    def __init__(self):
        self.x = train_embeddings.to(device)
        self.y = traininglabels.to(device)
        self.n_samples = len(train_embeddings)

    def __getitem__(self, index):
        return self.x[index], self.y[index]

    def __len__(self):
        return self.n_samples


class TestDS(Dataset):
    def __init__(self):
        self.x = test_embeddings.to(device)
        self.y = testlabels.to(device)
        self.n_samples = len(test_embeddings)

    def __getitem__(self, index):
        return self.x[index], self.y[index]

    def __len__(self):
        return self.n_samples


train_ds = TrainDS()
test_ds = TestDS()

first_data = test_ds[0]
features, labels = first_data

train_dataloader = DataLoader(
    dataset=train_ds,
    batch_size=64,
    shuffle=True,
)

test_dataloader = DataLoader(
    dataset=test_ds,
    batch_size=64,
    shuffle=False,
)

## Neural Network

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(512, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.fc2 = nn.Linear(128, 32)
        self.fc3 = nn.Linear(32, 1)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        x = torch.sigmoid(self.fc3(x))
        return x

In [None]:
model = NeuralNetwork()

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Training loop
num_epochs = 5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

NeuralNetwork(
  (fc1): Linear(in_features=512, out_features=124, bias=True)
  (bn1): BatchNorm1d(124, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc2): Linear(in_features=124, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=1, bias=True)
  (dropout): Dropout(p=0.2, inplace=False)
)

In [None]:
def rounder(num):
    distfrom_one = abs(num - 1)
    distfrom_zero = abs(num)
    if distfrom_one < distfrom_zero:
        return 1
    else:
        return 0


def train_accuracy(model):
    accuracy = 0
    model.eval()
    with torch.inference_mode():
        output = model(train_embeddings)
        for i in range(24000):
            pred = output[i].item()
            pred = rounder(pred)
            if pred == traininglabels[i]:
                accuracy += 1

    return accuracy * 100 / 24000


def test_accuracy(model):
    accuracy = 0
    model.eval()
    with torch.inference_mode():
        output = model(test_embeddings)
        for i in range(24000):
            pred = output[i].item()
            pred = rounder(pred)
            if pred == testlabels[i]:
                accuracy += 1

    return accuracy * 100 / 24000

In [None]:
for epoch in tqdm(range(num_epochs)):
    print(f"Epoch: {epoch}\n-----")
    running_loss = 0.0

    for batch, (inputs, labels) in enumerate(train_dataloader):
        model.train()
        inputs, labels = inputs.to(device), labels.view(-1, 1).float().to(device)

        optimizer.zero_grad()

        outputs = model(inputs).reshape(64, 1)
        loss = criterion(outputs, labels)
        running_loss += loss.item()

        loss.backward()
        optimizer.step()

        if batch % 400 == 0:
            print(
                f"Looked at {batch*len(inputs)}/{len(train_dataloader.dataset)} samples."
            )
            print(f"Accuracy = {train_accuracy(model):.2f}")

    running_loss /= len(train_dataloader)

    # print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

#### Accuracy

In [None]:
model.eval()
val_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_dataloader:
        inputs, labels = (
            inputs.view(inputs.size(0), -1).to(device),
            labels.float().to(device),
        )
        outputs = model(inputs)

        loss = criterion(outputs.squeeze(), labels)
        val_loss += loss.item()

        predicted = torch.sigmoid(outputs).squeeze() > 0.5
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

print(f"Validation Loss: {val_loss / len(test_dataloader):.4f}")
print(f"Validation Accuracy: {correct / total * 100:.2f}%")

#### Save and Load Model

In [None]:
model_path = Path("Model")
model_path.mkdir(parents=True, exist_ok=True)

model_name = "CLIPNeural_One.pth"

model_save_path = model_path / model_name

print(f"Saving model to: {model_save_path}")
torch.save(obj=model.state_dict(), f=model_save_path)

Saving model to: Model/CLIPNeural_One.pth
