<a href="https://colab.research.google.com/github/YossiAsher/abstract-learning-in-image-processing/blob/main/svg_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install svgpathtools
import shutil
import os

In [None]:
svg_zip_link = 'https://drive.google.com/file/d/1i0xmRbSkdSG93RxWKWBDsKBnjtO4zdVG/view?usp=sharing'
unsupervised_model_checkpoint_folder = 'https://drive.google.com/drive/folders/1ht1ZbXw6ojfOzzMk-4blrrEhoeK90-RU?usp=sharing'
from googleapiclient.http import MediaIoBaseDownload
import io
from google.colab import auth
auth.authenticate_user()
from googleapiclient.discovery import build
drive_service = build('drive', 'v3').files()

In [None]:
def download_file(name, link = None, id = None):
  fileId = id if id else link.split('/')[-2]
  request = drive_service.get_media(fileId=fileId)
  fh = io.BytesIO()
  downloader = MediaIoBaseDownload(fh, request)
  done = False
  while done is False:
      status, done = downloader.next_chunk()
      print("Download %d%%" % int(status.progress() * 100))
  fh.seek(0)
  with open(name, 'wb') as f:
      shutil.copyfileobj(fh, f)

In [None]:
def download_folder(path, link):
  folderId = link.split('/')[-1].split('?')[0]
  results = drive_service.list(q=f"'{folderId}' in parents").execute()
  items = results.get('files', [])
  os.mkdir(path)
  for i in range(3):
    id = items[i]["id"]
    name = f"{path}/{items[i]['name']}"
    download_file(name=name, id=id)

In [None]:
download_file('svg_100.zip', svg_zip_link)
download_folder('checkpoint', unsupervised_model_checkpoint_folder)

In [None]:
!unzip svg_100.zip

In [None]:
rm -fr svg_100/paragliding

In [None]:
rm -rf svg_100/paragliding-launch

In [None]:
%load_ext tensorboard

In [None]:
!rm -rf ./logs/ 

In [None]:
import datetime
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

import uuid
import glob
import math
import os
import pathlib
from sklearn.model_selection import train_test_split
from svgpathtools import svg2paths, Path, CubicBezier, wsvg

