### 5-1

### Image classifier

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import torch.nn.functional as F
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [2]:
# load and preprocess the MNIST for anamoly detection
from sklearn.model_selection import train_test_split

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
mnist_data = datasets.MNIST(root='./data', train=True, download=True, transform=transform)

# filter the dataset for digits 1,3,5,7
target_digits = [1, 3, 5, 7] 
target_indices = [i for i, (img, label) in enumerate(mnist_data) if label in target_digits]
nontarget_indices = [i for i, (img, label) in enumerate(mnist_data) if label not in target_digits]

train_target_indices, val_target_indices = train_test_split(target_indices, test_size=0.2, random_state=42)
val_indices = nontarget_indices + val_target_indices

train_dataset = Subset(mnist_data, train_target_indices)
val_dataset = Subset(mnist_data, val_indices)

label_mapping = {1: 0, 3: 1, 5: 2, 7: 3}

def map_labels(batch):
    inputs, labels = zip(*batch)
    labels = torch.tensor([label_mapping[label] for label in labels])
    inputs = torch.stack(inputs)
    return inputs, labels

# dataloader for the filtered dataset
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=map_labels)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)



In [3]:
# train an image classifier that can recognize 1,3,5,7 then use it to do abnormal detection on the handwrittrn images other than 1,3,5,7

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 4)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

net = Net()
net = net.to(device)
criterion = nn.CrossEntropyLoss() 
optimizer = optim.Adam(net.parameters(), lr=0.001)


In [4]:
import torch
from tqdm import tqdm
num_epochs = 10
for epoch in range(num_epochs):
    total = 0
    correct = 0
    train_loss = 0

    for inputs, labels in tqdm(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
        train_loss += loss.item()

    train_acc = correct / total

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss / len(train_loader):.4f}, Train Accuracy: {train_acc:.4f}')


torch.save(net.state_dict(), './iamge_classifier.pth')



100%|██████████| 307/307 [00:02<00:00, 120.65it/s]


Epoch [1/10], Train Loss: 0.1976, Train Accuracy: 0.9333


100%|██████████| 307/307 [00:02<00:00, 152.17it/s]


Epoch [2/10], Train Loss: 0.0416, Train Accuracy: 0.9871


100%|██████████| 307/307 [00:02<00:00, 153.48it/s]


Epoch [3/10], Train Loss: 0.0259, Train Accuracy: 0.9910


100%|██████████| 307/307 [00:01<00:00, 155.10it/s]


Epoch [4/10], Train Loss: 0.0221, Train Accuracy: 0.9925


100%|██████████| 307/307 [00:01<00:00, 155.03it/s]


Epoch [5/10], Train Loss: 0.0156, Train Accuracy: 0.9954


100%|██████████| 307/307 [00:02<00:00, 151.83it/s]


Epoch [6/10], Train Loss: 0.0129, Train Accuracy: 0.9958


100%|██████████| 307/307 [00:01<00:00, 154.10it/s]


Epoch [7/10], Train Loss: 0.0120, Train Accuracy: 0.9960


100%|██████████| 307/307 [00:02<00:00, 152.95it/s]


Epoch [8/10], Train Loss: 0.0097, Train Accuracy: 0.9971


100%|██████████| 307/307 [00:02<00:00, 153.08it/s]


Epoch [9/10], Train Loss: 0.0092, Train Accuracy: 0.9970


100%|██████████| 307/307 [00:02<00:00, 152.54it/s]

Epoch [10/10], Train Loss: 0.0075, Train Accuracy: 0.9977





In [5]:
import numpy as np
from sklearn.metrics import precision_recall_curve

def find_optimal_threshold(model, loader):
    model.eval()
    softmax = nn.Softmax(dim=1)
    all_scores = []
    true_labels = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(loader):
            inputs = inputs.to(device)
            outputs = model(inputs)
            probabilities = softmax(outputs)
            max_probs, _ = torch.max(probabilities, dim=1)

            all_scores.extend(max_probs.cpu().numpy())
            
            true_labels.extend([(label not in [1, 3, 5, 7]) for label in labels.cpu().numpy()])

    precisions, recalls, thresholds = precision_recall_curve(true_labels, all_scores, pos_label=True)

    # use f1-score to find the best threshold
    f1_scores = 2 * (precisions * recalls) / (precisions + recalls)
    optimal_idx = np.argmax(f1_scores[np.isfinite(f1_scores)])  # ignore NaN
    optimal_threshold = thresholds[optimal_idx]
    return optimal_threshold, f1_scores[optimal_idx]

