In [None]:
import math, random
from collections import deque
import pandas as pd

# Reproducibility
random.seed(42)

DT = 2.5                         # seconds between position updates
T_START = 0.0                    # seconds
T_END   = 43200.0                # seconds (12 hours)
CENTER_LON, CENTER_LAT = -121.7493613, 38.5411082  # center of monitored area
AREA_HALF = 25.0                 # 50m x 50m area -> half-width = 25 m
# Normal-state population bounds (uniform over time)
MIN_PEOPLE, MAX_PEOPLE = 50, 150

# Speed bands (meters/second)
RUN_MIN, RUN_MAX   = 2.0, 2.6
WALK_MIN, WALK_MAX = 1.0, 1.3
STILL_SPEED        = 0.0

# Normal state ratio ~ 70:28:2 (run:walk:still)
RATIO_RUN, RATIO_WALK, RATIO_STILL = 0.70, 0.28, 0.02

# Gunshot window & scripted events (seconds)
T_GUNSHOT_START = 25202.5
T_GUNSHOT_PHASE2 = 25205.0
T_GUNSHOT_ALL_RUN = 25207.5
T_GUNSHOT_END = 25227.5

# During gunshot window, there are EXACTLY 100 people inside (excluding shooter)
GUNSHOT_POP = 100

# Small angular noise (radians) to avoid perfectly straight lines
HEADING_JITTER_NORMAL = math.radians(5)
HEADING_JITTER_AWAY   = math.radians(12)

# Output CSV name (exact columns required by PDF)
OUT_CSV = "expanded_gunshot_sim.csv"


In [None]:
# Convert meters to degrees at the latitude we simulate (simple/effective for small areas)
def meters_per_degree(lat_deg: float):
    # Based on equirectangular approximation (fine for < few km)
    lat = math.radians(lat_deg)
    m_per_deg_lat = 111_132.954 - 559.822 * math.cos(2*lat) + 1.175 * math.cos(4*lat)
    m_per_deg_lon = (math.pi/180) * 6_378_137.0 * math.cos(lat)
    return m_per_deg_lat, m_per_deg_lon

M_PER_DEG_LAT, M_PER_DEG_LON = meters_per_degree(CENTER_LAT)

def meters_to_latlon(dx_m, dy_m, center_lat, center_lon):
    dlat = dy_m / M_PER_DEG_LAT
    dlon = dx_m / M_PER_DEG_LON
    return center_lat + dlat, center_lon + dlon

def luhn_checksum14(num_str14):
    s = 0
    reverse_digits = list(map(int, num_str14[::-1]))
    for i, d in enumerate(reverse_digits):
        if i % 2 == 0:
            s += d
        else:
            dbl = d * 2
            s += dbl if dbl < 10 else dbl - 9
    return (10 - (s % 10)) % 10

next_imei_serial = 10000000000000  # 14 digits start

def generate_imei():
    global next_imei_serial
    base14 = str(next_imei_serial)
    next_imei_serial += 1
    cksum = luhn_checksum14(base14)
    return base14 + str(cksum)


In [None]:
def random_edge_spawn():
    side = random.choice(['left','right','top','bottom'])
    if side == 'left':
        x = -AREA_HALF
        y = random.uniform(-AREA_HALF, AREA_HALF)
        heading_center = math.atan2(0 - y, 0 - x)  # toward center
    elif side == 'right':
        x = AREA_HALF
        y = random.uniform(-AREA_HALF, AREA_HALF)
        heading_center = math.atan2(0 - y, 0 - x)
    elif side == 'top':
        y = AREA_HALF
        x = random.uniform(-AREA_HALF, AREA_HALF)
        heading_center = math.atan2(0 - y, 0 - x)
    else:  # bottom
        y = -AREA_HALF
        x = random.uniform(-AREA_HALF, AREA_HALF)
        heading_center = math.atan2(0 - y, 0 - x)
    # Nudge heading around "toward center" so they don't all aim exactly at center
    heading = heading_center + random.uniform(-HEADING_JITTER_NORMAL, HEADING_JITTER_NORMAL)
    return x, y, heading

def choose_state_for_normal():
    r = random.random()
    if r < RATIO_RUN:
        return 'run'
    elif r < RATIO_RUN + RATIO_WALK:
        return 'walk'
    else:
        return 'still'

def speed_for_state(state):
    if state == 'run' or state == 'run_away':
        return random.uniform(RUN_MIN, RUN_MAX)
    if state == 'walk' or state == 'walk_away':
        return random.uniform(WALK_MIN, WALK_MAX)
    return STILL_SPEED  # 'still' or 'dead'

def clamp_to_area(x, y):
    # Keep inside (used during gunshot window to maintain exactly 100 inside)
    return max(-AREA_HALF, min(AREA_HALF, x)), max(-AREA_HALF, min(AREA_HALF, y))

