In [1]:
!pip install scikit-fuzzy

Collecting scikit-fuzzy
  Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl.metadata (2.6 kB)
Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl (920 kB)
   ---------------------------------------- 0.0/920.8 kB ? eta -:--:--
   ----------- ---------------------------- 262.1/920.8 kB ? eta -:--:--
   ---------------------- ----------------- 524.3/920.8 kB 1.4 MB/s eta 0:00:01
   ---------------------------------- ----- 786.4/920.8 kB 1.4 MB/s eta 0:00:01
   ---------------------------------------- 920.8/920.8 kB 870.1 kB/s  0:00:00
Installing collected packages: scikit-fuzzy
Successfully installed scikit-fuzzy-0.5.0


In [None]:
# ===========================================
# UNIFIED FUZZY EXPERT SYSTEM ‚Äì EARTHQUAKE DETECTION (GUI fix, fast build)
# ===========================================

# If running in a notebook for the first time:
# !pip install -q scikit-fuzzy ipywidgets

import numpy as np
import skfuzzy as fuzz
from skfuzzy import control as ctrl

# -------------------------------
# Utilities
# -------------------------------
def leer_float(prompt, minimo, maximo, default=None, allow_blank=False):
    """Read a float in [minimo, maximo]. If allow_blank and user sends empty, returns default."""
    while True:
        try:
            raw = input(prompt)
            if allow_blank and raw.strip() == "":
                if default is None:
                    print("Blank not allowed here. Please enter a number.")
                    continue
                return float(default)
            v = float(raw)
            if not (minimo <= v <= maximo):
                print(f"Value out of range [{minimo}‚Äì{maximo}]. Try again.")
                continue
            return v
        except Exception:
            print("Invalid input. Enter a valid number.")

def in_notebook():
    try:
        from IPython import get_ipython
        return get_ipython() is not None
    except Exception:
        return False

# -------------------------------
# Variables (Antecedents & Consequent)
# -------------------------------
aceleracion = ctrl.Antecedent(np.arange(0, 1.51, 0.01), 'aceleracion')   # g (0‚Äì1.5)
frecuencia  = ctrl.Antecedent(np.arange(0, 20.1, 0.1),  'frecuencia')    # Hz
duracion    = ctrl.Antecedent(np.arange(0, 60.1, 1),    'duracion')      # s
snr         = ctrl.Antecedent(np.arange(0, 30.1, 0.1),  'snr')           # dB
distancia   = ctrl.Antecedent(np.arange(0, 601, 1),     'distancia')     # km

# Finer consequent universe (avoid quantization artifacts for centroid)
amenaza     = ctrl.Consequent(np.arange(0, 10.01, 0.1), 'amenaza')       # 0‚Äì10, step 0.1
amenaza.defuzzify_method = 'centroid'  # explicit defuzzifier

# -------------------------------
# Memberships
# -------------------------------
# Core labels
aceleracion['baja']  = fuzz.trimf(aceleracion.universe, [0.0, 0.0, 0.3])
aceleracion['media'] = fuzz.trimf(aceleracion.universe, [0.2, 0.5, 0.8])
aceleracion['alta']  = fuzz.trimf(aceleracion.universe, [0.6, 1.0, 1.5])
# Keep one extreme and drop duplicates
aceleracion['muy_baja'] = fuzz.trapmf(aceleracion.universe, [0.0, 0.0, 0.02, 0.05])
aceleracion['muy_alta'] = fuzz.trapmf(aceleracion.universe, [1.0, 1.2, 1.5, 1.5])

frecuencia['baja']     = fuzz.trapmf(frecuencia.universe, [0, 0, 2, 5])
frecuencia['media']    = fuzz.trapmf(frecuencia.universe, [3, 6, 10, 13])
frecuencia['alta']     = fuzz.trapmf(frecuencia.universe, [11, 15, 20, 20])
frecuencia['muy_baja'] = fuzz.gaussmf(frecuencia.universe, 1.0, 0.5)
frecuencia['muy_alta'] = fuzz.smf(frecuencia.universe, 15, 18)

duracion['corta'] = fuzz.trimf(duracion.universe, [0, 0, 10])
duracion['media'] = fuzz.trimf(duracion.universe, [5, 20, 35])
duracion['larga'] = fuzz.trimf(duracion.universe, [30, 60, 60])