net.load_state_dict(torch.load('./iamge_classifier.pth'))
net = net.to(device)

optimal_threshold, max_f1_score = find_optimal_threshold(net, val_loader)
print(f"Optimal Threshold: {optimal_threshold:.4f}, Max F1 Score: {max_f1_score:.4f}")


100%|██████████| 631/631 [00:03<00:00, 173.19it/s]

Optimal Threshold: 0.3161, Max F1 Score: 0.9352





### Normal autoencoder|

In [6]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 12),
            nn.ReLU(),
            nn.Linear(12, 3) 
        )
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(),
            nn.Linear(12, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28*28),
            nn.Sigmoid()  # use Sigmoid to output between [0, 1]
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


In [7]:
autoencoder = Autoencoder().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)


def train_autoencoder(model, train_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for data in tqdm(train_loader):
            inputs, _ = data
            inputs = inputs.view(inputs.size(0), -1).to(device)  
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}')

train_autoencoder(autoencoder, train_loader)

torch.save(autoencoder.state_dict(), './autoencoder.pth')


100%|██████████| 307/307 [00:02<00:00, 146.25it/s]


Epoch 1, Loss: 0.7426


100%|██████████| 307/307 [00:02<00:00, 146.68it/s]


Epoch 2, Loss: 0.6635


100%|██████████| 307/307 [00:02<00:00, 146.99it/s]


Epoch 3, Loss: 0.6365


100%|██████████| 307/307 [00:02<00:00, 147.68it/s]


Epoch 4, Loss: 0.6312


100%|██████████| 307/307 [00:02<00:00, 147.45it/s]


Epoch 5, Loss: 0.6181


100%|██████████| 307/307 [00:02<00:00, 147.27it/s]


Epoch 6, Loss: 0.5995


100%|██████████| 307/307 [00:02<00:00, 146.99it/s]


Epoch 7, Loss: 0.5935


100%|██████████| 307/307 [00:02<00:00, 148.03it/s]


Epoch 8, Loss: 0.5899


100%|██████████| 307/307 [00:02<00:00, 148.36it/s]


Epoch 9, Loss: 0.5862


100%|██████████| 307/307 [00:02<00:00, 148.67it/s]

Epoch 10, Loss: 0.5833





In [8]:
def evaluate_autoencoder(model, val_loader):
    model.eval()
    reconstruction_errors = []
    true_labels = []
    with torch.no_grad():
        for data in tqdm(val_loader):
            inputs, labels = data
            inputs = inputs.view(inputs.size(0), -1).to(device)
            outputs = model(inputs)
            loss = torch.mean((outputs - inputs) ** 2, dim=1)  ## MSE 
            reconstruction_errors.extend(loss.cpu().numpy())
            true_labels.extend([(label not in [1, 3, 5, 7]) for label in labels.cpu().numpy()])

    # 计算阈值
    errors = np.array(reconstruction_errors)
    # threshold = np.percentile(errors, 95)  # use 95 percentile as threshold
    # return threshold
    precisions, recalls, thresholds = precision_recall_curve(true_labels, errors, pos_label=True)

    f1_scores = 2 * (precisions * recalls) / (precisions + recalls)
    optimal_idx = np.argmax(f1_scores)
    optimal_threshold = thresholds[optimal_idx]
    return optimal_threshold, f1_scores[optimal_idx]


autoencoder.load_state_dict(torch.load('./autoencoder.pth'))
optimal_threshold, max_f1_score = evaluate_autoencoder(autoencoder, val_loader)
print(f"Optimal Threshold: {optimal_threshold:.4f}, Max F1 Score: {max_f1_score:.4f}")


100%|██████████| 631/631 [00:03<00:00, 171.07it/s]

Optimal Threshold: 0.4451, Max F1 Score: 0.9477





### Denoising autoencoder

In [9]:
class DenoisingAutoencoder(nn.Module):
    def __init__(self):
        super(DenoisingAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 12),
            nn.ReLU(),
            nn.Linear(12, 3)
        )
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(),
            nn.Linear(12, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28*28),
            nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded


In [10]:
def add_noise(inputs, noise_factor=0.5):
    noise = torch.randn_like(inputs) * noise_factor
    return inputs + noise

def train_denoising_autoencoder(model, train_loader, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for data in tqdm(train_loader):
            inputs, _ = data
            inputs = inputs.view(inputs.size(0), -1).to(device)  # flatten image
            noisy_inputs = add_noise(inputs)  
            noisy_inputs = noisy_inputs.to(device)
            optimizer.zero_grad()
            outputs = model(noisy_inputs)
            loss = criterion(outputs, inputs) 
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}')

denoising_autoencoder = DenoisingAutoencoder().to(device)
optimizer = torch.optim.Adam(denoising_autoencoder.parameters(), lr=0.001)
criterion = nn.MSELoss()

train_denoising_autoencoder(denoising_autoencoder, train_loader)
torch.save(denoising_autoencoder.state_dict(), './denoising_autoencoder.pth')

100%|██████████| 307/307 [00:02<00:00, 147.24it/s]


Epoch 1, Loss: 0.7360


100%|██████████| 307/307 [00:02<00:00, 149.62it/s]


Epoch 2, Loss: 0.6603


100%|██████████| 307/307 [00:02<00:00, 149.57it/s]


Epoch 3, Loss: 0.6269


100%|██████████| 307/307 [00:02<00:00, 147.85it/s]


Epoch 4, Loss: 0.6142


100%|██████████| 307/307 [00:02<00:00, 146.67it/s]


Epoch 5, Loss: 0.6065


100%|██████████| 307/307 [00:02<00:00, 148.30it/s]


Epoch 6, Loss: 0.6012


100%|██████████| 307/307 [00:02<00:00, 149.41it/s]


Epoch 7, Loss: 0.5969


100%|██████████| 307/307 [00:02<00:00, 149.26it/s]


Epoch 8, Loss: 0.5932


100%|██████████| 307/307 [00:02<00:00, 148.36it/s]


Epoch 9, Loss: 0.5900


100%|██████████| 307/307 [00:02<00:00, 146.96it/s]

Epoch 10, Loss: 0.5868





In [11]:
denoising_autoencoder.load_state_dict(torch.load('./denoising_autoencoder.pth'))
optimal_threshold, max_f1_score = evaluate_autoencoder(denoising_autoencoder, val_loader)
print(f"Optimal Threshold: {optimal_threshold:.4f}, Max F1 Score: {max_f1_score:.4f}")

100%|██████████| 631/631 [00:03<00:00, 170.15it/s]

Optimal Threshold: 0.4493, Max F1 Score: 0.9475





In [12]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.fc1 = nn.Linear(28*28, 400)
        self.fc21 = nn.Linear(400, 20)  # 均值
        self.fc22 = nn.Linear(400, 20)  # 对数方差
        self.fc3 = nn.Linear(20, 400)
        self.fc4 = nn.Linear(400, 28*28)

    def encode(self, x):
        h1 = F.relu(self.fc1(x))
        return self.fc21(h1), self.fc22(h1)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar) + 1e-6
        eps = torch.randn_like(std)
        return mu + eps*std

    def decode(self, z):
        h3 = F.relu(self.fc3(z))
        return self.fc4(h3)

    def forward(self, x):
        mu, logvar = self.encode(x.view(-1, 28*28))
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

def loss_function(recon_x, x, mu, logvar):
    bce_loss = nn.BCEWithLogitsLoss(reduction='sum')
    BCE = bce_loss(recon_x, x.view(-1, 28*28))
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    #print("BCE:",BCE)
    #print("KLD:",KLD)
    return BCE + KLD

# Model, Optimizer and Loss
model = VAE().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training VAE
def train_vae(model, train_loader, epochs=10):
    model.train()
    for epoch in range(epochs):
        train_loss = 0
        for data, _ in tqdm(train_loader):
            data = data.to(device)
            optimizer.zero_grad()
            recon_batch, mu, logvar = model(data)
            loss = loss_function(recon_batch, data, mu, logvar)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()
        print(f'Epoch {epoch + 1}, Average Loss: {train_loss / len(train_loader):.4f}')

train_vae(model, train_loader)

# Save the model
torch.save(model.state_dict(), 'vae.pth')


