In [None]:
import numpy as np
import matplotlib.pyplot as plt
from math import tan, atan, radians, degrees
import random
from google.colab import drive
import pandas as pd

In [None]:
max_dim = 10
epsilon = 0.0001

In [None]:
drive.mount('/content/drive')
%cd /write/your/directory/to/AI_4_ATD

# Define Processing Functions

In [None]:
class Format():
  def __init__(self):
    self.scale = 0
    self.cns = 0
    self.ia = 0
    self.imp = 0

In [None]:
def SCALE(dataset, labels, format):
  # Determine the shape of the dataset to handle 2D or 3D inputs
  shape = dataset.shape
  if len(shape) == 3:
    # Process 3D datasets (batch, n_lines, n_points)
    batch, n_lines, n_points = shape
    for i in range(batch):
      # Find the maximum absolute value across all lines in the current batch for scaling
      max_val = np.max(np.absolute([y for y in dataset[i][0] if y != -1]), initial=0)
      for j in range(n_lines - 1):
        new_max_val = np.max(np.absolute([y for y in dataset[i][j + 1] if y != -1]), initial=0)
        if new_max_val > max_val:
          max_val = new_max_val
      # Prevent division by zero if all values are zero
      if max_val == 0:
        max_val = epsilon
      # Scale all non-missing values in the current batch by the determined max_val
      for j in range(n_lines):
        for k in range(n_points):
          if dataset[i][j][k] != -1:
            dataset[i][j][k] /= max_val
  else:
    # Process 2D datasets (batch, n_points)
    batch, n_points = shape
    for i in range(batch):
      # Find the maximum absolute value in the current batch for scaling
      max_val = np.max(np.absolute([y for y in dataset[i] if y != -1]), initial=0)
      # Prevent division by zero if all values are zero
      if max_val == 0:
        max_val = epsilon
      # Scale all non-missing values in the current batch by the determined max_val
      for j in range(n_points):
        if dataset[i][j] != -1:
          dataset[i][j] /= max_val
  # Mark that scaling has been applied
  format.scale = 1
  return dataset, labels, format

In [None]:
def CNS(dataset, labels, format):
  batch, n_lines, n_points = dataset.shape
  for i in range(batch):
    # Calculate the mean and standard deviation of the control series (first line) for the current batch
    c_mean = np.mean([y for y in dataset[i][0] if y != -1])
    c_std = np.std([y for y in dataset[i][0] if y != -1]).clip(epsilon, None)
    for j in range(n_lines):
      for k in range(n_points):
        # Apply Common Negative Scaling transformation to all non-missing data points using the control series' statistics
        if dataset[i][j][k] != -1:
          dataset[i][j][k] = (dataset[i][j][k] - c_mean) / c_std
  # Remove the control series from the dataset after scaling
  dataset = np.delete(dataset, 0, axis=1)

  format.cns = 1
  return dataset, labels, format

In [None]:
def IA(dataset, labels, format):

  # If Common Negative Scaling (CNS) was not applied, separate the control series
  if format.cns == 0:
    control = dataset[:, 0, :].copy()
    dataset = np.delete(dataset, 0, axis=1)

  batch, n_lines, n_points = dataset.shape
  # Reshape the dataset from 3D to 2D for individual analysis of each line
  dataset = (dataset.copy()).reshape(batch*n_lines, n_points)

  # If CNS was not applied, re-add the control series to each reshaped line
  if format.cns == 0:
    control = np.repeat(control, repeats=3, axis=0)
    dataset = np.stack((control, dataset), axis=1)

  # Reshape labels to match the new dataset shape
  labels = labels.reshape(batch*n_lines)

  # Mark that Individual Analysis (IA) has been applied
  format.ia = 1
  return dataset, labels, format

In [None]:
def iqm(series, q):
  # Calculate the interquartile mean of a series, handling empty series by returning the mean of the original series.
  old_series = series.copy()
  q1 = np.quantile(series, 1-q)
  q3 = np.quantile(series, q)
  series = [y for y in series if q1 <= y <= q3]
  if len(series) == 0:
    return np.mean(old_series)
  return np.mean(series)

