# Training data

## Reading the data

In this notebook we will load the data that is used for training and convert it to the input representation that we need to train the network.

The dataset consists of two seperate directories. One directory contains $x$ and $y$ coordinates and corresponding timestamps in xml format for each seperate line of a text. The other directory contains the texts.

In [None]:
from pathlib import Path

path_strokefiles = Path("./../../data/raw_data/strokefiles")
path_textfiles = Path("./../../data/raw_data/textfiles")

In [None]:
import os

textfile_paths = [Path(dirpath + "/" + filenames[0])
                  for (dirpath, dirnames, filenames) in os.walk(path_textfiles)
                  if filenames != []] 

print(textfile_paths[:3])

First a function is defined to read the text lines from a text file, as these files need to be parsed and split up into lines, this is done with regex.

In [None]:
import re

example_textfile_path = textfile_paths[0]
print(f'exmaple path = {example_textfile_path}\n')
with open(example_textfile_path) as f:
    print(f.read())

def get_file_lines(textfile_path):
    f = open(textfile_path) 
    content = f.read()
    lines = re.search("CSR:\s*([^~]*)", content).group(1).strip().split("\n")
    return lines
    
example_textlines = get_file_lines(example_textfile_path)
print(example_textlines)

In [None]:
stroke_paths = [(dirpath, filenames)
                for (dirpath, dirnames, filenames) in os.walk(path_strokefiles)
                if filenames != []]

print(stroke_paths[:3])

Next, the paths of the xml files containing the $x$ and $y$ coordinates and timestamps for the lines in a certain text file are retrieved.

In [None]:
def get_stroke_paths(textfile_path):
    strokefiles_root_folder = path_strokefiles / textfile_path.parts[-3] / textfile_path.parts[-2]
    
    if not strokefiles_root_folder.is_dir():
        return None
    
    m = re.search("(.*?)-(.*)", textfile_path.stem)
        
    res = [strokefiles_root_folder / filename for filename in sorted(os.listdir(strokefiles_root_folder))         
           if re.search("(.*?)-(.*?)-.*", filename).groups() == m.groups()]

    if len(res) == 0:
        return None
    
    return res

example_textline_strokefile_paths = get_stroke_paths(example_textfile_path)
print(example_textline_strokefile_paths)

These files are then parse using the xml.etree.ElementTree module.

In [None]:
import xml.etree.ElementTree as ET
import numpy as np

def read_file(filename):
    root = ET.parse(filename).getroot()
    
    strokes = [[[point.attrib["x"], point.attrib["y"], point.attrib["time"]]
                 for point in stroke.findall("./Point")]
                for stroke in root.findall("./StrokeSet/Stroke")]

    max_stroke_len = max(len(r) for r in strokes)
    
    s = np.zeros((len(strokes), max_stroke_len, 3))
    s[:, :, 2] -= 1

    for i, row in enumerate(strokes):
        s[i, :len(row)] = row

    return s

example_textline_strokes = [read_file(file) for file in example_textline_strokefile_paths]

In [None]:
import matplotlib.pyplot as plt

def plot_strokes(strokes):
    for stroke in strokes:
        plt.plot(stroke[:, 0][stroke[:, 2] >= 0], stroke[:, 1][stroke[:, 2] >= 0])

    plt.show()
        
for (textline, textline_strokes) in zip(example_textlines, example_textline_strokes):
    print(textline)
    plot_strokes(textline_strokes)

## Encoding the data

The encoding alphabet is made by taking looking at all unique characters in the dataset.

In [None]:
def get_alphabet(textfile_paths):
    all_chars = set()
    
    for file in textfile_paths:
        with open(file) as f:
            content = f.read()
            text_chars = set(re.search("CSR:\s*([^~]*)", content).group(1).strip())
            
            all_chars = all_chars.union(text_chars)
        
    return list(all_chars)

alphabet = get_alphabet(textfile_paths)
print(alphabet, len(alphabet))

In [None]:
def encode_textline(textline, alphabet):
    return [alphabet.index(c) for c in textline]

def decode_textline(encodedline, alphabet):
    return [alphabet[v] for v in encodedline]

Now all the data is normalized and stored as explained in the notebook Input representation.