In [None]:
learning_rate = 0.002
weight_decay = 0.0001
batch_size = 128
num_epochs = 500
projection_dim = 16
num_heads = 4
transformer_units = [
    projection_dim * 8,
    projection_dim * 4,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [1024, 512]  # Size of the dense layers of the final classifier
dim_size = 144
input_shape = (dim_size, 5)
num_positions = 10000
checkpoint_filepath = "checkpoint/checkpoint"
dropout_rate = 0
num_classes = 2
seed = 55
np.random.seed(seed)

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, path, files, batch_size, dim_size, line_size = 70, supervised=False, shuffle=True, debug=False):
        'Initialization'
        self.supervised = supervised
        if path:
          self.files = glob.glob(path + '/**/*.svg', recursive=True)
        else:
          self.files = files
        self.classes = list(set([f.split('/')[-2] for f in self.files])) if supervised else [0,1]
        print("files: ", len(self.files))
        self.shuffle = shuffle
        self.debug = debug
        self.batch_size = batch_size
        if self.batch_size == -1:
          self.batch_size = len(self.files)
        self.dim_size = dim_size
        self.line_size = line_size
        self.intersection_count = 0
        self.data = self.__init_data(self.files)
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.data) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        data_temp = [self.data[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(data_temp)

        return X, y

    def on_epoch_end(self):
        if self.debug:
          self.fo = str(uuid.uuid1())
          self.p = pathlib.Path(self.fo)
          os.mkdir(self.p)
        
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.data))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __init_data(self, files):
      data = []
      for file in files:
        paths, attributes = svg2paths(file)
        data.append((paths, file))
      return data

    def __data_generation(self, data_temp):
        'Generates data containing batch_size samples' 
        # Initialization
        X = np.zeros((self.batch_size, *input_shape))
        y = np.zeros((self.batch_size), dtype=int)

        # Generate data
        for i, (paths, file) in enumerate(data_temp):
            # Store sample
            paths, segments, index = self.__normalize_path(paths)
            
            out = 0
            if self.supervised:
              class_name = file.split('/')[-2]
              out = self.classes.index(class_name)
            else:
              randon_line, line_segment = self.__get_randon_line()
              segments[index] = line_segment
              for path in paths:
                if len(path.intersect(randon_line)) > 0:
                  out = 1            

            if self.debug:
              paths.append(randon_line)
              debug_file = str(self.p / (file.replace(self.path, '').replace('/', '_') + '_' + str(intersect) + '.svg'))
              wsvg(paths, filename= debug_file)
            
            # print(segments)
            X[i,] = segments
            y[i] = out
        return X, y
    
    def __get_randon_line(self):
      x1, y1 = np.random.randint(99, size=2)
      x2, y2 = np.random.randint(self.line_size, size=2)
      line = Path()
      cubic_bezier = CubicBezier(complex(x1,y1), complex(x1,y1), complex(x1 + x2,y1 + y2), complex(x1 + x2,y1 + y2))
      line.append(cubic_bezier)
      return line, self.__segment_to_array(1,cubic_bezier)

    def __normalize_path_rotated(self, paths):
      new_paths = []
      rad = np.random.randint(360)
      for path in paths:
        new_path = path.rotated(rad)
        new_paths.append(new_path)
      return new_paths

    def __normalize_path(self, paths):
      index = 0
      segments = np.zeros(input_shape)
      paths = self.__normalize_path_rotated(paths)
      max_total, paths = self.__normalize_path_align(paths)
      paths = self.__normalize_path_scale(paths, max_total)
      for path in paths:
        for segment in path:
          segments[index,] = self.__segment_to_array(0, segment)
          index += 1
      return paths, segments, index

    def __segment_to_array(self, path_index, segment):
      values = [path_index,
                self.__complex_to_int(segment.start), 
                self.__complex_to_int(segment.control1),
                self.__complex_to_int(segment.control2), 
                self.__complex_to_int(segment.end)]
      return np.array(values)

    def __normalize_path_scale(self, paths, max_total):
      new_paths = []
      for path in paths:
        path = path.scaled(99/max_total, 99/max_total)
        new_paths.append(path)
      return new_paths

    def __normalize_path_align(self, paths):
      x_max_total = 0
      y_max_total = 0
      new_paths = []
      for path in paths:
        if len(path) > 0:
          path = path.scaled(1,-1)
          x_min, x_max, y_min ,y_max = path.bbox()
          path = path.translated(complex(-x_min,-y_min))
          x_min, x_max, y_min ,y_max = path.bbox()
          if x_max > x_max_total:
            x_max_total = x_max
          if y_max > y_max_total:
            y_max_total = y_max    
          new_paths.append(path)
      max_total = max(x_max_total, y_max_total)
      return max_total, new_paths

    def __complex_to_int(self, compl):
      # print(compl)
      real = self.__clipping_to_int(compl.real)
      imag = self.__clipping_to_int(compl.imag)
      return real * 100 + imag
    
    def __clipping_to_int(self, value):
      result = int(value) if int(value) >= 0 and int(value) < 100 else (0 if int(value) < 0 else 99)
      return result

In [None]:
files = glob.glob('svg_100/**/*.svg', recursive=True)
train_files, test_files = train_test_split(files, test_size=0.2, random_state=seed)

In [None]:
train_dataset = DataGenerator(path=None, files=train_files, batch_size=batch_size, dim_size=dim_size, supervised=True)
test_dataset = DataGenerator(path=None, files=test_files, batch_size=batch_size, dim_size=dim_size, supervised=True)
unsupervised_dataset = DataGenerator(path=None, files=files, batch_size=batch_size, dim_size=dim_size, supervised=False)

In [None]:
def mlp(x, hidden_units, dropout_rate):
  for units in hidden_units:
    x = layers.Dense(units, activation=tf.nn.gelu)(x)
    x = layers.Dropout(dropout_rate)(x)
  return x