def imputate(dataset, labels, format, imp_type):
  # Imputate missing values (-1) in the dataset using the specified imputation type (mean, median, or interquartile mean).
  shape = dataset.shape
  if len(shape) == 3:
    batch, n_series, n_points = shape
    for i in range(batch):
      for j in range(n_series):
        series = [y for y in dataset[i][j] if y != -1]
        if len(series) == 0:
          dataset[i][j] = np.zeros(n_points)
        elif imp_type == 'mean':
          imput_val = np.mean(series)
        elif imp_type == 'median':
          imput_val = np.median(series)
        elif imp_type == 'iqm':
          imput_val = iqm(series, 0.75)
        else:
          assert False, f"imp_type={imp_type}"

        for k in range(n_points):
          if dataset[i][j][k] == -1:
            dataset[i][j][k] = imput_val
  else:
    batch, n_points = shape
    for i in range(batch):
      series = [y for y in dataset[i] if y != -1]
      if len(series) == 0:
        dataset[i] = np.zeros(n_points)
      elif imp_type == 'mean':
        imput_val = np.mean(series)
      elif imp_type == 'median':
        imput_val = np.median(series)
      elif imp_type == 'iqm':
        imput_val = iqm(series, 0.75)
      else:
        assert False, f"imp_type={imp_type}"

      for j in range(n_points):
          if dataset[i][j] == -1:
            dataset[i][j] = imput_val
  if imp_type == 'mean':
    format.imp = 1
  elif imp_type == 'median':
    format.imp = 2
  elif imp_type == 'iqm':
    format.imp = 3
  else:
    assert False, f"imp_type={imp_type}"

  return dataset, labels, format

In [None]:
def process(dataset, labels, goal_format):
  # Initialize a Format object to track applied transformations
  format = Format()
  # Apply Common Negative Scaling (CNS) if specified in goal_format
  if goal_format.cns == 1:
    dataset, labels, format = CNS(dataset, labels, format)
  # Apply Individual Analysis (IA) if specified in goal_format
  if goal_format.ia == 1:
    dataset, labels, format = IA(dataset, labels, format)
  # Apply imputation (mean, median, or IQM) if specified in goal_format
  if goal_format.imp == 1:
    dataset, labels, format = imputate(dataset, labels, format, 'mean')
  elif goal_format.imp == 2:
    dataset, labels, format = imputate(dataset, labels, format, 'median')
  elif goal_format.imp == 3:
    dataset, labels, format = imputate(dataset, labels, format, 'iqm')
  # Apply scaling if specified in goal_format
  if goal_format.scale == 1:
    dataset, labels, format = SCALE(dataset, labels, format)

  # Assert that all specified format changes have been applied
  assert format.cns == goal_format.cns
  assert format.scale == goal_format.scale
  assert format.ia == goal_format.ia
  assert format.imp == goal_format.imp

  # Assert that no NaN values are present in the processed dataset
  assert not np.any(np.isnan(dataset))

  return dataset, labels, format

# Generate Training Dataset

In [None]:
def create_series(a, t, constant, n, SMD):

    #Start with empty series
    data_series = []

    #For number of points generate errors
    for i in range(n):

        #To deal with first point only
        if not data_series:
            error = np.random.normal()
            data_series.append(error)

        #Points other than first - Add autocorrelation
        else:
            error = a*(data_series[i-1])+np.random.normal()
            data_series.append(error)

    #Add trend to the series based on the given angle 't'
    middle_point = np.median(range(n))

    for i in range(n):
        diff = i - middle_point
        data_series[i] = data_series[i] + diff*tan(radians(t))

    #Add a constant value to each point in the series
    data_series = [x + constant for x in data_series]

    #Add Standardized Mean Difference (SMD) to each point of the series
    for j in range(n):
        data_series[j] = data_series[j] + SMD

    return data_series