In [None]:
def normalize_strokes(strokes):
    ustrokes = [np.unique(stroke[:, :2], return_index=True, axis=0)[1] for stroke in strokes]
    max_stroke_len = max(len(r) for r in ustrokes)

    normalized_strokes = np.zeros((len(ustrokes), max_stroke_len, 3))
    normalized_strokes[:, :, 2] -= 1

    for i, row in enumerate(ustrokes):
        normalized_strokes[i, :len(row)] = strokes[i, np.sort(row)]

    non_ragged = normalized_strokes[:, :, 2] >= 0
    normalized_strokes[non_ragged] -= [normalized_strokes[0, 0, 0], np.amax(normalized_strokes[:, :, 1]), normalized_strokes[0, 0, 2]]
    normalized_strokes[non_ragged] /= [-np.amin(normalized_strokes[:, :, 1]), np.amin(normalized_strokes[:, :, 1]), 1]

    return normalized_strokes

def sample_line(p0, p1, delta=0.05):
    l = ((p1[0]-p0[0])**2 + (p1[1]-p0[1])**2)**0.5
    num = int(l/delta)
    
    if num == 0:
        return [p0]
    
    sampled_xs = np.linspace(p0[0], p1[0], num)
    sampled_ys = np.linspace(p0[1], p1[1], num)
    sampled_timestamps = np.linspace(p0[2], p1[2], num)

    return np.stack((sampled_xs, sampled_ys, sampled_timestamps), axis=1).tolist()

def resample_stroke(stroke):
    resampled_stroke = []
    
    for i, _ in enumerate(stroke[stroke[:, 2] >= 0][:-1]):
        resampled_stroke.extend(sample_line(stroke[i], stroke[i+1]))
        
    return resampled_stroke

def resample_strokes(strokes):
    rs = [resample_stroke(stroke) for stroke in strokes]
    max_stroke_len = max(len(r) for r in rs)

    resampled_strokes = np.zeros((len(rs), max_stroke_len, 3))
    resampled_strokes[:, :, 2] -= 1

    for i, row in enumerate(rs):
        if row:
            resampled_strokes[i, :len(row)] = row
    
    return resampled_strokes

def add_extra_params(strokes):
    directions = np.apply_along_axis(lambda x: int(x[x >= 0][0] < x[x >= 0][-1]), 1, strokes[:,:,1])
    directions = np.tile(np.expand_dims(directions, axis=0).transpose(), (1, strokes.shape[1]))
    rtps = np.append(strokes, np.expand_dims(directions, axis=2), axis=2)
    rtps = np.append(rtps, np.zeros((strokes.shape[0], strokes.shape[1], 1)), axis=2)
    rtps[:, 0, 4] = 1
    touch_points = rtps[rtps[:, :, 2] >= 0].tolist()
    return np.array(touch_points)

def convert_stroke_to_bezier_curves(datapoints):
    fitted_curves = fit_datapoints(datapoints)
    stiched_curves = stitch_curves(fitted_curves)    
    parameters = [parameterize_curve(PE, 1) for (PE, s, stdev, d) in stiched_curves]
    return parameters

In [None]:
def makeSMatrix(s, width): 
    return np.column_stack([s**p for p in range(width)])

def SSE(data, P, s):
    D = data[data[:, 2] >= 0]
    S = makeSMatrix(s, 4)
    return np.sum(np.sum((D - (S@P))**2, axis=1), axis=0)


def newton_step(data, P, s):
    D = data[data[:, 2] >= 0]
    S = makeSMatrix(s, 4)
    C = S@P

    P1d = P[1:, :] * [[1], [2], [3]]
    C1d = makeSMatrix(s, 3)@P1d # First derivates
    
    P2d = P1d[1:, :] * [[1], [2]]
    C2d = makeSMatrix(s, 2)@P2d # Second derivates
    
    P3d = P2d[1:, :]
    C3d = makeSMatrix(s, 1)@P3d # Third derivates
    
    N1 = (D[:, 0] - C[:, 0])*C2d[:, 0] + (D[:, 1] - C[:, 1])*C2d[:, 1]\
            - C2d[:, 0]**2 - C2d[:, 1]**2
    
    N2 = (D[:, 0] - C[:, 0])*C3d[:, 0] + (D[:, 1] - C[:, 1])*C3d[:, 1]\
            - 2*C3d[:, 0]*C2d[:, 0] - C1d[:, 0]*C2d[:, 0]\
            - 2*C3d[:, 1]*C2d[:, 1] - C1d[:, 1]*C2d[:, 1]
    
    s_new = np.copy(s)
    s_new[1:-1] -= (N1/N2)[1:-1] # Keep s=0 and s=1 in place.
    return s_new

