In [None]:
from collections import deque
from random import random, randint
from math import log

class OFCModel:
    def __init__(self, L: int, ALPHA: float, F_TH: float, f_0: list[list[float]], p: int) -> None:
        self.L = L
        self.ALPHA = ALPHA
        self.F_TH = F_TH
        self.f_0 = self._deepcopy_2d(f_0)
        self.p = p

        # model connections using an adjacency list
        self.adj: dict[tuple[int, int], list[tuple[int, int]]] = dict()

        # initially, connect each cell to the 4 orthogonally adjacent cells
        for i in range(L):
            for j in range(L):
                self.adj[(i,j)] = []
                for (di,dj) in ((-1,0),(1,0),(0,-1),(0,1)):
                    ni, nj = i+di, j+dj
                    if 0 <= ni < L and 0 <= nj < L:
                        self.adj[(i,j)].append((ni,nj))
    
    def _deepcopy_2d(self, f: list[list[float]]) -> list[list[float]]:
        return [[cell for cell in row] for row in f]
    
    def _deepcopy_list_tuple(self, lt: list[tuple[int, int]]):
        return [(i, j) for i, j in lt]

    # goes through each cell, and rewires them at a probability of p/100
    def rewire(self, adj: dict[tuple[int, int], list[tuple[int, int]]]):
        for i, j in adj:
            pi = randint(0, 99)

            # rewire a cell with a probability p/100
            if pi < self.p:
                index_to_change = randint(0, len(adj[(i, j)]) - 1)

                new_neighbour = (randint(0, self.L - 1), randint(0, self.L - 1))

                # do not connect a cell to itself
                while new_neighbour == (i, j):
                    new_neighbour = (randint(0, self.L - 1), randint(0, self.L - 1))

                adj[(i, j)][index_to_change] = new_neighbour
    
    # returns the max entry in f
    def get_max_cell(self, f: list[list[float]]):
        return max([max(row) for row in f])

    # gives a list[LxL], multiple calls allowed to simulate different alpha and f_th
    def simulate(self, STEPS: int):
        f = self._deepcopy_2d(self.f_0)
        adj = {(i, j): self._deepcopy_list_tuple(self.adj[(i, j)]) for i, j in self.adj}

        # Open boundary conditions
        for i in range(self.L):
            f[0][i] = 0
            f[self.L - 1][i] = 0
            f[i][0] = 0
            f[i][self.L - 1] = 0

        step_list: list[list[list[float]]] = [self._deepcopy_2d(f)] # add initial as a frame
        magnitudes: list[float] = [0]
        sizes: list[int] = [0]

        for step in range(STEPS):
            print(f"===== STEP {step} =====")
            active_queue = deque()

            max_cell = self.get_max_cell(f)
            
            delta = self.F_TH - max_cell

            # add delta to each cell. If f(i,j) goes over the threshold, it is considered active.
            for i in range(self.L):
                for j in range(self.L):
                    f[i][j] = min(self.F_TH, f[i][j] + delta)
                    if f[i][j] >= self.F_TH:
                        active_queue.append((i,j))

            # For each active cell, add force to its adjacent cells.
            # If the adjacent cells reach the threshold, they are considered active.
            # We define earthquake_size as the number of active cells.
            earthquake_size = 0
            while len(active_queue) > 0:
                (ui, uj) = active_queue.popleft()
                fi = f[ui][uj]

                for (vi, vj) in adj[(ui,uj)]:
                    if f[vi][vj] < self.F_TH:
                        f[vi][vj] = min(f[vi][vj]+self.ALPHA*fi, self.F_TH)
                        if f[vi][vj] >= self.F_TH:
                            active_queue.append((vi,vj))
                earthquake_size += 1

            m = 0

            if earthquake_size > 0:
                print(f"EARTHQUAKE OF SIZE {earthquake_size} OCCURRED")
                m = log(earthquake_size)
                print(f"Earthquake Magnitude {m}")
            
            # Relax the cells which became active
            for i in range(self.L):
                for j in range(self.L):
                    if f[i][j] >= self.F_TH:
                        f[i][j] = 0

            step_list.append(self._deepcopy_2d(f))
            sizes.append(earthquake_size)
            magnitudes.append(m)

            # rewiring at random
            self.rewire(adj)
        
        
        return step_list, magnitudes, sizes

In [None]:
L = 40
ALPHA = 0.21
F_TH = 1
STEPS = 2000
# set to random values initially
f: list[list[float]] = [[random() for _ in range(L)] for _ in range(L)]

ofc_random = OFCModel(L, ALPHA, F_TH, f, 2)
steps, magnitudes, sizes = ofc_random.simulate(STEPS)

print(max(magnitudes))

In [None]:
import plotly.graph_objects as go