In [None]:
#Set random seed for replicability
np.random.seed(48151)

#Set values of autocorrelation 'a', trend in degrees 't', number of points in
#phase A and phase B, and standardized mean difference 'smd'
a_values = [0,0.2]
t_values = [0, 0, 0]
constant_values = [0.5]
points_values = (3, 10)
diff_smd = [0, 3, 5, 7, 9, 10]
#smd_values = len(diff_smd)*[0] + diff_smd
smd_values = diff_smd

series_dataset = []
series_labels = []
dataset = []
labels = []
# Generate individual time series based on a combination of parameters
for i in range(100):
    for a in a_values:
        for t in t_values:
            for constant in constant_values:
                for points in range(points_values[0], points_values[1] + 1):
                      for smd in smd_values:
                          dataseries = create_series(a, t, constant,
                                                  points, smd)
                          series_dataset.append(dataseries)
                          series_labels.append(np.clip(smd, 0, 1))

# Pair series with their labels and shuffle them randomly
paired = list(zip(series_dataset, series_labels))
random.shuffle(paired)
series_dataset, series_labels = zip(*paired)

# Create 'graphs' by combining a control series with three generated series
for i in range(round(len(series_dataset)/3)):
  # Randomly determine autocorrelation for the control series
  a = a_values[-1] * random.randint(0, 1)
  constant = constant_values[0]
  points = random.randint(points_values[0], points_values[1])
  # Generate a control series with no SMD (effect)
  control = create_series(a, 0, constant, points, 0)
  # Assemble the graph: control + three experimental series
  graph = [control, series_dataset[i], series_dataset[i+1], series_dataset[i+2]]
  # Get labels for the experimental series
  graph_labels = [series_labels[i], series_labels[i+1], series_labels[i+2]]
  dataset.append(graph)
  labels.append(graph_labels)

# Pad shorter series with -1 to a maximum length (max_dim)
for graph in range(len(dataset)):
  for line in range(len(dataset[graph])):
    length = len(dataset[graph][line])
    if length < 10:
      for i in range(10-length):
        dataset[graph][line].append(-1)

# Convert the dataset and labels to NumPy arrays
dataset = np.array(dataset)
labels = np.array(labels)

# Define the desired processing format for the training data
format = Format()
format.scale = 1
format.cns = 0
format.ia = 0
format.imp = 0

# Process the dataset according to the specified format
dataset, labels, format = process(dataset, labels, format)

In [None]:
print(f'length of series dataset: {len(series_dataset)}')

In [None]:
print(f'length of training dataset: {len(dataset)}')

In [None]:
np.save('Datasets/train_dataset', dataset)
np.save('Datasets/train_labels.npy', labels)

## Plot Training Graph Examples

In [None]:
def plot_ATD(ax, graph, label, idx):
  random.seed(346)
  markers = ['o', 'v', 's', 'x']
  legend = ['Control', 'Condition 1', 'Condition 2', 'Condition 3']
  # Iterate through each line in the graph to plot it
  for i, line in enumerate(graph):
    # Filter out missing values (-1) before plotting
    series = [y for y in line if y != -1]
    if len(series) > 0:
      # Plot the series with a specific marker and add to legend
      ax.plot([y for y in line if y != -1], color='k', marker = markers[i], label=legend[i])
  # Display the legend for the plot
  ax.legend()
  # Set the title with the graph's label
  ax.set_title(f"Label={label}")
  # Set axis labels
  ax.set_xlabel('Session')
  ax.set_ylabel('Behavior Response')

# Generate three random indices to select graphs for plotting
random_indices = random.sample(range(len(dataset)), 3)

# Create a figure with three subplots to display the selected graphs
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Plot each selected graph in its respective subplot
for i, idx in enumerate(random_indices):
  plot_ATD(axes[i], dataset[idx], labels[idx], idx)

# Adjust layout to prevent overlapping titles/labels and display the plot
plt.tight_layout()
plt.show()

## Describe the Training Dataset

### Describe Dataset

