<a href="https://colab.research.google.com/github/aizazaziz/ML_Projects/blob/main/imgGen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os, cv2
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from torchvision.models.segmentation import deeplabv3_resnet50
from PIL import Image

In [None]:
class PairedImageDataset(Dataset):
    def __init__(self, root):
        self.input_dir = os.path.join(root, "input")
        self.target_dir = os.path.join(root, "target")
        self.files = os.listdir(self.input_dir)

        self.transform = T.Compose([
            T.Resize((256, 256)), # Further reduced image size
            T.ToTensor()
        ])

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        name = self.files[idx]
        inp = Image.open(os.path.join(self.input_dir, name)).convert("RGB")
        tar = Image.open(os.path.join(self.target_dir, name)).convert("RGB")

        return self.transform(inp), self.transform(tar)

In [None]:
class UNetBlock(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_c, out_c, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_c, out_c, 3, padding=1),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        return self.net(x)

class UNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.down1 = UNetBlock(3, 64)
        self.pool = nn.MaxPool2d(2)
        self.down2 = UNetBlock(64,128)
        self.down3 = UNetBlock(128,256)

        self.bottleneck = UNetBlock(256,512)

        self.up1 = nn.ConvTranspose2d(512,256,2,2)
        self.dec1 = UNetBlock(512,256)

        self.up2 = nn.ConvTranspose2d(256,128,2,2)
        self.dec2 = UNetBlock(256,128)

        self.up3 = nn.ConvTranspose2d(128,64,2,2)
        self.dec3 = UNetBlock(128,64)

        self.final = nn.Conv2d(64,3,1)

    def forward(self, x):
        d1 = self.down1(x)
        d2 = self.down2(self.pool(d1))
        d3 = self.down3(self.pool(d2))

        bn = self.bottleneck(self.pool(d3))

        u1 = self.up1(bn)
        u1 = self.dec1(torch.cat([u1,d3],1))

        u2 = self.up2(u1)
        u2 = self.dec2(torch.cat([u2,d2],1))

        u3 = self.up3(u2)
        u3 = self.dec3(torch.cat([u3,d1],1))

        return torch.sigmoid(self.final(u3))


In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
model = UNet().to(DEVICE)

dataset = PairedImageDataset("dataset")   # <-- change path
loader = DataLoader(dataset, batch_size=1, shuffle=True)

criterion = nn.L1Loss()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-4)

In [None]:
EPOCHS = 50

for epoch in range(EPOCHS):
    loop = tqdm(loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    for inp, tar in loop:
        inp, tar = inp.to(DEVICE), tar.to(DEVICE)

        pred = model(inp)
        loss = criterion(pred, tar)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss=float(loss))

torch.save(model.state_dict(), "model.pth")
print("Training complete. Model saved.")


Consider using tensor.detach() first. (Triggered internally at /pytorch/torch/csrc/autograd/generated/python_variable_methods.cpp:836.)
  loop.set_postfix(loss=float(loss))
Epoch 1/50: 100%|██████████| 17/17 [00:02<00:00,  7.54it/s, loss=0.231]
Epoch 2/50: 100%|██████████| 17/17 [00:00<00:00, 20.13it/s, loss=0.261]
Epoch 3/50: 100%|██████████| 17/17 [00:00<00:00, 20.03it/s, loss=0.22]
Epoch 4/50: 100%|██████████| 17/17 [00:00<00:00, 20.33it/s, loss=0.0923]
Epoch 5/50: 100%|██████████| 17/17 [00:00<00:00, 20.30it/s, loss=0.0471]
Epoch 6/50: 100%|██████████| 17/17 [00:00<00:00, 20.21it/s, loss=0.0595]
Epoch 7/50: 100%|██████████| 17/17 [00:00<00:00, 19.74it/s, loss=0.0641]
Epoch 8/50: 100%|██████████| 17/17 [00:00<00:00, 19.26it/s, loss=0.0291]
Epoch 9/50: 100%|██████████| 17/17 [00:00<00:00, 19.04it/s, loss=0.0838]
Epoch 10/50: 100%|██████████| 17/17 [00:00<00:00, 19.15it/s, loss=0.0574]
Epoch 11/50: 100%|██████████| 17/17 [00:00<00:00, 20.27it/s, loss=0.0561]
Epoch 12/50: 100%|████████

Training complete. Model saved.


In [None]:
import cv2
import numpy as np

model = UNet().to(DEVICE)
model.load_state_dict(torch.load("model.pth"))
model.eval()

transform = T.Compose([T.Resize((256,256)), T.ToTensor()])

os.makedirs("results", exist_ok=True)

test_images = os.listdir("test")

for name in test_images:
    img = Image.open(f"test/{name}").convert("RGB")
    inp = transform(img).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        out = model(inp)[0].cpu().permute(1,2,0).numpy()

    out = (out*255).astype("uint8")
    cv2.imwrite(f"results/{name}", cv2.cvtColor(out, cv2.COLOR_RGB2BGR))

print("Done! Check results/ folder.")


Done! Check results/ folder.


In [None]:
import os

model_version = "1"
output_dir = os.path.join("/content", "model_versions")
os.makedirs(output_dir, exist_ok=True)
torch.save(model.state_dict(), os.path.join(output_dir, f"model_{model_version}.pth"))
print(f"Model saved to {os.path.join(output_dir, f'model_{model_version}.pth')}")

Model saved to /content/model_versions/model_1.pth


In [None]:
import pickle
filename = 'model.pkl'
pickle.dump(model, open(filename, 'wb'))

In [None]:
from torch.serialization import load
loaded_model = pickle.load(open(filename, 'rb'))

