In [6]:
import pandas as pd
import numpy as np
import csv
import re
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import logging
import math
from lmfit import minimize, fit_report, Parameters
from aim2_population_model_spatial_aff_parallel import get_mod_spike
from model_constants import (MC_GROUPS, LifConstants)
from popul_model import pop_model
from aim2_population_model_spatial_aff_parallel import Afferent, SimulationConfig, Simulation

In [7]:
#Global Variables
lmpars_init_dict = {}
lmpars = Parameters()
lmpars.add('tau1', value=8, vary=False)
lmpars.add('tau2', value=200, vary=False)
lmpars.add('tau3', value=1744.6, vary=False)
lmpars.add('tau4', value=np.inf, vary=False)
lmpars.add('k1', value=.74, vary=False, min=0) #a constant
lmpars.add('k2', value=2.75, vary=False, min=0) #b constant
lmpars.add('k3', value=.07, vary=False, min=0) #c constant
lmpars.add('k4', value=.0312, vary=False, min=0)
lmpars_init_dict['t3f12v3final'] = lmpars


In [8]:
#random helper functino calculating the distance between 2 points
def distance(x1,y1,x2,y2):
    x = (x2-x1) **2
    y  = (y2 -y1) **2
    return np.sqrt(x+y)


#find the non matches in 2 lists
def find_non_matches(list1, list2):
    set1 = set(list1)
    set2 = set(list2)
    
    # Find elements unique to each list
    non_matches = list(set1.symmetric_difference(set2))
    
    return non_matches


**Population Model Class**