In [None]:
data = {
    "Descriptor": ["Level for Diff", "Level for Undiff", "Trend (deg)", "Trend (SMD/Sesh)", "Variability", "CV"],
    10: [0, 0, 0, 0, 0, 0],
    50: [0, 0, 0, 0, 0, 0],
    90: [0, 0, 0, 0, 0, 0],
}

df = pd.DataFrame(data)

In [None]:
format = Format()
format.scale = 0
format.cns = 1
format.ia = 1
format.imp = 0
# Process the dataset by applying Common Negative Scaling (CNS) and Independent Analysis (IA)
dataset_it, labels_it, format = process(dataset.copy(), labels.copy(), format)
# Flatten the labels for easier indexing
labels_it = labels_it.flatten()

# Calculate and store the 10th, 50th, and 90th percentiles for 'Level for Diff'
dataset_temp = []
for line in dataset_it[labels_it == 1]:
  series = [y for y in line if y != -1]
  if len(series) == 0:
    continue
  dataset_temp.append(np.mean(series))
for p in [10, 50, 90]:
  df.loc[df["Descriptor"] == "Level for Diff", p] = float(np.percentile(dataset_temp, p))

# Calculate and store the 10th, 50th, and 90th percentiles for 'Level for Undiff'
dataset_temp = []
for line in dataset_it[labels_it == 0]:
  series = [y for y in line if y != -1]
  if len(series) == 0:
    continue
  dataset_temp.append(np.mean(series))
for p in [10, 50, 90]:
  df.loc[df["Descriptor"] == "Level for Undiff", p] = np.percentile(dataset_temp, p)

# Calculate and store the 10th, 50th, and 90th percentiles for 'Trend (SMD/Sesh)'
dataset_temp = []
for line in dataset_it:
  series = [y for y in line if y != -1]
  if len(series) == 0:
    continue
  # Perform linear regression to find the slope (trend)
  a, b = np.polyfit(range(len(series)), series, 1)
  dataset_temp.append(a)
for p in [10, 50, 90]:
  df.loc[df["Descriptor"] == "Trend (SMD/Sesh)", p] = np.percentile(dataset_temp, p)

# Calculate and store the 10th, 50th, and 90th percentiles for 'Variability'
dataset_temp = []
for line in dataset_it:
  series = [y for y in line if y != -1]
  if len(series) == 0:
    continue
  # Perform linear regression to remove the trend
  a, b = np.polyfit(range(len(series)), series, 1)
  # Calculate the standard deviation of residuals (variability after trend removal)
  error = [y - (a*x + b) for x, y in enumerate(series)]
  dataset_temp.append(np.std(error))
for p in [10, 50, 90]:
  df.loc[df["Descriptor"] == "Variability", p] = np.percentile(dataset_temp, p)

# Calculate and store the 10th, 50th, and 90th percentiles for 'CV' (Coefficient of Variation)
dataset_temp = []
for line in dataset_it:
  series = [y for y in line if y != -1]
  if len(series) == 0:
    continue
  std = np.std(series).clip(epsilon, None)
  # Calculate the absolute value of the mean divided by the standard deviation
  dataset_temp.append(abs(np.mean(series)) / std)
for p in [10, 50, 90]:
  df.loc[df["Descriptor"] == "CV", p] = np.percentile(dataset_temp, p)

format = Format()
format.scale = 0
format.cns = 0
format.ia = 1
format.imp = 0
# Process the dataset again, this time only applying Individual Analysis (IA)
dataset_it, labels_it, format = process(dataset.copy(), labels.copy(), format)
labels_it = labels_it.flatten()

# Calculate and store the 10th, 50th, and 90th percentiles for 'Trend (deg)'
dataset_temp = []
for line in dataset_it:
  # Consider the second series in each line (experimental series)
  series = [y for y in line[1] if y != -1]
  if len(series) == 0:
    continue
  # Perform linear regression to find the slope (trend)
  a, b = np.polyfit(range(len(series)), series, 1)
  # Convert the slope to degrees and append to temporary list
  dataset_temp.append(degrees(atan(a)))
