# Cell 1 — Load clusters and get the Saxony center

This prefers your GeoPackage. If missing, it falls back to a CSV that has lon/lat.

In [1]:
import os, io, zipfile, requests
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

SCRIPT_DIR = os.getcwd()
GPKG_PATH  = os.path.join(SCRIPT_DIR, "loss_clusters_all_years.gpkg")
CSV_PATH   = os.path.join(SCRIPT_DIR, "loss_clusters_all_years.csv")

# --- Load clusters ---
clusters_gdf = None
if os.path.exists(GPKG_PATH):
    clusters_gdf = gpd.read_file(GPKG_PATH, layer="clusters")
    print(f"Loaded clusters from GeoPackage: {len(clusters_gdf)}")
elif os.path.exists(CSV_PATH):
    df = pd.read_csv(CSV_PATH)
    # require lon/lat if only CSV:
    if {"lon","lat"}.issubset(df.columns):
        clusters_gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["lon"], df["lat"]), crs=4326)
        print(f"Loaded clusters from CSV (built geometry from lon/lat): {len(clusters_gdf)}")
    else:
        raise FileNotFoundError("CSV found but missing lon/lat columns.")
else:
    raise FileNotFoundError("No clusters file found. Expected loss_clusters_all_years.gpkg or .csv")

# Clean essential columns
assert "year" in clusters_gdf.columns and "shape_class" in clusters_gdf.columns, \
    "Clusters need 'year' and 'shape_class' columns."

# --- Get Saxony center (centroid of the Saxony polygon) ---
GADM_DIR = os.path.join(SCRIPT_DIR, "gadm_germany")
gadm_shp = os.path.join(GADM_DIR, "gadm41_DEU_1.shp")

if not os.path.exists(gadm_shp):
    os.makedirs(GADM_DIR, exist_ok=True)
    url = "https://geodata.ucdavis.edu/gadm/gadm4.1/shp/gadm41_DEU_shp.zip"
    r = requests.get(url, timeout=120); r.raise_for_status()
    with zipfile.ZipFile(io.BytesIO(r.content)) as z:
        z.extractall(GADM_DIR)

states = gpd.read_file(gadm_shp)
saxony = states[states["NAME_1"] == "Sachsen"].to_crs(25833)  # ETRS89 / UTM 33N (meters)
sax_center_pt = saxony.geometry.unary_union.centroid  # 2D centroid is fine for this use
print("Got Saxony centroid in EPSG:25833")

# Ensure clusters are in the same metric CRS for angle/radius math
clusters_gdf = clusters_gdf.to_crs(25833)


Loaded clusters from GeoPackage: 82032
Got Saxony centroid in EPSG:25833


  sax_center_pt = saxony.geometry.unary_union.centroid  # 2D centroid is fine for this use


# Cell 2 — Clock-sweep onsets: angle & radius from Saxony center

0° = north; increases clockwise.

onset (s) = angle_deg / 360 * 120.

In [3]:
import numpy as np

def angle_deg_from_north_clockwise(dx, dy):
    """
    Given vector (dx, dy) where +x=east, +y=north (projected CRS),
    return angle in degrees where 0 = north, increases clockwise.
    """
    theta = np.degrees(np.arctan2(dx, dy))  # note: atan2(x,y) swaps to make 0 at north
    theta = (theta + 360.0) % 360.0
    return theta

def add_clock_onsets(gdf, center_pt, total_duration_s=120.0):
    # vector from center to point
    dx = gdf.geometry.x.values - center_pt.x
    dy = gdf.geometry.y.values - center_pt.y
    angle_deg = angle_deg_from_north_clockwise(dx, dy)
    radius_m  = np.hypot(dx, dy)
    onset_s   = (angle_deg / 360.0) * total_duration_s
    out = gdf.copy()
    out["angle_deg"] = angle_deg
    out["radius_m"]  = radius_m
    out["onset_s"]   = onset_s
    return out

clusters_clock = add_clock_onsets(clusters_gdf, sax_center_pt, total_duration_s=120.0)
clusters_clock.head()