def step_person(person, dt, t_now, gunshot_window=False):
    st = person['state']
    x, y, h, v = person['x'], person['y'], person['heading'], person['speed']

    if st in ('still','dead'):
        # no movement
        pass

    elif st in ('run','walk'):
        # Normal motion with slight random drift in heading
        h += random.uniform(-HEADING_JITTER_NORMAL, HEADING_JITTER_NORMAL)
        x += v * math.cos(h) * dt
        y += v * math.sin(h) * dt

    elif st in ('run_away','walk_away'):
        away_angle = math.atan2(y, x)
        h = away_angle + random.uniform(-HEADING_JITTER_AWAY, HEADING_JITTER_AWAY)
        x += v * math.cos(h) * dt
        y += v * math.sin(h) * dt

    # During the gunshot window we FORCE everyone to remain inside (exactly 100 inside)
    if gunshot_window:
        x, y = clamp_to_area(x, y)

    # Write back
    person['x'], person['y'], person['heading'] = x, y, h

def distance_from_center(person):
    return math.hypot(person['x'], person['y'])


In [None]:
# Active population and retired IDs (phones that left never reappear)
active = {}
retired_ids = set()

def spawn_one(now_t):
    pid = generate_imei()
    x, y, heading = random_edge_spawn()
    st = choose_state_for_normal()
    v = speed_for_state(st)
    active[pid] = {
        'id': pid, 'x': x, 'y': y,
        'heading': heading, 'speed': v,
        'state': st, 'born_t': now_t, 'exited': False
    }

def try_exit(person):
    # Return True if person has left the area (normal state), else False.
    return (person['x'] < -AREA_HALF or person['x'] > AREA_HALF or
            person['y'] < -AREA_HALF or person['y'] > AREA_HALF)

def maintain_normal_population(now_t, target_min=MIN_PEOPLE, target_max=MAX_PEOPLE):
    # Gently guide population into [target_min, target_max].
    n = len(active)
    # If too low, spawn a few; if too high, some will walk out naturally.
    if n < target_min:
        need = target_min - n
        for _ in range(need):
            spawn_one(now_t)
    elif n > target_max:
        # do nothing special: movers will exit on their own
        pass
    else:
        # Small random chance to spawn/none to keep uniform counts over time
        if random.random() < 0.05:
            spawn_one(now_t)

def adjust_states_toward_ratio():
    # Tiny probability to change state so the mix hovers near 70:28:2 (normal only).
    for p in active.values():
        if p['state'] in ('dead','run_away','walk_away'):
            continue
        # With small prob, re-draw a state to keep global mix near target
        if random.random() < 0.02:
            st = choose_state_for_normal()
            p['state'] = st
            p['speed'] = speed_for_state(st)

def force_population_to_exact_100(now_t):
    # At the boundary of gunshot start, make active count exactly 100 (smoothly as possible).
    n = len(active)
    if n < GUNSHOT_POP:
        need = GUNSHOT_POP - n
        for _ in range(need):
            # Spawn near edges then step once to gently bring them inside
            spawn_one(now_t)
    elif n > GUNSHOT_POP:
        # Randomly retire extras (mark as exited just before window)
        extras = random.sample(list(active.keys()), n - GUNSHOT_POP)
        for pid in extras:
            retired_ids.add(pid)
            del active[pid]


In [None]:
def snap_random_to_ring_and_kill(k, radius_m):
    alive_pids = [pid for pid, p in active.items() if p['state'] != 'dead']
    chosen = random.sample(alive_pids, k)
    # spread angles so they are not all on top of each other
    for i, pid in enumerate(chosen):
        angle = random.uniform(0, 2*math.pi)
        active[pid]['x'] = radius_m * math.cos(angle)
        active[pid]['y'] = radius_m * math.sin(angle)
        active[pid]['state'] = 'dead'
        active[pid]['speed'] = STILL_SPEED
    return chosen  # return IDs for potential checks

def set_mix_at_25202p5():

    # First, reset all ALIVE to 'walk' as a base with constant heading
    for p in active.values():
        if p['state'] != 'dead':
            p['state'] = 'walk'
            p['speed'] = speed_for_state('walk')
            # keep their current heading (constant)
    # Make 25 still (alive)
    alive_pids = [pid for pid, p in active.items() if p['state'] != 'dead']
    still_ids = random.sample(alive_pids, 25)
    for pid in still_ids:
        active[pid]['state'] = 'still'
        active[pid]['speed'] = STILL_SPEED
    # Make 2 runners
    remaining = [pid for pid in alive_pids if pid not in still_ids]
    run_ids = random.sample(remaining, 2)
    for pid in run_ids:
        active[pid]['state'] = 'run'
        active[pid]['speed'] = speed_for_state('run')
    # Kill 3 at radius 2m
    snap_random_to_ring_and_kill(3, 2.0)