100%|██████████| 307/307 [00:02<00:00, 148.12it/s]


Epoch 1, Average Loss: 904820921035.1698


100%|██████████| 307/307 [00:02<00:00, 147.68it/s]


Epoch 2, Average Loss: 464121322654325888.0000


100%|██████████| 307/307 [00:02<00:00, 148.06it/s]


Epoch 3, Average Loss: 11252917674078142.0000


100%|██████████| 307/307 [00:02<00:00, 148.96it/s]


Epoch 4, Average Loss: 394765950771707.0000


100%|██████████| 307/307 [00:02<00:00, 148.20it/s]


Epoch 5, Average Loss: -10422526712174.9062


100%|██████████| 307/307 [00:02<00:00, 149.04it/s]


Epoch 6, Average Loss: -18185076265080.0781


100%|██████████| 307/307 [00:02<00:00, 148.99it/s]


Epoch 7, Average Loss: -30528059717278.4375


100%|██████████| 307/307 [00:02<00:00, 148.85it/s]


Epoch 8, Average Loss: -48647261350374.9844


100%|██████████| 307/307 [00:02<00:00, 147.23it/s]


Epoch 9, Average Loss: -73014662846243.8594


100%|██████████| 307/307 [00:02<00:00, 148.97it/s]

Epoch 10, Average Loss: -104118362581852.5625





In [13]:
import numpy as np
from sklearn.metrics import precision_recall_curve

def find_optimal_threshold_vae(model, val_loader):
    model.eval()
    reconstruction_errors = []
    true_labels = []
    with torch.no_grad():
        for data, labels in tqdm(val_loader):
            data = data.to(device)
            recon_batch, mu, logvar = model(data)
            recon_error = F.mse_loss(recon_batch, data.view(-1, 28*28), reduction='none').sum(1)
            reconstruction_errors.extend(recon_error.cpu().numpy())

            true_labels.extend([(label not in [1, 3, 5, 7]) for label in labels.cpu().numpy()])


    precisions, recalls, thresholds = precision_recall_curve(true_labels, reconstruction_errors, pos_label=True)

    f1_scores = 2 * (precisions * recalls) / (precisions + recalls)
    optimal_idx = np.nanargmax(f1_scores)  # 忽略NaN值，找出最大F1分数
    optimal_threshold = thresholds[optimal_idx]
    return optimal_threshold, f1_scores[optimal_idx]

model.load_state_dict(torch.load('vae.pth'))
model = model.to(device)


optimal_threshold, max_f1_score = find_optimal_threshold_vae(model, val_loader)
print(f"Optimal Threshold: {optimal_threshold:.4f}, Max F1 Score: {max_f1_score:.4f}")


100%|██████████| 631/631 [00:03<00:00, 171.84it/s]

Optimal Threshold: 5929.6426, Max F1 Score: 0.9352





In [18]:
import numpy as np

def convert_data_from_dataloader(dataloader):
    features = []
    labels = []
    for images, targets in dataloader:
        # 展平图像数据
        images = images.view(images.size(0), -1).numpy()
        features.append(images)
        labels.append(targets.numpy())
    # 将列表转换为NumPy数组
    features = np.concatenate(features, axis=0)
    labels = np.concatenate(labels, axis=0)
    return features, labels

# 从DataLoader转换数据
X_train, y_train = convert_data_from_dataloader(train_loader)
X_val, y_val = convert_data_from_dataloader(val_loader)

# 将验证集的标签转换为异常检测格式（1为异常，0为正常）
target_digits = [1, 3, 5, 7]
y_val_binary = np.isin(y_val, target_digits, invert=True).astype(int)


In [22]:
from sklearn.ensemble import IsolationForest
from sklearn.metrics import classification_report
from sklearn.metrics import precision_recall_curve, f1_score

# 孤立森林模型初始化
iso_forest = IsolationForest(n_estimators=100, contamination='auto', random_state=42)

# 训练模型
iso_forest.fit(X_train)

# 获取决策函数分数来判断异常
scores = iso_forest.decision_function(X_val)



# 计算精确度、召回率和阈值
precisions, recalls, thresholds = precision_recall_curve(y_val_binary, scores)