In [None]:
class VF_Population_Model:
    
    def __init__(self, vf_tip_size, aff_type):
        self.vf_tip_size = vf_tip_size
        self.aff_type = aff_type
        self.results = None
        self.stress_data = None
        self.x_coords = None
        self.y_coords = None
    """
        functino takes in a vf_tip_size (given that that there is data assicated with it) an
        afferent type, and runs the single unit model for all of those coordinates with the data
        
    """
    def spatial_stress_vf_model(self, scaling_factor = 0.3):

        #reading data in 
        coords = pd.read_csv(f"data/vfspatial/{self.vf_tip_size}_spatial_coords.csv", header = None)

        #assinging the instance variables for all the x coords and y coords of spatial points
        self.x_coords = [float(row[0]) for row in coords.iloc[1:].values]
        self.y_coords = [float(row[1]) for row in coords.iloc[1:].values]


        stress_data = pd.read_csv(f"data/vfspatial/{self.vf_tip_size}_spatial_stress.csv", )
        time = stress_data['Time (ms)'].to_numpy()

        afferent_type = []
        x_pos = []
        y_pos = []
        spikes = []
        mean_firing_frequency = []
        peak_firing_frequency = []
        first_spike_time = []
        last_spike_time = []

        #iterating through each of the coordinates
        for i, row in coords.iloc[1:].iterrows():
            #getting stress data
            
            if f"Coord {i} Stress (kPa)" in stress_data.columns:
                stress = stress_data[f"Coord {i} Stress (kPa)"]
            else:
                logging.warning("STRESS VALUE COULD NOT BE INDEXED")

            lmpars = lmpars_init_dict['t3f12v3final']
            if self.aff_type == "RA":
                lmpars['tau1'].value = 2.5
                lmpars['tau2'].value = 200
                lmpars['tau3'].value = 1
                lmpars['k1'].value = 35
                lmpars['k2'].value = 0
                lmpars['k3'].value = 0.0
                lmpars['k4'].value = 0

            groups = MC_GROUPS
            mod_spike_time, mod_fr_inst = get_mod_spike(lmpars, groups, time, stress)

            if len(mod_spike_time) == 0 or len(mod_fr_inst) == 0:
                logging.warning(f"SPIKES COULD NOT BE GENERATED FOR {self.vf_tip_size}")
                continue

            if len(mod_spike_time) != len(mod_fr_inst):
                if len(mod_fr_inst) > 1:
                    mod_fr_inst_interp = np.interp(mod_spike_time, time, mod_fr_inst)
                else:
                    mod_fr_inst_interp = np.zeros_like(mod_spike_time)
            else:
                mod_fr_inst_interp = mod_fr_inst

            features, _ = pop_model(mod_spike_time,mod_fr_inst_interp)

            #appending stuff to lists
            afferent_type.append(self.aff_type)
            x_pos.append(row[0])
            y_pos.append(row[1])
            spikes.append(len(mod_spike_time) if len(mod_spike_time) !=0 else None)
            mean_firing_frequency.append(features["Average Firing Rate"])
            peak_firing_frequency.append(np.max(mod_fr_inst_interp))
            first_spike_time.append(mod_spike_time[0] if len(mod_spike_time) != None else None)
            last_spike_time.append(mod_spike_time[-1])
            
        model_results = {
            'afferent_type': self.aff_type,
            'x_position': x_pos,
            'y_position': y_pos,
            'num_of_spikes' : spikes,
            'mean_firing_frequency' : mean_firing_frequency,
            'peak_firing_frequency' : peak_firing_frequency, 
            'first_spike_time': first_spike_time,
            'last_spike_time' : last_spike_time
        }

        self.results = model_results
        return model_results

    def radial_stress_vf_model(self,scaling_factor = 0.3):
        """ Read in the Radial which has sample stress traces for every 2mm from a center point
        to calculate firing"""

        #regex pattern for exstracting the distance from the middle point
        distance_regex = r'\d\.\d{2}'

        #reading data from spatial stress data file for 50 (or n) data points in a grid
        coords = pd.read_csv(f"data/vfspatial/{self.vf_tip_size}_spatial_coords.csv", header = None)

        self.x_coords = [float(row[0]) for row in coords.iloc[1:].values]
        self.y_coords = [float(row[1]) for row in coords.iloc[1:].values]


        
        stress_df = pd.read_csv(f"data/vfspatial/{self.vf_tip_size}_spatial_stress.csv", )
        time = stress_df['Time (ms)'].to_numpy()

        #Reading in the radial stress file
        radial_stress = pd.read_csv(f"data/vfspatial/{self.vf_tip_size}_radial_stress.csv")
        radial_time = stress_df['Time (ms)'].to_numpy()
        
        stress_data = {}
        iff_data = {}


        #Outer loop top iterate through all n spatial points
        for i, row in coords.iloc[1:].iterrows():
            radial_spatial_flag = True
            stress_data[i] = {}
            iff_data[i] = {}

            if f"Coord {i} Stress (kPa)" in stress_df.columns:
                spatial_stress = stress_df[f"Coord {i} Stress (kPa)"]
                spatial_stress_max = np.max(spatial_stress)

                # Inner loop to iterate through radial distances
                for col in radial_stress.columns[1:]:
                    distance_from_center = float(re.findall(distance_regex, col)[0])

                    # Initialize lists for each coordinate-distance pair
                    afferent_type = []
                    x_pos = []
                    y_pos = []
                    spikes = []
                    mean_firing_frequency = []
                    peak_firing_frequency = []
                    first_spike_time = []
                    last_spike_time = []

                    if radial_spatial_flag:
                        radial_stress_vals = radial_stress[col]
                        radial_stress_max = np.max(radial_stress_vals)
                        distance_scaling_factor = spatial_stress_max / radial_stress_max
                        radial_spatial_flag = False
                    
                    scaled_stress = radial_stress[col] * distance_scaling_factor * scaling_factor

                    stress_data[i][distance_from_center] = {
                        "Time": radial_time,
                        distance_from_center: scaled_stress.to_numpy()
                    }

                    lmpars = lmpars_init_dict['t3f12v3final']
                    if self.aff_type == "RA":
                        lmpars['tau1'].value = 2.5
                        lmpars['tau2'].value = 200
                        lmpars['tau3'].value = 1
                        lmpars['k1'].value = 35
                        lmpars['k2'].value = 0
                        lmpars['k3'].value = 0.0
                        lmpars['k4'].value = 0

                    groups = MC_GROUPS
                    mod_spike_time, mod_fr_inst = get_mod_spike(lmpars, groups, stress_data[i][distance_from_center]["Time"], stress_data[i][distance_from_center][distance_from_center])

                    if len(mod_spike_time) == 0 or len(mod_fr_inst) == 0:
                        # logging.warning(f"SPIKES COULD NOT BE GENERATED FOR {self.vf_tip_size}")
                        continue

                    if len(mod_spike_time) != len(mod_fr_inst):
                        if len(mod_fr_inst) > 1:
                            mod_fr_inst_interp = np.interp(mod_spike_time, time, mod_fr_inst)
                        else:
                            mod_fr_inst_interp = np.zeros_like(mod_spike_time)
                    else:
                        mod_fr_inst_interp = mod_fr_inst

                    features, _ = pop_model(mod_spike_time, mod_fr_inst_interp)

                    # Append single values to the lists
                    afferent_type.append(self.aff_type)
                    x_pos.append(row[0])
                    y_pos.append(row[1])
                    spikes.append(len(mod_spike_time) if len(mod_spike_time) != 0 else None)
                    mean_firing_frequency.append(features["Average Firing Rate"])
                    peak_firing_frequency.append(np.max(mod_fr_inst_interp))
                    first_spike_time.append(mod_spike_time[0] if len(mod_spike_time) != 0 else None)
                    last_spike_time.append(mod_spike_time[-1])

                    # Store each coordinate-distance dictionary within iff_data
                    iff_data[i][distance_from_center] = {
                        'afferent_type': self.aff_type,
                        'x_position': x_pos[0],
                        'y_position': y_pos[0],
                        'num_of_spikes': spikes[0],
                        'mean_firing_frequency': mean_firing_frequency[0],
                        'peak_firing_frequency': peak_firing_frequency[0],
                        'first_spike_time': first_spike_time[0],
                        'last_spike_time': last_spike_time[0]
                    }
            else:
                logging.warning("STRESS VALUE COULD NOT BE INDEXED")
        self.stress_data = stress_data
        self.results = iff_data


    def aggregate_results(self):
        df = pd.DataFrame(self.results)
        file_path = f"generated_csv_files/{self.vf_tip_size}_vf_popul_model.csv"
        df.to_csv(file_path, index = False)
        return file_path 
    
    def plot_spatial_coords(self):
        """
        Plots the iffs on a grid for the original n points, the magniude of the peak firing
        frequency directly affects the size of the circle plotted, and the opacity
        """
        #colors for differnet afferents
        colors = {'SA': '#31a354', 'RA': '#3182bd'}
        plt.figure(figsize=(12, 8))

        # Plot the stimulus locations as circles
        x_positions = self.results.get("x_position")
        y_positions = self.results.get("y_position")
        mean_iffs = self.results.get("mean_firing_frequency")
        peak_iffs = self.results.get("peak_firing_frequency")
        
        x_positions = [float(value) for value in x_positions]
        y_positions = [float(value) for value in y_positions]
        #scaling peak_iffs so it looks better when plotting
    
        alphas = [float(value)/max(peak_iffs) for value in peak_iffs]

        #Scatter plot
                # Plot the stimulus locations as circles
        for x_pos, y_pos, radius, alpha in zip(x_positions, y_positions, peak_iffs, alphas):
            plt.gca().add_patch(
                patches.Circle((x_pos, y_pos), radius*2, edgecolor='black', facecolor = colors.get(self.aff_type) , linewidth=1, alpha = 0.5)
            )
        plt.xlabel('Length (mm)')
        plt.ylabel('Width (mm)')
        plt.title("VF Afferent Stress Distribution")
        plt.gca().set_aspect('equal', adjustable='box')
        plt.xlim(min(x_positions) - 1, max(x_positions) + 1)
        plt.ylim(min(y_positions) - 1, max(y_positions) + 1)
        plt.savefig(f"vf_graphs/aggregated_results_on_grid/{self.vf_tip_size}_{self.aff_type}_constant_opacity.png")

    def simulate_afferent_response(self, afferents):
        if self.stress_data is None or self.results is None:
            return None, None

        stress_data = []
        iff_data = []
        
        # Get the list of radii (assuming they are the same for all entries)
        radii = list(self.results.get(1, {}).keys())

        for aff in afferents:
            # Calculate the distance between the afferent and all spatial point centers
            distance_from_spatial_centers = [
                distance(x1, y1, aff.x_pos, aff.y_pos) for x1, y1 in zip(self.x_coords, self.y_coords)
            ]

            # Find the index of the minimum distance
            min_index = distance_from_spatial_centers.index(min(distance_from_spatial_centers))

            # Get the distance to the nearest center
            distance_to_nearest_center = distance_from_spatial_centers[min_index]

            # Check if the afferent is within any of the concentric circles
            stresses = None
            iffs = None
            for radius in radii:
                # Ensure min_index exists in both dictionaries and contains the needed radius data
                if (min_index in self.stress_data and self.stress_data[min_index] and 
                    min_index in self.results and self.results[min_index]):
                    
                    if distance_to_nearest_center <= radius:
                        stresses = self.stress_data[min_index].get(radius)
                        iffs = self.results[min_index].get(radius)
                        break  # Exit loop once the correct radius is found

            # Append the results (even if None) for this afferent
            stress_data.append(stresses)
            iff_data.append(iffs)

        return stress_data, iff_data

    
    def post_process(self, iffs_results):
        pass


    """Passing in multiple afferents aggregate IFF data to generate a plot"""
    def plot_afferents(self, iffs_results, afferents,size_factor=10,non_firing_size =50):
        colors = {'SA': '#31a354', 'RA': '#3182bd'}
        plt.figure(figsize=(12, 8))  # Increase figure size if necessary


        
        iff_coords = [(float(iff.get("x_position")), float(iff.get("y_position"))) for iff in iffs_results if iff is not None]
        aff_coords = [(aff.x_pos, aff.y_pos) for aff in afferents]

        non_coords_matches = find_non_matches(iff_coords, aff_coords)

        aff_x_non_firing = [ coord[0] for coord in non_coords_matches ]
        aff_y_non_firing = [ coord[1] for coord in non_coords_matches ]
        plt.scatter(aff_x_non_firing, aff_y_non_firing, edgecolor='red', facecolor='none', s= non_firing_size, 
                    linewidth=0.5, label='Non-firing Afferents')
    

        peak_iffs = [float(iff.get("peak_firing_frequency")) for iff in iffs_results if iff is not None]
        alphas = [iff / max(peak_iffs) for iff in peak_iffs]


        for iff, alpha in zip(iffs_results, alphas):
            if iff is not None:
                try:
                    x_pos = float(iff.get("x_position"))
                    y_pos = float(iff.get("y_position"))
                    peak_iff = float(iff.get("peak_firing_frequency"))
                    
                    # Multiply the radius by the size_factor to control circle size
                    plt.gca().add_patch(
                        patches.Circle(
                            (x_pos, y_pos), peak_iff * size_factor,
                            edgecolor='black', facecolor=colors.get(self.aff_type),
                            linewidth=0.5, alpha=alpha * 0.8  # Set alpha for visibility
                        )
                    )

                except (TypeError, ValueError) as e:
                    print(f"Skipping entry due to error: {e}, iff: {iff}")

        plt.xlabel('Length (mm)')
        plt.ylabel('Width (mm)')
        plt.title(f"Distribution of Afferents Von Frey {self.aff_type} {self.vf_tip_size}")
        plt.gca().set_aspect('equal', adjustable='box')
        plt.xlim(min(self.x_coords) - 1, max(self.x_coords) + 1)
        plt.ylim(min(self.y_coords) - 1, max(self.y_coords) + 1)
        plt.show()



    """This Method Plots the Concetric Cirlces"""
    def plot_radial_spatial_afferents(self, aff: Afferent, config : SimulationConfig, stress, iff):
        radii = list(self.results.get(1).keys())
        colors = {'SA': '#31a354', 'RA': '#3182bd'}

        plt.figure( figsize=(10,5))

        """
        Plotting the "Stimuli" or the know stress data this will look similar to spatial
        data points in a grid and each of the spatial points will be surrounded be concentric
        circles
        """

        for x_pos,y_pos in zip(self.x_coords, self.y_coords):
            for radius in radii[1:]:
                plt.gca().add_patch(
                    patches.Circle((x_pos,y_pos), radius, edgecolor = 'black', facecolor = 'None' , linewidth=.1) 
                )

        
        plt.xlabel('Length (mm)')
        plt.ylabel('Width (mm)')
        plt.title(f"Distribution of Afferents Von Frey {self.aff_type} {self.vf_tip_size}")
        plt.gca().set_aspect('equal', adjustable='box')
        plt.xlim(min(self.x_coords) - 1, max(self.x_coords) + 1)
        plt.ylim(min(self.y_coords) - 1, max(self.y_coords) + 1)
        plt.savefig(f"vf_graphs/aggregated_results_on_grid/failed_plot_of_stimulus_concentric_circles")


    def get_iffs(self):
        return self.results
    
    def get_stress_traces(self):
        return self.stress_data


