# T Cell Attractors and Slicing Analysis
Tested on Python 3.11.4, networkx 3.1, pandas 1.5.3


In [None]:
import networkx as nx
import pandas as pd
from functools import reduce
from swiplserver import PrologMQI, PrologThread
import sys
import os
import glob
from itertools import chain, combinations

### Auxiliary definitions

In [None]:
class Target:
    def __init__(self,pr,ab):
        self.present = pr
        self.absent = ab
    
    def __str__(self):
        return f"present: {self.present} absent: {self.absent}"    
        
targets = []
interesting_genes = ["tbet","gata3","foxp3","rorgt"]
#interesting_genes = ["gata3"]
def powerset(s):
    return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
for x in powerset(interesting_genes):
    if (len(x)>0):
        targets.append(Target([k for k in x],[k for k in interesting_genes if k not in x]))

In [None]:
# checks whether a node is in an attractor
def check_node(node):
    cycle = list(nx.find_cycle(G,node))
    tmp = map(lambda x : x[0]==node or x[1]==node, cycle) 
    return reduce(lambda b1, b2: b1 or b2, tmp)

# compute the list of genes that are present in the attractor reachable form "node"
def compute_attractor(node):
    cycle = list(nx.find_cycle(G,node))
    tmp1 = map(lambda x: x[0].split(';') + x[1].split(';'), cycle)
    tmp2 = reduce(lambda x, y: x+y,tmp1)
    res = list(dict.fromkeys(tmp2))
    return res

In [None]:
def count_states (d):
    tmp = 0
    for v in d.values():
        tmp += len(v)
    return tmp

# finds computations (LTS traces) that lead to the "target"
def target_computations(target):
    all_nodes = list(G.nodes)
    filtered = [k for k in all_nodes if check_node(k)] # filters out intermediate nodes (not in attractor)
    attractors_map = dict()
    for f in filtered:
        attractors_map[f] = compute_attractor(f) # creates a map "state -> attractor"
    for s in target.present:
        filtered = [k for k in filtered if s in attractors_map[k]]   # filters out states that do not contain s (present in the target)
    for s in target.absent:
        filtered = [k for k in filtered if s not in attractors_map[k]] # filters out state that contain s (absent in the target)

    # filters out states in target attractors, but that do not contain any gene in target.present
    # (slicing would give an empty result on these states)
    filtered2 = [k for k in filtered if len(list(set(k.split(';')) & set(target.present)))>0]

    # cleans the attractors_map from states not in target (this step is not really necessary...)
    keys_to_delete = list() 
    for k in attractors_map.keys():
        if k not in filtered: keys_to_delete.append(k) 
    for k in keys_to_delete:
        del attractors_map[k] 
    
    contexts = list()   # list of the contexts that lead to the target
    contexts_dict = {}  # for each context, lists the states in the corresponding attractor
    filtered_splitted = [f.split(';') for f in filtered2]
    for f in filtered_splitted:
        prefix = f[0:10] # the first 10 elements in the state are the context [UPDATE 10!!]
        pure_state = f[10:] # the others are the actual state
        if (not prefix in contexts):
            contexts.append(prefix)
            contexts_dict[','.join(prefix)] = [','.join(pure_state)]
        else:
            contexts_dict[','.join(prefix)].append(','.join(pure_state))
    print("TARGET --> " + str(target))
    print("found " + str(len(contexts)) + " contexts that lead to the target")
    print("found " + str(count_states(contexts_dict) + (len(filtered)-len(filtered2))) + " states in reachable attractors")
    print("of which " + str(count_states(contexts_dict)) + " with genes in the target")
    print()
    return contexts, contexts_dict

In [None]:
from IPython.display import display

# Creare un DataFrame globale per accumulare le righe
table_data_df = pd.DataFrame()

