<a href="https://colab.research.google.com/github/desean1625/scatter_test/blob/main/scatter_plot_track_sorting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras

total = 10000
batch_size = 10 


In [None]:
def conv_block(fs, x, activation = 'relu'):
  conv  = layers.Conv2D(fs, (3, 3), padding = 'same', activation = activation)(x)
  bnrm  = layers.BatchNormalization()(conv)
  drop  = layers.Dropout(0.5)(bnrm)
  return drop

def residual_block(fs, x):
  y = conv_block(fs, x)
  y = conv_block(fs, y)
  y = conv_block(fs, y)
  return layers.Concatenate(axis = -1)([x, y])

inp = keras.Input(shape = (total,2))
out = layers.Dropout(.05)(inp)
out = layers.Concatenate(axis=1)([out, out,out])
out = layers.Reshape((3,32,625))(out)
out = residual_block(16, out) #create convolutional filters
out = layers.Dense(2048)(out)
out = layers.Dense(1024)(out)
out = layers.Dense(256)(out)
out = layers.Flatten()(out)
out = layers.Dense(total)(out) #prediction layer output for every input point
model = keras.models.Model(inputs = inp, outputs = out)
model.summary()

In [None]:
#Create training data generator
def norm(xy):
  x_range = (-180,180)
  y_range = (-90,90)
  x = (xy[0]-x_range[0])/(x_range[1]-x_range[0])
  y = (xy[1]-y_range[0])/(y_range[1]-y_range[0])
  return (x,y)
def rand_point(x,y):
  deviation = .01
  xmin = x -(x*deviation)
  xmax = x + (x*deviation)
  ymin = y -(y*deviation)
  ymax = y + (y*deviation)
  x = random.uniform(xmin, xmax)
  y = random.uniform(ymin, ymax)
  return x,y
def create_sample(total):
  clusters = random.randint(0,4)
  num = total//(clusters+1)
  allx =[]
  ally = []
  for x in range(1,clusters+1):
    n = (random.uniform(-180,180),random.uniform(-90,90))
    for z in range(0,num):
      allx.append(rand_point(n[0],n[1]))
      ally.append(x)
  for n in range(0,total-len(allx)):
    n = (random.uniform(-180,180),random.uniform(-90,90))
    allx.append(n)
    ally.append(0)
  normx = list(map(norm,allx))
  x = np.array([np.hstack(t) for t in normx],dtype=np.float32)
  y = np.array(ally,dtype=np.float32)
  return x,y
def train_gen(total,batch_size=32):
  while True:
    x_train = []
    y_train = []
    for x in range(0,batch_size):
      x,y = create_sample(total)
      x_train.append(x)      
      y_train.append(y)
    yield np.array(x_train),np.array(y_train)
gen = train_gen(total,batch_size)
x_train,y_train = next(gen)

In [None]:
opt = tf.keras.optimizers.Adam(
    learning_rate=0.0001
)

def custom_loss(y_true, y_pred):
    # calculating difference between target and predicted values 
    loss = keras.backend.abs(y_true - y_pred)   # (batch_size, 2)
    loss = loss * 1.5          # Make the penalty for being wrong more severe      
    # summing both loss values along batch dimension 
    loss = keras.backend.sum(loss, axis=1)       
    return loss
loss = custom_loss


def my_acc_fn(y_true, y_pred):
    equals = (y_true.numpy().flatten() == keras.backend.round(y_pred).numpy().flatten()).sum()  #TODO fix this to use tf functions instead of numpy so we don't need to run_eagerly
    result = equals/len(y_pred.numpy().flatten())
    return result
loss = custom_loss
model.compile(loss=loss, optimizer=opt, metrics=[my_acc_fn], run_eagerly=True)

In [None]:
model.fit(gen,steps_per_epoch=batch_size//1, epochs=2000)

In [None]:
x_train,y_train = next(gen)

r = keras.backend.round(model.predict(x_train))
print(r.numpy()[0].max())
print(r.numpy()[0][0:10])
print(y_train[0][0:10])
equals = (r.numpy()[0] == y_train[0]).sum()  
result = equals/len(y_train[0])
print(result)

In [None]:
import matplotlib.pyplot as plt
plt.scatter(x_train[0][:, 0],x_train[0][:, 1],c=y_train[0])
plt.show()