In [None]:
import math
import numpy as np
import csv
import os
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from tqdm import tqdm
%matplotlib inline

In [None]:
class TestCases:
  ROUND_CASE = 1
  LINE_CASE = 2
  HORIZONTAL_CASE = 3
  VERTICAL_CASE = 4
  FROM_CENTER = 5

# Функции для генерации тестовых множеств

In [None]:
def generate_round_case(g, psi, radius, i, gap):
  r = radius 
  fi0 = psi + math.asin(g/r) + 0.001
  #fi = np.random.uniform(psi + math.asin(g/r), math.pi - psi - math.asin(g/r))
  dfi = math.acos((2*r*r - 4*g*g*gap*gap)/(2*r*r))
  fi = fi0 + i * dfi
  if fi > math.pi - psi - math.asin((g*gap)/r):
    return r, fi, False
  return r, fi, True


def generate_vertical_case(g, rMax, i, gap, rMin=50):
  fi = math.pi / 2
  r = rMin + i * 2 * g * gap
  can_continue = r <= rMax
  return r, fi, can_continue


def generate_horizontal_case(g, k, i, gap):
  r0 = 2 * k
  x0 = r0 * math.cos(math.asin(k/r0))
  x = x0 - 2*i*g*gap
  r = math.sqrt(x*x + k*k)
  # r = np.random.uniform(k, rMax)
  if x < 0:
    fi = math.pi - math.asin(k / r)
  else:
    fi = math.asin(k / r)
  # if np.random.uniform(0, 2) > 1:
  #   fi = math.pi - fi
  can_continue = x > -r0
  return r, fi, can_continue

def generate_from_center_case(g, rMax, alpha_in_degrees, i, gap, started):
  fi = math.radians(alpha_in_degrees)
  if not started:
    Supp.r_start += 1
    r = Supp.r_start
  else:
    r = Supp.r_start + 2 * g * gap * i
  can_continue = r <= rMax
  return r, fi, can_continue

def generate_line_case(g, rMax, x, alpha_in_degrees, i, gap):
  x = abs(x)
  alpha = math.radians(alpha_in_degrees)
  r = math.sqrt(x*x + 4*i*i*g*g*gap*gap - 4*g*gap*i*x*math.cos(alpha))
  dfi = math.asin((2*g*gap*i*math.sin(alpha))/r)
  if x - i * 2 * g * gap * math.cos(alpha) > 0:
    fi = math.pi - dfi
  else:
    fi = dfi
  #rMin = math.sin(alpha) * x
  #r = np.random.uniform(rMin, rMax)
  #fi = alpha + math.asin((x * math.sin(alpha)) / r)
  can_continue = r <= rMax
  if r < 50:
    return rMax * 2, fi, can_continue
  return r, fi, can_continue

