In [1]:
import tensorflow as tf
import numpy as np

In [2]:
x=np.load('/content/drive/MyDrive/model/data/processed/x.npy')

In [3]:
y = np.roll(x, shift=-1, axis=1)
y[:, -1, :] = 0
print(x.shape)
print(y.shape)

(11911, 1200, 3)
(11911, 1200, 3)


In [4]:
print(x[0][1])
print(y[0][0])

[ 0.0217352 -0.3341787  0.       ]
[ 0.0217352 -0.3341787  0.       ]


In [22]:
def mixtureDensityParams(Z):
    mdn_params = Z[:, :, 1:]

    pi_hat, mu1_hat, mu2_hat, sigma1_hat, sigma2_hat, rho_hat = tf.split(mdn_params, 6, axis=-1)

    eos_hat = Z[:, :, 0]
    eos = tf.sigmoid(eos_hat)
    rho = tf.tanh(rho_hat)
    pi = tf.sigmoid(pi_hat)

    sigma1 = tf.exp(sigma1_hat)
    sigma2 = tf.exp(sigma2_hat)

    mu1 = mu1_hat
    mu2 = mu2_hat

    return mu1, mu2, sigma1, sigma2, pi, eos, rho

# Define the mixture density function
def mixtureDensity(mu1, mu2, sigma1, sigma2, pi, eos, rho, x1, x2):
    x_mu1 = x1 - mu1
    x_mu2 = x2 - mu2
    Z_out = tf.square(tf.divide(x_mu1, sigma1)) + tf.square(tf.divide(x_mu2, sigma2)) - 2 * tf.divide(rho * x_mu1 * x_mu2, sigma1 * sigma2)
    rho_square_term = 1 - tf.square(rho)
    power_e = tf.exp(tf.divide(-Z_out, 2 * rho_square_term))
    regularize_term = 2 * tf.constant(np.pi, dtype=tf.float32) * sigma1 * sigma2 * tf.sqrt(rho_square_term)
    gaussian = tf.divide(power_e, regularize_term)
    return tf.reduce_sum(gaussian * pi)

# Define the probability function
def get_prob(x1, x2, eos_true, Z):
    mu1, mu2, sigma1, sigma2, pi, eos, rho = mixtureDensityParams(Z)
    eps = tf.constant(np.finfo(float).eps, dtype=tf.float32)
    prob = 0.0
    for i in range(20):
        prob += mixtureDensity(mu1[:,:,i], mu2[:,:,i], sigma1[:,:,i], sigma2[:,:,i], pi[:,:,i], eos, rho[:,:,i], x1, x2)
    return prob, tf.reduce_sum(tf.math.log(tf.squeeze((eos * (eos_true + eps) + (1 - eos) * (1 - eos_true + eps)))))

def custom_loss(y_true, y_pred):
    x1 = y_true[:, :, 0]
    x2 = y_true[:, :, 1]
    eos_true = y_true[:, :, 2]

    prob, stroke_prob = get_prob(x1, x2, eos_true, y_pred)
    loss=(-1)*(prob+stroke_prob)

    return loss

In [21]:
import tensorflow as tf
import numpy as np

class MyModel(tf.keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()

        self.lstm1 = tf.keras.layers.LSTM(400, return_sequences=True)
        self.batch_norm1 = tf.keras.layers.BatchNormalization()

        self.lstm2 = tf.keras.layers.LSTM(400, return_sequences=True)
        self.batch_norm2 = tf.keras.layers.BatchNormalization()

        self.lstm3 = tf.keras.layers.LSTM(400, return_sequences=True)
        self.batch_norm3 = tf.keras.layers.BatchNormalization()

        self.concatenate = tf.keras.layers.Concatenate(axis=-1)
        self.dense1 = tf.keras.layers.Dense(600, activation='relu')
        self.batch_norm4 = tf.keras.layers.BatchNormalization()

        self.dense2 = tf.keras.layers.Dense(121, activation='relu')
        self.batch_norm5 = tf.keras.layers.BatchNormalization()

    def call(self, inputs, training=True):
        x1 = self.lstm1(inputs)
        x1 = self.batch_norm1(x1, training=training)

        x2 = self.lstm2(x1)
        x2 = self.batch_norm2(x2, training=training)

        x3 = self.lstm3(x2)
        x3 = self.batch_norm3(x3, training=training)

        concatenated_output = self.concatenate([x1, x2, x3])

        x = self.dense1(concatenated_output)
        x = self.batch_norm4(x, training=training)

        mdn_params = self.dense2(x)
        mdn_params = self.batch_norm5(mdn_params, training=training)

        return mdn_params

model = MyModel()

In [23]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001, clipvalue=10.0), loss=custom_loss)

In [24]:
model.build((None,1200,3))
model.summary()