# SNR: keep detailed bins for possible plots, but use a single compact confidence term in rules
snr['pobre']       = fuzz.trapmf(snr.universe, [0.0, 0.0, 4.0, 6.0])
snr['marginal']    = fuzz.trimf( snr.universe, [6.0, 7.75, 9.5])
snr['operacional'] = fuzz.trimf( snr.universe, [9.5, 12.0, 14.0])
snr['aceptable']   = fuzz.trapmf(snr.universe, [8.0, 12.0, 18.0, 22.0])
snr['buena']       = fuzz.trimf( snr.universe, [14.0, 17.0, 20.0])
snr['excelente']   = fuzz.trapmf(snr.universe, [20.0, 23.0, 30.0, 30.0])
# Compact confidence term (used in rules)
snr['fiable']      = fuzz.smf(snr.universe, 10, 14)
snr['ruidosa']     = fuzz.zmf(snr.universe, 8, 12)  # not used in rules, but available

# Distance: original four + compact aggregates for rules
distancia['muy_cercana'] = fuzz.trapmf(distancia.universe, [0, 0, 50, 150])
distancia['cercana']     = fuzz.trimf(distancia.universe, [100, 200, 300])
distancia['moderada']    = fuzz.trimf(distancia.universe, [250, 375, 500])
distancia['lejana']      = fuzz.trapmf(distancia.universe, [450, 550, 600, 600])
# Aggregates to keep the rule base small
distancia['cerca']       = fuzz.trapmf(distancia.universe, [0, 0, 150, 320])
distancia['lejos']       = fuzz.trapmf(distancia.universe, [250, 430, 600, 600])

# Consequent linguistic sets
amenaza['sin_actividad'] = fuzz.trimf(amenaza.universe, [0, 0, 2])
amenaza['microtemblor']  = fuzz.trimf(amenaza.universe, [1, 3, 5])
amenaza['leve']          = fuzz.trimf(amenaza.universe, [4, 5, 6])
amenaza['moderado']      = fuzz.trimf(amenaza.universe, [5, 7, 8])
amenaza['fuerte']        = fuzz.trimf(amenaza.universe, [7, 9, 10])
amenaza['destructivo']   = fuzz.trimf(amenaza.universe, [9, 10, 10])

# -------------------------------
# Rule generation (lazy, cached) ‚Äî compact: 27 * 2 = 54 rules total
# -------------------------------
_CONTROL_CACHE = None       # ctrl.ControlSystem
_RULE_SPECS    = None       # list of tuples for explainability

orden_salidas = ['sin_actividad','microtemblor','leve','moderado','fuerte','destructivo']
idx_salida    = {name: i for i, name in enumerate(orden_salidas)}

niveles_acel_core = ['baja', 'media', 'alta']
niveles_freq_core = ['baja', 'media', 'alta']
niveles_dur       = ['corta', 'media', 'larga']
dist_groups       = ['lejos', 'cerca']   # aggregate groups used by rules
snr_group         = 'fiable'             # single confidence gate

def clamp_idx(i):
    return max(0, min(len(orden_salidas) - 1, i))

def base_output_core(a_base, f_base, d):
    """
    Core anchor mapping WITHOUT SNR or distance. Fix: (alta, alta, corta) ‚Üí 'fuerte' (not 'microtemblor').
    """
    if a_base == 'baja':
        if f_base == 'alta' and d == 'corta':
            return 'sin_actividad'
        else:
            return 'microtemblor'
    elif a_base == 'media':
        if f_base == 'alta':
            return 'leve'
        elif f_base == 'media' and d != 'corta':
            return 'moderado'
        else:
            return 'leve'
    elif a_base == 'alta':
        if f_base == 'alta' and d == 'corta':
            return 'fuerte'
        elif f_base == 'media' and d == 'corta':
            return 'leve'
        elif f_base == 'baja' and d == 'larga':
            return 'destructivo'
        elif f_base == 'baja':
            return 'fuerte'
        else:
            return 'moderado'
    else:
        return 'moderado'

# Distance adjustment on the base class
dist_adjust = {'lejos': -1, 'cerca': +1}