def get_relative_distances(data):
    if len(data) < 2:
        return np.array([[0.5]])

    diffs = (data[1:, :] - data[:-1, :])
    distances = np.insert((diffs[:, 0]**2 + diffs[:, 1]**2)**(1/2), 0, 0)
    cummulative_distances = np.cumsum(distances)
    return cummulative_distances/cummulative_distances[-1]

def fit_curve_newton_step(data, delta=0.05, precision=0.05, maxiter=50):
    D = data[data[:, 2] >= 0]
    
    if len(D) == 0:
        return None
    
    s = get_relative_distances(D)
    S = makeSMatrix(s, 4)
    PE = np.linalg.lstsq(S, D, rcond=None)[0]
    
    prev_error = SSE(D, PE, s)
    
    if prev_error < precision:
        return PE, s, prev_error
    
    for value in range(maxiter):
        s = newton_step(D, PE, s)
        S = makeSMatrix(s, 4)
        PE = np.linalg.lstsq(S, D, rcond=None)[0]
        
        error = SSE(D, PE, s)

        if abs(error - prev_error) < delta:
            break

        prev_error = error
        
    return PE, s, prev_error

def fit_datapoints(datapoints, precision=0.001, precision_newton=0.05):
    res = fit_curve_newton_step(datapoints, delta=precision_newton)
    
    if not res:
        return None
    
    PE, s, error = res
    stdev = (error/len(datapoints))**(1/2)
    curves = []
    
    curve_diffs = (datapoints[1:, :] - datapoints[:-1, :]) # Smarter way to do this?
    distances = (curve_diffs[:, 0]**2 + curve_diffs[:, 1]**2)**(1/2)
    
    abs_diffs = datapoints[0] - datapoints[-1]
    abs_dist = (abs_diffs[0]**2 + abs_diffs[1]**2)**(1/2)
    
    if stdev > precision or (np.sum(distances) / abs_dist) > 3:
        split = split_datapoints(datapoints)
        
        if not split:
            curves.append([PE, s, stdev, datapoints])
        else:
            first_h, second_h = split
            res_f = fit_datapoints(first_h, precision, precision_newton)
            res_s = fit_datapoints(second_h, precision, precision_newton)
            
            if res_f and res_s:
                curves.extend(res_f)
                curves.extend(res_s)
            else:
                curves.append([PE, s, stdev, datapoints])
    else:
        curves.append([PE, s, stdev, datapoints])
        
    return curves

def get_control_points(P):
    C = makeSMatrix(np.array([0, 1]), 4)@P
    p0 = C[0, :2]
    p3 = C[-1, :2]

    PE31d = P[1:, :] * [[1], [2], [3]]
    C1d = makeSMatrix(np.array([0, 1]), 3)@PE31d # First derivates

    p1 = p0 + (1/3) * C1d[0, :2]
    p2 = p3 - (1/3) * C1d[-1, :2]

    return [p0, p1, p2, p3]
    