# 计算 F1 分数
f1_scores = 2 * (precisions * recalls) / (precisions + recalls)
optimal_idx = np.nanargmax(f1_scores)  # 找到 F1 分数最大的索引
optimal_threshold = thresholds[optimal_idx]  # 对应的最佳阈值

y_pred = (scores < optimal_threshold).astype(int)

print("Optimal threshold based on F1 score:", optimal_threshold)
print("F1 Score at optimal threshold:", f1_scores[optimal_idx])
print(classification_report(y_val_binary, y_pred))



Optimal threshold based on F1 score: -0.18516509922054947
F1 Score at optimal threshold: 0.9351927593213183
              precision    recall  f1-score   support

           0       0.12      1.00      0.22      4912
           1       0.00      0.00      0.00     35441

    accuracy                           0.12     40353
   macro avg       0.06      0.50      0.11     40353
weighted avg       0.01      0.12      0.03     40353



  f1_scores = 2 * (precisions * recalls) / (precisions + recalls)
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [23]:
import torch
from torchvision import models, transforms
from torch.utils.data import DataLoader, TensorDataset

# 加载预训练的ResNet模型
model = models.resnet18(pretrained=True)
model.fc = torch.nn.Identity()  # 移除最后一层，用于获取特征
model.eval()

# 设置为不更新梯度
for param in model.parameters():
    param.requires_grad = False




In [24]:
# 设置转换，调整大小并归一化
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 调整图像大小以匹配ResNet的输入
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 应用转换
def apply_transforms(dataloader):
    features = []
    labels = []
    for images, targets in dataloader:
        # 应用转换并扩展为3通道
        images = images.repeat(1, 3, 1, 1)  # 从1通道扩展到3通道
        images = torch.stack([transform(img) for img in images])
        features.append(model(images).detach().numpy())
        labels.append(targets.numpy())
    features = np.concatenate(features, axis=0)
    labels = np.concatenate(labels, axis=0)
    return features, labels

# 从 DataLoader 转换数据
X_train_features, _ = apply_transforms(train_loader)
X_val_features, _ = apply_transforms(val_loader)




In [25]:
from sklearn.ensemble import IsolationForest

# 使用孤立森林
iso_forest = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
iso_forest.fit(X_train_features)

# 获取决策函数分数
val_scores = iso_forest.decision_function(X_val_features)

# 计算最佳阈值
precisions, recalls, thresholds = precision_recall_curve(y_val_binary, val_scores)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls)
optimal_idx = np.nanargmax(f1_scores)
optimal_threshold = thresholds[optimal_idx]

# 预测
y_pred_features = (val_scores < optimal_threshold).astype(int)

# 输出结果
print("Optimal threshold based on F1 score with features:", optimal_threshold)
print("F1 Score at optimal threshold with features:", f1_scores[optimal_idx])
print(classification_report(y_val_binary, y_pred_features))


Optimal threshold based on F1 score with features: -0.11632542558040748
F1 Score at optimal threshold with features: 0.9352050980961302
              precision    recall  f1-score   support

           0       0.12      1.00      0.22      4912
           1       0.00      0.00      0.00     35441

    accuracy                           0.12     40353
   macro avg       0.06      0.50      0.11     40353
weighted avg       0.01      0.12      0.03     40353



  f1_scores = 2 * (precisions * recalls) / (precisions + recalls)


### 5-2

In [1]:
import chromadb
chroma_client = chromadb.Client()

collection = chroma_client.create_collection(name="qa_collection")
collection.add(
    documents=["Cloud computing is a technology that allows for the storage, management, and processing of data on remote servers accessed via the internet, rather than on local servers or personal computers. It enables users and businesses to utilize online services to run applications or store data.", 
               "Mental health refers to the state of someone's emotional, psychological, and social well-being. A person with good mental health can handle daily stresses, perform productive work, and contribute to their community.",
               "The ancient Egyptians built pyramids primarily as tombs for the pharaohs, the rulers of ancient Egypt. They served to house their remains and belongings for use in the afterlife. The size and complexity of the pyramids reflect the pharaohs' power and religious beliefs about the afterlife."
            ],
    metadatas=[{'source':'Tech'},
               {'source':'Healthy'},
               {'source':'geography'}
            ],
    ids=['doc_tech','doc_health','doc_geo']
)




