In [None]:
!mkdir gif

In [None]:
import config

In [None]:
!pip install --upgrade pip
!pip install numpy
config.CLOUD = False #False: macos, True:windows/linux

In [None]:
import numpy as np
import tensorflow as tf

In [None]:
class RenderEngine(tf.keras.layers.Layer):
    def __init__(self, patch_size, **kwargs):
        super(RenderEngine, self).__init__(**kwargs)
        self.patch_size = patch_size
    
    def build(self, input_shape):
        super(RenderEngine, self).build(input_shape)
    
    def screen(self, image):
        return 1 - tf.reduce_prod(1 - image, axis=0)
    
    def gaussian(self, params):
        coords = tf.range(self.patch_size, dtype=tf.float32) + 0.5
        coords = tf.meshgrid(coords, coords)
        
        x_mean = params[0] + params[3] * (config.DOWNSAMPLE - 1)
        y_mean = params[1] + params[4] * (config.DOWNSAMPLE - 1)
        amp = params[2]
        x_std = params[5] * config.MAX_STD
        y_std = params[6] * config.MAX_STD
        theta = (params[7] - 0.5) * np.pi
        
        cos_x = tf.math.divide_no_nan(tf.math.square(tf.math.cos(theta)), 2*x_std)
        cos_y = tf.math.divide_no_nan(tf.math.square(tf.math.cos(theta)), 2*y_std)
        sin_x = tf.math.divide_no_nan(tf.math.square(tf.math.sin(theta)), 2*x_std)
        sin_y = tf.math.divide_no_nan(tf.math.square(tf.math.sin(theta)), 2*y_std)
        sin_2x = tf.math.divide_no_nan(tf.math.sin(2*theta), 2*x_std)
        sin_2y = tf.math.divide_no_nan(tf.math.sin(2*theta), 2*y_std)
        
        a = cos_x + sin_y
        b = sin_2x - sin_2y
        c = sin_x + cos_y
        img = amp * tf.math.exp(-a*tf.math.square(coords[0] - x_mean) -b*(coords[0]-x_mean)*(coords[1]-y_mean) -c*tf.math.square(coords[1]-y_mean))
        img = tf.clip_by_value(img, 0.0, 1.0)
        
        return tf.expand_dims(img, axis=-1)
    
    def call_per_batch(self, x):
        coords = tf.range(self.patch_size, delta=config.DOWNSAMPLE, dtype=tf.float32) + 0.5
        coords = tf.meshgrid(coords, coords)
        coords = tf.transpose(coords, perm=[1, 2, 0])
        
        x = tf.concat([coords, x], axis=-1)
        x = tf.reshape(x, [-1, config.NUM_OF_PARAMS+2])
        
        if config.CLOUD:
            return self.screen(tf.vectorized_map(self.gaussian, x))
        return self.screen(tf.map_fn(self.gaussian, x))
    
    def call(self, x):
        if config.CLOUD:
            return tf.vectorized_map(self.call_per_batch, x)
        return tf.map_fn(self.call_per_batch, x)