def parameterize_curve(P, p, debug=False):
    p0, p1, p2, p3 = get_control_points(P)
    
    vec_14 = p3 - p0 # Vec from controlpoint 1 to control point 4
    vec_41 = p0 - p3 # Vec from controlpoint 4 to control point 1
    
    distance_endpoints = np.sum((p3 - p0)**2)**(1/2)

    control_vec1 = p1 - p0
    control_vec2 = p2 - p3

    d1 = np.sum(control_vec1**2)**(1/2) / distance_endpoints
    d2 = np.sum(control_vec2**2)**(1/2) / distance_endpoints
    
    a1 = np.arctan2(
        control_vec1[0] * vec_14[1] - control_vec1[1] * vec_14[0],
        np.dot(vec_14, control_vec1)
    )
    
    a2 = np.arctan2(
        control_vec2[0] * vec_41[1] - control_vec2[1] * vec_41[0],
        np.dot(vec_41, control_vec2)
    )

    if debug:
        plt.title("angle 1")
        plt.plot([0, vec_14[0]], [0, vec_14[1]], color="r")
        plt.plot([0, control_vec1[0]], [0, control_vec1[1]], color="b")
        plt.show()
    
        plt.title("angle 2")
        plt.plot([0, vec_41[0]], [0, vec_41[1]], color="r")
        plt.plot([0, control_vec2[0]], [0, control_vec2[1]], color="b")
        plt.show()

    return [vec_14[0], vec_14[1], d1, d2, a1, a2, P[1, 2], P[2, 2], P[3, 2], p]

def length_vecs(vec):
    return (vec[:, 0]**2 + vec[:, 1]**2)**(1/2)

def dot_vecs(vec1, vec2):
    return vec1[:, 0]*vec2[:, 0] + vec1[:, 1]*vec2[:, 1]

def calc_angles(stroke):
    D = stroke[stroke[:, 2] >= 0]
    vecs_back = D[1:, :] - D[:-1, :]
    vecs_forward = D[:-1, :] - D[1:, :]

    frac = dot_vecs(vecs_forward[1:], vecs_back[:-1]) / (length_vecs(vecs_forward[1:]) * length_vecs(vecs_back[:-1]))
    
    frac[frac < -1] = -1 # TODO: why is this needed?
    
    return np.arccos(
                frac
            )

def split_datapoints(stroke):
#     print("split")
    angles = calc_angles(stroke)
    indices = np.argsort(angles) + 1
    
    for index in indices:
        if 3 <= index or index <= len(stroke) - 3: # Make sure there are enough datapoints to make the fit.
            return stroke[:index+1], stroke[index:]
    
    return None

def stitch_curves(fitted_curves, precision=0.001):
    curves = []
    
    for i, _ in enumerate(fitted_curves[:-1]):
        d = np.vstack((fitted_curves[i][3][:-1], fitted_curves[i+1][3]))
        PE, s, error = fit_curve_newton_step(d)
        stdev = (error/len(d))**(1/2)
        
        curve_diffs = (d[1:, :] - d[:-1, :]) # Smarter way to do this?
        distances = (curve_diffs[:, 0]**2 + curve_diffs[:, 1]**2)**(1/2)
    
        abs_diffs = d[0] - d[-1]
        abs_dist = (abs_diffs[0]**2 + abs_diffs[1]**2)**(1/2)
    
        if stdev > precision or (np.sum(distances) / abs_dist) > 3:
            curves.append(fitted_curves[i])
            if i == len(fitted_curves) - 2:
                curves.append(fitted_curves[i+1])
        else:
            if i == len(fitted_curves) - 2:
                curves.append([PE, s, stdev, d])
            else:
                fitted_curves[i+1] = [PE, s, stdev, d]
                
    return curves

def convert_stroke_to_bezier_curves(datapoints):
    fitted_curves = fit_datapoints(datapoints)
    
    if not fitted_curves:
        return None

    stiched_curves = stitch_curves(fitted_curves)    
    parameters = [parameterize_curve(PE, 1) for (PE, s, stdev, d) in stiched_curves]
    return parameters

def strokes_to_bezier(strokes):
    points = []

    for stroke in strokes:
        res = convert_stroke_to_bezier_curves(stroke)
        
        if res:
            points.extend(res)
        
    return np.array(points)

In [None]:
rtp_features = []
bezier_features = []
target = []

for i, textfile_path in enumerate(textfile_paths):
    if i % 100 == 0:
        print(i/len(textfile_paths))

    textline_strokefile_paths = get_stroke_paths(textfile_path)
    
    if not textline_strokefile_paths:
        continue

    lines = [encode_textline(line, alphabet) for line in get_file_lines(textfile_path)]
    rtp_strokes = []
    bezier_strokes = []
    
    for textfile_stroke_path in textline_strokefile_paths:
        n_strokes = normalize_strokes(read_file(textfile_stroke_path))
        
        rtp_strokes.append(add_extra_params(resample_strokes(n_strokes)))