In [2]:
def query(question_text):
    results = collection.query(
        query_texts = [question_text],
        n_results = 1
    )

    print("Most relevant information:")
    print(f"Document ID: {results['ids'][0][0]}")
    print(f"Content: {results['documents'][0][0]}")
    print(f"Metadata: {results['metadatas'][0][0]}")
    print("-----")

query("What is cloud computing?")
query("What is mental health?")
query("Why did the ancient Egyptians build pyramids?")

Most relevant information:
Document ID: doc_tech
Content: Cloud computing is a technology that allows for the storage, management, and processing of data on remote servers accessed via the internet, rather than on local servers or personal computers. It enables users and businesses to utilize online services to run applications or store data.
Metadata: {'source': 'Tech'}
-----
Most relevant information:
Document ID: doc_health
Content: Mental health refers to the state of someone's emotional, psychological, and social well-being. A person with good mental health can handle daily stresses, perform productive work, and contribute to their community.
Metadata: {'source': 'Healthy'}
-----
Most relevant information:
Document ID: doc_geo
Content: The ancient Egyptians built pyramids primarily as tombs for the pharaohs, the rulers of ancient Egypt. They served to house their remains and belongings for use in the afterlife. The size and complexity of the pyramids reflect the pharaohs' power an

### 5-3

In [1]:
import torch
import clip
from PIL import Image
import requests
from io import BytesIO

device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load('ViT-B/32', device=device)


100%|████████████████████████████████████████| 338M/338M [00:03<00:00, 104MiB/s]


In [6]:
def load_image(image_path):
    image = Image.open(image_path)
    return preprocess(image).unsqueeze(0).to(device)

def prepare_text(description):
    return clip.tokenize([description]).to(device)

def compare_image_text(image_path, description):
    image = load_image(image_path)
    text = prepare_text(description)

    with torch.no_grad():
        image_features = model.encode_image(image)
        text_features = model.encode_text(text)

        # Correctly calculate the similarity score using cosine similarity and logit scale
        logit_scale = model.logit_scale.exp()
        logits_per_image = logit_scale * image_features @ text_features.t()
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

    return probs[0][0]



In [7]:
image_path = "./my_bro.png"  # Change to your actual image path

# Descriptions to test with the image
descriptions = [
    "A monkey sitting by trash bins on a city street, looking directly at the camera.",
    "A large animal searching through garbage in an urban environment.",
    "A cat sitting on a sunny window ledge overlooking a busy street.",
    "An animal outside during the daytime.",
    "A dog rummaging through trash bags near a park bench on a rainy day."
]

# Loop through each description and compute the match probability
for description in descriptions:
    match_probability = compare_image_text(image_path, description)
    print(f"Description: '{description}'")
    print(f"Match Probability: {match_probability:.2f}\n")


Description: 'A monkey sitting by trash bins on a city street, looking directly at the camera.'
Match Probability: 1.00

Description: 'A large animal searching through garbage in an urban environment.'
Match Probability: 1.00

Description: 'A cat sitting on a sunny window ledge overlooking a busy street.'
Match Probability: 1.00

Description: 'An animal outside during the daytime.'
Match Probability: 1.00

Description: 'A dog rummaging through trash bags near a park bench on a rainy day.'
Match Probability: 1.00



In [11]:
import torch
import clip
from PIL import Image

device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load('ViT-B/32', device=device)

def load_image(image_path):
    image = Image.open(image_path)
    return preprocess(image).unsqueeze(0).to(device)

def prepare_text(descriptions):
    return clip.tokenize(descriptions).to(device)

def compare_image_text(image_path, descriptions):
    image = load_image(image_path)
    texts = prepare_text(descriptions)

    with torch.no_grad():
        image_features = model.encode_image(image)
        text_features = model.encode_text(texts)

        logit_scale = model.logit_scale.exp()
        logits_per_image = logit_scale * image_features @ text_features.t()
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

    return probs.flatten()

image_path = "./my_bro.png"  

# descriptions = [
#     "A monkey rummaging through trash bins on a city street.",
#     "A monkey sitting by trash bins on a city street looking curious.",
#     "A monkey looking directly at the camera surrounded by trash."
# ]

# # Running the comparison
# probabilities = compare_image_text(image_path, descriptions)


