In [None]:
#filter terrain
import imquality.brisque as brisque
import os
from skimage.io import imread



terrain_filter = os.listdir('terrain')
for terrain_img in terrain_filter:
  img = imread('terrain/'+terrain_img)
  score = brisque.score(img)
  print(score)
  if score > 80:
    os.remove('terrain/'+terrain_img)

In [None]:
"""Perlin noise implementation."""
# Licensed under ISC
from itertools import product
import math
import random


def smoothstep(t):
    """Smooth curve with a zero derivative at 0 and 1, making it useful for
    interpolating.
    """
    return t * t * (3. - 2. * t)


def lerp(t, a, b):
    """Linear interpolation between a and b, given a fraction t."""
    return a + t * (b - a)


class PerlinNoiseFactory(object):
    """Callable that produces Perlin noise for an arbitrary point in an
    arbitrary number of dimensions.  The underlying grid is aligned with the
    integers.
    There is no limit to the coordinates used; new gradients are generated on
    the fly as necessary.
    """

    def __init__(self, dimension, octaves=1, tile=(), unbias=False):
        """Create a new Perlin noise factory in the given number of dimensions,
        which should be an integer and at least 1.
        More octaves create a foggier and more-detailed noise pattern.  More
        than 4 octaves is rather excessive.
        ``tile`` can be used to make a seamlessly tiling pattern.  For example:
            pnf = PerlinNoiseFactory(2, tile=(0, 3))
        This will produce noise that tiles every 3 units vertically, but never
        tiles horizontally.
        If ``unbias`` is true, the smoothstep function will be applied to the
        output before returning it, to counteract some of Perlin noise's
        significant bias towards the center of its output range.
        """
        self.dimension = dimension
        self.octaves = octaves
        self.tile = tile + (0,) * dimension
        self.unbias = unbias

        # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply
        # by this to scale to ±1
        self.scale_factor = 2 * dimension ** -0.5

        self.gradient = {}

    def _generate_gradient(self):
        # Generate a random unit vector at each grid point -- this is the
        # "gradient" vector, in that the grid tile slopes towards it

        # 1 dimension is special, since the only unit vector is trivial;
        # instead, use a slope between -1 and 1
        if self.dimension == 1:
            return (random.uniform(-1, 1),)

        # Generate a random point on the surface of the unit n-hypersphere;
        # this is the same as a random unit vector in n dimensions.  Thanks
        # to: http://mathworld.wolfram.com/SpherePointPicking.html
        # Pick n normal random variables with stddev 1
        random_point = [random.gauss(0, 1) for _ in range(self.dimension)]
        # Then scale the result to a unit vector
        scale = sum(n * n for n in random_point) ** -0.5
        return tuple(coord * scale for coord in random_point)

    def get_plain_noise(self, *point):
        """Get plain noise for a single point, without taking into account
        either octaves or tiling.
        """
        if len(point) != self.dimension:
            raise ValueError("Expected {} values, got {}".format(
                self.dimension, len(point)))

        # Build a list of the (min, max) bounds in each dimension
        grid_coords = []
        for coord in point:
            min_coord = math.floor(coord)
            max_coord = min_coord + 1
            grid_coords.append((min_coord, max_coord))

        # Compute the dot product of each gradient vector and the point's
        # distance from the corresponding grid point.  This gives you each
        # gradient's "influence" on the chosen point.
        dots = []
        for grid_point in product(*grid_coords):
            if grid_point not in self.gradient:
                self.gradient[grid_point] = self._generate_gradient()
            gradient = self.gradient[grid_point]

            dot = 0
            for i in range(self.dimension):
                dot += gradient[i] * (point[i] - grid_point[i])
            dots.append(dot)

        # Interpolate all those dot products together.  The interpolation is
        # done with smoothstep to smooth out the slope as you pass from one
        # grid cell into the next.
        # Due to the way product() works, dot products are ordered such that
        # the last dimension alternates: (..., min), (..., max), etc.  So we
        # can interpolate adjacent pairs to "collapse" that last dimension.  Then
        # the results will alternate in their second-to-last dimension, and so
        # forth, until we only have a single value left.
        dim = self.dimension
        while len(dots) > 1:
            dim -= 1
            s = smoothstep(point[dim] - grid_coords[dim][0])

            next_dots = []
            while dots:
                next_dots.append(lerp(s, dots.pop(0), dots.pop(0)))

            dots = next_dots

        return dots[0] * self.scale_factor

    def __call__(self, *point):
        """Get the value of this Perlin noise function at the given point.  The
        number of values given should match the number of dimensions.
        """
        ret = 0
        for o in range(self.octaves):
            o2 = 1 << o
            new_point = []
            for i, coord in enumerate(point):
                coord *= o2
                if self.tile[i]:
                    coord %= self.tile[i] * o2
                new_point.append(coord)
            ret += self.get_plain_noise(*new_point) / o2

        # Need to scale n back down since adding all those extra octaves has
        # probably expanded it beyond ±1
        # 1 octave: ±1
        # 2 octaves: ±1½
        # 3 octaves: ±1¾
        ret /= 2 - 2 ** (1 - self.octaves)

        if self.unbias:
            # The output of the plain Perlin noise algorithm has a fairly
            # strong bias towards the center due to the central limit theorem
            # -- in fact the top and bottom 1/8 virtually never happen.  That's
            # a quarter of our entire output range!  If only we had a function
            # in [0..1] that could introduce a bias towards the endpoints...
            r = (ret + 1) / 2
            # Doing it this many times is a completely made-up heuristic.
            for _ in range(int(self.octaves / 2 + 0.5)):
                r = smoothstep(r)
            ret = r * 2 - 1

        return ret