#         bezier_strokes.append(strokes_to_bezier(n_strokes))

    rtp_features.extend(rtp_strokes)
#     bezier_features.extend(bezier_strokes)
    target.extend(lines)

In [None]:
print(len(rtp_features), len(target))

s = 0

for elem in rtp_features:
    s+= len(elem)
    
print(s/len(rtp_features))

In [None]:
sample = 0

def plot_stroke(stroke):
    plt.scatter(stroke[:, 0][stroke[:, 4] == 1], stroke[:, 1][stroke[:, 4] == 1])
    plt.plot(stroke[:, 0][stroke[:, 2] >= 0], stroke[:, 1][stroke[:, 2] >= 0])
    plt.show()
    
plot_stroke(rtp_features[sample])

print(decode_textline(target[sample], alphabet))

## Storing the data

The data is then padded and stored as compressed numpy files using np.save(). The alphabet is also stored for later use when training the network.

In [None]:
def pad_data(l, value=0, width=None):
    max_len = max(len(item) for item in l)
    
    if width:
        padded_numpy_array = np.full((len(l), max_len, width), value)
    else:
        padded_numpy_array = np.full((len(l), max_len), value)
    
    for i, row in enumerate(l):
        padded_numpy_array[i, :len(row)] = row
        
    return padded_numpy_array
        
padded_rtp_features = pad_data(rtp_features, width=5)
padded_target = pad_data(target, value=len(alphabet))

print(padded_rtp_features.shape, padded_target.shape)

np.save("../../data/processed_data/rtp_features_padded", padded_rpt_features)
np.save("../../data/processed_data/target_padded", padded_target)

In [None]:
import pickle

with open("../../data/processed_data/alphabet", "wb") as f:
    pickle.dump(alphabet, f)

## Experiments

For fitting the Bezier curves there are two parameters that can be tuned. Firstly, the necessary precision of a fit that is needed, such that the data points for a certain fit are not split up, we will refer to this as parameter as $\alpha_1$. Secondly, the precision of the fit that is accurate enough that no more Newton steps need to be taken. Some experiments have been setup to see how these parameters influence the accuracy of the overall fits and what the trade off is between precision of the fits and the number of curves that is needed to achieve that precision, we will refer to this parameter as $\alpha_2$. In these experiments different values for each parameter have been tested on a sample of the IAM-OnDB data set, we look at the total number of data points needed to represent the input and the precision of the overall fits of all the curves for a stroke. To measure accuracy, we do not parameterize the fitted curves.

In [None]:
import random

testing_set = random.sample(textfile_paths, 5)

def convert_stroke_to_bezier_curves_non_parameterized(datapoints, a1, a2):
    fitted_curves = fit_datapoints(datapoints, precision=a1, precision_newton=a2)
    
    if not fitted_curves:
        return None

    stiched_curves = stitch_curves(fitted_curves, precision=a1)
    return [stdev for (PE, s, stdev, d) in stiched_curves]

def strokes_to_bezier_non_parameterized(strokes, a1, a2):
    points = []

    for stroke in strokes:
        res = convert_stroke_to_bezier_curves_non_parameterized(stroke, a1, a2)
        
        if res:
            points.extend(res)
        
    return np.array(points)

def fit_testing_set(testing_set, a1, a2):
    bezier_features = []

    for i, textfile_path in enumerate(testing_set):
        if i % 100 == 0:
            print(i/len(textfile_paths))

        textline_strokefile_paths = get_stroke_paths(textfile_path)
    
        if not textline_strokefile_paths:
            continue

        lines = [encode_textline(line, alphabet) for line in get_file_lines(textfile_path)]
        rtp_strokes = []
        bezier_strokes = []
    
        for textfile_stroke_path in textline_strokefile_paths:
            n_strokes = normalize_strokes(read_file(textfile_stroke_path))
            bezier_strokes.append(strokes_to_bezier_non_parameterized(n_strokes, a1, a2))

        bezier_features.extend(bezier_strokes)
        
    return bezier_features