In [10]:
!pip install kaggle -q

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os
import matplotlib.pyplot as plt
import kaggle
import zipfile
import shutil
import torch.nn.init as init
from torchvision.utils import save_image


In [2]:
class CatDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(root_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image

In [3]:
class Encoder(nn.Module):
    def __init__(self, latent_dim):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=4, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.fc_mu = nn.Linear(256 * 4 * 4, latent_dim)

        self.fc_logvar = nn.Linear(256 * 4 * 4, latent_dim)
        
        # Apply Kaiming initialization
        # self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
            init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        elif isinstance(m, nn.Linear):
            init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def forward(self, x):
        x = torch.relu(self.bn1(self.conv1(x)))
        x = torch.relu(self.bn2(self.conv2(x)))
        x = torch.relu(self.bn3(self.conv3(x)))
        x = torch.relu(self.bn4(self.conv4(x)))
        x = x.view(x.size(0), -1)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

In [4]:
class Decoder(nn.Module):
    def __init__(self, latent_dim):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(latent_dim, 256 * 4 * 4)
        self.conv1 = nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(128)
        self.conv2 = nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1)
        self.bn3 = nn.BatchNorm2d(32)
        self.conv4 = nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1)

        # Apply Kaiming initialization
        # self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
            init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        elif isinstance(m, nn.Linear):
            init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

    def forward(self, z):
        x = self.fc(z)
        x = x.view(x.size(0), 256, 4, 4)
        x = torch.relu(self.bn1(self.conv1(x)))
        x = torch.relu(self.bn2(self.conv2(x)))
        x = torch.relu(self.bn3(self.conv3(x)))
        x = torch.sigmoid(self.conv4(x))
        return x

In [5]:
class VAE(nn.Module):
    def __init__(self, encoder, decoder):
        super(VAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), mu, logvar

In [22]:
def loss_function(recon_x, x, mu, logvar):
    BCE = nn.functional.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD, BCE, KLD

def train(model, dataloader, optimizer, device, epoch):
    model.train()
    train_loss = 0
    train_bce_loss = 0
    train_kld_loss = 0
    for batch_idx, data in enumerate(dataloader):
        data = data.to(device)
        optimizer.zero_grad()
        recon_batch, mu, logvar = model(data)
        loss, bce, kld = loss_function(recon_batch, data, mu, logvar)
        loss.backward()
        train_loss += loss.item()
        train_bce_loss += bce.item()
        train_kld_loss += kld.item()
        optimizer.step()
    
    return train_loss / len(dataloader.dataset), train_bce_loss / len(dataloader.dataset), train_kld_loss / len(dataloader.dataset)

def generate_images(model, latent_dim, num_images, device):
    model.eval()
    with torch.no_grad():
        z = torch.randn(num_images, latent_dim).to(device)
        sample = model.decoder(z)
    return sample

def interpolate_latent(model, start_image, end_image, num_steps, device):
    model.eval()
    with torch.no_grad():
        start_mu, start_logvar = model.encoder(start_image.to(device))
        end_mu, end_logvar = model.encoder(end_image.to(device))
        
        start_z = model.reparameterize(start_mu, start_logvar)
        end_z = model.reparameterize(end_mu, end_logvar)
        
        interpolated_z = torch.zeros(num_steps, start_z.size(1)).to(device)
        for i in range(num_steps):
            alpha = i / (num_steps - 1)
            interpolated_z[i] = start_z * (1 - alpha) + end_z * alpha
        print(interpolated_z.shape)
        interpolated_images = model.decoder(interpolated_z)
    return interpolated_images

In [7]:
def download_kaggle_dataset(dataset_name, download_path):
    kaggle.api.authenticate()
    kaggle.api.dataset_download_files(dataset_name, path=download_path, unzip=True)
    
    subfolder = os.path.join(download_path, 'cats')
    for item in os.listdir(subfolder):
        s = os.path.join(subfolder, item)
        d = os.path.join(download_path, item)
        shutil.move(s, d)
    os.rmdir(subfolder)

In [8]:
import os
import shutil
from pathlib import Path

def preprocess_cat_dataset(source_dir, target_dir):
    """
    Extracts .jpg files from all subdirectories in the source directory
    and copies them to the target directory.
    
    :param source_dir: Path to the source directory containing CAT_XX subdirectories
    :param target_dir: Path to the target directory where .jpg files will be copied
    """
    # Create the target directory if it doesn't exist
    Path(target_dir).mkdir(parents=True, exist_ok=True)
    
    # Counter for processed files
    processed_count = 0
    
    # Iterate through all subdirectories in the source directory
    for root, dirs, files in os.walk(source_dir):
        for file in files:
            if file.lower().endswith('.jpg'):
                source_file = Path(root) / file
                target_file = Path(target_dir) / file
                
                # Copy the file to the target directory
                shutil.copy2(source_file, target_file)
                processed_count += 1
                
                # Print progress every 100 files
                if processed_count % 100 == 0:
                    print(f"Processed {processed_count} images...")

    print(f"Finished processing. Total images copied: {processed_count}")