In [None]:
!mkdir data

In [None]:
!unzip data.zip

In [None]:

import PIL.Image
from PIL import ImageEnhance

from random import randrange

size = 64
res = 40
frames = 500
frameres = 128
space_range = 1000
frame_range = 1000

pnf = PerlinNoiseFactory(2, octaves=4, tile=(space_range, space_range, frame_range))

for t in range(frames):
    img = PIL.Image.new('L', (size, size))
    for x in range(size):
        for y in range(size):
            n = abs(pnf((t+x)/res, (y)/res))
            img.putpixel((x, y), int((n + 1) / 2 * 255 + 0.5))
    print(t)
    enhancer = ImageEnhance.Contrast(img)
    img = enhancer.enhance(2.5)
    img.save("data/noise/noise{:03d}.png".format(t))
    print(t)

DISCRIMINTOR

In [None]:
import torch
import torch.nn as nn

class Block(nn.Module):
  def __init__(self, in_channels, out_channels, stride):
    super().__init__()
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 4, stride, 1, bias=True, padding_mode="reflect"),
        nn.InstanceNorm2d(out_channels),
        nn.LeakyReLU(0.2),
    )

  def forward(self, x):
    return self.conv(x)

class Discriminator(nn.Module):
  def __init__(self, in_channels=3, features=[64,128,256,512]):
    super().__init__()
    self.initial = nn.Sequential(
      nn.Conv2d(
          in_channels,
          features[0],
          kernel_size=4,
          stride=2,
          padding=1,
          padding_mode="reflect",
      ),
      nn.LeakyReLU(0.2),
    )

    layers = []
    in_channels = features[0]
    for feature in features[1:]:
      layers.append(Block(in_channels, feature, stride=1 if feature == features[-1] else 2))
      in_channels = feature
    layers.append(nn.Conv2d(in_channels, 1, kernel_size=4, stride = 1, padding=1, padding_mode="reflect"))
    self.model = nn.Sequential(*layers)

  def forward(self, x):
    x = self.initial(x)
    return torch.sigmoid(self.model(x))


def test():
  x = torch.randn((5,3,256,256))
  model = Discriminator(in_channels=3)
  preds = model(x)
  print(preds.shape)

test()