In [None]:
def create_batch():
    outputs = []
    engine = RenderEngine(config.PATCH_SIZE)
    
    for _ in range(config.BATCH_SIZE):
        param = np.zeros([config.PATCH_SIZE//config.DOWNSAMPLE, config.PATCH_SIZE//config.DOWNSAMPLE, config.NUM_OF_PARAMS])
        
        num_of_stars = np.random.randint(0, 10)
        config.TOT_NUM_STARS += num_of_stars
        
        for _ in range(num_of_stars):
            i = np.random.randint(low=0, high=config.PATCH_SIZE//config.DOWNSAMPLE)
            j = np.random.randint(low=0, high=config.PATCH_SIZE//config.DOWNSAMPLE)
            
            amp = np.random.uniform(low=0.05, high=1.0)
            x_fine = np.random.uniform(low=0.0, high=1.0)
            y_fine = np.random.uniform(low=0.0, high=1.0)
            if np.random.choice([True, False]):
                x_std = np.random.uniform(low=0.05, high=1.0)
                y_std = np.random.uniform(low=np.clip(x_std-0.4, 0, 1), high=np.clip(x_std+0.4, 0, 1))
            else:
                y_std = np.random.uniform(low=0.05, high=1.0)
                x_std = np.random.uniform(low=np.clip(y_std-0.4, 0, 1), high=np.clip(y_std+0.4, 0, 1))
            theta = np.random.uniform(low=0.0, high=1.0)
            
            param[i, j] = [amp, x_fine, y_fine, x_std, y_std, theta]
        
        outputs.append(param)
    
    outputs = np.array(outputs)
    inputs = engine(outputs)
    #inputs = inputs + np.random.normal(loc=0.0, scale=0.04, size=inputs.shape)
    return inputs, outputs

In [None]:
### Output visualizerat
def merge_channels(render, original):
    zero = np.zeros([config.PATCH_SIZE, config.PATCH_SIZE, 1])
    return np.concatenate([zero, original, render], axis=-1)

def concatenate_channels(render, original):
    return np.concatenate([render, original], axis=1)

In [None]:
### Neural Network
def build_model():
    inputs = tf.keras.layers.Input(shape=[config.PATCH_SIZE, config.PATCH_SIZE, 1])
    
    conv = tf.keras.layers.Conv2D(64, [7,7], strides=2, padding='same')(inputs)
    conv = tf.keras.layers.BatchNormalization()(conv)
    conv = tf.nn.leaky_relu(conv)
    
    conv = tf.keras.layers.Conv2D(128, [5,5], strides=2, padding='same')(conv)
    conv = tf.keras.layers.BatchNormalization()(conv)
    conv = tf.nn.leaky_relu(conv)
    
    conv = tf.keras.layers.Conv2D(256, [3,3], strides=1, padding='same')(conv)
    conv = tf.keras.layers.BatchNormalization()(conv)
    conv = tf.nn.leaky_relu(conv)
    
    conv = tf.keras.layers.Conv2D(512, [3,3], strides=2, padding='same')(conv)
    conv = tf.keras.layers.BatchNormalization()(conv)
    conv = tf.nn.leaky_relu(conv)
    
    conv = tf.keras.layers.Conv2D(config.NUM_OF_PARAMS, [1,1], strides=1, padding='same')(conv)
    conv = tf.nn.sigmoid(conv)
    
    model = tf.keras.Model(inputs=inputs, outputs=conv)
    return model

In [None]:
### Neural Net definition
opt = tf.keras.optimizers.RMSprop(learning_rate=5e-4)
#opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
model = build_model()

In [None]:
!rm ./gif/log.txt
!touch ./gif/log.txt
config.TOT_NUM_STARS = 0

In [None]:
### Training loop
loss_mean = []
last_check_print = 0

while config.TOT_NUM_STARS < config.ITERATIONS:
    input_img, input_pos = create_batch()
    with tf.GradientTape() as tape:
        output_pos = model(input_img)
        
        loss_model = tf.math.reduce_mean(tf.math.square(output_pos - input_pos))
        
    grads = tape.gradient(loss_model, model.trainable_weights)
    opt.apply_gradients(zip(grads, model.trainable_weights))
    
    loss_mean.append(loss_model)
    
    if np.floor(config.TOT_NUM_STARS / 1000) > last_check_print:
        status = '[Num of stars trained : {}; Loss: {:.7f}]'.format(config.TOT_NUM_STARS, np.mean(loss_mean))
        tf.print(status)
        
        with open('./gif/log.txt', 'a') as log:
            log.write(status + '\n')
        
        output_img = RenderEngine(config.PATCH_SIZE)(np.expand_dims(output_pos[0], axis=0))
        img_in = input_img[0] * 255.0
        
        out_img = concatenate_channels(output_img[0] * 255.0, img_in)
        out_png = tf.io.encode_png(tf.cast(out_img, tf.uint8))
        tf.io.write_file('./gif/out_img.png', out_png)
        
        loss_mean.clear()
        last_check_print = np.floor(config.TOT_NUM_STARS / 1000)

log.close()

In [None]:
model.save('hopeless_diamond.h5')

In [None]:
import cv2 as cv
import numpy as np
import tensorflow as tf
import config
import astropy.modeling

In [None]:
image = cv.imread('star_mask.png')
image = image / 255.0

In [None]:
recon = np.zeros(image.shape)

for x in range(0, image.shape[1], config.PATCH_SIZE):
    for y in range(0, image.shape[0], config.PATCH_SIZE):
        patch = image[y:y+config.PATCH_SIZE, x:x+config.PATCH_SIZE]
        patch = np.expand_dims(patch, axis=0)
        
        if patch.shape != (1, config.PATCH_SIZE, config.PATCH_SIZE, image.shape[-1]):
            if patch.shape[1] != config.PATCH_SIZE and patch.shape[2] == config.PATCH_SIZE:
                patch = np.concatenate([patch, np.zeros([1, config.PATCH_SIZE-patch.shape[1], config.PATCH_SIZE, image.shape[-1]])], axis=1)
            elif patch.shape[2] != config.PATCH_SIZE and patch.shape[1] == config.PATCH_SIZE:
                patch = np.concatenate([patch, np.zeros([1, config.PATCH_SIZE, config.PATCH_SIZE-patch.shape[2], image.shape[-1]])], axis=2)
            else:
                continue
            
        params_matrix = np.zeros([image.shape[-1], config.PATCH_SIZE//config.DOWNSAMPLE, config.PATCH_SIZE//config.DOWNSAMPLE, 8])
        
        for channel in range(image.shape[-1]):
            star_params = model(np.expand_dims(patch[:,:,:,channel], axis=-1))[0]
            coords = np.arange(0, config.PATCH_SIZE, config.DOWNSAMPLE) + 0.5
            coords = np.meshgrid(coords, coords)
            coords = tf.transpose(coords, perm=[1, 2, 0]).numpy()
            star_params = np.concatenate([coords, star_params], axis=-1)
            params_matrix[channel, :, :, :] = star_params
        
        for _x in range(params_matrix.shape[2]):
            for _y in range(params_matrix.shape[1]):
                for channel in range(params_matrix.shape[0]):
                    x_mean = y + params_matrix[channel, _x, _y, 1]
                    y_mean = x + params_matrix[channel, _x, _y, 0]
                    x_fine = np.mean(params_matrix[:, _x, _y, 4]) * config.DOWNSAMPLE
                    y_fine = np.mean(params_matrix[:, _x, _y, 3]) * config.DOWNSAMPLE
                    amplitude = np.mean(params_matrix[:, _x, _y, 2])
                    x_std = params_matrix[channel, _x, _y, 5] * 14 * 0.9
                    y_std = params_matrix[channel, _x, _y, 6] * 14 * 0.9
                    theta = (params_matrix[channel, _x, _y, 7] - 0.5) * np.pi
                    
                    gauss = astropy.modeling.models.Gaussian2D(
                            amplitude=amplitude,
                            x_mean=y_mean+y_fine,
                            y_mean=x_mean+x_fine,
                            x_stddev=np.mean([x_std, y_std]),
                            y_stddev=np.mean([x_std, y_std]),
                            theta=0)
                    try:
                        gauss.render(recon[:,:,channel])
                    except:
                        continue

In [None]:
cv.imwrite('rec.png', recon*255.0)