def plot_frames(steps: list[list[list[float]]], magnitudes: list[float]):
    frames = [
        go.Frame(data=go.Heatmap(z=s, zmin=0, zmax=1), name=f"Step {i}, Magnitude {magnitudes[i]}")
        for i, s in enumerate(steps)
    ]

    return go.Figure(data=frames[0].data, frames=frames).update_layout(
        updatemenus=[
            {
                "buttons": [{"args": [None, {"frame": {"duration": 100, "redraw": True}}],
                            "label": "Play", "method": "animate",},
                            {"args": [[None],{"frame": {"duration": 0, "redraw": False},
                                            "mode": "immediate", "transition": {"duration": 0},},],
                            "label": "Pause", "method": "animate",},],
                "type": "buttons",
            }
        ],
        # iterate over frames to generate steps... NB frame name...
        sliders=[{"steps": [{"args": [[f.name],{"frame": {"duration": 0, "redraw": True},
                                                "mode": "immediate",},],
                            "label": f.name, "method": "animate",}
                            for f in frames],}],
        height=800,
        width=800,
        # yaxis={"title": 'callers'},
        # xaxis={"title": 'callees', "tickangle": 45, 'side': 'top'},
        title_x=0.5,
    )

In [None]:
plot_frames(steps, magnitudes)

In [None]:
import plotly.express as px
import pandas as pd
import numpy as np
from pprint import pprint

In [None]:
df_size =  pd.DataFrame.from_dict({
    'seismic_event': [-1] + [i for i in range(STEPS)],
    'size': sizes,
})

df_size

fig = px.bar(df_size, x='seismic_event', y='sizes')
fig.update_traces(marker_line_width=0)
fig.show()

In [None]:
# These are the square endpoints from USGS_dbscan.py
SQUARE_MIN_LAT = 7.4210
SQUARE_MAX_LAT = 9.8038
SQUARE_MIN_LON = 125.7307
SQUARE_MAX_LON = 128.1135

def lattice_to_lat_long_center(i: int, j: int):
    return (((SQUARE_MAX_LAT - SQUARE_MIN_LAT) / L) * i + SQUARE_MIN_LAT), (((SQUARE_MAX_LON - SQUARE_MIN_LON) / L) * j + SQUARE_MIN_LON)

print(lattice_to_lat_long_center(L, L))

In [None]:
df_buildings = pd.read_csv('buildings_lat_long.csv')
df_buildings.head()

In [None]:

# These are the square endpoints from USGS_dbscan.py
SQUARE_MIN_LAT = 7.4210
SQUARE_MAX_LAT = 9.8038
SQUARE_MIN_LON = 125.7307
SQUARE_MAX_LON = 128.1135

# This is the output file from USGS_dbscan.py with 202312Dataset.csv as input
INPUT_FILE = "202312Spacial.csv"

L = 100

# Load .csv file
df = pd.read_csv(INPUT_FILE)
points = df[['latitude', 'longitude', 'mag']].to_numpy()
lats = points[:, 0]
lons = points[:, 1]
mags = points[:, 2]

def discretize_axis(values, axis_min, axis_max):
  partial = (values - axis_min) / (axis_max - axis_min)
  idx = (partial * L).astype(int)
  return np.clip(idx, 0, L-1) # np.clip limits idx to (0, L-1). Treat edges as part of bordering cell.

mapped_rows = discretize_axis(lats, SQUARE_MIN_LAT, SQUARE_MAX_LAT)
mapped_cols = discretize_axis(lons, SQUARE_MIN_LON, SQUARE_MAX_LON)

# Initialize the L x L matrix
# This L x L matrix is a matrix of magnitudes. 
# If multiple earthquakes discretize into the same cell, get the mean.
mag_sum = np.zeros((L, L))
mag_count = np.zeros((L, L))
mag_matrix = np.zeros((L, L))

# Map all earthquakes to their cells
for r, c, m in zip(mapped_rows, mapped_cols, mags):
  mag_sum[r, c] += m
  mag_count[r, c] += 1

# Get means of magnitudes per cell
mask = mag_count > 0
mag_matrix[mask] = mag_sum[mask] / mag_count[mask]


fig = px.imshow(mag_matrix)

fig.show()

In [None]:
# Naively Scale Magnitude
from sklearn.preprocessing import normalize

norm = normalize(mag_matrix)
# True

fig = px.imshow(norm)

fig.show()

In [None]:
L = len(norm)
ALPHA = 0.21
F_TH = 1
STEPS = 1000
# set to random values initially

ofc_magnaive = OFCModel(L, ALPHA, F_TH, norm.tolist(), 2)
steps, magnitudes, sizes = ofc_magnaive.simulate(STEPS)

In [None]:
plot_frames(steps, magnitudes)