def build_rules_and_specs():
    """
    Build a compact backbone:
      (acel ‚àà {baja,media,alta}) √ó (freq ‚àà {baja,media,alta}) √ó (dur ‚àà {corta,media,larga})
      √ó (dist ‚àà {lejos,cerca})
    SNR is included as a single confidence antecedent snr['fiable'] for all rules.
    """
    rules = []
    specs = []  # (a,f,d, snr_group, dist_group, out_label)

    for a in niveles_acel_core:
        for f in niveles_freq_core:
            for d in niveles_dur:
                base = base_output_core(a, f, d)
                base_idx = idx_salida[base]
                for x in dist_groups:
                    adj_idx = clamp_idx(base_idx + dist_adjust[x])
                    out = orden_salidas[adj_idx]
                    antecedent = (aceleracion[a] & frecuencia[f] & duracion[d] &
                                  distancia[x] & snr[snr_group])
                    rules.append(ctrl.Rule(antecedent, amenaza[out]))
                    specs.append((a, f, d, snr_group, x, out))
    return rules, specs

def get_control_system():
    global _CONTROL_CACHE, _RULE_SPECS
    if _CONTROL_CACHE is None:
        rules, specs = build_rules_and_specs()
        system = ctrl.ControlSystem(rules)
        _CONTROL_CACHE = system
        _RULE_SPECS = specs
    return _CONTROL_CACHE

# -------------------------------
# Explainability helpers
# -------------------------------
def _interp_mu(term, x):
    """Interpolate membership degree for a given FuzzyVariable term at crisp x."""
    try:
        xs = term.parent.universe
        ys = term.mf
        if x <= xs[0]: return float(ys[0])
        if x >= xs[-1]: return float(ys[-1])
        return float(np.interp(x, xs, ys))
    except Exception:
        return 0.0

def explain_predict(acel, freq, dur, snr_db, dist_km, top_k=8):
    """Return top-k firing rules (min t-norm) with their strengths."""
    if _RULE_SPECS is None:
        get_control_system()

    # Crisp‚Üílinguistic membership degrees (only terms actually used by the rules)
    mu = {}
    for lbl in ['baja','media','alta','muy_baja','muy_alta']:
        if lbl in aceleracion.terms:
            mu[('acel', lbl)] = _interp_mu(aceleracion[lbl], acel)
    for lbl in ['baja','media','alta','muy_baja','muy_alta']:
        if lbl in frecuencia.terms:
            mu[('freq', lbl)] = _interp_mu(frecuencia[lbl], freq)
    for lbl in ['corta','media','larga']:
        mu[('dur', lbl)] = _interp_mu(duracion[lbl], dur)
    for lbl in [snr_group]:
        mu[('snr', lbl)] = _interp_mu(snr[lbl], snr_db)
    for lbl in dist_groups:
        mu[('dist', lbl)] = _interp_mu(distancia[lbl], dist_km)

    fired = []
    for (a, f, d, s, x, out) in _RULE_SPECS:
        s_val = min(
            mu.get(('acel', a), 0.0),
            mu.get(('freq', f), 0.0),
            mu.get(('dur', d), 0.0),
            mu.get(('snr', s), 0.0),
            mu.get(('dist', x), 0.0),
        )
        if s_val > 0.0:
            fired.append((
                s_val,
                f"IF acel is {a} AND freq is {f} AND dur is {d} AND dist is {x} AND snr is {s} "
                f"THEN amenaza is {out}"
            ))
    fired.sort(key=lambda t: t[0], reverse=True)
    return fired[:top_k]

# -------------------------------
# Programmatic API (+ uncertainty)
# -------------------------------
def _label_from_val(val):
    if val < 2:      return "No seismic activity"
    elif val < 4:    return "Microtremor"
    elif val < 6:    return "Mild earthquake"
    elif val < 8:    return "Moderate earthquake"
    elif val < 9.5:  return "Strong earthquake"
    else:            return "Destructive earthquake"

def predict(acel, freq, dur, snr_db, dist_km=300.0, return_explanation=False):
    """Compute crisp threat and label. If return_explanation=True, also returns top firing rules."""
    sim = ctrl.ControlSystemSimulation(get_control_system())  # lazy, cached
    sim.input['aceleracion'] = float(acel)
    sim.input['frecuencia']  = float(freq)
    sim.input['duracion']    = float(dur)
    sim.input['snr']         = float(snr_db)
    sim.input['distancia']   = float(dist_km)
    try:
        sim.compute()
        val = float(sim.output['amenaza'])
    except Exception as e:
        # Ensure GUI shows *something* instead of staying blank
        val = 0.0
    label = _label_from_val(val)
    if return_explanation:
        fired = explain_predict(acel, freq, dur, snr_db, dist_km, top_k=8)
        return val, label, fired
    return val, label