def generate_table(target_desc, intersection_pos_set, union_pos_set, intersection_neg_set, union_neg_set):
    global table_data_df
        
    # Ottenere tutte le stringhe uniche
    all_strings = sorted(set(table_data_df.columns.get_level_values(0)).union(union_pos_set, union_neg_set))
    
    # Creare l'intestazione delle colonne alternando le sottocolonne POS e NEG
    columns = [(s, cat) for s in all_strings for cat in ('POS', 'NEG')]
    
    # Assicurarsi che il DataFrame abbia tutte le colonne necessarie
    if table_data_df.empty:
        table_data_df = pd.DataFrame(columns=pd.MultiIndex.from_tuples(columns))
    else:
        missing_columns = set(columns) - set(table_data_df.columns)
        for col in missing_columns:
            table_data_df[col] = 0
        table_data_df = table_data_df[columns]  # Riordina le colonne
    
    # Creare una nuova riga
    new_row = {col: 0 for col in table_data_df.columns}  # Inizializza tutte le colonne a 0
    for s in all_strings:
        if s in intersection_pos_set:
            new_row[(s, 'POS')] = 2
        elif s in union_pos_set:
            new_row[(s, 'POS')] = 1
        if s in intersection_neg_set:
            new_row[(s, 'NEG')] = 2
        elif s in union_neg_set:
            new_row[(s, 'NEG')] = 1
    
    # Aggiungere la nuova riga al DataFrame e riempire eventuali NaN con 0
    table_data_df = pd.concat([table_data_df, pd.DataFrame([new_row], index=[target_desc])]).fillna(0)
    
    # Funzione per applicare lo stile
    def highlight_cells(val):
        if val == 2:
            return 'background-color: black; color: white'
        elif val == 1:
            return 'background-color: gray; color: white'
        return ''
    
    # Applicare lo stile
    styled_df = table_data_df.style.applymap(highlight_cells)
    
    # Mostrare la tabella
    display(styled_df)

def save_table_to_file(filename):
    """
    Salva il DataFrame in un file CSV.
    """
    global table_data_df
    table_data_df.to_csv(filename, index=True)

