In [1]:
# -*- coding: utf-8 -*-
# @author: Alexander Golden

### Importation des bibliothèques ###
import numpy as np
import matplotlib
import random as rand
import matplotlib.pyplot as plt
import math
import os
import sys
import scipy.stats as sc
import csv
import pandas as pd
import torch
import time
import scipy as sp
import scipy.sparse as sparse
from numba import njit, float64
from matplotlib.colors import LogNorm
import itertools as it
import pickle as pkl
import argparse


In [9]:
### Importation des bibliothèques ###
import numpy as np
import random as rand
import scipy.sparse as sparse
from numba import njit, float64

### Fonctions de signalisation ###

# Fonction numba (accélérée) pour ajouter le terme de diffusion à l'équation
@njit(float64[:,:](float64, float64[:,:], float64, float64, float64, float64, float64, float64[:,:], float64[:,:]))
def addcD(dt, lap, Dsig, J, rho, a0, D, signal, A_grid_hsided):
    """
    Calcule la diffusion du signal chimique avec un terme laplacien et différents paramètres.
    Cette fonction simule la dynamique de diffusion et de production du produit chimique.

    dt : pas de temps
    lap : terme laplacien
    Dsig : coefficient de diffusion
    J : taux de dégradation
    rho, a0, D : paramètres de production
    signal : concentration du signal
    A_grid_hsided : grille avec le seuil de production des cellules (seuil de Heaviside)
    """
    return dt * (Dsig * lap - J * signal + rho * a0) + dt * (rho * D * A_grid_hsided)

# Accumule des valeurs sur une grille en fonction des coordonnées (pour créer une grille de concentration à partir de coordonnées cellulaires)
def accumulate_arr(coord, arr, shape):
    """
    Accumule les valeurs de 'arr' dans une grille de taille 'shape' en fonction des coordonnées 'coord'.
    """
    lidx = np.ravel_multi_index(coord, shape)  # Convertir les coordonnées en indices linéaires
    return np.bincount(lidx, arr, minlength=shape[0]*shape[1]).reshape(shape[0], shape[1])

# Met à l'échelle une matrice A à partir d'une matrice B et un facteur d'échelle k
def scale(A, B, k):
    """
    Redimensionne la matrice B et la copie dans A selon l'échelle k (sur des intervalles k).
    """
    Y = A.shape[0]
    X = A.shape[1]
    for y in range(0, k):
        for x in range(0, k):
            A[y:Y:k, x:X:k] = B
    return A

# Version 1D de la fonction de redimensionnement
def scale1D(A, B, k):
    """
    Similaire à la fonction scale, mais pour des tableaux 1D.
    """
    Y = A.shape[0]
    for y in range(0, k):
        A[y:Y:k] = B
    return A

