In [4]:
import os
import struct
import pandas as pd
import xml.etree.ElementTree as ET
import json
import numpy as np
from datetime import datetime
import pyproj
from collections import Counter

# ==========================================
# CONFIGURATION
# ==========================================
DATA_FOLDER     = r'/Users/holmes/local_dev/agri_analysis/data/TASKDATA'
INTERIM_FOLDER  = r'/Users/holmes/local_dev/agri_analysis/data/taskdata_out2'
ENRICHED_FOLDER = r'/Users/holmes/local_dev/agri_analysis/data/ENRICHED4'

# "Denmark Signature" - Broad bounds to validate coordinates
MIN_LAT, MAX_LAT = 54.0, 58.0
MIN_LON, MAX_LON = 8.0, 16.0

for f in [INTERIM_FOLDER, ENRICHED_FOLDER]:
    if not os.path.exists(f): os.makedirs(f)

# ==========================================
# 1. ROBUST STRIDE DETECTOR (The Fix)
# ==========================================
def detect_stride_robust(content):
    """
    Scans the ENTIRE binary to find the repeating pattern distance.
    """
    total_len = len(content)
    # We scan for Lat/Lon because they are distinct (54-58, 8-16)
    # Header is Time(4) + Lat(4) + Lon(4)
    
    valid_offsets = []
    
    # Scan with a step of 1 to find every possible valid coordinate header
    # Limit scan to first 500KB to be fast, but enough to find pattern
    scan_limit = min(total_len, 500000) 
    
    for i in range(0, scan_limit - 12):
        try:
            # Look for Lat/Lon at offsets 4 and 8 relative to 'i'
            # (Assuming 'i' is the start of the 12-byte header)
            lat_raw, lon_raw = struct.unpack('<ii', content[i+4:i+12])
            
            lat = lat_raw * 1e-7
            lon = lon_raw * 1e-7
            
            # Check Signature
            if (MIN_LAT < lat < MAX_LAT) and (MIN_LON < lon < MAX_LON):
                valid_offsets.append(i)
        except: pass
        
    if len(valid_offsets) < 10:
        return None 
    
    # Calculate distances between ALL found valid headers
    diffs = np.diff(valid_offsets)
    
    # Filter for realistic strides (e.g., 16 bytes to 256 bytes)
    # A stride of 1 or 4 is likely a false positive coincidence
    valid_diffs = [d for d in diffs if 16 <= d <= 256]
    
    if not valid_diffs: return None
    
    # Find the most common difference (The Stride)
    mode_stride = Counter(valid_diffs).most_common(1)[0][0]
    return mode_stride