torch.Size([5, 1, 30, 30])


GENERATOR


In [None]:
class ConvBlock(nn.Module):
  def __init__(self, in_channels, out_channels, down=True, use_act=True, **kwargs):
    super().__init__()
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, padding_mode="reflect", **kwargs)
        if down
        else nn.ConvTranspose2d(in_channels, out_channels, **kwargs),
        nn.InstanceNorm2d(out_channels),
        nn.ReLU(inplace=True) if use_act else nn.Identity()
    )

  def forward(self,x):
    return self.conv(x)

class ResidualBlock(nn.Module):
  def __init__(self, channels):
    super().__init__()
    self.block = nn.Sequential(
        ConvBlock(channels, channels, kernel_size=3, padding=1),
        ConvBlock(channels, channels, use_act=False, kernel_size=3, padding=1),
    )

  def forward(self,x):
    return x + self.block(x)


class Generator(nn.Module):
  def __init__(self, img_channels, num_features = 64, num_residuals=9):
    super().__init__()
    self.initial = nn.Sequential(
        nn.Conv2d(img_channels, num_features, kernel_size = 7, stride = 1, padding=3, padding_mode="reflect"),
        nn.ReLU(inplace=True),
    )

    self.down_blocks = nn.ModuleList(
        [
         ConvBlock(num_features,num_features*2, kernel_size = 3, stride=2, padding = 1),
         ConvBlock(num_features*2,num_features*4, kernel_size = 3, stride=2, padding = 1),
        ]
    )

    self.residual_blocks = nn.Sequential(
        *[ResidualBlock(num_features*4) for _ in range(num_residuals)]
    )

    self.up_blocks = nn.ModuleList(
        [
         ConvBlock(num_features*4, num_features*2, down = False, kernel_size=3, stride = 2, padding = 1, output_padding = 1),
         ConvBlock(num_features*2, num_features, down = False, kernel_size=3, stride = 2, padding = 1, output_padding = 1)
        ]
    )

    self.last = nn.Conv2d(num_features, img_channels, kernel_size=7, stride=1, padding=3, padding_mode="reflect")

  def forward(self, x):
    x = self.initial(x)
    for layer in self.down_blocks:
      x = layer(x)
    x = self.residual_blocks(x)
    for layer in self.up_blocks:
      x = layer(x)
    return torch.tanh(self.last(x))

def test2():
  img_channels = 3
  img_size = 256
  x = torch.randn((2, img_channels, img_size, img_size))
  gen = Generator(img_channels, 9)
  print(gen(x).shape)

test2()

torch.Size([2, 3, 256, 256])


In [None]:
!pip uninstall albumentations

In [None]:
!pip install -U albumentations

In [None]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TRAIN_DIR = "prodata/train"
VAL_DIR = "prodata/val"
BATCH_SIZE = 1
LEARNING_RATE = 1e-5
LAMBDA_IDENTITY = 0.0
LAMBDA_CYCLE = 10
NUM_WORKERS = 2
NUM_EPOCHS = 100
LOAD_MODEL = True
SAVE_MODEL = True
CHECKPOINT_GEN_H = "genh.pth.tar"
CHECKPOINT_GEN_Z = "genz.pth.tar"
CHECKPOINT_CRITIC_H = "critich.pth.tar"
CHECKPOINT_CRITIC_Z = "criticz.pth.tar"

transforms = A.Compose(
    [   
        A.Resize(width=256, height=256),
        A.HorizontalFlip(p=0.5),
        A.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], max_pixel_value=255),
        ToTensorV2(),
     ],
    additional_targets={"image0": "image"},
)

In [None]:
import random, torch, os, numpy as np
import torch.nn as nn
import copy

