<a href="https://colab.research.google.com/github/AryanPROFFESOR/AryanPROFFESOR/blob/main/skeleton_neuron_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [29]:
# === Cell 0: Dependencies & globals (Python 3 / Google Colab) ===

# Install required packages
!pip install -q pandas numpy scipy vcfpy

# Imports
import numpy as np
import pandas as pd
import scipy.stats as stats

from typing import Dict, List, Tuple, Any

import vcfpy

print("All dependencies installed successfully.")


All dependencies installed successfully.


In [30]:
# === Cell 0: Install dependencies (REQUIRED) ===
!pip install -q vcfpy pandas numpy scipy


In [31]:
# === Cell 1: Module 0A — VariantParser ===

import vcfpy
import pandas as pd
from typing import Dict, List, Any

class VariantParser:
    """
    Parse VCF files and annotate epilepsy-related genes.
    EXACTLY as defined in markdown, with validation guards.
    """

    EPILEPSY_GENES = {
        'Na': ['SCN1A', 'SCN2A', 'SCN8A', 'SCN9A'],
        'K': ['KCNQ2', 'KCNQ3', 'KCNA1', 'KCNT1', 'KCNB1'],
        'Ca': ['CACNA1A', 'CACNA1G', 'CACNB4'],
        'Cl': ['GABRG2', 'GABRA1', 'SLC6A1'],
        'Gap': ['GJA1', 'GJB1'],
        'Synapse': ['PCDH19', 'STXBP1', 'CNTNAP2']
    }

    def __init__(self, vcf_path: str):
        self.vcf_path = vcf_path
        self.variants = self._load_vcf()

    def _load_vcf(self) -> List[Dict[str, Any]]:
        """Load and parse VCF file."""
        variants = []
        reader = vcfpy.Reader.from_path(self.vcf_path)

        for record in reader:
            ann = record.INFO.get('ANN', [''])[0]
            variants.append({
                'CHROM': record.CHROM,
                'POS': record.POS,
                'REF': str(record.REF),
                'ALT': str(record.ALT[0].value) if record.ALT else None,
                'QUAL': record.QUAL,
                'ANN': ann,
                'GENE': self._extract_gene(ann)
            })
        return variants

    def _extract_gene(self, ann_field: str) -> str:
        """
        Conservative gene extraction from ANN field.
        ANN format varies; this avoids false positives.
        """
        for genes in self.EPILEPSY_GENES.values():
            for g in genes:
                if g in ann_field:
                    return g
        return "UNKNOWN"

    def filter_epilepsy_genes(self) -> pd.DataFrame:
        """Filter to epilepsy-related genes only."""
        df = pd.DataFrame(self.variants)
        df = df[df['GENE'] != "UNKNOWN"].reset_index(drop=True)
        return df

    def annotate_impact(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Add functional impact predictions.
        Placeholders preserved exactly as markdown.
        """
        df['IMPACT_SIFT'] = self._predict_sift(df)
        df['IMPACT_POLYPHEN'] = self._predict_polyphen(df)
        df['IMPACT_CADD'] = self._predict_cadd(df)
        return df

    def _predict_sift(self, df: pd.DataFrame) -> List[float]:
        return [0.5] * len(df)

    def _predict_polyphen(self, df: pd.DataFrame) -> List[float]:
        return [0.5] * len(df)

    def _predict_cadd(self, df: pd.DataFrame) -> List[float]:
        return [15.0] * len(df)


In [32]:
# === Cell 2: Module 0B — ProteinStructureAnalyzer ===

from scipy.optimize import minimize
from typing import Dict, Any, Tuple

class ProteinStructureAnalyzer:
    """
    Evaluate structural feasibility of variants.
    Implements ALL markdown constraints without removing placeholders.
    """

    def __init__(self, gene_id: str, variant: Dict[str, Any]):
        self.gene_id = gene_id
        self.variant = variant
        self.wt_structure = self._fetch_alphafold_structure(gene_id)

    def _fetch_alphafold_structure(self, gene_id: str):
        """
        Placeholder — structural fetch.
        Preserved exactly as markdown.
        """
        return None

    def calculate_folding_energy(self) -> Tuple[float, float, float]:
        """
        Calculate folding energy difference.
        Returns (ΔG_wt, ΔG_variant, ΔΔG)
        """
        dg_wt = -8.5     # kcal/mol (example)
        dg_variant = -6.2
        delta_delta_g = dg_variant - dg_wt
        return dg_wt, dg_variant, delta_delta_g

    def check_membrane_insertion(self) -> bool:
        """
        Membrane insertion constraint.
        Preserved as boolean gate.
        """
        return True

    def evaluate_gating_motif(self) -> Dict[str, float]:
        """
        Structural integrity scores ∈ [0,1]
        """
        return {
            'voltage_sensor_integrity': 0.95,
            'inactivation_domain_integrity': 0.88,
            'pore_domain_integrity': 0.92
        }

    def filter_pass_feasibility(self) -> bool:
        """
        EXACT feasibility cascade from markdown.
        No shortcuts, no relaxation.
        """

        _, _, delta_delta_g = self.calculate_folding_energy()
        if delta_delta_g > 3.0:
            return False

        if not self.check_membrane_insertion():
            return False

        motifs = self.evaluate_gating_motif()
        for score in motifs.values():
            if score < 0.85:
                return False

        return True


In [33]:
# === Cell 3: Combined Layer 0 Execution ===

import pandas as pd

def run_layer_0(vcf_path: str) -> pd.DataFrame:
    """
    Executes Module 0A + 0B exactly as pipeline expects.
    """

    parser = VariantParser(vcf_path)
    df = parser.filter_epilepsy_genes()
    df = parser.annotate_impact(df)

    feasible_variants = []

    for _, row in df.iterrows():
        analyzer = ProteinStructureAnalyzer(row['GENE'], row.to_dict())
        if analyzer.filter_pass_feasibility():
            feasible_variants.append(row)

    return pd.DataFrame(feasible_variants)


In [34]:
# === Cell 5: Module 1A — Variant Feasibility Filter ===

class VariantFeasibilityFilter:
    """
    Module 1A
    Purpose:
    - Accepts output of Layer 0
    - Applies NO new rules
    - Only enforces that structural feasibility == True
    """

    def __init__(self, feasible_variants_df):
        self.df = feasible_variants_df

    def pass_variants(self):
        """
        Markdown specifies:
        - Variants passed from Module 0B
        - No additional rejection logic
        """
        return self.df.copy()


In [35]:
# === Cell 6: Module 1B — Markov Channel State Solver ===

import numpy as np
from scipy.linalg import null_space

class MarkovChannelSolver:
    """
    Implements:
    C ↔ O ↔ I Markov model
    As specified in markdown.
    """

    def __init__(self, gene, temperature_celsius=37.0):
        self.gene = gene
        self.T = 273.15 + temperature_celsius
        self.R = 8.314
        self.F = 96485

        # Base WT parameters (placeholders exactly as markdown)
        self.params = {
            'k0_CO': 0.5,
            'k0_OC': 0.2,
            'k0_OI': 0.1,
            'k0_IO': 0.05,
            'Ea_CO': 35000,
            'Ea_OC': 40000,
            'Ea_OI': 38000,
            'Ea_IO': 42000,
            'z_CO': 0.8,
            'z_OC': -0.3,
            'z_OI': 0.5,
            'z_IO': -0.2
        }

    def rate(self, k0, Ea, z, V):
        return k0 * np.exp(-Ea / (self.R * self.T)) * np.exp(z * self.F * V / (self.R * self.T))

    def build_Q(self, V, variant_effect=None):
        """
        Build transition matrix Q.
        Variant effects are multiplicative, as per markdown.
        """

        p = self.params.copy()

        if variant_effect == "loss-of-function":
            p['k0_OI'] *= 1.4
            p['k0_IO'] *= 0.7
        elif variant_effect == "gain-of-function":
            p['k0_OI'] *= 0.6
            p['k0_IO'] *= 1.3

        k_CO = self.rate(p['k0_CO'], p['Ea_CO'], p['z_CO'], V)
        k_OC = self.rate(p['k0_OC'], p['Ea_OC'], p['z_OC'], V)
        k_OI = self.rate(p['k0_OI'], p['Ea_OI'], p['z_OI'], V)
        k_IO = self.rate(p['k0_IO'], p['Ea_IO'], p['z_IO'], V)

        Q = np.array([
            [-k_CO,          k_CO,        0.0],
            [ k_OC, -(k_OC + k_OI),       k_OI],
            [ 0.0,            k_IO,     -k_IO]
        ])

        return Q

    def steady_state_open_probability(self, V, variant_effect=None):
        """
        Solve Qᵀ p = 0
        """
        Q = self.build_Q(V, variant_effect)
        ns = null_space(Q.T)
        p = ns[:, 0]
        p /= np.sum(p)
        return float(p[1])


In [36]:
# === Cell 7: Module 1C — HH Parameter Generator ===

import numpy as np
from scipy.interpolate import UnivariateSpline

class HHParameterGenerator:
    """
    Converts Markov open probability into HH parameters.
    """

    def __init__(self, markov_solver, variant_effect=None):
        self.markov = markov_solver
        self.variant_effect = variant_effect
        self.V = np.linspace(-90e-3, 50e-3, 300)

    def generate(self):
        p_open = np.array([
            self.markov.steady_state_open_probability(V, self.variant_effect)
            for V in self.V
        ])

        spline = UnivariateSpline(self.V * 1e3, p_open - 0.5, s=0)
        V_half = float(spline.roots()[0])

        gbar = 120 * np.max(p_open)

        return {
            'gbar': gbar,
            'V_half_m': V_half,
            'p_open_curve': p_open
        }


In [37]:
# === Cell 8: Run Layer 1 ===

def run_layer_1(feasible_variants_df):
    """
    Executes Module 1A → 1B → 1C
    """

    feasibility = VariantFeasibilityFilter(feasible_variants_df)
    variants = feasibility.pass_variants()

    hh_param_sets = []

    for _, row in variants.iterrows():
        solver = MarkovChannelSolver(row['GENE'])
        hh_gen = HHParameterGenerator(
            solver,
            variant_effect="gain-of-function"  # placeholder as per markdown
        )
        hh_param_sets.append(hh_gen.generate())

    return hh_param_sets


In [38]:
# === Cell 9: Module 2A — Hodgkin–Huxley Dynamics Solver ===

import numpy as np
from scipy.integrate import odeint

class HodgkinHuxleyModel:
    """
    Classic HH neuron model.
    No population, no noise, no bursting.
    """

    def __init__(self, params):
        self.Cm = 1.0
        self.gNa = params['gbar']
        self.gK = 36.0
        self.gL = 0.3
        self.ENa = 50.0
        self.EK = -77.0
        self.EL = -54.4

    def alpha_m(self, V): return 0.1*(V+40)/(1-np.exp(-(V+40)/10))
    def beta_m(self, V):  return 4*np.exp(-(V+65)/18)

    def alpha_h(self, V): return 0.07*np.exp(-(V+65)/20)
    def beta_h(self, V):  return 1/(1+np.exp(-(V+35)/10))

    def alpha_n(self, V): return 0.01*(V+55)/(1-np.exp(-(V+55)/10))
    def beta_n(self, V):  return 0.125*np.exp(-(V+65)/80)

    def derivatives(self, y, t, I):
        V, m, h, n = y

        INa = self.gNa * m**3 * h * (V - self.ENa)
        IK  = self.gK * n**4 * (V - self.EK)
        IL  = self.gL * (V - self.EL)

        dVdt = (I - INa - IK - IL) / self.Cm
        dmdt = self.alpha_m(V)*(1-m) - self.beta_m(V)*m
        dhdt = self.alpha_h(V)*(1-h) - self.beta_h(V)*h
        dndt = self.alpha_n(V)*(1-n) - self.beta_n(V)*n

        return [dVdt, dmdt, dhdt, dndt]

    def simulate(self, I, t):
        y0 = [-65, 0.05, 0.6, 0.32]
        sol = odeint(self.derivatives, y0, t, args=(I,))
        return sol


In [39]:
# === Cell 10: Module 2B — Morris–Lecar Analyzer ===

import numpy as np
from scipy.optimize import fsolve

class MorrisLecarModel:
    """
    Reduced 2D excitability model.
    Used ONLY for bifurcation and stability.
    """

    def __init__(self):
        self.gCa = 4.4
        self.gK = 8.0
        self.gL = 2.0
        self.VCa = 120
        self.VK = -84
        self.VL = -60

    def m_inf(self, V):
        return 0.5 * (1 + np.tanh((V + 1) / 15))

    def w_inf(self, V):
        return 0.5 * (1 + np.tanh((V - 10) / 14))

    def tau_w(self, V):
        return 1 / np.cosh((V - 10) / 28)

    def equations(self, vars, I):
        V, w = vars
        dV = I - self.gCa*self.m_inf(V)*(V-self.VCa) \
               - self.gK*w*(V-self.VK) \
               - self.gL*(V-self.VL)
        dw = (self.w_inf(V) - w) / self.tau_w(V)
        return [dV, dw]

    def find_fixed_point(self, I):
        return fsolve(self.equations, [-60, 0.2], args=(I,))


In [40]:
# === Cell 11: Module 2C — Hindmarsh–Rose Bursting Model ===

import numpy as np
from scipy.integrate import odeint

class HindmarshRoseModel:
    """
    Bursting neuron model.
    Independent of HH and ML.
    """

    def equations(self, state, t, I):
        x, y, z = state
        dx = y - x**3 + 3*x**2 + I - z
        dy = 1 - 5*x**2 - y
        dz = 0.01 * (4*(x + 1.6) - z)
        return [dx, dy, dz]

    def simulate(self, I, t):
        y0 = [-1.6, 1.0, 3.0]
        sol = odeint(self.equations, y0, t, args=(I,))
        return sol


In [41]:
# === Cell 12: Run Layer 2 ===

def run_layer_2(hh_param_sets):
    """
    Executes:
    - HH dynamics
    - Morris–Lecar fixed point
    - Hindmarsh–Rose bursting
    """

    results = []

    t = np.linspace(0, 50, 5000)

    for params in hh_param_sets:
        hh = HodgkinHuxleyModel(params)
        hh_trace = hh.simulate(I=10, t=t)

        ml = MorrisLecarModel()
        ml_fp = ml.find_fixed_point(I=10)

        hr = HindmarshRoseModel()
        hr_trace = hr.simulate(I=3.0, t=t)

        results.append({
            'HH_trace': hh_trace,
            'ML_fixed_point': ml_fp,
            'HR_trace': hr_trace
        })

    return results


In [42]:
# === Cell 13: Module 3A — Wilson–Cowan Population Solver ===

import numpy as np
from scipy.integrate import odeint

class WilsonCowanModel:
    """
    Excitatory / Inhibitory population dynamics.
    """

    def __init__(self):
        self.wEE = 10.0
        self.wEI = 12.0
        self.wIE = 10.0
        self.wII = 2.0
        self.thetaE = 3.0
        self.thetaI = 3.5
        self.tauE = 1.0
        self.tauI = 2.0

    def S(self, x):
        return 1 / (1 + np.exp(-x))

    def equations(self, y, t, PE, PI):
        E, I = y

        dE = (-E + self.S(self.wEE*E - self.wEI*I - self.thetaE + PE)) / self.tauE
        dI = (-I + self.S(self.wIE*E - self.wII*I - self.thetaI + PI)) / self.tauI

        return [dE, dI]

    def simulate(self, PE=0.5, PI=0.0, t=None):
        if t is None:
            t = np.linspace(0, 50, 5000)

        y0 = [0.1, 0.1]
        sol = odeint(self.equations, y0, t, args=(PE, PI))
        return sol


In [43]:
# === Cell 14: Module 3B — Synaptic Coupling Module ===

import numpy as np

class Synapse:
    """
    Simple synaptic current model.
    """

    def __init__(self, tau=5.0, gmax=1.0):
        self.tau = tau
        self.gmax = gmax

    def current(self, t, pre_spike_times):
        I = 0.0
        for ts in pre_spike_times:
            if t >= ts:
                I += self.gmax * np.exp(-(t - ts) / self.tau)
        return I


In [44]:
# === Cell 15: Module 3C — Kuramoto Synchronization Model ===

import numpy as np

class KuramotoModel:
    """
    Phase synchronization model.
    """

    def __init__(self, omega, K):
        self.omega = omega
        self.K = K
        self.N = len(omega)

    def equations(self, theta, t):
        dtheta = np.zeros(self.N)
        for i in range(self.N):
            coupling = 0.0
            for j in range(self.N):
                coupling += np.sin(theta[j] - theta[i])
            dtheta[i] = self.omega[i] + (self.K / self.N) * coupling
        return dtheta


In [45]:
# === Cell 16: Run Layer 3 ===

def run_layer_3():
    """
    Executes population and synchronization models.
    """

    t = np.linspace(0, 50, 5000)

    wc = WilsonCowanModel()
    wc_sol = wc.simulate(t=t)

    syn = Synapse()
    syn_current = syn.current(t=10.0, pre_spike_times=[5.0, 7.0, 9.0])

    omega = np.random.normal(1.0, 0.1, 20)
    kuramoto = KuramotoModel(omega=omega, K=1.5)
    theta0 = np.random.uniform(0, 2*np.pi, 20)

    return {
        'WilsonCowan': wc_sol,
        'SynapticCurrent': syn_current,
        'KuramotoInitialPhases': theta0
    }


In [46]:
# === Cell 17: Module 4A — Network Graph Constructor ===

import networkx as nx
import numpy as np

class NetworkTopology:
    """
    Construct network graphs and adjacency matrices.
    """

    def __init__(self, n_nodes=100):
        self.n_nodes = n_nodes

    def small_world(self, k=6, p=0.3):
        G = nx.watts_strogatz_graph(self.n_nodes, k, p)
        return G

    def scale_free(self):
        G = nx.barabasi_albert_graph(self.n_nodes, m=3)
        return G

    def adjacency_matrix(self, G):
        return nx.to_numpy_array(G)


In [47]:
# === Cell 18: Module 4B — Jacobian Stability Analyzer ===

import numpy as np

class JacobianStabilityAnalyzer:
    """
    Analyze network stability via Jacobian eigenvalues.
    """

    def __init__(self, adjacency_matrix):
        self.A = adjacency_matrix

    def compute_eigenvalues(self):
        return np.linalg.eigvals(self.A)

    def max_real_eigenvalue(self):
        eigvals = self.compute_eigenvalues()
        return np.max(np.real(eigvals))


In [48]:
# === Cell 19: Module 4C — Seizure Criticality Detector ===

class SeizureCriticalityDetector:
    """
    Detect proximity to seizure criticality.
    """

    def __init__(self, lambda_max):
        self.lambda_max = lambda_max

    def is_critical(self):
        if self.lambda_max > 1.0:
            return True
        return False


In [49]:
# === Cell 20: Run Layer 4 ===

def run_layer_4():
    """
    Executes network construction and stability analysis.
    """

    topo = NetworkTopology(n_nodes=100)
    G = topo.small_world()
    A = topo.adjacency_matrix(G)

    stability = JacobianStabilityAnalyzer(A)
    lambda_max = stability.max_real_eigenvalue()

    detector = SeizureCriticalityDetector(lambda_max)

    return {
        'AdjacencyMatrix': A,
        'LambdaMax': lambda_max,
        'IsCritical': detector.is_critical()
    }


In [50]:
# === Cell 21: Module 5A — Cross-Model Consistency Checker ===

class CrossModelConsistencyChecker:
    """
    Check consistency across multiple model outputs.
    """

    def __init__(self, layer2_results, layer3_results, layer4_results):
        self.layer2 = layer2_results
        self.layer3 = layer3_results
        self.layer4 = layer4_results

    def evaluate(self):
        """
        Majority agreement rule.
        """

        flags = []

        for res in self.layer2:
            hh_activity = res['HH_trace'][:, 0]
            if hh_activity.max() > 0:
                flags.append(True)
            else:
                flags.append(False)

        wc_activity = self.layer3['WilsonCowan'][:, 0]
        flags.append(wc_activity.max() > 0.5)

        flags.append(self.layer4['IsCritical'])

        return sum(flags) >= (len(flags) // 2 + 1)


In [51]:
# === Cell 22: Module 5B — Risk Stratification Engine ===

import numpy as np

class RiskStratificationEngine:
    """
    Aggregate outputs into a single risk score.
    """

    def __init__(self, consistency_flag):
        self.consistency_flag = consistency_flag

    def compute_risk(self):
        """
        Logistic aggregation.
        """
        x = 1.0 if self.consistency_flag else -1.0
        risk = 1 / (1 + np.exp(-x))
        return risk


In [52]:
# === Cell 23: Module 5C — Clinical Interpretation Engine ===

class ClinicalInterpretationEngine:
    """
    Generate final clinical interpretation.
    """

    def __init__(self, risk_score):
        self.risk_score = risk_score

    def interpret(self):
        if self.risk_score > 0.7:
            return "High seizure risk"
        elif self.risk_score > 0.4:
            return "Moderate seizure risk"
        else:
            return "Low seizure risk"


In [53]:
# === Cell 24: Run Layer 5 (Final Integration) ===

def run_layer_5(layer2_results, layer3_results, layer4_results):
    """
    Executes final integration and interpretation.
    """

    consistency = CrossModelConsistencyChecker(
        layer2_results,
        layer3_results,
        layer4_results
    ).evaluate()

    risk_engine = RiskStratificationEngine(consistency)
    risk_score = risk_engine.compute_risk()

    interpreter = ClinicalInterpretationEngine(risk_score)
    interpretation = interpreter.interpret()

    return {
        'ConsistencyFlag': consistency,
        'RiskScore': risk_score,
        'Interpretation': interpretation
    }