# ==========================================
# 2. UNIVERSAL EXTRACTOR
# ==========================================
def extract_file(bin_path, out_csv_path, xml_ddi_list):
    if not os.path.exists(bin_path): return False
    
    with open(bin_path, 'rb') as f:
        content = f.read()
        
    # A. Detect Stride
    stride = detect_stride_robust(content)
    
    # B. Fallback if detection fails
    if not stride:
        # Fallback to XML definition: 12 byte header + 4 bytes per sensor
        stride = 12 + (len(xml_ddi_list) * 4)
        print(f"  -> Warning: Auto-detect failed. Using XML stride: {stride}")
    else:
        # print(f"  -> Auto-detected Stride: {stride}") # Optional debug
        pass
        
    # Calculate Payload
    payload_size = stride - 12
    num_ints = payload_size // 4
    
    # Prepare DDI Columns
    # We map the XML DDI list to the first N integers we find
    col_names = []
    # Registry of known codes
    DDI_MAP = {
        '0054': 'Yield_Mass', '0095': 'Yield_Vol', '018D': 'Speed', 
        '0063': 'Moisture', '0053': 'Dry_Mass', '013A': 'Fuel'
    }
    
    for ddi in xml_ddi_list:
        col_names.append(DDI_MAP.get(ddi, f"DDI_{ddi}"))
    
    # Extraction Loop
    valid_rows = []
    cursor = 0
    total_len = len(content)
    
    while cursor < total_len - stride:
        try:
            # 1. Validate Header
            _, lat_raw, lon_raw = struct.unpack('<Lii', content[cursor:cursor+12])
            lat = lat_raw * 1e-7
            lon = lon_raw * 1e-7
            
            if (MIN_LAT < lat < MAX_LAT) and (MIN_LON < lon < MAX_LON):
                # 2. Extract Data
                time_ms = struct.unpack('<L', content[cursor:cursor+4])[0]
                
                # Extract Payload as Integers
                p_start = cursor + 12
                # Only read as many ints as fit in the payload
                vals = struct.unpack(f'<{num_ints}I', content[p_start : p_start+(num_ints*4)])
                
                row = {'Time_ms': time_ms, 'Latitude': lat, 'Longitude': lon}
                
                # 3. Smart Mapping
                # If we have a Stride of 31 (19b payload), we know it's the Compact Format
                # regardless of what the XML says.
                if stride == 31:
                    # Forensic Mapping (Offsets relative to payload start)
                    # Payload: [0-3]?, [4-7]?, [8-11]?, [12-15]?, [16-18]?
                    # Based on TLG00001 analysis:
                    # Mass @ 2, Speed @ 10, Moist @ 15
                    payload_bytes = content[p_start : p_start+19]
                    row['Raw_Yield_Mass'] = struct.unpack('<I', payload_bytes[2:6])[0]
                    row['Raw_Speed']      = struct.unpack('<H', payload_bytes[10:12])[0]
                    row['Raw_Moisture']   = struct.unpack('<H', payload_bytes[15:17])[0]
                else:
                    # Standard Mapping (Use XML order)
                    for i, val in enumerate(vals):
                        # Map to DDI name if we have one, else Val_X
                        if i < len(col_names):
                            name = col_names[i]
                            # Heuristic: If name is Speed, store as Raw_Speed
                            if 'Speed' in name: row['Raw_Speed'] = val
                            elif 'Yield_Mass' in name: row['Raw_Yield_Mass'] = val
                            elif 'Moisture' in name: row['Raw_Moisture'] = val
                            else: row[name] = val
                        else:
                            row[f'Val_{i}'] = val
                            
                valid_rows.append(row)
                cursor += stride # Jump
            else:
                cursor += 1 # Sync search
        except:
            cursor += 1
            
    if valid_rows:
        pd.DataFrame(valid_rows).to_csv(out_csv_path, index=False)
        return True
    return False

# ==========================================
# 3. ENRICHMENT (With Physics Time)
# ==========================================
def enrich_data(raw_csv, meta):
    df = pd.read_csv(raw_csv)
    if df.empty: return False
    
    # 1. Deduplicate
    df.drop_duplicates(subset=['Latitude', 'Longitude'], keep='first', inplace=True)
    
    # 2. Physics Metrics
    transformer = pyproj.Transformer.from_crs("EPSG:4326", "EPSG:25832", always_xy=True)
    xx, yy = transformer.transform(df['Longitude'].values, df['Latitude'].values)
    df['Step_Dist_M'] = np.sqrt(np.diff(xx, prepend=xx[0])**2 + np.diff(yy, prepend=yy[0])**2)
    
    # 3. Speed
    # Try to find a speed column
    df['Calc_Speed'] = 2.0
    
    speed_col = None
    if 'Raw_Speed' in df.columns: speed_col = 'Raw_Speed'
    elif 'Speed' in df.columns: speed_col = 'Speed'
    elif 'DDI_018D' in df.columns: speed_col = 'DDI_018D'
    
    if speed_col:
        # Scale: usually mm/s -> m/s
        s = pd.to_numeric(df[speed_col], errors='coerce').fillna(0) * 0.001
        mask_valid = (s > 0.1) & (s < 25.0)
        df.loc[mask_valid, 'Calc_Speed'] = s[mask_valid]

    # 4. Time Reconstruction
    df['Step_Time_S'] = df['Step_Dist_M'] / df['Calc_Speed'].clip(lower=0.1)
    df['Step_Time_S'] = df['Step_Time_S'].clip(upper=15.0)
    
    try:
        anchor = datetime.fromisoformat(meta['StartTime'].replace('Z', '+00:00'))
    except:
        anchor = datetime(int(meta['Year']), 1, 1, 12, 0, 0)
    df['Datetime'] = anchor + pd.to_timedelta(df['Step_Time_S'].cumsum(), unit='s')
    
    # 5. Yield
    mass_col = None
    if 'Raw_Yield_Mass' in df.columns: mass_col = 'Raw_Yield_Mass'
    elif 'Yield_Mass' in df.columns: mass_col = 'Yield_Mass'
    
    df['Mass_Flow_Kg_S'] = 0
    if mass_col:
        # Scale: mg/s -> kg/s
        m = pd.to_numeric(df[mass_col], errors='coerce').fillna(0)
        df['Mass_Flow_Kg_S'] = m / 1_000_000.0
        
    HEADER_WIDTH = 9.0
    df['Yield_T_Ha'] = (df['Mass_Flow_Kg_S'] * 10.0) / (df['Calc_Speed'] * HEADER_WIDTH)
    df['Yield_T_Ha'] = df['Yield_T_Ha'].fillna(0).replace([np.inf, -np.inf], 0)
    
    # 6. Save (The Fix)
    def clean(s): return str(s).strip().replace(' ', '_').replace('/', '-').replace('\\', '-').replace(':', '')
    year = int(meta['Year'])
    safe_field = clean(meta['FieldName'])
    safe_crop = clean(meta['Crop'])
    tlg_id = clean(meta['LogFilename'].replace('.bin',''))
    
    if 'import' in safe_field.lower(): safe_field = tlg_id
        
    year_dir = os.path.join(ENRICHED_FOLDER, str(year))
    if not os.path.exists(year_dir): os.makedirs(year_dir)
    
    out_name = f"{year}_{safe_field}_{safe_crop}.csv"
    if os.path.exists(os.path.join(year_dir, out_name)):
        out_name = f"{year}_{safe_field}_{safe_crop}_{tlg_id}.csv"
        
    # Save useful columns
    out_cols = ['Datetime', 'Latitude', 'Longitude', 'Yield_T_Ha', 'Calc_Speed', 'Mass_Flow_Kg_S']
    for c in ['Raw_Yield_Vol', 'Raw_Moisture', 'Moisture']:
        if c in df.columns: out_cols.append(c)
        
    df[out_cols].to_csv(os.path.join(year_dir, out_name), index=False)
    return True