In [None]:
def slicing_analysis(version):
    target_tot = len(targets)
    target_count = 0
    print("TO BE ANALYZED: " + str(target_tot) + " target")
    print()

    outfile = open("out.txt","w")
    for target in targets:
        target_count = target_count+1
        print("TARGET COUNT: " + str(target_count) + "/" + str(target_tot))
        contexts, contexts_dict = target_computations(target)

        prolog_target = ','.join(target.present)

        cont = 1
        tot = str(count_states(contexts_dict))
        union_set = set()
        intersection_set = set()
        first_time = True
        for ctx in contexts:
            prolog_context = ','.join(ctx)
            prolog_target_states = contexts_dict[','.join(ctx)]
            for i,state in enumerate(prolog_target_states):
                print("TEST CASE: " + str(cont) + "/" + str(tot))
                cont=cont+1
                print("CONTEXT: " + prolog_context + "      ATTRACTOR STATE: " + str(i+1) + "/" + str(len(prolog_target_states)))
                print("STATE: " + state)
            
                param_file = open("BioResolve_positive/params.pl",'w')
                if (version=="original"):
                    param_file.write("myenvironment('[ x1 = ({tgfb}.x11 + {}.x0),\n x2 = ({il23}.x21 + {}.x0),\n x3 = ({il12}.x31 + {}.x0),\n x4 = ({il18}.x41 + {}.x0),\n x5 = ({il4e}.x51 + {}.x0),\n x6 = ({il27}.x61 + {}.x0),\n x7 = ({il6e}.x71 + {}.x0),\n x8 = ({ifnge}.x81 + {}.x0),\n x9 = ({tcr}.x91 + {}.x0),\n x11 = {tgfb}.x11,\n x21 = {il23}.x21,\n x31 = {il12}.x31,\n x41 = {il18}.x41,\n x51 = {il4e}.x51,\n x61 = {il27}.x61,\n x71 = {il6e}.x71,\n x81 = {ifnge}.x81,\n x91 = {tcr}.x91,\n x0 = {}.x0\n]').\n \nmyentities([]).\n \n")
                elif (version=="positive"):
                    param_file.write("myenvironment('[ x1 = ({tgfb}.x11 + {neg_tgfb}.x12),\n x2 = ({il23}.x21 + {neg_il23}.x22),\n x3 = ({il12}.x31 + {neg_il12}.x32),\n x4 = ({il18}.x41 + {neg_il18}.x42),\n x5 = ({il4e}.x51 + {neg_il4e}.x52),\n x6 = ({il27}.x61 + {neg_il27}.x62),\n x7 = ({il6e}.x71 + {neg_il6e}.x72),\n x8 = ({ifnge}.x81 + {neg_ifnge}.x82),\n x9 = ({tcr}.x91 + {neg_tcr}.x92),\n x11 = {tgfb}.x11,\n x21 = {il23}.x21,\n x31 = {il12}.x31,\n x41 = {il18}.x41,\n x51 = {il4e}.x51,\n x61 = {il27}.x61,\n x71 = {il6e}.x71,\n x81 = {ifnge}.x81,\n x91 = {tcr}.x91,\n x12 = {neg_tgfb}.x12,\n x22 = {neg_il23}.x22,\n x32 = {neg_il12}.x32,\n x42 = {neg_il18}.x42,\n x52 = {neg_il4e}.x52,\n x62 = {neg_il27}.x62,\n x72 = {neg_il6e}.x72,\n x82 = {neg_ifnge}.x82,\n x92 = {neg_tcr}.x92,\n x0 = {}.x0\n]').\n \n") 
                    param_file.write("myentities([neg_foxp3,neg_gata3,neg_ifng,neg_ifngr,neg_il12r,neg_il17,neg_il18r,neg_il2,neg_il21,neg_il21r,neg_il23r,neg_il2r,neg_il4,neg_il4r,neg_il6,neg_il6r,neg_irak,neg_jak1,neg_nfat,neg_nfkb,neg_rorgt,neg_socs1,neg_stat1,neg_stat3,neg_stat4,neg_stat5,neg_stat6,neg_tbet,neg_tgfbr]).\n \n")
                else:
                    print("Error")
                    return
                param_file.write('mycontext("[' + prolog_context + ']").\n\n')  # x0,
                param_file.write('mytarget([' + state + ']).\n')
                param_file.write('mypos([' + prolog_target + ']).\n')
                param_file.write('myneg([]).')
                param_file.flush()
                param_file.close()
                print()
                with PrologMQI() as mqi:
                    with mqi.create_thread() as prolog_thread:
                        prolog_thread.query('["BioResolve_positive/BioReSolvePositive.pl"]')
                        result = prolog_thread.query('main_do(ordslice,EKs,ListReactNumbR,ListEpos,ListCS).')
                        tmp_set = set(chain.from_iterable(result[0]['ListEpos'])) 
                        union_set.update(tmp_set)
                        if (first_time):
                            intersection_set = tmp_set
                            first_time = False
                        else:
                            intersection_set = intersection_set.intersection(tmp_set)
        for f in glob.glob("tmp-slice*.txt"):
            os.remove(f)
        for f in glob.glob("tmp-legenda*.txt"):
            os.remove(f)
        for f in glob.glob("tmp-slicingrun*.txt"):
            os.remove(f)
        
        print()
        union_set=sorted(union_set)
        intersection_set=sorted(intersection_set)
        print("SET OF ENTITIES IN SLICED COMPUTATIONS FOR TARGET " + str(target) + ":")
        outfile.write("SET OF ENTITIES IN SLICED COMPUTATIONS FOR TARGET " + str(target) + ":\n")
        print("UNION: " + str(union_set))
        outfile.write("         UNION: " + str(union_set)+"\n")
        print("INTERSECTION: " + str(intersection_set))
        outfile.write("         INTERSECTION: " + str(intersection_set)+"\n\n")
        print()
        outfile.flush()
    
        intersection_neg_set = {s[4:] for s in intersection_set if s.startswith("neg")}
        intersection_pos_set = {s for s in intersection_set if not s.startswith("neg")}

        union_neg_set = {s[4:] for s in union_set if s.startswith("neg")}
        union_pos_set = {s for s in union_set if not s.startswith("neg")}
    
        generate_table(str(target.present),intersection_pos_set, union_pos_set, intersection_neg_set, union_neg_set)

    outfile.close()
    save_table_to_file("table_results.csv")