def save_checkpoint(model, optimizer, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    checkpoint = {
        "state_dict": model.state_dict(),
        "optimizer": optimizer.state_dict(),
    }
    torch.save(checkpoint, filename)


def load_checkpoint(checkpoint_file, model, optimizer, lr):
    print("=> Loading checkpoint")
    checkpoint = torch.load(checkpoint_file, map_location=DEVICE)
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

    # If we don't do this then it will just have learning rate of old checkpoint
    # and it will lead to many hours of debugging \:
    for param_group in optimizer.param_groups:
        param_group["lr"] = lr


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


DATA LOADING

In [None]:
from PIL import Image
import os
from torch.utils.data import Dataset
import numpy as np

class HorseZebraDataset(Dataset):
    def __init__(self, root_zebra, root_horse, transform=None):
        self.root_zebra = root_zebra
        self.root_horse = root_horse
        self.transform = transform

        self.zebra_images = os.listdir(root_zebra)
        self.horse_images = os.listdir(root_horse)
        self.length_dataset = max(len(self.zebra_images), len(self.horse_images)) # 1000, 1500
        self.zebra_len = len(self.zebra_images)
        self.horse_len = len(self.horse_images)

    def __len__(self):
        return self.length_dataset

    def __getitem__(self, index):
        zebra_img = self.zebra_images[index % self.zebra_len]
        horse_img = self.horse_images[index % self.horse_len]

        zebra_path = os.path.join(self.root_zebra, zebra_img)
        horse_path = os.path.join(self.root_horse, horse_img)

        zebra_img = np.array(Image.open(zebra_path).convert("RGB"))
        horse_img = np.array(Image.open(horse_path).convert("RGB"))

        if self.transform:
            augmentations = self.transform(image=zebra_img, image0=horse_img)
            zebra_img = augmentations["image"]
            horse_img = augmentations["image0"]

        return zebra_img, horse_img



Dataset dir should look like:
  DATA
    val
      noise
      terrain
    train
      noise
      terrain

In [None]:
!pip install split-folders tqdm


Collecting split-folders
  Downloading split_folders-0.4.3-py3-none-any.whl (7.4 kB)
Installing collected packages: split-folders
Successfully installed split-folders-0.4.3


In [None]:
import splitfolders  # or import split_folders

# Split with a ratio.
# To only split into training and validation set, set a tuple to `ratio`, i.e, `(.8, .2)`.
splitfolders.ratio("data2", output="prodata2", seed=1337, ratio=(.5, .5), group_prefix=None) # default values


Copying files: 1077 files [00:00, 9926.42 files/s] 


TRAIN

In [None]:
import torch
import sys
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torchvision.utils import save_image
import random



def train_fn(disc_H, disc_Z, gen_Z, gen_H, loader, opt_disc, opt_gen, l1, mse, d_scaler, g_scaler):
    H_reals = 0
    H_fakes = 0
    count = 0
    loop = tqdm(loader, leave=True)

    for idx, (zebra, horse) in enumerate(loop):
        zebra = zebra.to(DEVICE)
        horse = horse.to(DEVICE)

        # Train Discriminators H and Z
        with torch.cuda.amp.autocast():
            fake_horse = gen_H(zebra)
            D_H_real = disc_H(horse)
            D_H_fake = disc_H(fake_horse.detach())
            H_reals += D_H_real.mean().item()
            H_fakes += D_H_fake.mean().item()
            D_H_real_loss = mse(D_H_real, torch.ones_like(D_H_real))
            D_H_fake_loss = mse(D_H_fake, torch.zeros_like(D_H_fake))
            D_H_loss = D_H_real_loss + D_H_fake_loss

            fake_zebra = gen_Z(horse)
            D_Z_real = disc_Z(zebra)
            D_Z_fake = disc_Z(fake_zebra.detach())
            D_Z_real_loss = mse(D_Z_real, torch.ones_like(D_Z_real))
            D_Z_fake_loss = mse(D_Z_fake, torch.zeros_like(D_Z_fake))
            D_Z_loss = D_Z_real_loss + D_Z_fake_loss

            # put it togethor
            D_loss = (D_H_loss + D_Z_loss)/2

        opt_disc.zero_grad()
        d_scaler.scale(D_loss).backward()
        d_scaler.step(opt_disc)
        d_scaler.update()

        # Train Generators H and Z
        with torch.cuda.amp.autocast():
            # adversarial loss for both generators
            D_H_fake = disc_H(fake_horse)
            D_Z_fake = disc_Z(fake_zebra)
            loss_G_H = mse(D_H_fake, torch.ones_like(D_H_fake))
            loss_G_Z = mse(D_Z_fake, torch.ones_like(D_Z_fake))

            # cycle loss
            cycle_zebra = gen_Z(fake_horse)
            cycle_horse = gen_H(fake_zebra)
            cycle_zebra_loss = l1(zebra, cycle_zebra)
            cycle_horse_loss = l1(horse, cycle_horse)

            # identity loss (remove these for efficiency if you set lambda_identity=0)
            identity_zebra = gen_Z(zebra)
            identity_horse = gen_H(horse)
            identity_zebra_loss = l1(zebra, identity_zebra)
            identity_horse_loss = l1(horse, identity_horse)

            # add all togethor
            G_loss = (
                loss_G_Z
                + loss_G_H
                + cycle_zebra_loss * LAMBDA_CYCLE
                + cycle_horse_loss * LAMBDA_CYCLE
                + identity_horse_loss * LAMBDA_IDENTITY
                + identity_zebra_loss * LAMBDA_IDENTITY
            )

        opt_gen.zero_grad()
        g_scaler.scale(G_loss).backward()
        g_scaler.step(opt_gen)
        g_scaler.update()
        rand = random.random()
        if idx % 200 == 0:
            save_image(fake_horse*0.5+0.5, f"gdrive/MyDrive/ridgedNoise/saved_images/horse_{rand}.png")
            save_image(fake_zebra*0.5+0.5, f"gdrive/MyDrive/ridgedNoise/saved_images/zebra_{rand}.png")
            count += 1

        loop.set_postfix(H_real=H_reals/(idx+1), H_fake=H_fakes/(idx+1))


def run():
    disc_H = Discriminator(in_channels=3).to(DEVICE)
    disc_Z = Discriminator(in_channels=3).to(DEVICE)
    gen_Z = Generator(img_channels=3, num_residuals=9).to(DEVICE)
    gen_H = Generator(img_channels=3, num_residuals=9).to(DEVICE)
    opt_disc = optim.Adam(
        list(disc_H.parameters()) + list(disc_Z.parameters()),
        lr=LEARNING_RATE,
        betas=(0.5, 0.999),
    )

    opt_gen = optim.Adam(
        list(gen_Z.parameters()) + list(gen_H.parameters()),
        lr=LEARNING_RATE,
        betas=(0.5, 0.999),
    )

    L1 = nn.L1Loss()
    mse = nn.MSELoss()

    if LOAD_MODEL:
        load_checkpoint(
            'gdrive/MyDrive/ridgedNoise/'+CHECKPOINT_GEN_H, gen_H, opt_gen, LEARNING_RATE,
        )
        load_checkpoint(
            'gdrive/MyDrive/ridgedNoise/'+CHECKPOINT_GEN_Z, gen_Z, opt_gen, LEARNING_RATE,
        )
        load_checkpoint(
            'gdrive/MyDrive/ridgedNoise/'+CHECKPOINT_CRITIC_H, disc_H, opt_disc, LEARNING_RATE,
        )
        load_checkpoint(
            'gdrive/MyDrive/ridgedNoise/'+CHECKPOINT_CRITIC_Z, disc_Z, opt_disc, LEARNING_RATE,
        )

    dataset = HorseZebraDataset(
        root_horse="prodata2/train/noise", root_zebra="prodata/train/terrain", transform=transforms
    )
    val_dataset = HorseZebraDataset(
       root_horse="prodata2/val/noise", root_zebra="prodata/val/terrain", transform=transforms
    )


    val_loader = DataLoader(
        val_dataset,
        batch_size=1,
        shuffle=False,
        pin_memory=True,
    )

    print(val_loader)
    loader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=NUM_WORKERS,
        pin_memory=True
    )
    g_scaler = torch.cuda.amp.GradScaler()
    d_scaler = torch.cuda.amp.GradScaler()

    for epoch in range(NUM_EPOCHS):
        train_fn(disc_H, disc_Z, gen_Z, gen_H, loader, opt_disc, opt_gen, L1, mse, d_scaler, g_scaler)

        if SAVE_MODEL:
            save_checkpoint(gen_H, opt_gen, filename="gdrive/MyDrive/ridgedNoise/"+CHECKPOINT_GEN_H)
            save_checkpoint(gen_Z, opt_gen, filename="gdrive/MyDrive/ridgedNoise/"+CHECKPOINT_GEN_Z)
            save_checkpoint(disc_H, opt_disc, filename="gdrive/MyDrive/ridgedNoise/"+CHECKPOINT_CRITIC_H)
            save_checkpoint(disc_Z, opt_disc, filename="gdrive/MyDrive/ridgedNoise/"+CHECKPOINT_CRITIC_Z)
            