Unnamed: 0,cluster_id,year,n_pixels,area_m2,area_ha,perimeter_m,circularity,solidity,elongation,length_m,...,shape_class,area_ha_norm01,elongation_norm01,width_m_norm01,nn_dist_m_norm01,circularity_norm01,geometry,angle_deg,radius_m,onset_s
0,4,2001,4,1579.406619,0.157941,79.483498,3.141593,1.0,1.0,39.741749,...,point,0.0,0.0,0.132168,0.13925,0.24514,POINT (346535.64 5724677.828),331.05059,77334.476975,110.350197
1,5,2001,7,2763.961584,0.276396,123.34064,2.283124,0.7,3.399041,124.200958,...,other,0.007557,0.409805,0.038746,0.001529,0.176383,POINT (350014.462 5724531.636),333.305194,75581.180937,111.101731
2,6,2001,32,12635.252954,1.263525,419.406635,0.902658,0.842105,1.367295,151.921094,...,plane,0.070529,0.062742,0.364165,0.10229,0.065817,POINT (342441.618 5724453.66),328.379662,79206.023065,109.459887
3,7,2001,4,1579.406619,0.157941,79.483498,3.141593,1.0,1.0,39.741749,...,other,0.0,0.0,0.132168,0.001529,0.24514,POINT (349933.559 5724518.861),333.24607,75606.153259,111.082023
4,11,2001,4,1579.406619,0.157941,47.972535,8.624193,0.8,3.162278,88.865252,...,point,0.0,0.369361,0.014808,0.023394,0.684259,POINT (349491.432 5724215.831),332.843135,75536.458074,110.947712


# Cell 3 — MIDI renderer (uses onsets; no even spacing)

You can keep your earlier pitch/velocity/duration mappings. Here, I use radius → pitch (farther = higher) by default, since we’re in polar coords now.

In [4]:
import pretty_midi
import numpy as np
import pandas as pd

def norm01(series):
    s = pd.to_numeric(series, errors="coerce").astype(float)
    lo, hi = np.nanmin(s), np.nanmax(s)
    if not np.isfinite(lo) or not np.isfinite(hi) or hi - lo == 0:
        return pd.Series(np.zeros(len(s)), index=series.index)
    return (s - lo) / (hi - lo)

def make_scale(base=48, octaves=4, offsets=(0,3,5,7,10)):
    return [base + 12*o + off for o in range(octaves) for off in offsets]

SCALE_POINTS = make_scale(base=60, octaves=3)
SCALE_LINES  = make_scale(base=55, octaves=4)
SCALE_PLANES = make_scale(base=48, octaves=4)

def pick_note_from_scale(v01, scale):
    idx = int(np.clip(round(v01 * (len(scale)-1)), 0, len(scale)-1))
    return int(scale[idx])

def build_triad(root, quality="major"):
    third = 4 if quality == "major" else 3
    return [int(np.clip(n, 21, 108)) for n in (root, root+third, root+7)]

def render_midi_from_onsets(
    df, out_path, total_duration_s=120.0,
    instrument_program=0,      # GM program
    mode="mono",               # "mono" or "chords"
    pitch_series=None,         # 0..1
    vel_series=None,           # 0..1
    dur_series=None,           # 0..1
    min_vel=40, max_vel=110,
    min_dur=0.12, max_dur=1.5,
    scale=None,
    chord_quality_series=None,
    allow_tail=True,           # <-- NEW: let notes extend past total_duration_s
    clamp_onsets=True          # keep onsets within [0, total_duration_s]
):
    pm = pretty_midi.PrettyMIDI()
    inst = pretty_midi.Instrument(program=instrument_program)
    pm.instruments.append(inst)

    if len(df) == 0:
        pm.write(out_path)
        return out_path

    df = df.sort_values("onset_s").copy()

    p01 = pitch_series if pitch_series is not None else pd.Series(np.zeros(len(df)), index=df.index)
    v01 = vel_series   if vel_series   is not None else pd.Series(np.zeros(len(df)), index=df.index)
    d01 = dur_series   if dur_series   is not None else pd.Series(np.zeros(len(df)), index=df.index)

    used_scale = scale or SCALE_POINTS

    for idx, row in df.iterrows():
        start = float(row["onset_s"])
        if clamp_onsets:
            start = float(np.clip(start, 0, total_duration_s))  # onset within the sweep

        dur = float(min_dur + np.clip(d01.loc[idx], 0, 1) * (max_dur - min_dur))
        end = start + dur if allow_tail else min(start + dur, total_duration_s - 0.01)
        if end <= start:
            end = start + 0.01  # safeguard

        vel = int(np.clip(min_vel + np.clip(v01.loc[idx], 0, 1) * (max_vel - min_vel), 1, 127))
        note_num = pick_note_from_scale(float(np.clip(p01.loc[idx], 0, 1)), used_scale)

        if mode == "chords":
            q = "major"
            if chord_quality_series is not None:
                qv = str(chord_quality_series.loc[idx]).lower()
                if qv in ("major","minor"):
                    q = qv
            for n in build_triad(note_num, q):
                inst.notes.append(pretty_midi.Note(velocity=vel, pitch=n, start=start, end=end))
        else:
            inst.notes.append(pretty_midi.Note(velocity=vel, pitch=note_num, start=start, end=end))

    pm.write(out_path)
    return out_path