In [None]:
def dynamic_negative_slicing_analysis(version):
    target_tot = len(targets)
    target_count = 0
    print("TO BE ANALYZED: " + str(target_tot) + " target")
    print()

    outfile = open("out.txt","w")
    for target in targets:
        target_count = target_count+1
        print("TARGET COUNT: " + str(target_count) + "/" + str(target_tot))
        contexts, contexts_dict = target_computations(target)

        prolog_target = ','.join(target.present)

        cont = 1
        tot = str(count_states(contexts_dict))
        union_pos_set = set()
        intersection_pos_set = set()
        union_neg_set = set()
        intersection_neg_set = set()
        first_time = True
        for ctx in contexts:
            prolog_context = ','.join(ctx)
            prolog_target_states = contexts_dict[','.join(ctx)]
            for i,state in enumerate(prolog_target_states):
                print("TEST CASE: " + str(cont) + "/" + str(tot))
                cont=cont+1
                print("CONTEXT: " + prolog_context + "      ATTRACTOR STATE: " + str(i+1) + "/" + str(len(prolog_target_states)))
                print("STATE: " + state)
                param_file = open("BioResolve_positive/params.pl",'w')
                if (version=="original"):
                    param_file.write("myenvironment('[ x1 = ({tgfb}.x11 + {}.x0),\n x2 = ({il23}.x21 + {}.x0),\n x3 = ({il12}.x31 + {}.x0),\n x4 = ({il18}.x41 + {}.x0),\n x5 = ({il4e}.x51 + {}.x0),\n x6 = ({il27}.x61 + {}.x0),\n x7 = ({il6e}.x71 + {}.x0),\n x8 = ({ifnge}.x81 + {}.x0),\n x9 = ({tcr}.x91 + {}.x0),\n x11 = {tgfb}.x11,\n x21 = {il23}.x21,\n x31 = {il12}.x31,\n x41 = {il18}.x41,\n x51 = {il4e}.x51,\n x61 = {il27}.x61,\n x71 = {il6e}.x71,\n x81 = {ifnge}.x81,\n x91 = {tcr}.x91,\n x0 = {}.x0\n]').\n \nmyentities([]).\n \n")
                elif (version=="positive"):
                    param_file.write("myenvironment('[ x1 = ({tgfb}.x11 + {neg_tgfb}.x12),\n x2 = ({il23}.x21 + {neg_il23}.x22),\n x3 = ({il12}.x31 + {neg_il12}.x32),\n x4 = ({il18}.x41 + {neg_il18}.x42),\n x5 = ({il4e}.x51 + {neg_il4e}.x52),\n x6 = ({il27}.x61 + {neg_il27}.x62),\n x7 = ({il6e}.x71 + {neg_il6e}.x72),\n x8 = ({ifnge}.x81 + {neg_ifnge}.x82),\n x9 = ({tcr}.x91 + {neg_tcr}.x92),\n x11 = {tgfb}.x11,\n x21 = {il23}.x21,\n x31 = {il12}.x31,\n x41 = {il18}.x41,\n x51 = {il4e}.x51,\n x61 = {il27}.x61,\n x71 = {il6e}.x71,\n x81 = {ifnge}.x81,\n x91 = {tcr}.x91,\n x12 = {neg_tgfb}.x12,\n x22 = {neg_il23}.x22,\n x32 = {neg_il12}.x32,\n x42 = {neg_il18}.x42,\n x52 = {neg_il4e}.x52,\n x62 = {neg_il27}.x62,\n x72 = {neg_il6e}.x72,\n x82 = {neg_ifnge}.x82,\n x92 = {neg_tcr}.x92,\n x0 = {}.x0\n]').\n \n") 
                    param_file.write("myentities([neg_foxp3,neg_gata3,neg_ifng,neg_ifngr,neg_il12r,neg_il17,neg_il18r,neg_il2,neg_il21,neg_il21r,neg_il23r,neg_il2r,neg_il4,neg_il4r,neg_il6,neg_il6r,neg_irak,neg_jak1,neg_nfat,neg_nfkb,neg_rorgt,neg_socs1,neg_stat1,neg_stat3,neg_stat4,neg_stat5,neg_stat6,neg_tbet,neg_tgfbr]).\n \n")
                else:
                    print("Error")
                    return
                param_file.write('mycontext("[' + prolog_context + ']").\n\n')  # x0,
                param_file.write('mytarget([' + state + ']).\n')
                param_file.write('mypos([' + prolog_target + ']).\n')
                param_file.write('myneg([]).')
                param_file.flush()
                param_file.close()
                print()
                with PrologMQI() as mqi:
                    with mqi.create_thread() as prolog_thread:
                        prolog_thread.query('["BioResolve_positive/BioReSolvePositive.pl"]')
                        result = prolog_thread.query('main_do(negslice,EKs,ListReactNumbR,ListEpos,ListEneg,ListCS).')
                        tmp_pos_set = set(chain.from_iterable(result[0]['ListEpos'])) 
                        union_pos_set.update(tmp_pos_set)
                        tmp_neg_set = set(chain.from_iterable(result[0]['ListEneg'])) 
                        union_neg_set.update(tmp_neg_set)
                        if (first_time):
                            intersection_pos_set = tmp_pos_set
                            intersection_neg_set = tmp_neg_set
                            first_time = False
                        else:
                            intersection_pos_set = intersection_pos_set.intersection(tmp_pos_set)
                            intersection_neg_set = intersection_neg_set.intersection(tmp_neg_set)
        for f in glob.glob("tmp-slice*.txt"):
            os.remove(f)
        for f in glob.glob("tmp-legenda*.txt"):
            os.remove(f)
        for f in glob.glob("tmp-slicingrun*.txt"):
            os.remove(f)
        
        print()
        union_pos_set=sorted(union_pos_set)
        intersection_pos_set=sorted(intersection_pos_set)
        union_neg_set=sorted(union_neg_set)
        intersection_neg_set=sorted(intersection_neg_set)
        print("SET OF ENTITIES IN SLICED COMPUTATIONS FOR TARGET " + str(target) + ":")
        outfile.write("SET OF ENTITIES IN SLICED COMPUTATIONS FOR TARGET " + str(target) + ":\n")
        print("UNION POSITIVE: " + str(union_pos_set))
        outfile.write("         UNION POSITIVE: " + str(union_pos_set)+"\n")
        print("INTERSECTION POSITIVE: " + str(intersection_pos_set))
        outfile.write("         INTERSECTION POSITIVE: " + str(intersection_pos_set)+"\n\n")
        print("UNION NEGATIVE: " + str(union_neg_set))
        outfile.write("         UNION NEGATIVE: " + str(union_neg_set)+"\n")
        print("INTERSECTION NEGATIVE: " + str(intersection_neg_set))
        outfile.write("         INTERSECTION NEGATIVE: " + str(intersection_neg_set)+"\n\n")
        print()
        outfile.flush()
    
        generate_table(str(target.present),intersection_pos_set, union_pos_set, intersection_neg_set, union_neg_set)

    outfile.close()
    save_table_to_file("table_results.csv")