In [3]:
!pip install streamlit
import streamlit as st
import os, cv2
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from torchvision.models.segmentation import deeplabv3_resnet50
from PIL import Image


class PairedImageDataset(Dataset):
    def __init__(self, root):
        self.input_dir = os.path.join(root, "input")
        self.target_dir = os.path.join(root, "target")
        self.files = os.listdir(self.input_dir)

        self.transform = T.Compose([
            T.Resize((256, 256)), # Further reduced image size
            T.ToTensor()
        ])

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        name = self.files[idx]
        inp = Image.open(os.path.join(self.input_dir, name)).convert("RGB")
        tar = Image.open(os.path.join(self.target_dir, name)).convert("RGB")

        return self.transform(inp), self.transform(tar)
class UNetBlock(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_c, out_c, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_c, out_c, 3, padding=1),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        return self.net(x)

class UNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.down1 = UNetBlock(3, 64)
        self.pool = nn.MaxPool2d(2)
        self.down2 = UNetBlock(64,128)
        self.down3 = UNetBlock(128,256)

        self.bottleneck = UNetBlock(256,512)

        self.up1 = nn.ConvTranspose2d(512,256,2,2)
        self.dec1 = UNetBlock(512,256)

        self.up2 = nn.ConvTranspose2d(256,128,2,2)
        self.dec2 = UNetBlock(256,128)

        self.up3 = nn.ConvTranspose2d(128,64,2,2)
        self.dec3 = UNetBlock(128,64)

        self.final = nn.Conv2d(64,3,1)

    def forward(self, x):
        d1 = self.down1(x)
        d2 = self.down2(self.pool(d1))
        d3 = self.down3(self.pool(d2))

        bn = self.bottleneck(self.pool(d3))

        u1 = self.up1(bn)
        u1 = self.dec1(torch.cat([u1,d3],1))

        u2 = self.up2(u1)
        u2 = self.dec2(torch.cat([u2,d2],1))

        u3 = self.up3(u2)
        u3 = self.dec3(torch.cat([u3,d1],1))

        return torch.sigmoid(self.final(u3))


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
model = UNet().to(DEVICE)

dataset = PairedImageDataset("dataset")   # <-- change path
loader = DataLoader(dataset, batch_size=1, shuffle=True)

criterion = nn.L1Loss()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-4)
EPOCHS = 2

for epoch in range(EPOCHS):
    loop = tqdm(loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    for inp, tar in loop:
        inp, tar = inp.to(DEVICE), tar.to(DEVICE)

        pred = model(inp)
        loss = criterion(pred, tar)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss=loss.item())

torch.save(model.state_dict(), "model.pth")
print("Training complete. Model saved.")


model = UNet().to(DEVICE)
model.load_state_dict(torch.load("model.pth"))
model.eval()

transform = T.Compose([T.Resize((256,256)), T.ToTensor()])

os.makedirs("results", exist_ok=True)

test_images = os.listdir("test")

for name in test_images:
    img = Image.open(f"test/{name}").convert("RGB")
    inp = transform(img).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        out = model(inp)[0].cpu().permute(1,2,0).numpy()

    out = (out*255).astype("uint8")
    cv2.imwrite(f"results/{name}", cv2.cvtColor(out, cv2.COLOR_RGB2BGR))

print("Done! Check results/ folder.")

# -----------------------------
# Load MODELS
# -----------------------------
@st.cache_resource
def load_sr_model():
    model = UNet() # Instantiate the UNet model
    model.load_state_dict(torch.load("model.pth", map_location="cpu")) # Load the state_dict
    model.eval()
    return model

@st.cache_resource
def load_enhance_model():
    model = UNet() # Instantiate the UNet model
    model.load_state_dict(torch.load("model.pth", map_location="cpu")) # Load the state_dict
    model.eval()
    return model

sr_model = load_sr_model()
enhance_model = load_enhance_model()

# -----------------------------
# Preprocessing & Postprocessing
# -----------------------------
transform = T.Compose([
    T.ToTensor(),
])

def tensor_to_img(tensor):
    arr = (tensor.detach().cpu().numpy().transpose(1, 2, 0) * 255).clip(0,255).astype(np.uint8)
    return Image.fromarray(arr)

# -----------------------------
# SR + ENHANCEMENT PIPELINE
# -----------------------------
def enhance_pipeline(img, brightness, sharpness):
    x = transform(img).unsqueeze(0)

    with torch.no_grad():
        # Super Resolution first
        sr = sr_model(x)

        # Apply enhancement network
        enhanced = enhance_model(sr)

    out = tensor_to_img(enhanced.squeeze(0))

    # Apply user-controlled adjustments
    from PIL import ImageEnhance

    if brightness != 1.0:
        out = ImageEnhance.Brightness(out).enhance(brightness)

    if sharpness != 1.0:
        out = ImageEnhance.Sharpness(out).enhance(sharpness)

    return out


# -----------------------------
# UI
# -----------------------------
st.title("Real Estate Image Enhancer (SR + HDR)")

uploaded = st.file_uploader("Upload an image", type=['jpg', 'jpeg', 'png'])

brightness = st.slider("Brightness", 0.2, 2.0, 1.0, 0.05)
sharpness = st.slider("Sharpness", 0.2, 3.0, 1.0, 0.05)

if uploaded:
    img = Image.open(uploaded).convert("RGB")

    st.image(img, caption="Original", use_column_width=True)

    if st.button("Enhance Image"):
        out = enhance_pipeline(img, brightness, sharpness)
        st.image(out, caption="Enhanced Output", use_column_width=True)



FileNotFoundError: [Errno 2] No such file or directory: 'dataset/input'