for p in [10, 50, 90]:
  df.loc[df["Descriptor"] == "Trend (deg)", p] = np.percentile(dataset_temp, p)

In [None]:
print(df)

# Generate Test Dataset

In [None]:
#Set random seed for replicability
np.random.seed(777)

#Set values of autocorrelation 'a', trend in degrees 't', number of points in
#phase A and phase B, and standardized mean difference 'smd'
a_values = [0,0.2]
t_values = [0, 0, 0]
constant_values = [0.5]
points_values = (3,10)
smd_values = [3, 5, 7, 9, 10]

series_dataset = []
series_labels = []
test_dataset = []
test_labels = []
# Generate individual time series based on parameter combinations
for i in range(10):
    for a in a_values:
        for t in t_values:
            for constant in constant_values:
                for points in range(points_values[0], points_values[1] + 1):
                      for smd in smd_values:
                          dataseries = create_series(a, t, constant,
                                                  points, smd)
                          series_dataset.append(dataseries)
                          series_labels.append(np.clip(smd, 0, 1))

# Pair series with their labels and shuffle them randomly
paired = list(zip(series_dataset, series_labels))
random.shuffle(paired)
series_dataset, series_labels = zip(*paired)

# Create 'graphs' by combining a control series with three generated series
for i in range(round(len(series_dataset)/3)):
  # Randomly determine autocorrelation for the control series
  a = a_values[-1] * random.randint(0, 1)
  constant = constant_values[0]
  points = random.randint(points_values[0], points_values[1])
  # Generate a control series with no SMD (effect)
  control = create_series(a, 0, constant, points, 0)
  # Assemble the graph: control + three experimental series
  graph = [control, series_dataset[i], series_dataset[i+1], series_dataset[i+2]]
  # Get labels for the experimental series
  graph_labels = [series_labels[i], series_labels[i+1], series_labels[i+2]]
  test_dataset.append(graph)
  test_labels.append(graph_labels)

# Pad shorter series with -1 to a maximum length (max_dim)
for graph in range(len(test_dataset)):
  for line in range(len(test_dataset[graph])):
    length = len(test_dataset[graph][line])
    if length < 10:
      for i in range(10-length):
        test_dataset[graph][line].append(-1)

# Convert the dataset and labels to NumPy arrays
test_dataset = np.array(test_dataset)
test_labels = np.array(test_labels)

In [None]:
print(f'length of training dataset: {len(test_dataset)}')

In [None]:
np.save('Datasets/test_dataset.npy', test_dataset)
np.save('Datasets/test_labels.npy', test_labels)

## Calculate Effect Sizes

In [None]:
def calc_effect_sizes(dataset):

  real_effect_sizes = []

  # Iterate through each graph in the dataset
  for graph in dataset:
    effect_sizes = []
    # Extract and filter the control series (first line in the graph)
    control = [i for i in graph[0] if i != -1]
    assert len(control) != 0
    # Calculate the mean and standard deviation of the control series
    c_mean = np.mean(control)
    c_std = np.std(control)

    # Iterate through experimental lines (excluding the control line)
    for line in graph[1:]:
      # Extract and filter the current experimental line
      line = [i for i in line if i != -1]
      if len(line) == 0:
        # Assign -1 if the experimental line is empty
        effect_sizes.append(-1)
      elif c_std == 0:
        # Assign -2 if the control series has zero standard deviation (cannot normalize)
        effect_sizes.append(-2)
      else:
        # Calculate Cohen's d-like effect size for the current line
        es = np.absolute((np.mean(line) - c_mean) / c_std)
        effect_sizes.append(es)
    assert not np.any(np.isnan(effect_sizes))
    real_effect_sizes.append(effect_sizes)

  return real_effect_sizes

sim_effect_sizes = calc_effect_sizes(test_dataset)
np.save('Datasets/test_effect_sizes.npy', sim_effect_sizes)