# Crée une matrice creuse pour la réduction de la taille de la grille de manière cohérente avec l'échelle donnée
def scale_down_mat_sp(current_shape, scale):
    """
    Génère une matrice pour réduire la taille d'une matrice en appliquant un facteur d'échelle donné.
    """
    assert current_shape[0] // scale == current_shape[0] / scale, "Shape not divisible by scale"
    assert current_shape[1] // scale == current_shape[1] / scale, "Shape not divisible by scale"
    
    out_shape = (current_shape[0] // scale, current_shape[1] // scale)
    # Créer la première ligne de la matrice
    line = np.array(([1/scale**2] * scale + [0] * (current_shape[0] - scale)) * scale + [0] * (current_shape[0] * (current_shape[1] - scale)))
    
    # Initialiser la matrice en bloc avec cette première ligne
    out_mat = sparse.bsr_matrix(line)
    
    # Créer les autres lignes par translation de la première
    for i in range(out_shape[0] - 1):
        newline = np.roll(line, scale * (i + 1))
        out_mat = sparse.vstack([out_mat, newline])
    
    # Compléter la matrice pour toutes les dimensions
    block = out_mat.todense()
    for i in range(out_shape[1] - 1):
        newblock = sparse.bsr_matrix(np.roll(block, scale * current_shape[0] * (i + 1), 1))
        out_mat = sparse.vstack([out_mat, newblock])
    
    return sparse.csr_matrix(out_mat)

# Calcule la matrice laplacienne pour une grille sans flux
def calc_square_laplacian_noflux_matrix(size):
    """
    Crée une matrice laplacienne sans conditions aux limites de flux, utilisée pour la diffusion.
    """
    # Différences en X
    xDiff = np.zeros((size[1] + 1, size[1]))
    ix, jx = np.indices(xDiff.shape)
    xDiff[ix == jx] = 1
    xDiff[ix == jx + 1] = -1
    
    # Différences en Y
    yDiff = np.zeros((size[0] + 1, size[0]))
    iy, jy = np.indices(yDiff.shape)
    yDiff[iy == jy] = 1
    yDiff[iy == jy + 1] = -1
    
    # Calcul de la matrice laplacienne en X et Y
    Ax = sparse.dia_matrix(-np.matmul(np.transpose(xDiff), xDiff))
    Ay = sparse.dia_matrix(-np.matmul(np.transpose(yDiff), yDiff))
    
    lap = sparse.kron(Ay, sparse.eye(size[1]), format='csr') + sparse.kron(sparse.eye(size[0]), Ax, format='csr')
    
    # Ajouter les conditions aux limites
    lap += sparse.diags([2] + [1] * (size[1] - 2) + [2] + ([1] + [0] * (size[1] - 2) + [1]) * (size[0] - 2) + [2] + [1] * (size[1] - 2) + [2])
    
    return sparse.bsr_matrix(lap)

# Fonction pour lisser une courbe avec une boîte de convolution
def smooth(y, box_pts):
    """
    Lisse la courbe y en appliquant une convolution avec une boîte de taille box_pts.
    """
    box = np.ones(box_pts) / box_pts
    y_smooth = np.convolve(y, box, mode='same')
    return y_smooth

### Classe de population de cellules ###
class dictyPop:
    def __init__(self, T, save_every, g_params=None, c_params=None, noise=True, progress_report=True, save_data=True):
        """
        Initialise une population de cellules Dictyostelium et simule la diffusion d'un signal (comme le cAMP).
        
        T : Temps total de simulation
        save_every : Intervalle pour sauvegarder les données
        g_params : Paramètres de la grille
        c_params : Paramètres des cellules
        noise : Si True, ajoute du bruit dans la production du signal
        """
        self.g_params = g_params  # Paramètres de la grille
        self.c_params = c_params  # Paramètres des cellules
        self.T = T  # Temps total
        self.ts = 0  # Temps courant
        self.dt = self.g_params['dx'] ** 2 / (8 * self.g_params['D_sig'])  # Pas de temps
        self.Tsteps = int(self.T / self.dt)  # Nombre d'étapes temporelles
        
        ll = self.g_params['box_size_x']  # Taille de la grille en X
        
        self.signal_size = (int(self.g_params['box_size_x'] / self.g_params['dx']),
                            int(self.g_params['box_size_y'] / self.g_params['dx']))  # Taille de la grille du signal

        self.fluxes = np.zeros(4)  # Flux de bord
        self.p_r = progress_report  # Affichage du progrès
        self.save_every = save_every  # Intervalle de sauvegarde
        self.save_data = save_data  # Si True, les données sont sauvegardées
        
        # Résolution des cellules (nombre de cellules par unité de grille)
        self.agent_ratio = int(self.g_params['agent_dim'] / self.g_params['dx'])
        self.agent_size = (max(1, int(self.g_params['box_size_x'] / self.g_params['agent_dim'])),
                           max(1, int(self.g_params['box_size_y'] / self.g_params['agent_dim'])))
        self.signal = np.zeros(self.signal_size)  # Matrice de concentration du signal
        self.noise_flag = noise  # Ajout du bruit ou non

        # États initiaux des cellules (concentrations A et R)
        cell_state = np.random.normal(loc=0.0, scale=2, size=(2, g_params['num_agents']))
        self.A = cell_state[0, :]  # Concentration du produit chimique A
        self.R = cell_state[1, :]  # Concentration du produit chimique R
        
        self.D = float(self.c_params['D'])  # Coefficient de diffusion
        self.dsignal = np.zeros(self.signal.shape)  # Diffusion du signal
        
        # Matrice de réduction d'échelle et coordonnées des cellules
        self.interp_mat = scale_down_mat_sp(self.signal_size, self.agent_ratio)
        self.A_grid = np.zeros((self.signal_size[0] // self.agent_ratio, self.signal_size[1] // self.agent_ratio))
        self.A_grid_big = np.zeros(self.signal.shape)
        
        # Coordonnées des cellules sur la grille
        self.coord = np.zeros((2, self.g_params['num_agents']), dtype='int')
        for agent in range(self.g_params['num_agents']):
            self.coord[:, agent] = np.array([
                int(rand.random() * ll / max(self.g_params['agent_dim'], self.g_params['dx'])),
                int(rand.random() * ll / max(self.g_params['agent_dim'], self.g_params['dx']))
            ])

        # Calcul du laplacien
        if min(self.signal_size) > 1:
            self.lap_mat = calc_square_laplacian_noflux_matrix(self.signal_size)
        else:
            self.lap_mat = sparse.diags([np.ones(max(self.signal_size) - 1), -2 * np.ones(max(self.signal_size)), np.ones(max(self.signal_size) - 1)], [-1, 0, 1], format='csr')
            self.lap_mat += sparse.diags(np.array([1] + [0] * (max(self.signal_size) - 2) + [1]), format='csr')

        # Sauvegarde des données si nécessaire
        if self.save_data is True:
            self.A_saved = np.zeros((self.signal_size[0] // self.agent_ratio, self.signal_size[1] // self.agent_ratio, (self.T // save_every) + 1))
            self.R_saved = np.zeros((self.signal_size[0] // self.agent_ratio, self.signal_size[1] // self.agent_ratio, (self.T // save_every) + 1))
            self.S_saved = np.zeros((self.signal_size[0], self.signal_size[1], (self.T // save_every) + 1))
            self.A_saved[:, :, 0] = self.getAGrid()
            self.R_saved[:, :, 0] = self.getRGrid()
            self.S_saved[:, :, 0] = self.signal

    # Récupère la grille de concentrations A
    def getAGrid(self):
        return accumulate_arr(self.coord, self.A, self.A_grid.shape)

    # Récupère la grille de concentrations R
    def getRGrid(self):
        return accumulate_arr(self.coord, self.R, self.A_grid.shape)

    # Fixe le signal de départ
    def setSignal(self, signal):
        assert self.signal.shape == signal.shape and self.signal.dtype == signal.dtype, "Input signal incorrect shape or type"
        self.signal = np.array(signal)

    # Fixe l'état des cellules
    def setCellState(self, state):
        assert self.A.shape == state[:, 0].shape and self.A.dtype == state.dtype, "Input cell state incorrect shape or type"
        self.A = np.array(state[:, 0])
        self.R = np.array(state[:, 1])

    # Fixe les flux aux limites
    def setFluxes(self, fluxes):
        assert self.fluxes.shape == fluxes.shape and self.fluxes.dtype == fluxes.dtype, "Input fluxes incorrect shape or type"
        self.fluxes = np.array(fluxes)

    # Mise à jour des concentrations A
    def getdA(self):
        return ((self.A - (self.A * self.A * self.A) / 3 - self.R)) * self.dt + \
               ((self.c_params['a'] * np.log1p(np.reshape(self.interp_mat.dot(np.reshape(self.signal, self.signal_size[0] * self.signal_size[1])), self.agent_size)[self.coord[0, :], self.coord[1, :]] / self.c_params['Kd']))) * self.dt

    # Mise à jour des concentrations R
    def getdR(self):
        return (((self.A - self.c_params['gamma'] * self.R) + self.c_params['c0']) * self.c_params['epsilon']) * self.dt

    # Mise à jour du système (diffusion du signal)
    def update(self):
        self.ts += self.dt
        self.dA = self.getdA()
        if self.noise_flag:
            self.dA += self.c_params['sigma'] * np.random.normal(loc=0.0, scale=np.sqrt(self.dt), size=self.A.shape)
        self.dR = self.getdR()
        self.A_grid = accumulate_arr(self.coord, self.A, self.A_grid.shape)
        self.A_grid_big = scale(self.A_grid_big, self.A_grid, self.agent_ratio)
        return self.signal  # Retourne la grille du signal mise à jour


In [19]:
### Exécution du script et génération des séries temporelles ###
def timeseries_a(a, t0, tf, freq):
    """
    Cette fonction simule l'évolution de la concentration d'un produit chimique sécrété par des cellules dans un espace 2D.
    Elle affiche une carte des concentrations à chaque pas de temps et calcule des séries temporelles des moyennes de concentration.
    
    :param a: Paramètre optionnel pour ajuster les cellules (non utilisé dans cette version)
    :param t0: Temps initial de la simulation
    :param tf: Temps final de la simulation
    :param freq: Fréquence à laquelle les séries temporelles sont calculées (non utilisé dans ce code)
    """
    
    ### Paramètres des cellules et de la grille ###
    path = './output_files/'  # Répertoire où les cartes de concentration seront sauvegardées
    N = int(15000)  # Taille de la population, nombre total de cellules

    # Paramètres pour le comportement chimique des cellules et de leur environnement
    cell_params_default = {
        'c0': 1,           # Concentration de base de la molécule sécrétée par les cellules
        'a': 0.05,         # Taux de production de la molécule chimique par les cellules
        'gamma': 0.5,      # Coefficient de rétroaction pour la concentration
        'Kd': 10 ** (-5),  # Constante de dissociation pour la signalisation chimique
        'sigma': 0.15,     # Amplitude du bruit aléatoire (variation stochastique de la production)
        'epsilon': 0.2,    # Sensibilité de la cellule au produit chimique
        'cStim': 100,      # Concentration du stimulus externe
        'aPDE': 10,        # Taux de dégradation du produit chimique
        'rho': 1,          # Densité des cellules (nombre de cellules par unité de surface)
        'D': 1000,         # Coefficient de diffusion des molécules chimiques dans l'environnement
        'a0': 1,           # Production initiale des cellules
        'af': 0            # Facteur de production final (non utilisé ici)
    }

    # Paramètres de la grille sur laquelle la diffusion est simulée
    grid_params_default = {
        'dx': 0.5,          # Taille d'une unité spatiale (0.5 unités)
        'D_sig': 5,         # Coefficient de diffusion du signal chimique
        'box_size_x': 5,   # Taille de la grille en X (50 unités)
        'box_size_y': 5,   # Taille de la grille en Y (50 unités)
        'agent_dim': 2 * 0.5,  # Taille d'une cellule (1 unité spatiale)
        'num_agents': N    # Nombre total d'agents (ou cellules) simulées
    }

    # Flag pour activer ou désactiver le bruit dans la simulation
    noise = True

    # Création de la population de cellules avec les paramètres définis
    pop = dictyPop(1, 1, c_params=cell_params_default, g_params=grid_params_default, noise=noise)
    
    # Initialisation des états des cellules (A et R) à zéro
    pop.A = np.zeros(pop.A.shape)  # État initial de la molécule A sécrétée
    pop.R = np.zeros(pop.R.shape)  # État initial de la molécule R (potentiellement un inhibiteur ou autre molécule)
    
    # Calcul initial de l'état du système (matrice de concentration)
    S = pop.update()
    
    # Initialisation de la série temporelle avec des NaN
    TimeSeries = np.array([(np.nan,) for _ in range(int((len(S) / 10) ** 2))])
    
    # Boucle principale qui va de t0 à tf pour mettre à jour l'état du système à chaque pas de temps
    for i in range(t0, tf):
        S = pop.update()  # Mise à jour de la concentration à chaque pas de temps
        NewCol = []  # Liste pour stocker les nouvelles valeurs moyennes des sous-régions de la grille
        
        # Affichage de la carte de concentration à chaque pas de temps
        plt.matshow(S)  # Affiche la matrice de concentration S sous forme de carte de chaleur
        plt.title("t = " + str(i))  # Ajout d'un titre avec le temps courant
        plt.colorbar()  # Ajoute une échelle de couleur pour la carte de chaleur
        plt.show()  # Affiche la carte
        
        # Sauvegarde de la carte de concentration (optionnel)
        print(f"{path}/concentration_t_{i}.png")  # Affiche le chemin de sauvegarde dans la console
        plt.savefig(f"{path}/concentration_t_{i}.png")  # Sauvegarde l'image au chemin spécifié
        
        # Calcul des moyennes de concentration dans des sous-régions de la grille (10x10 unités)
        for i in range(0, len(S), 10):
            for j in range(0, len(S), 10):
                # Moyenne de concentration dans chaque sous-région de taille 10x10 et normalisation par la moyenne générale
                NewCol.append(np.mean(S[i:i + 10, j:j + 10]) / np.mean(S))
        
        # Transformation des nouvelles moyennes en DataFrame pandas
        NCol = pd.DataFrame({'col': NewCol})
        
        # Ajout des nouvelles valeurs de concentration à la série temporelle
        TimeSeries = np.column_stack((TimeSeries, NCol))
    
    return TimeSeries  # Retourne la série temporelle complète


In [None]:
timeseries_a(a=0, t0=0, tf=100, freq=0)