In [24]:
def main(data_dir, dataset_name='crawford/cat-dataset', batch_size=64, latent_dim=200, lr=1e-3, epochs=50, device='cuda'):
    # Download the dataset if it doesn't exist
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
        print(f"Downloading dataset '{dataset_name}'...")
        download_kaggle_dataset(dataset_name, data_dir)
        print("Dataset downloaded successfully.")
    if not os.path.exists("./cleaned_data"):
        preprocess_cat_dataset(data_dir, "./cleaned_data")
    
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor()
    ])

    dataset = CatDataset("./cleaned_data", transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    encoder = Encoder(latent_dim)
    decoder = Decoder(latent_dim)

    model = VAE(encoder, decoder).to(device)
    if os.path.exists('./best_model.pth'):
        model_path = 'best_model.pth'
        model.load_state_dict(torch.load(model_path))
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses = []
    bce_losses = []
    kld_losses = []
    best_loss = float('inf')

    for epoch in range(1, epochs + 1):
        train_loss, bce_loss, kld_loss = train(model, dataloader, optimizer, device, epoch)
        train_losses.append(train_loss)
        bce_losses.append(bce_loss)
        kld_losses.append(kld_loss)
        
        print(f'Epoch {epoch}, Loss: {train_loss:.4f}, BCE: {bce_loss:.4f}, KLD: {kld_loss:.4f}')
        
        if train_loss < best_loss:
            best_loss = train_loss
            torch.save(model.state_dict(), 'best_model.pth')
        
        if epoch % 100 == 0:
            torch.save(model.state_dict(), f'model_checkpoint_{epoch}.pth')
    return train_losses, bce_losses, kld_losses, dataset, model
    

In [29]:
train_losses, bce_losses, kld_losses, dataset, decoder = main('./dataset', device='mps')
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Total Loss')
plt.plot(bce_losses, label='BCE Loss')
plt.plot(kld_losses, label='KLD Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_plot_2.png')
plt.close()

Epoch 1, Loss: 6740.6242, BCE: 6607.0024, KLD: 133.6218
Epoch 2, Loss: 6738.9783, BCE: 6605.3345, KLD: 133.6439
Epoch 3, Loss: 6736.9873, BCE: 6602.9424, KLD: 134.0449
Epoch 4, Loss: 6737.8102, BCE: 6604.1674, KLD: 133.6428
Epoch 5, Loss: 6737.7562, BCE: 6603.5341, KLD: 134.2222
Epoch 6, Loss: 6735.6423, BCE: 6601.7861, KLD: 133.8562
Epoch 7, Loss: 6737.4368, BCE: 6603.3750, KLD: 134.0618
Epoch 8, Loss: 6736.9534, BCE: 6602.9816, KLD: 133.9719
Epoch 9, Loss: 6736.5268, BCE: 6602.4356, KLD: 134.0911
Epoch 10, Loss: 6734.9486, BCE: 6601.0006, KLD: 133.9480
Epoch 11, Loss: 6735.7738, BCE: 6601.5069, KLD: 134.2670
Epoch 12, Loss: 6734.8999, BCE: 6600.7592, KLD: 134.1408
Epoch 13, Loss: 6733.6245, BCE: 6599.4678, KLD: 134.1567
Epoch 14, Loss: 6733.3773, BCE: 6599.5233, KLD: 133.8539
Epoch 15, Loss: 6733.4519, BCE: 6599.1903, KLD: 134.2616
Epoch 16, Loss: 6732.9177, BCE: 6598.7426, KLD: 134.1751
Epoch 17, Loss: 6734.1612, BCE: 6599.8505, KLD: 134.3107
Epoch 18, Loss: 6735.8329, BCE: 6601.506

In [28]:
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Total Loss')
plt.plot(bce_losses, label='BCE Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_plot_2.png')
plt.close()

In [30]:
latent_dim = 200 
# Initialize the Decoder model
encoder = Encoder(latent_dim)
decoder = Decoder(latent_dim)
device = 'mps'
model = VAE(encoder, decoder).to(device)

model_path = 'best_model.pth'  
model.load_state_dict(torch.load(model_path))

new_images = generate_images(model, latent_dim, 16, device)
save_image(new_images, 'generated_images.png', nrow=4)

start_image = dataset[0].unsqueeze(0).to(device)
end_image = dataset[1].unsqueeze(0).to(device)
interpolated_images = interpolate_latent(model, start_image, end_image, 10, device)
save_image(interpolated_images, 'interpolated_images.png', nrow=10)

torch.Size([10, 200])


In [32]:
import numpy as np
stock_price = np.array([20, 30, 5, 10, 40, 20, 70])

start_price = stock_price[0]
profit = 0

for i in range(1,len(stock_price)):
    if stock_price[i] > stock_price[i-1]:
        profit += stock_price[i] - stock_price[i-1]
    else:
        start_price = stock_price[i]

print(profit)

95


In [37]:
import numpy as np

X = np.random.random((10,5))
y = np.array([1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0])

In [None]:
# 5 , 10, 5, 1
W_hidden_01 = np.random.normal((5, 10))
W_hidden_12 = np.random.normal((10, 5))
W_output  = np.random.normal((5,1))

b_hidden_01 = np.zeros((10,1))
b_hidden_12 = np.zeros((5,1))

activation = np.tanh

n_iter = 10

for i in range(n_iter):
    h1_out = np.matmul(X, W_hidden_01) + b_hidden_01
    h1_act = activation(h1_out)

    h2_out = np.matmul(h1_act, W_hidden_12) + b_hidden_12
    h2_act = activation(h2_out)

    out = np.matmul(h2_act, W_output)

    out_p = np.exp()

    
    