**COnfiguring the Von-Frey Population Model**

In [10]:
#creates model class
vf_model = VF_Population_Model(4.17, "SA")

#runs the model which calculates the results
# vf_model.spatial_stress_vf_model()
vf_model.radial_stress_vf_model()
iffs_results = vf_model.get_iffs()
stress_results = vf_model.get_stress_traces()


KeyboardInterrupt: 

**Randomly Generating Afferents and setting up the configuration for the Simulation**

In [None]:
tongue_size = (12,16)  # in mm
density_ratio = (1, 0)  # Ratio of SA and RA afferents
n_afferents = 500
rf_sizes = {
    'SA': [1],
    'RA': [1]
}

config = SimulationConfig(tongue_size, density_ratio, n_afferents, rf_sizes,
                          stimulus_diameter=None, 
                          x_stimulus=None, y_stimulus=None,
                          stress=None)

simulation = Simulation(config)

afferents = simulation.get_afferents()


**Getting Stress & Firing Data for the Afferents**

In [None]:
stress_data = []
iff_data = []

stresses, iffs = vf_model.simulate_afferent_response(afferents)

**Creating The Plot**

In [None]:
vf_model.plot_afferents(iffs, afferents)

In [None]:
for aff in afferents:
    plt.scatter(aff.x_pos,aff.y_pos)

plt.show()

In [None]:
for iff in iff_data:
    print(iff)

In [None]:
stress_results.get(4).get(0.0), 
iffs_results.get(4).get(0.0)

In [None]:
iffs_results.get(3)