# # Printing each description with its respective probability
# print("Descriptions and their matching probabilities:")
# for desc, prob in zip(descriptions, probabilities):
#     print(f"Description: '{desc}' - Probability: {prob:.4f}")
categories = {
    "Monkey in Urban Environment": [
        "A monkey beside trash bins in a busy urban setting.",
        "A monkey climbing on a metal barrier near trash bins.",
        "A monkey peering out from behind city trash bins."
    ],
    "Animal Interactions with Trash": [
        "A raccoon rummaging through trash bins in a city.",
        "A dog sniffing around garbage bags in an alley.",
        "A cat hiding behind trash bins on a sidewalk."
    ],
    "Urban Wildlife": [
        "A bird perched on trash bins in a city.",
        "A squirrel scavenging near city trash bins.",
        "A rat running around near urban garbage."
    ]
}

# Running the comparisons for each category
for category, descriptions in categories.items():
    print(f"Category: {category}")
    probabilities = compare_image_text(image_path, descriptions)
    for desc, prob in zip(descriptions, probabilities):
        print(f"Description: '{desc}' - Probability: {prob:.4f}")
    print()  # New line for better separation of categories


Category: Monkey in Urban Environment
Description: 'A monkey beside trash bins in a busy urban setting.' - Probability: 0.0000
Description: 'A monkey climbing on a metal barrier near trash bins.' - Probability: 0.0000
Description: 'A monkey peering out from behind city trash bins.' - Probability: 1.0000

Category: Animal Interactions with Trash
Description: 'A raccoon rummaging through trash bins in a city.' - Probability: 1.0000
Description: 'A dog sniffing around garbage bags in an alley.' - Probability: 0.0000
Description: 'A cat hiding behind trash bins on a sidewalk.' - Probability: 0.0000

Category: Urban Wildlife
Description: 'A bird perched on trash bins in a city.' - Probability: 0.0000
Description: 'A squirrel scavenging near city trash bins.' - Probability: 0.0000
Description: 'A rat running around near urban garbage.' - Probability: 1.0000



In [12]:
import torch
import clip
from PIL import Image

# Setup device and load the CLIP model
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load('ViT-B/32', device=device)

# Load and preprocess the image
image_path = './my_bro.png'  # Replace with your actual image file path
image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)

# Define the topics and corresponding descriptions
topics = [
    "Classes",
    "Generality",
    "Details",
    "Wrong class with details",
    "Correct answers",
]
texts = [
    ["A dog", "A cat", "A monkey"],  # Classes
    ["An animal", "A monkey", "A Macaca"],  # Generality
    ["A monkey", "A monkey in a trash can", "An aggressive monkey coming out from a trash can, staring at the camera"],  # Details
    ["A dog", "A dog in a trash can", "An aggressive dog coming out from a trash can, staring at the camera"],  # Wrong class with details
    ["A monkey", "A trash can", "A plastic bag"],  # Correct answers
]

# Compare image with texts and calculate probabilities
for i, comparing_texts in enumerate(texts):
    text_tokens = clip.tokenize(comparing_texts).to(device)

    # Compute logits and probabilities
    with torch.no_grad():
        logits_per_image, _ = model(image, text_tokens)
        probabilities = logits_per_image.softmax(dim=-1).cpu().numpy()[0]

    # Print the results for each topic
    print(f"** Topic {i + 1}: {topics[i]} **")
    for text, prob in zip(comparing_texts, probabilities):
        print(f"'{text}' - Probability: {prob:.4f}")
    print()  # For better readability between topics


** Topic 1: Classes **
'A dog' - Probability: 0.0014
'A cat' - Probability: 0.0029
'A monkey' - Probability: 0.9956

** Topic 2: Generality **
'An animal' - Probability: 0.0262
'A monkey' - Probability: 0.9219
'A Macaca' - Probability: 0.0520

** Topic 3: Details **
'A monkey' - Probability: 0.0002
'A monkey in a trash can' - Probability: 0.3140
'An aggressive monkey coming out from a trash can, staring at the camera' - Probability: 0.6860

** Topic 4: Wrong class with details **
'A dog' - Probability: 0.0010
'A dog in a trash can' - Probability: 0.1081
'An aggressive dog coming out from a trash can, staring at the camera' - Probability: 0.8911

** Topic 5: Correct answers **
'A monkey' - Probability: 0.7866
'A trash can' - Probability: 0.0925
'A plastic bag' - Probability: 0.1207