def set_mix_at_25205():
    snap_random_to_ring_and_kill(2, 5.0)  # now total dead = 5
    # Partition alive into 85 run_away, 5 walk_away, 5 still
    alive_pids = [pid for pid, p in active.items() if p['state'] != 'dead']
    assert len(alive_pids) == 95, "There should be 95 alive at 25205.0"
    random.shuffle(alive_pids)
    runA = alive_pids[:85]
    walkA = alive_pids[85:90]
    stillA = alive_pids[90:]  # 5
    for pid in runA:
        active[pid]['state'] = 'run_away'
        active[pid]['speed'] = speed_for_state('run_away')
    for pid in walkA:
        active[pid]['state'] = 'walk_away'
        active[pid]['speed'] = speed_for_state('walk_away')
    for pid in stillA:
        active[pid]['state'] = 'still'
        active[pid]['speed'] = STILL_SPEED

def set_all_alive_run_away_at_25207p5():
    for p in active.values():
        if p['state'] != 'dead':
            p['state'] = 'run_away'
            p['speed'] = speed_for_state('run_away')


In [8]:
rows = []  # will hold dicts -> DataFrame -> CSV

# We'll simulate in "local meters" with center at (0,0) and convert to lat/lon for output.
def record_positions(now_t, is_gunshot_flag):
    for p in active.values():
        # Convert local (x,y) meters -> (lat,lon)
        lat, lon = meters_to_latlon(p['x'], p['y'], CENTER_LAT, CENTER_LON)
        rows.append({
            'phone_id': p['id'],
            't': round(now_t, 1),   # keep simple 1-decimal since dt=2.5s
            'lat': round(lat, 8),
            'lon': round(lon, 8),
            'is_gunshot': 1 if is_gunshot_flag else 0
        })

# Precompute the time grid
num_steps = int((T_END - T_START)/DT) + 1
times = [T_START + i*DT for i in range(num_steps)]

# To make the transition "smooth", we aim near 100 in the minutes before the window.
def target_bounds_for_time(t):
    if t < T_GUNSHOT_START - 60:     # long before event
        return MIN_PEOPLE, MAX_PEOPLE
    elif t < T_GUNSHOT_START - 10:   # 1 minute -> 10s before: narrow toward ~100
        return 90, 110
    elif t < T_GUNSHOT_START:        # last 10s: even closer
        return 98, 102
    elif t <= T_GUNSHOT_END:         # during window: exactly 100, enforced elsewhere
        return GUNSHOT_POP, GUNSHOT_POP
    else:                            # after window: back to normal
        return MIN_PEOPLE, MAX_PEOPLE

# Main simulation
for t in times:
    in_gunshot = (T_GUNSHOT_START <= t <= T_GUNSHOT_END)

    # 1) NORMAL POPULATION MAINTENANCE (before and after the gunshot window)
    if not in_gunshot:
        lo, hi = target_bounds_for_time(t)
        maintain_normal_population(t, lo, hi)
        adjust_states_toward_ratio()

    # 2) ENFORCE EXACT COUNT AT GUNSHOT START
    if abs(t - T_GUNSHOT_START) < 1e-9:
        force_population_to_exact_100(t)
        set_mix_at_25202p5()

    # 3) SECOND GUNSHOT PHASE
    if abs(t - T_GUNSHOT_PHASE2) < 1e-9:
        # Keep exactly 100 inside. If some tried to exit in the last step, respawn to 100.
        force_population_to_exact_100(t)
        set_mix_at_25205()

    # 4) TURN EVERY ALIVE TO RUN_AWAY AT 25207.5
    if abs(t - T_GUNSHOT_ALL_RUN) < 1e-9:
        force_population_to_exact_100(t)
        set_all_alive_run_away_at_25207p5()

    # 5) MOVE EVERYONE ONE STEP
    to_remove = []
    for pid, person in active.items():
        step_person(person, DT, t,
                    gunshot_window=(T_GUNSHOT_START <= t <= T_GUNSHOT_END))
        # If *not* in the gunshot window, allow exits. Leaving phones get retired forever.
        if not in_gunshot and try_exit(person):
            to_remove.append(pid)
    for pid in to_remove:
        retired_ids.add(pid)
        del active[pid]

    # During the gunshot window, make absolutely sure count stays exactly 100
    if in_gunshot and len(active) != GUNSHOT_POP:
        force_population_to_exact_100(t)

    # 6) RECORD A ROW FOR *EACH PHONE CURRENTLY INSIDE*
    record_positions(t, is_gunshot_flag=in_gunshot)

# Convert to DataFrame and write CSV (exact columns as required)
df = pd.DataFrame(rows, columns=['phone_id','t','lat','lon','is_gunshot'])
df.to_csv(OUT_CSV, index=False)

print("Done. Wrote", OUT_CSV, "with", len(df), "rows and columns:", list(df.columns))


Done. Wrote expanded_gunshot_sim.csv with 807776 rows and columns: ['phone_id', 't', 'lat', 'lon', 'is_gunshot']