# ==========================================
# EXECUTION
# ==========================================
def run_pipeline():
    print("--- 1. BUILDING INDEX ---")
    tree = ET.parse(os.path.join(DATA_FOLDER, 'TASKDATA.XML'))
    root = tree.getroot()
    
    products = {p.attrib.get('A'): p.attrib.get('B') for p in root.findall(".//PDT")}
    fields = {f.attrib.get('A'): f.attrib.get('C') for f in root.findall(".//PFD")}
    
    tasks = []
    for tsk in root.findall(".//TSK"):
        tlg = tsk.find("TLG")
        if tlg is None: continue
        log = tlg.attrib.get('A') + '.bin'
        
        # Parse DDIs
        ddis = []
        tim = tsk.find("TIM")
        if tim is not None:
             s_time = tim.attrib.get('A')
             for dlv in tim.findall("DLV"): ddis.append(dlv.attrib.get('A'))
        else: s_time = ""

        # Sidecar check for DDIs
        sidecar = os.path.join(DATA_FOLDER, log.replace('.bin','.xml'))
        if os.path.exists(sidecar):
            try: 
                st = ET.parse(sidecar)
                ddis = [d.attrib.get('A') for d in st.findall(".//DLV")]
            except: pass
            
        crop = products.get(tsk.find("PAN").attrib.get('A'), "Unknown") if tsk.find("PAN") is not None else "Unknown"
        f_name = fields.get(tsk.attrib.get('E'), "Unknown")
        
        year = 2024
        try: year = datetime.fromisoformat(s_time.replace('Z','')).year
        except: pass
        
        tasks.append({'LogFilename': log, 'Year': year, 'Crop': crop, 'FieldName': f_name, 'StartTime': s_time, 'DDI_List': json.dumps(ddis)})
        
    df_index = pd.DataFrame(tasks)
    df_index.to_csv(os.path.join(INTERIM_FOLDER, 'task_index.csv'), index=False)
    print(f"Found {len(df_index)} tasks.")

    print("\n--- 2. PROCESSING ---")
    count = 0
    for idx, row in df_index.iterrows():
        bin_file = os.path.join(DATA_FOLDER, row['LogFilename'])
        raw_csv = os.path.join(INTERIM_FOLDER, row['LogFilename'].replace('.bin', '.csv'))
        ddi_list = json.loads(row['DDI_List'])
        
        # print(f"Processing {row['LogFilename']}...", end='\r')
        if extract_file(bin_file, raw_csv, ddi_list):
            if enrich_data(raw_csv, row):
                count += 1
                
    print(f"\nDone. {count} files enriched in '{ENRICHED_FOLDER}'")

if __name__ == '__main__':
    run_pipeline()

--- 1. BUILDING INDEX ---
Found 142 tasks.

--- 2. PROCESSING ---

Done. 142 files enriched in '/Users/holmes/local_dev/agri_analysis/data/ENRICHED4'