In [None]:
def generate(h, l, m, n, rMax, g, test_case, values, gap=1, rMin=50):
  gamma = (2 * math.pi) / m
  psi = math.asin(l/h)
  M = []
  masks = []
  precedents = 0
  started = False
  for precedents in range(n):
  #while precedents < n:
    # gMin = r * math.sin(gamma/2)
    if test_case == TestCases.ROUND_CASE:
      r, fi, can_continue = generate_round_case(g, psi, values['radius'], precedents, gap)
      if not can_continue:
        break
    elif test_case == TestCases.HORIZONTAL_CASE:
      r, fi, can_continue = generate_horizontal_case(g, values['k'], precedents, gap)
      if not can_continue:
        break
    elif test_case == TestCases.LINE_CASE:
      r, fi, can_continue = generate_line_case(g, rMax, values['x_start'], values['alpha_in_degrees'], precedents, gap)
      if not can_continue:
        break
    elif test_case == TestCases.VERTICAL_CASE:
      r, fi, can_continue = generate_vertical_case(g, values['rMax'], precedents, gap, rMin)
      if not can_continue:
        break
    elif test_case == TestCases.FROM_CENTER:
      r, fi, can_continue = generate_from_center_case(g, rMax, values['alpha_in_degrees'], precedents, gap, started)
      if not can_continue:
        break
    if fi < psi + math.asin(g/r) or fi > math.pi - psi - math.asin(g/r):
      continue
    r_b = math.sqrt(h*h + r*r - 2*h*r*math.cos(fi))
    if g > r_b:  
      continue     
    fi_b = math.pi - math.acos((h - r * math.cos(fi)) / math.sqrt(h*h + r*r - 2*h*r*math.cos(fi)))
    L_b = math.floor((m / (2 * math.pi)) * (fi_b - math.asin(g / r_b)))
    R_b = math.floor((m / (2 * math.pi)) * (fi_b + math.asin(g / r_b)))
    r_a = math.sqrt(h*h + r*r + 2*h*r*math.cos(fi))
    if g > r_a:   
      continue   
    fi_a = math.acos((h + r * math.cos(fi)) / math.sqrt(h*h + r*r + 2*h*r*math.cos(fi)))
    L_a = math.floor((m / (2 * math.pi)) * (fi_a - math.asin(g / r_a)))
    R_a = math.floor((m / (2 * math.pi)) * (fi_a + math.asin(g / r_a)))
    if L_a < 0:
      continue
    if R_b >= m//2:
      continue
    beta_A = [0] * (m//2)
    beta_B = [0] * (m//2)
    for j in range(L_a, R_a + 1):
        beta_A[j] = 1
    for j in range(L_b, R_b + 1):
        beta_B[j] = 1
    started = True
    M.append(beta_A + beta_B + [r, fi, g, r*math.cos(fi), r*math.sin(fi), h, l])
  M = sorted(M, key=lambda x: x[723])
  return M

In [None]:
class Supp:
  r_start = 50

# Генерация данных для спец. экспериментов

In [None]:
def generate_special_round_case(g, psi, r, i, n):
  fi_start = psi + math.asin(g/r) + 0.00001
  fi_end = math.pi - psi - math.asin(g/r) - 0.00001
  dfi = (fi_end - fi_start) / (n - 1)
  fi = fi_start + dfi * i
  return r, fi

def generate_special_horizontal_case(g, r0, k, i, n, is_started, psi):
  fi_start = psi + math.asin(g/r0) + 0.00001
  fi_end = math.pi - psi - math.asin(g/r0) - 0.00001
  if n > 1:
    x0 = r0 * math.cos(math.asin(k/r0))
    dx = (2 * x0) / (n-1)
    x = x0 - dx * i
  else:
    x = 0
  if is_started:
    r = math.sqrt(x*x + k*k)
  else:
    r0 -= 10
    r = r0
  if x < 0:
    fi = math.pi - math.asin(k / r)
  else:
    fi = math.asin(k / r)
  return r, fi, r0

def generate_special_vertical_case(g, rMax, i, n):
  rMin = 75
  fi = math.pi / 2
  dr = (rMax - rMin) / (n - 1)
  r = rMin + i * dr
  return r, fi

def generate_horizontal_dataset(h, l, m, n, rMax, g, k):
  gamma = (2 * math.pi) / m
  psi = math.asin(l/h)
  M = []
  conf = {'r0': 10000}
  is_started = False
  precedents = 0
  while precedents < n:
    r, fi, r0 = generate_special_horizontal_case(g, conf['r0'], k, precedents, n, is_started, psi)
    conf['r0'] = r0
    if fi < psi + math.asin(g/r) or fi > math.pi - psi - math.asin(g/r):
      continue
    r_b = math.sqrt(h*h + r*r - 2*h*r*math.cos(fi))
    if g > r_b:  
      continue     
    fi_b = math.pi - math.acos((h - r * math.cos(fi)) / math.sqrt(h*h + r*r - 2*h*r*math.cos(fi)))
    L_b = math.floor((m / (2 * math.pi)) * (fi_b - math.asin(g / r_b)))
    R_b = math.floor((m / (2 * math.pi)) * (fi_b + math.asin(g / r_b)))
    r_a = math.sqrt(h*h + r*r + 2*h*r*math.cos(fi))
    if g > r_a:   
      continue   
    fi_a = math.acos((h + r * math.cos(fi)) / math.sqrt(h*h + r*r + 2*h*r*math.cos(fi)))
    L_a = math.floor((m / (2 * math.pi)) * (fi_a - math.asin(g / r_a)))
    R_a = math.floor((m / (2 * math.pi)) * (fi_a + math.asin(g / r_a)))
    if L_a < 0:
      continue
    if R_b >= m//2:
      continue
    beta_A = [0] * (m//2)
    beta_B = [0] * (m//2)
    for j in range(L_a, R_a + 1):
        beta_A[j] = 1
    for j in range(L_b, R_b + 1):
        beta_B[j] = 1
    is_started = True
    M.append(beta_A + beta_B + [r, fi, g, r*math.cos(fi), r*math.sin(fi), h, l])
    precedents += 1
  return M


In [None]:
dataset = generate_horizontal_dataset(h=40, l=20, m=720, rMax=1000, g=10, n=20, k=700) + generate_horizontal_dataset(h=40, l=20, m=720, rMax=1000, g=10, n=20, k=600) + generate_horizontal_dataset(h=40, l=20, m=720, rMax=1000, g=10, n=20, k=500)
print(len(dataset))

In [None]:
visualize_dataset(dataset, 360, 1)

In [None]:
def check_n_round_case(min_radius, g):
  dataset = generate(h=40, l=20, m=720, rMax=1000, g=g, n=10000, test_case=TestCases.ROUND_CASE, values={'radius': min_radius}, gap=1)
  return len(dataset)

def check_n_horiz_case(min_k, g):
  dataset = generate(h=40, l=20, m=720, rMax=1000, g=g, n=10000, test_case=TestCases.HORIZONTAL_CASE, values={'k': min_k}, gap=1)
  return len(dataset)

def check_n_vertical_case(min_k, g):
  dataset = generate(h=40, l=20, m=720, rMax=1000, g=g, n=10000, test_case=TestCases.VERTICAL_CASE, values={'rMax': min_k}, gap=1)
  return len(dataset)

In [None]:
def generate_special_dataset(h, l, m, n, rMax, g, test_case, values, gap=1):
  gamma = (2 * math.pi) / m
  psi = math.asin(l/h)
  M = []
  for precedents in range(n):
    # gMin = r * math.sin(gamma/2)
    if test_case == TestCases.ROUND_CASE:
      r, fi = generate_special_round_case(g, psi, values['radius'], precedents, n)
    elif test_case == TestCases.HORIZONTAL_CASE:
      r, fi = generate_horizontal_case(g, values['k'], precedents, gap)
    elif test_case == TestCases.VERTICAL_CASE:
      r, fi = generate_special_vertical_case(g, values['rMax'], precedents, n)
    if fi < psi + math.asin(g/r) or fi > math.pi - psi - math.asin(g/r):
      continue
    r_b = math.sqrt(h*h + r*r - 2*h*r*math.cos(fi))
    if g > r_b:  
      continue     
    fi_b = math.pi - math.acos((h - r * math.cos(fi)) / math.sqrt(h*h + r*r - 2*h*r*math.cos(fi)))
    L_b = math.floor((m / (2 * math.pi)) * (fi_b - math.asin(g / r_b)))
    R_b = math.floor((m / (2 * math.pi)) * (fi_b + math.asin(g / r_b)))
    r_a = math.sqrt(h*h + r*r + 2*h*r*math.cos(fi))
    if g > r_a:   
      continue   
    fi_a = math.acos((h + r * math.cos(fi)) / math.sqrt(h*h + r*r + 2*h*r*math.cos(fi)))
    L_a = math.floor((m / (2 * math.pi)) * (fi_a - math.asin(g / r_a)))
    R_a = math.floor((m / (2 * math.pi)) * (fi_a + math.asin(g / r_a)))
    if L_a < 0:
      continue
    if R_b >= m//2:
      continue
    beta_A = [0] * (m//2)
    beta_B = [0] * (m//2)
    for j in range(L_a, R_a + 1):
        beta_A[j] = 1
    for j in range(L_b, R_b + 1):
        beta_B[j] = 1
    M.append(beta_A + beta_B + [r, fi, g, r*math.cos(fi), r*math.sin(fi), h, l])
  return M

# Визуализатор множества

In [None]:
def visualize_dataset(dataset, m, step):
  k = 0
  #fig, axes = plt.subplots(figsize=(10, 10))
  fig = plt.figure(figsize=(10, 10), dpi=100) 
  axes = fig.add_subplot(111, aspect='equal')
  up = 0
  bottom = -50
  left = -100
  right = 100
  for i in range(0, len(dataset), step):
      k += 1
      precedent = dataset[i]
      gObj = float(precedent[2 * m + 2])
      xObj = float(precedent[2 * m + 3])
      if xObj > right:
        right = xObj + gObj + 10
      if xObj < left:
        left = xObj - gObj - 10
      yObj = float(precedent[2 * m + 4])
      if yObj > up:
        up = yObj + gObj + 10
      h = float(precedent[2 * m + 5])
      l = float(precedent[2 * m + 6])
      
      if i == 0:
          axes.add_patch(plt.Circle((h, 0), l, color='black', fill=False))
          axes.add_patch(plt.Circle((-h, 0), l, color='black', fill=False))
      axes.add_patch(plt.Circle((xObj, yObj), gObj, color='orangered', fill=False))
  print(k, '/', len(dataset))
  up = 1020
  bottom = -50
  left = -900
  right = 900
  plt.axis([left, right, bottom, up])
  plt.grid()
  fig.show()
  fig.savefig('plot.png', bbox_inches='tight')
  return 'plot.png'

In [None]:
visualize_dataset(d, 360, 1)

# Wandb

In [None]:
!pip install wandb -qqq
!apt install tree

In [None]:
import os
import wandb
from collections import namedtuple
Dataset = namedtuple("Dataset", ["x", "r", "fi"])

def load(datasets_list):
  datasets = []
  for dataset in datasets_list:
    x, y_r, y_fi = [], [], []
    for row in dataset:
      x.append(row[:720])
      y_r.append(row[720])
      y_fi.append(row[721])
    dataset = Dataset(x, y_r, y_fi)
    datasets.append(dataset)
  return datasets

In [None]:
wandb.login()

# Test

In [None]:
from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.utils.validation import check_consistent_length, check_array

def mean_absolute_percentage_error(y_true, y_pred,
                                   sample_weight=None,
                                   multioutput='uniform_average'):
    y_type, y_true, y_pred, multioutput = _check_reg_targets(
        y_true, y_pred, multioutput)
    check_consistent_length(y_true, y_pred, sample_weight)
    epsilon = np.finfo(np.float64).eps
    mape = np.abs(y_pred - y_true) / np.maximum(np.abs(y_true), epsilon)
    output_errors = np.average(mape,
                               weights=sample_weight, axis=0)
    if isinstance(multioutput, str):
        if multioutput == 'raw_values':
            return output_errors
        elif multioutput == 'uniform_average':
            # pass None as weights to np.average: uniform mean
            multioutput = None

    return np.average(output_errors, weights=multioutput)

def _check_reg_targets(y_true, y_pred, multioutput, dtype="numeric"):
    check_consistent_length(y_true, y_pred)
    y_true = check_array(y_true, ensure_2d=False, dtype=dtype)
    y_pred = check_array(y_pred, ensure_2d=False, dtype=dtype)

    if y_true.ndim == 1:
        y_true = y_true.reshape((-1, 1))

    if y_pred.ndim == 1:
        y_pred = y_pred.reshape((-1, 1))

    if y_true.shape[1] != y_pred.shape[1]:
        raise ValueError("y_true and y_pred have different number of output "
                         "({0}!={1})".format(y_true.shape[1], y_pred.shape[1]))

    n_outputs = y_true.shape[1]
    allowed_multioutput_str = ('raw_values', 'uniform_average',
                               'variance_weighted')
    if isinstance(multioutput, str):
        if multioutput not in allowed_multioutput_str:
            raise ValueError("Allowed 'multioutput' string values are {}. "
                             "You provided multioutput={!r}".format(
                                 allowed_multioutput_str,
                                 multioutput))
    elif multioutput is not None:
        multioutput = check_array(multioutput, ensure_2d=False)
        if n_outputs == 1:
            raise ValueError("Custom weights are useful only in "
                             "multi-output cases.")
        elif n_outputs != len(multioutput):
            raise ValueError(("There must be equally many custom weights "
                              "(%d) as outputs (%d).") %
                             (len(multioutput), n_outputs))
    y_type = 'continuous' if n_outputs == 1 else 'continuous-multioutput'

    return y_type, y_true, y_pred, multioutput

In [None]:
models_info = [{'title': 'solar-sweep-2', 'wandb_rep': 'distance_trained_model', 'version': 'v46', 'pred_type': 'Distance'},  
               {'title': 'revived-sweep-1', 'wandb_rep': 'angle_trained_model', 'version': 'v28', 'pred_type': 'Angle'}]      

In [None]:
with wandb.init(project="flatfasetgen", job_type="experiments") as run:
  model_artifact = run.use_artifact(f"{models_info[1]['wandb_rep']}:{models_info[1]['version']}")
  model_dir = model_artifact.download()
  model_path = os.path.join(model_dir, models_info[1]['wandb_rep'])
  angle_model = keras.models.load_model(model_dir)

  model_artifact = run.use_artifact(f"{models_info[0]['wandb_rep']}:{models_info[0]['version']}")
  model_dir = model_artifact.download()
  model_path = os.path.join(model_dir, models_info[0]['wandb_rep'])
  distance_model = keras.models.load_model(model_dir)

In [None]:
import plotly
import plotly.graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
import os
from tensorflow import keras

In [None]:
r_list = [i for i in range(1000, 10001, 1000)]
g_list = [5, 50,500]

In [None]:
g_info, hit_info = {}, {}
for g in g_list:
  print(g)
  r_res, r_res2 = [], []
  for r in r_list:
    x, y = [], []
    dataset = generate(h=40, l=20, m=720, rMax=50000, g=g, n=1000, test_case=TestCases.ROUND_CASE, values={'radius': r})
    for precedent in dataset:
      x.append(precedent[:720])
      y.append(precedent[721])
    y_pred = angle_model.predict(x)
    counter, counter2 = 0, 0
    for i in range(len(y)):
      if abs(y[i] - y_pred[i]) < math.asin(g / r):
        counter += 1
    r_res.append(1 - mean_absolute_percentage_error(y, y_pred))
    r_res2.append(counter / len(dataset))
  g_info[g] = r_res
  hit_info[g] = r_res2

In [None]:
fig2 = go.Figure()
for g in hit_info:
  fig2.add_trace(go.Scatter(x=r_list[:], y=hit_info[g][:], mode='lines+markers',  name=f"g = {g}", line_shape='spline'))
fig2.update_layout(legend_orientation="h",
                  legend=dict(yanchor="top",y=1.1,xanchor="right",x=1.0),
                  title=f"Circle movement  / angle",
                  xaxis_title="rObj",
                  yaxis_title="'Hit rate'",
                  autosize=False,
                  width=800, height=500)

In [None]:
fig = go.Figure()
for g in g_info:
  fig.add_trace(go.Scatter(x=r_list[:], y=g_info[g][:], mode='lines+markers',  name=f"g = {g}", line_shape='spline'))
fig.update_layout(legend_orientation="h",
                  legend=dict(yanchor="top",y=1.1,xanchor="right",x=1.0),
                  title=f"Circle movement  / angle",
                  xaxis_title="rObj",
                  yaxis_title="1 - MAPE",
                  autosize=False,
                  width=800, height=500)

# Test horiz

In [None]:
g_info_angle, g_info_dist = {}, {}
for g in g_list:
  fi_res, r_res = [], []
  for cur_r in r_list:
    x, r, fi = [], [], []
    dataset = generate(h=40, l=20, m=720, rMax=50000, g=g, n=1000, test_case=TestCases.HORIZONTAL_CASE, values={'k': cur_r}, rMin=1000)
    for precedent in dataset:
      x.append(precedent[:720])
      r.append(precedent[720])
      fi.append(precedent[721])
    fi_pred = angle_model.predict(x)
    fi_res.append(1 - mean_absolute_percentage_error(fi, fi_pred))
    r_pred = distance_model.predict(x)
    r_res.append(abs(1 - mean_absolute_percentage_error(r, r_pred)))
  g_info_angle[g] = fi_res
  g_info_dist[g] = r_res
  print(g)

In [None]:
fig101 = go.Figure()
for g in g_info_angle:
  fig101.add_trace(go.Scatter(x=r_list[:], y=g_info_angle[g][:], mode='lines+markers',  name=f"g = {get_g(g)}", line_shape='spline'))
fig101.update_layout(legend_orientation="h",
                  legend=dict(yanchor="top",y=1.1,xanchor="right",x=1.0),
                  title=f"Horizontal movement / angle",
                  xaxis_title="rObj_min",
                  yaxis_title="1 - MAPE",
                  autosize=False,
                  width=800, height=500)

In [None]:
fig121 = go.Figure()
for g in g_info_dist:
  fig121.add_trace(go.Scatter(x=r_list[:], y=g_info_dist[g][:], mode='lines+markers',  name=f"g = {get_g(g)}", line_shape='spline'))
fig121.update_layout(legend_orientation="h",
                  legend=dict(yanchor="top",y=1.1,xanchor="right",x=1.0),
                  title=f"Horizontal movement / distance",
                  xaxis_title="rObj_min",
                  yaxis_title="1 - MAPE",
                  autosize=False,
                  width=800, height=500)

# Test vert

In [None]:
g_info = {}
table = wandb.Table(columns=['g', 'rObj', '1 - MAPE'])
for g in g_list:
  r_res = []
  for cur_r in r_list:
    x, r = [], []
    dataset = generate(h=40, l=20, m=720, rMax=50000, g=g, n=1000, test_case=TestCases.VERTICAL_CASE, values={'rMax': cur_r}, rMin=600)
    for precedent in dataset:
      x.append(precedent[:720])
      r.append(precedent[720])
    r_pred = distance_model.predict(x)
    r_res.append(1 - mean_absolute_percentage_error(r, r_pred))
    table.add_data(g, cur_r, 1 - mean_absolute_percentage_error(r, r_pred))
  g_info[g] = r_res
  print(g)

In [None]:
fig201 = go.Figure()
for g in g_info:
  fig201.add_trace(go.Scatter(x=r_list[:], y=g_info[g][:], mode='lines+markers',  name=f"g = {get_g(g)}", line_shape='spline'))
fig201.update_layout(legend_orientation="h",
                  legend=dict(yanchor="top",y=1.1,xanchor="right",x=1.0),
                  title=f"Vertical movement / distance",
                  xaxis_title="rObj_max",
                  yaxis_title="1 - MAPE",
                  autosize=False,
                  width=800, height=500)

In [None]:
with wandb.init(project="flatfasetgen", job_type="summary_plots") as run:
  run.log({"Circle plot": fig})
  run.log({"Hit plot": fig2})
  run.log({"Horiz plot (angle)": fig101})
  run.log({"Horiz plot (distance)": fig121})
  run.log({"Vertical plot": fig201})