### Attractors Analysis
Before starting the analysis, uncomment directive

:- ["spec.pl"].

in BioReSolvePositive.pl

In [None]:
G = nx.DiGraph(nx.nx_pydot.read_dot("graph_complete_final.dot"))
print(len(G.nodes))
print(len(G.edges))

In [None]:
for target in targets:
    contexts, contexts_dict = target_computations(target)
    if (len(contexts)>0):
        df = pd.DataFrame(contexts).sort_values(by=[8,7,6,5,4,3,2,1,0],ignore_index=True)
        print("CONTEXTS THAT LEAD TO THE TARGET:")
        print(df)
        print()
        df.to_csv("contexts_to_" + str(target.present) + ".csv",index=False,header=False)

### Slicing Analysis -- Original T Cell Model -- Not Minimized
Before starting the analysis, uncomment directive

:- ["spec.pl"].

in BioReSolvePositive.pl

In [None]:
G = nx.DiGraph(nx.nx_pydot.read_dot("graph_complete_final.dot"))

In [None]:
table_data_df = pd.DataFrame()
slicing_analysis("original")

### Slicing Analysis -- Original T Cell Model -- Minimized
Before starting the analysis, uncomment directive

:- ["spec_Min.pl"].

in BioReSolvePositive.pl

In [None]:
G = nx.DiGraph(nx.nx_pydot.read_dot("graph_complete_final_Min.dot"))

In [None]:
table_data_df = pd.DataFrame()
slicing_analysis("original")

### Slicing Analysis -- Positive RS T Cell Model -- Not Minimized
Before starting the analysis, uncomment directive

:- ["spec_pos_nonMin.pl"].

in BioReSolvePositive.pl

In [None]:
G = nx.DiGraph(nx.nx_pydot.read_dot("graph_complete_final_pos_nonMin.dot"))

In [None]:
table_data_df = pd.DataFrame()
slicing_analysis("positive")

### Slicing Analysis -- Positive RS T Cell Model -- Minimized
Before starting the analysis, uncomment directive

:- ["spec_pos_Min.pl"].

in BioReSolvePositive.pl

In [None]:
G = nx.DiGraph(nx.nx_pydot.read_dot("graph_complete_final_pos_Min.dot"))

In [None]:
table_data_df = pd.DataFrame()
slicing_analysis("positive")

### Dynamic Negative Slicing Analysis -- Original T Cell Model -- Not Minimized

Before starting the analysis, uncomment directive

:- ["spec.pl"].

in BioReSolvePositive.pl

In [None]:
G = nx.DiGraph(nx.nx_pydot.read_dot("graph_complete_final.dot"))

In [None]:
table_data_df = pd.DataFrame()
dynamic_negative_slicing_analysis("original")