# Cell 4 — Export three files per year using the clock-sweep onsets

Points: pitch ← radius (farther = higher), vel/dur ← area (if available).

Lines: pitch ← elongation, vel ← (1–width), longer durations.

Planes: chords; pitch ← area (or circularity), chord quality from circularity.

In [8]:
# --- Simple, robust export: only write files for classes that exist ---
OUTPUT_DIR = os.path.join(SCRIPT_DIR, "midi_output_clock")
os.makedirs(OUTPUT_DIR, exist_ok=True)

YEARS   = sorted(clusters_clock["year"].unique().tolist())
CLASSES = ["point", "plane", "line"]  # order: do points & planes first, then lines

PROG = {"point": 0, "plane": 48, "line": 40}  # GM instruments

requested_params = [
    "lon", "lat", "radius_m", "n_pixels", "circularity",
    "solidity", "elongation", "length_m", "width_m", "nn_dist_m"
]

def ensure_columns(df, cols):
    for c in cols:
        if c not in df.columns:
            df[c] = np.nan
    return df

for year in YEARS:
    dfy = clusters_clock[clusters_clock["year"] == year]
    if dfy.empty:
        continue

    for cls in CLASSES:
        dfc = dfy[dfy["shape_class"] == cls].copy()
        if dfc.empty:
            print(f"{year}: skipping '{cls}' (none)")
            continue

        # --- build normalized features per class (safe fallbacks) ---
        r01    = norm01(dfc["radius_m"])
        area01 = norm01(dfc["area_ha"])     if "area_ha"     in dfc else pd.Series(np.zeros(len(dfc)), index=dfc.index)
        el01   = norm01(dfc["elongation"])  if "elongation"  in dfc else pd.Series(np.zeros(len(dfc)), index=dfc.index)
        w01    = norm01(dfc["width_m"])     if "width_m"     in dfc else (1 - el01)
        circ01 = norm01(dfc["circularity"]) if "circularity" in dfc else pd.Series(np.ones(len(dfc))*0.5, index=dfc.index)

        # --- choose MIDI path & render with clock onsets ---
        if cls == "point":
            midi_path = os.path.join(OUTPUT_DIR, f"{year}_points_clock_120s.mid")
            render_midi_from_onsets(
                dfc.sort_values("onset_s"), midi_path, total_duration_s=120.0,
                instrument_program=PROG[cls], mode="mono", scale=SCALE_POINTS,
                pitch_series=r01,           # farther -> higher
                vel_series=area01,          # bigger -> louder
                dur_series=area01,          # bigger -> longer (short)
                min_dur=0.10, max_dur=0.50
            )

        elif cls == "plane":
            midi_path = os.path.join(OUTPUT_DIR, f"{year}_planes_clock_120s.mid")
            quality = pd.Series(np.where(circ01 >= 0.5, "major", "minor"), index=dfc.index)
            render_midi_from_onsets(
                dfc.sort_values("onset_s"), midi_path, total_duration_s=120.0,
                instrument_program=PROG[cls], mode="chords", scale=SCALE_PLANES,
                pitch_series=area01,         # larger -> higher
                vel_series=area01,           # larger -> louder
                dur_series=circ01,           # compact -> longer chord
                min_dur=0.8, max_dur=4.0,
                chord_quality_series=quality
            )

        elif cls == "line":
            midi_path = os.path.join(OUTPUT_DIR, f"{year}_lines_clock_120s.mid")
            render_midi_from_onsets(
                dfc.sort_values("onset_s"), midi_path, total_duration_s=120.0,
                instrument_program=PROG[cls], mode="mono", scale=SCALE_LINES,
                pitch_series=el01,           # more elongated -> higher
                vel_series=(1 - w01),        # thinner -> softer (invert if you prefer)
                dur_series=el01,             # more elongated -> longer
                min_dur=0.8, max_dur=6.0
            )

        # --- per-parameter TXT exports (one-column, ordered by onset) ---
        dfc_sorted = dfc.sort_values("onset_s").copy()

        # ensure lon/lat exist (derive from geometry if needed)
        if "lon" not in dfc_sorted.columns or "lat" not in dfc_sorted.columns:
            try:
                dfc_wgs = dfc_sorted.to_crs(4326)
                dfc_sorted["lon"] = dfc_wgs.geometry.x.values
                dfc_sorted["lat"] = dfc_wgs.geometry.y.values
            except Exception:
                # if geometry is missing but lon/lat already present, this is fine
                pass

        dfc_sorted = ensure_columns(dfc_sorted, requested_params)

        # helper: write single-column txt with header
        def write_param_txt(series, base, name, fmt="%.6f"):
            out_path = base.replace(".mid", f"_{name}.txt")
            series.to_csv(out_path, sep="\t", index=False, header=[name], float_format=fmt)
            return out_path

        for param in requested_params:
            write_param_txt(dfc_sorted[param], midi_path, param)

        print(f"✓ {year}: wrote '{cls}' → {midi_path}")