def predict_mc(acel, freq, dur, snr_db, dist_km=300.0,
               sigmas=(0.03, 0.5, 1.0, 0.8, 10.0), draws=200):
    """Simple uncertainty propagation via Monte Carlo."""
    rng = np.random.default_rng(42)
    vals = []
    classes = []
    for _ in range(int(draws)):
        a = max(0.0, min(1.5, rng.normal(acel,   sigmas[0])))
        f = max(0.0, min(20., rng.normal(freq,   sigmas[1])))
        d = max(0.0, min(60., rng.normal(dur,    sigmas[2])))
        s = max(0.0, min(30., rng.normal(snr_db, sigmas[3])))
        x = max(0.0, min(600.,rng.normal(dist_km,sigmas[4])))
        v, lab = predict(a, f, d, s, x)
        vals.append(v)
        classes.append(lab)
    vals = np.array(vals, dtype=float)
    hist = {}
    for lab in set(classes):
        hist[lab] = classes.count(lab) / float(draws)
    return {
        "mean": float(vals.mean()),
        "std": float(vals.std(ddof=1) if len(vals) > 1 else 0.0),
        "q05": float(np.quantile(vals, 0.05)),
        "q95": float(np.quantile(vals, 0.95)),
        "class_probs": hist
    }

# -------------------------------
# CLI + Notebook UI (robust output on click)
# -------------------------------
def run_cli():
    val_acel = leer_float("Enter ground acceleration (g, 0‚Äì1.5): ", 0.0, 1.5)
    val_freq = leer_float("Enter dominant frequency (Hz, 0‚Äì20): ", 0.0, 20.0)
    val_dur  = leer_float("Enter event duration (s, 0‚Äì60): ", 0.0, 60.0)
    val_dist = leer_float("Enter distance to plate boundary (km, 0‚Äì600) [Enter=300]: ",
                          0.0, 600.0, default=300.0, allow_blank=True)
    val_snr  = leer_float("Enter SNR (dB, 0‚Äì30): ", 0.0, 30.0)
    val, label = predict(val_acel, val_freq, val_dur, val_snr, val_dist)
    print(f"\nüîç Seismic threat level: {val:.2f} / 10")
    print(f"‚û°Ô∏è Classification: {label}")

def notebook_ui():
    from IPython.display import display
    import ipywidgets as W
    a = W.FloatSlider(description='a (g)', min=0, max=1.5, step=0.01, value=0.6)
    f = W.FloatSlider(description='f (Hz)', min=0, max=20,  step=0.1, value=4.0)
    d = W.IntSlider(  description='dur (s)', min=0, max=60,  step=1,   value=20)
    s = W.FloatSlider(description='SNR (dB)', min=0, max=30, step=0.1, value=18.0)
    x = W.IntSlider(  description='dist (km)', min=0, max=600, step=1, value=300)
    out = W.Output()
    btn = W.Button(description='Compute', button_style='primary')

    def on_click(_):
        with out:
            out.clear_output()
            try:
                val, label, fired = predict(a.value, f.value, d.value, s.value, x.value, return_explanation=True)
                print(f"Threat: {val:.2f} / 10 ‚Äî {label}")
                print("\nTop firing rules:")
                for strength, text in fired:
                    print(f"  Œº={strength:.3f} :: {text}")
            except Exception as e:
                import traceback
                print("‚ö†Ô∏è Error during compute:", e)
                traceback.print_exc()

    btn.on_click(on_click)
    display(W.VBox([W.HBox([a, f]), W.HBox([d, s, x]), btn, out]))

# Entry point: light UI first; rules build fast on first compute
if in_notebook():
    try:
        notebook_ui()
    except Exception:
        val, label = predict(0.6, 4.0, 25, 18.0, 120.0)
        print(f"Demo ‚Üí Threat: {val:.2f} / 10 | {label}")
else:
    run_cli()


VBox(children=(HBox(children=(FloatSlider(value=0.6, description='a (g)', max=1.5, step=0.01), FloatSlider(val‚Ä¶