# Imports

In [None]:
import pandas as pd
import numpy as np
from nn_spikes import NeuralNetwork, batchTrain
from spike_tools import classifySpikesMLP, getSpikeWaveforms
import plotly.express as px
from simulated_annealing import anneal

In [None]:
data = pd.read_csv('./datasources/spikes/training_data.csv')
spikeLocations = pd.read_csv('./datasources/spikes/training_spike_locations.csv', index_col=0)

data_training = pd.read_csv('./datasources/spikes/dev/data_training_SA.csv')
data_training.set_index(data_training.columns[0], drop=True, inplace=True)
data_training.index.name='index'
data_training.head(3)

data_validation = pd.read_csv('./datasources/spikes/dev/data_validation_SA.csv')
data_validation.set_index(data_validation.columns[0], drop=True, inplace=True)
data_validation.index.name='index'
data_validation.head(3)

spikeIndexes_training = pd.read_csv('./datasources/spikes/dev/spikeIndexes_training_SA.csv')
spikeIndexes_training.set_index(spikeIndexes_training.columns[0], drop=True, inplace=True)
spikeIndexes_training = spikeIndexes_training.values.flatten()
spikeIndexes_training[:5]

spikeIndexes_validation = pd.read_csv('./datasources/spikes/dev/spikeIndexes_validation_SA.csv')
spikeIndexes_validation.set_index(spikeIndexes_validation.columns[0], drop=True, inplace=True)
spikeIndexes_validation = spikeIndexes_validation.values.flatten()
spikeIndexes_validation[:5]

# Run simulated annealing optimiser

In [None]:
# epochs, hidden_nodes, lr
solution = [15,500,0.2] 

# Simulated annealing optimisation
final_solution, finalError, cost_values = anneal(solution, spikeLocations)

### Predict on validation dataset

In [None]:
waveforms = data_validation.loc[spikeIndexes_validation, 'waveform']
predictions = spike_tools.classifySpikesMLP(waveforms, results['1100']['nn'])
data_validation.at[spikeIndexes_validation, 'predictedClass'] = pd.Series(predictions).values

In [None]:
data_validation.loc[spikeIndexes_validation]

---

In [None]:
def dataPreProcess(df, spikeLocations, threshold=0.85, submission=False, detectPeaksOn='signalSavgolBP', waveformWindow=60, waveformSignalType='signalSavgol'):

    data = df

    # if not 'signalSavgol' in data.columns:
    data['signalSavgol'] = savgol_filter(data['signal'], 17, 2)
    data['signalSavgolBP'] = bandPassFilter(data['signalSavgol'])
    data, predictedSpikeIndexes = detectPeaks(data, detectPeaksOn=detectPeaksOn, threshold=threshold)
    # else:
    #     predictedSpikeIndexes = data[data['predictedSpike'] == True].index

    data = getSpikeWaveforms(predictedSpikeIndexes, data, window=waveformWindow, signalType=waveformSignalType)

    if submission:
        print("Returning with {} detected spikes.".format(len(predictedSpikeIndexes)))
        return data, predictedSpikeIndexes
    else:
        data = joinKnownSpikeClasses(data, spikeLocations)
        # Assign known labels and drop any detected spikes that refer to more than one label
        data, predictedSpikeIndexes = assignKnownClassesToDetectedSpikes(data, predictedSpikeIndexes)

        data_training, data_validation, spikeIndexes_training, spikeIndexes_validation = splitData(data, predictedSpikeIndexes)
        data=0

        print("Returning with {} detected spikes.".format(len(predictedSpikeIndexes)))
        return data_training, data_validation, spikeIndexes_training, spikeIndexes_validation

In [None]:
def getNeighbour(solution, variation=0.2):
    """
    Function to select next parameter iterations for set of parameters given
    :param solution: parameter set
    :param variation:
    :return:
    """
    # Create 3x1 array of random floats within range [0.0, 1.0)
    delta = np.random.random((3, 1))

    # Create 3x1 array with each element equal to twice the variation
    scale = np.full((3, 1), 2 * variation)

    # Create 3x1 array with each element equal to 1 - variation
    offset = np.full((3, 1), 1.0 - variation)

    # Calculate array of new variation value by multiplying delta and scale arrays and add the offset
    a = np.multiply(delta, scale)
    a = np.add(a, offset)

    newSolution = np.multiply(solution, a.flatten())

    return [int(newSolution[0]), int(newSolution[1]), float(newSolution[2])]

In [None]:
def getError(supply, df, spikeLocations, demand=99.9):
    """
    Function finds error between target performance and achieved performance of latest solution classification
    :param supply: input parameters
    :param demand: target performance
    :return: return error
    """
    data = df

    # Extract next set of parameters
    assert isinstance(supply[0], int)
    assert isinstance(supply[1], int)
    assert isinstance(supply[2], float)

    epochs = supply[0]
    hidden_nodes = supply[1]
    lr = supply[2]

    data_training, data_validation, spikeIndexes_training, spikeIndexes_validation = dataPreProcess(data, spikeLocations, waveformWindow=100)

    # Train network with new parameters
    nn = NeuralNetwork(input_nodes=len(data_training.loc[spikeIndexes_training[0], 'waveform']),
                       hidden_nodes=hidden_nodes,
                       output_nodes=4,
                       lr=lr,
                       error_function='difference-squared')

    _, _, validationCurve = batchTrain(data_training=data_training,
                                       data_validation=data_validation,
                                       spikeIndexes_training=spikeIndexes_training,
                                       spikeIndexes_validation=spikeIndexes_validation,
                                       nn=nn,
                                       epochs=epochs,
                                       plotCurves=False)

    score = validationCurve[-1]

    # Return the new error value
    return demand - score

In [None]:
def acceptanceProbability(oldError, newError, T):
    """
    Calculate the acceptance porbability based on an exponentially decaying relationship to the temperature
    :param oldError: 
    :param newError: 
    :param T: 
    :return: 
    """
    return np.exp((oldError - newError) / T)

In [None]:
df = data
demand=99.9
solution = [15,500,0.2] 

In [None]:
from spike_tools import dataPreProcess

In [None]:
data.head()

In [None]:
"""
Function to perform simulated annealing.
:param df:
:param spikeLocations:
:param solution:
:param alpha:
:param iterations:
:param demand:
:param variation:
:param T:
:param T_min:
:return:
"""

x = df

# Create new list to store cost values
errorValues = []

# Generate and append cost of first solution parameters
oldError = getError(solution, x, spikeLocations, demand=demand)
errorValues.append(oldError)

In [None]:
# Loop until temp is below min allowable temp
while T > T_min:
    i = 1
    # Loop until iteration number is above or equal to max allowable number of iterations
    while i <= iterations:

        # Get new set of solution parameters and generate new cost value using this classification solution
        newSolution = getNeighbour(solution, variation=variation)

        print("It_{}, oldError = {}, newSolution = {}".format(i, oldError, newSolution))

        newError = getError(newSolution, x, spikeLocations, demand=demand)

        # Calculate the acceptance probability
        pA = __acceptanceProbability(oldError, newError, T)

        # If the acceptance probability is above the randomly generated float in the range [0.0,1.0), use the new
        # solution as the active solution going forwards and store the cost value
        if pA > random.random():
            solution = newSolution
            oldError = newError

        errorValues.append(oldError)
        i += 1

    # Decay (cool) the temperature and return to the top
    T = T * alpha

(solution, oldError, errorValues)

In [None]:
anneal(solution, data, spikeLocations)

In [None]:
df