✓ 2001: wrote 'point' → C:\Users\chris\Projekte\JazztageLabor\Deforestation\notebooks\Saxony\Points_Lines_planes\midi_output_clock\2001_points_clock_120s.mid
✓ 2001: wrote 'plane' → C:\Users\chris\Projekte\JazztageLabor\Deforestation\notebooks\Saxony\Points_Lines_planes\midi_output_clock\2001_planes_clock_120s.mid
2001: skipping 'line' (none)
✓ 2002: wrote 'point' → C:\Users\chris\Projekte\JazztageLabor\Deforestation\notebooks\Saxony\Points_Lines_planes\midi_output_clock\2002_points_clock_120s.mid
✓ 2002: wrote 'plane' → C:\Users\chris\Projekte\JazztageLabor\Deforestation\notebooks\Saxony\Points_Lines_planes\midi_output_clock\2002_planes_clock_120s.mid
2002: skipping 'line' (none)
✓ 2003: wrote 'point' → C:\Users\chris\Projekte\JazztageLabor\Deforestation\notebooks\Saxony\Points_Lines_planes\midi_output_clock\2003_points_clock_120s.mid
✓ 2003: wrote 'plane' → C:\Users\chris\Projekte\JazztageLabor\Deforestation\notebooks\Saxony\Points_Lines_planes\midi_output_clock\2003_planes_clock_120

# (Optional) Cell 5 — Micro-jitter notes sharing the exact same azimuth

If simultaneous hits are too dense for you, add a tiny, random ±30 ms jitter after computing onsets:

In [None]:
def jitter_onsets(df, jitter_s=0.03, seed=0):
    rng = np.random.default_rng(seed)
    # group by exact angle (or round to, say, 0.1 deg)
    grp = df.groupby(df["angle_deg"].round(3))
    out = df.copy()
    for _, idxs in grp.groups.items():
        n = len(idxs)
        if n > 1:
            jit = rng.uniform(-jitter_s, jitter_s, n)
            out.loc[idxs, "onset_s"] = np.clip(out.loc[idxs, "onset_s"] + jit, 0, 120.0 - 0.01)
    return out

# Example use before rendering:
# dfc = jitter_onsets(dfc, jitter_s=0.03, seed=42)