In [None]:
class PathEncoder(layers.Layer):
  def __init__(self, dim_size, num_positions, projection_dim):
    super(PathEncoder, self).__init__()
    self.dim_size = dim_size
    self.num_positions = num_positions
    self.projection_dim = projection_dim
    self.path_type_embedding = layers.Embedding(
        input_dim=2, output_dim=self.projection_dim *4
    )
    self.embedding_start = layers.Embedding(
        input_dim=self.num_positions, output_dim=self.projection_dim
    )
    self.embedding_control1 = layers.Embedding(
        input_dim=self.num_positions, output_dim=self.projection_dim
    )
    self.embedding_control2 = layers.Embedding(
        input_dim=self.num_positions, output_dim=self.projection_dim
    )
    self.embedding_end = layers.Embedding(
        input_dim=self.num_positions, output_dim=self.projection_dim
    )

  def get_config(self):
        config = super().get_config().copy()
        config.update({
            'dim_size': self.dim_size,
            'num_positions': self.num_positions,
            'projection_dim': self.projection_dim,
        })
        return config

  def call(self, paths):
    new_shape = (-1, self.dim_size, self.projection_dim)
    t0, t1, t2, t3, t4 = tf.split(paths, num_or_size_splits=5, axis=2)
    encoded = tf.concat([tf.reshape(self.embedding_start(t1), new_shape),
                        tf.reshape(self.embedding_control1(t2), new_shape),
                        tf.reshape(self.embedding_control2(t3), new_shape),
                        tf.reshape(self.embedding_end(t4), new_shape)], 2)
    path_index = tf.reshape(self.path_type_embedding(t0), (-1, self.dim_size, self.projection_dim*4))
    return encoded + path_index

In [None]:
inputs = layers.Input(shape=input_shape)
# Encode paths.
encoded_paths = PathEncoder(dim_size, num_positions, projection_dim)(inputs)
# Create multiple layers of the Transformer block.
for _ in range(transformer_layers):
    # Layer normalization 1.
    x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_paths)
    # Create a multi-head attention layer.
    attention_output = layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=projection_dim * 4, dropout=dropout_rate
    )(x1, x1)
    # Skip connection 1.
    x2 = layers.Add()([attention_output, encoded_paths])
    # Layer normalization 2.
    x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
    # MLP.
    x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=dropout_rate)
    # Skip connection 2.
    encoded_paths = layers.Add()([x3, x2])

# Create a [batch_size, projection_dim] tensor.
representation = layers.LayerNormalization(epsilon=1e-6)(encoded_paths)
representation = layers.Flatten()(representation)
representation = layers.Dropout(dropout_rate)(representation)
# Add MLP.
features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=dropout_rate)
# Classify outputs.
logits = layers.Dense(num_classes)(features)
# Create the Keras model.
unsupervised_model = keras.Model(inputs=inputs, outputs=logits)

In [None]:
# in order to use the unsupervised model checkpoint
unsupervised_model.load_weights(checkpoint_filepath)

In [None]:
x = unsupervised_model.layers[-4].output 
x = layers.Dense(512)(x)
predictions = layers.Dense(len(train_dataset.classes))(x)
supervised_model = keras.Model(inputs = unsupervised_model.input, outputs = predictions)

In [None]:
optimizer = tf.keras.optimizers.Adam()

unsupervised_model.compile(
    optimizer=optimizer,
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[
        keras.metrics.SparseCategoricalAccuracy(name="accuracy")
    ],
)

supervised_model.compile(
    optimizer=optimizer,
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[
        keras.metrics.SparseCategoricalAccuracy(name="accuracy")
    ],
)


In [None]:
log_dir = "logs/svg-attention"
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
history = supervised_model.fit(
        train_dataset,
        validation_data=test_dataset,
        epochs=num_epochs,
        callbacks=[tensorboard_callback],
        workers=16, 
        use_multiprocessing=True,
        max_queue_size=100
    )

In [None]:
%tensorboard --logdir logs

In [None]:
!tensorboard dev upload \
  --logdir logs/svg-attention \
  --name "abstract-learning-in-image-processing-svg-attention" \
  --one_shot


In [None]:
# train the unsupervised task

checkpoint_callback = keras.callbacks.ModelCheckpoint(
    checkpoint_filepath,
    monitor="accuracy",
    save_best_only=True,
    save_weights_only=True,
)

history = unsupervised_model.fit(
        unsupervised_dataset,
        epochs=num_epochs,
        callbacks=[checkpoint_callback],
        workers=16, 
        use_multiprocessing=True,
        max_queue_size=100
    )