In [None]:

run()

=> Loading checkpoint
=> Loading checkpoint
=> Loading checkpoint
=> Loading checkpoint
<torch.utils.data.dataloader.DataLoader object at 0x7f5b105fb150>


100%|██████████| 288/288 [02:55<00:00,  1.64it/s, H_fake=0.267, H_real=0.695]


=> Saving checkpoint
=> Saving checkpoint
=> Saving checkpoint
=> Saving checkpoint


  0%|          | 0/288 [00:00<?, ?it/s]


KeyboardInterrupt: ignored

For generating a set of terrain for stitching


In [None]:
import torchvision.transforms as transforms
gen_Z = Generator(img_channels=3, num_residuals=9).to(DEVICE)
gen_H = Generator(img_channels=3, num_residuals=9).to(DEVICE)
opt_gen = optim.Adam(
        list(gen_Z.parameters()) + list(gen_H.parameters()),
        lr=LEARNING_RATE,
        betas=(0.5, 0.999),
    )

load_checkpoint(
            'gdrive/MyDrive/ridgedNoise/'+CHECKPOINT_GEN_Z, gen_Z, opt_gen, LEARNING_RATE,
        )

for i in range(500):
  noise = Image.open('data2/noise/noise{:03d}.png'.format(i)).convert("RGB")
  noise = noise.resize((256,256))
  convert_tensor = transforms.ToTensor()
  tensorNoise = convert_tensor(noise)



  tensorNoise = tensorNoise.to(DEVICE)
  batch = torch.unsqueeze(tensorNoise, 0) #create a mini-batch as expected by the model
  fake_terrain = gen_Z(batch)
  save_image(fake_terrain*0.5+0.5, "generated/generatedTerrain{:03d}.png".format(i))


=> Loading checkpoint


In [None]:
!zip -r generated.zip generated

updating: generated/ (stored 0%)
updating: generated/generatedTerrain440.png (deflated 0%)
updating: generated/generatedTerrain024.png (deflated 0%)
updating: generated/generatedTerrain493.png (deflated 0%)
updating: generated/generatedTerrain482.png (deflated 0%)
updating: generated/generatedTerrain247.png (deflated 0%)
updating: generated/generatedTerrain150.png (deflated 0%)
updating: generated/generatedTerrain017.png (deflated 0%)
updating: generated/generatedTerrain000.png (deflated 0%)
updating: generated/generatedTerrain475.png (deflated 0%)
updating: generated/generatedTerrain495.png (deflated 0%)
updating: generated/generatedTerrain233.png (deflated 0%)
updating: generated/generatedTerrain244.png (deflated 0%)
updating: generated/generatedTerrain148.png (deflated 0%)
updating: generated/generatedTerrain215.png (deflated 0%)
updating: generated/generatedTerrain162.png (deflated 0%)
updating: generated/generatedTerrain472.png (deflated 0%)
updating: generated/generatedTerrain138