Model: "my_model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_6 (LSTM)               multiple                  646400    
                                                                 
 batch_normalization (Batch  multiple                  1600      
 Normalization)                                                  
                                                                 
 lstm_7 (LSTM)               multiple                  1281600   
                                                                 
 batch_normalization_1 (Bat  multiple                  1600      
 chNormalization)                                                
                                                                 
 lstm_8 (LSTM)               multiple                  1281600   
                                                                 
 batch_normalization_2 (Bat  multiple                  1

In [None]:
model.fit(x,y, batch_size=32, epochs=10)

In [None]:
model.save('/content/drive/MyDrive/model/handwriting-prediction')

In [None]:
model.load('/content/drive/MyDrive/model/handwriting-prediction')
example_index = 7
input_sequence = x[example_index:example_index + 1, :, :]

num_timesteps_to_generate = 1200

for _ in range(num_timesteps_to_generate):
    predicted_timestep = model.predict(input_sequence[:, -1200:, :])

    input_sequence = np.concatenate([input_sequence, predicted_timestep[:, -1:, :]], axis=1)

print("Generated Sequence Shape:", input_sequence.shape)

In [None]:
input_sequence.shape

(1, 2400, 3)

In [None]:
prediction=input_sequence[:,1200:,:]

In [None]:
prediction.shape

(1, 1200, 3)

In [None]:
for i in range(1200):
  if prediction[0,i,2]>0.5:
    prediction[0,i,2]=1
  else:
    prediction[0,i,2]=0

In [None]:
print(prediction[0,1000:1010])

[[ 0.00441157 -0.00333739  0.        ]
 [ 0.00441157 -0.0033374   0.        ]
 [ 0.00441157 -0.00333739  0.        ]
 [ 0.00441157 -0.00333739  0.        ]
 [ 0.00441157 -0.0033374   0.        ]
 [ 0.00441157 -0.00333739  0.        ]
 [ 0.00441157 -0.0033374   0.        ]
 [ 0.00441157 -0.0033374   0.        ]
 [ 0.00441157 -0.00333739  0.        ]
 [ 0.00441157 -0.00333739  0.        ]]


In [None]:
!pip install svgwrite

Collecting svgwrite
  Downloading svgwrite-1.4.3-py3-none-any.whl (67 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/67.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/67.1 kB[0m [31m1.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.1/67.1 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: svgwrite
Successfully installed svgwrite-1.4.3


In [None]:
#functions required to get image of generated handwriting from strokes
from __future__ import print_function
from collections import defaultdict

import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import savgol_filter
from scipy.interpolate import interp1d


alphabet = [
    '\x00', ' ', '!', '"', '#', "'", '(', ')', ',', '-', '.',
    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';',
    '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K',
    'L', 'M', 'N', 'O', 'P', 'R', 'S', 'T', 'U', 'V', 'W', 'Y',
    'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
    'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
    'y', 'z'
]
alphabet_ord = list(map(ord, alphabet))
alpha_to_num = defaultdict(int, list(map(reversed, enumerate(alphabet))))
num_to_alpha = dict(enumerate(alphabet_ord))

MAX_STROKE_LEN = 1200
MAX_CHAR_LEN = 75


def align(coords):
    """
    corrects for global slant/offset in handwriting strokes
    """
    coords = np.copy(coords)
    X, Y = coords[:, 0].reshape(-1, 1), coords[:, 1].reshape(-1, 1)
    X = np.concatenate([np.ones([X.shape[0], 1]), X], axis=1)
    offset, slope = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y).squeeze()
    theta = np.arctan(slope)
    rotation_matrix = np.array(
        [[np.cos(theta), -np.sin(theta)],
         [np.sin(theta), np.cos(theta)]]
    )
    coords[:, :2] = np.dot(coords[:, :2], rotation_matrix) - offset
    return coords


def skew(coords, degrees):
    """
    skews strokes by given degrees
    """
    coords = np.copy(coords)
    theta = degrees * np.pi/180
    A = np.array([[np.cos(-theta), 0], [np.sin(-theta), 1]])
    coords[:, :2] = np.dot(coords[:, :2], A)
    return coords


def stretch(coords, x_factor, y_factor):
    """
    stretches strokes along x and y axis
    """
    coords = np.copy(coords)
    coords[:, :2] *= np.array([x_factor, y_factor])
    return coords


def add_noise(coords, scale):
    """
    adds gaussian noise to strokes
    """
    coords = np.copy(coords)
    coords[1:, :2] += np.random.normal(loc=0.0, scale=scale, size=coords[1:, :2].shape)
    return coords


def encode_ascii(ascii_string):
    """
    encodes ascii string to array of ints
    """
    return np.array(list(map(lambda x: alpha_to_num[x], ascii_string)) + [0])


def denoise(coords):
    """
    smoothing filter to mitigate some artifacts of the data collection
    """
    coords = np.split(coords, np.where(coords[:, 2] == 1)[0] + 1, axis=0)
    new_coords = []
    for stroke in coords:
        if len(stroke) != 0:
            x_new = savgol_filter(stroke[:, 0], 7, 3, mode='nearest')
            y_new = savgol_filter(stroke[:, 1], 7, 3, mode='nearest')
            xy_coords = np.hstack([x_new.reshape(-1, 1), y_new.reshape(-1, 1)])
            stroke = np.concatenate([xy_coords, stroke[:, 2].reshape(-1, 1)], axis=1)
            new_coords.append(stroke)

    coords = np.vstack(new_coords)
    return coords


def interpolate(coords, factor=2):
    """
    interpolates strokes using cubic spline
    """
    coords = np.split(coords, np.where(coords[:, 2] == 1)[0] + 1, axis=0)
    new_coords = []
    for stroke in coords:

        if len(stroke) == 0:
            continue

        xy_coords = stroke[:, :2]

        if len(stroke) > 3:
            f_x = interp1d(np.arange(len(stroke)), stroke[:, 0], kind='cubic')
            f_y = interp1d(np.arange(len(stroke)), stroke[:, 1], kind='cubic')

            xx = np.linspace(0, len(stroke) - 1, factor*(len(stroke)))
            yy = np.linspace(0, len(stroke) - 1, factor*(len(stroke)))

            x_new = f_x(xx)
            y_new = f_y(yy)

            xy_coords = np.hstack([x_new.reshape(-1, 1), y_new.reshape(-1, 1)])

        stroke_eos = np.zeros([len(xy_coords), 1])
        stroke_eos[-1] = 1.0
        stroke = np.concatenate([xy_coords, stroke_eos], axis=1)
        new_coords.append(stroke)

    coords = np.vstack(new_coords)
    return coords


def normalize(offsets):
    """
    normalizes strokes to median unit norm
    """
    offsets = np.copy(offsets)
    offsets[:, :2] /= np.median(np.linalg.norm(offsets[:, :2], axis=1))
    return offsets


def coords_to_offsets(coords):
    """
    convert from coordinates to offsets
    """
    offsets = np.concatenate([coords[1:, :2] - coords[:-1, :2], coords[1:, 2:3]], axis=1)
    offsets = np.concatenate([np.array([[0, 0, 1]]), offsets], axis=0)
    return offsets


def offsets_to_coords(offsets):
    """
    convert from offsets to coordinates
    """
    return np.concatenate([np.cumsum(offsets[:, :2], axis=0), offsets[:, 2:3]], axis=1)


def draw(
        offsets,
        ascii_seq=None,
        align_strokes=True,
        denoise_strokes=True,
        interpolation_factor=None,
        save_file=None
):
    strokes = offsets_to_coords(offsets)

    if denoise_strokes:
        strokes = denoise(strokes)

    if interpolation_factor is not None:
        strokes = interpolate(strokes, factor=interpolation_factor)

    if align_strokes:
        strokes[:, :2] = align(strokes[:, :2])

    fig, ax = plt.subplots(figsize=(12, 3))

    stroke = []
    for x, y, eos in strokes:
        stroke.append((x, y))
        if eos == 1:
            coords = zip(*stroke)
            ax.plot(coords[0], coords[1], 'k')
            stroke = []
    if stroke:
        coords = zip(*stroke)
        ax.plot(coords[0], coords[1], 'k')
        stroke = []

    ax.set_xlim(-50, 600)
    ax.set_ylim(-40, 40)

    ax.set_aspect('equal')
    plt.tick_params(
        axis='both',
        left='off',
        top='off',
        right='off',
        bottom='off',
        labelleft='off',
        labeltop='off',
        labelright='off',
        labelbottom='off'
    )

    if ascii_seq is not None:
        if not isinstance(ascii_seq, str):
            ascii_seq = ''.join(list(map(chr, ascii_seq)))
        plt.title(ascii_seq)

    if save_file is not None:
        plt.savefig(save_file)
        print('saved to {}'.format(save_file))
    else:
        plt.show()
    plt.close('all')

In [None]:
import os
import logging

import numpy as np
import svgwrite

def _draw(strokes,filename, stroke_colors=None, stroke_widths=None):
        stroke_colors = stroke_colors or ['black']
        stroke_widths = stroke_widths or [2]
        line_height = 60
        view_width = 1000
        view_height = line_height*(len(strokes) + 1)

        dwg = svgwrite.Drawing(filename=filename)
        dwg.viewbox(width=view_width, height=view_height)
        dwg.add(dwg.rect(insert=(0, 0), size=(view_width, view_height), fill='white'))

        initial_coord = np.array([0, -(3*line_height / 4)])
        for offsets, color, width in zip(strokes,stroke_colors, stroke_widths):

            initial_coord[1] -= line_height
            offsets[:, :2] *= 1.5
            strokes = offsets_to_coords(offsets)
            strokes = denoise(strokes)
            strokes[:, :2] = align(strokes[:, :2])

            strokes[:, 1] *= -1
            strokes[:, :2] -= strokes[:, :2].min() + initial_coord
            strokes[:, 0] += (view_width - strokes[:, 0].max()) / 2

            prev_eos = 1.0
            p = "M{},{} ".format(0, 0)
            for x, y, eos in zip(*strokes.T):
                p += '{}{},{} '.format('M' if prev_eos == 1.0 else 'L', x, y)
                prev_eos = eos
            path = svgwrite.path.Path(p)
            path = path.stroke(color=color, width=width, linecap='round').fill("none")
            dwg.add(path)

            initial_coord[1] -= line_height

        dwg.save()


if __name__ == '__main__':
    _draw(prediction,'/content/drive/MyDrive/model/usage_demo.svg')