In [None]:
import clickhouse_connect
client = clickhouse_connect.get_client(
    host=os.environ["CLICKHOUSE_HOST"],
    username=os.environ["CLICKHOUSE_USERNAME"],
    password=os.environ["CLICKHOUSE_PASSWORD"],
    secure=True,
)

In [None]:
df = client.query_df("SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'")
df_copy = df.copy()

In [None]:
# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;

In [None]:
df = load_raw_adsb_for_day(datetime(2024,1,1))

In [None]:
df['aircraft']

In [None]:
COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
def compress_df(df):
    icao = df.name
    df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
    original_df = df.copy()
    df = df.groupby("_signature", as_index=False).last() # check if it works with both last and first.
    # For each row, create a dict of non-empty column values. This is using sets and subsets...
    def get_non_empty_dict(row):
        return {col: row[col] for col in COLUMNS if row[col] != ''}
    
    df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)
    df['_non_empty_count'] = df['_non_empty_dict'].apply(len)
    
    # Check if row i's non-empty values are a subset of row j's non-empty values
    def is_subset_of_any(idx):
        row_dict = df.loc[idx, '_non_empty_dict']
        row_count = df.loc[idx, '_non_empty_count']
        
        for other_idx in df.index:
            if idx == other_idx:
                continue
            other_dict = df.loc[other_idx, '_non_empty_dict']
            other_count = df.loc[other_idx, '_non_empty_count']
            
            # Check if all non-empty values in current row match those in other row
            if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):
                # If they match and other has more defined columns, current row is redundant
                if other_count > row_count:
                    return True
        return False
    
    # Keep rows that are not subsets of any other row
    keep_mask = ~df.index.to_series().apply(is_subset_of_any)
    df = df[keep_mask]

    if len(df) > 1:
        original_df = original_df[original_df['_signature'].isin(df['_signature'])]
        value_counts = original_df["_signature"].value_counts()
        max_signature = value_counts.idxmax()
        df = df[df['_signature'] == max_signature]

    df['icao'] = icao
    df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])
    return df

# df = df_copy
# df = df_copy.iloc[0:100000]
# df = df[df['r'] == "N4131T"]
# df = df[(df['icao'] == "008081")]
# df = df.iloc[0:500]
df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)
df = df.drop(columns=['aircraft'])
df = df.sort_values(['icao', 'time'])
df[COLUMNS] = df[COLUMNS].fillna('')
ORIGINAL_COLUMNS = df.columns.tolist()
df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)
cols = df_compressed.columns.tolist()
cols.remove("icao")
cols.insert(1, "icao")
df_compressed = df_compressed[cols]
df_compressed

In [None]:
df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)
df[~df['aircraft_category'].isna()]

In [None]:
# SOME KIND OF MAP REDUCE SYSTEM
import os

COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
def compress_df(df):
    icao = df.name
    df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
    
    # Compute signature counts before grouping (avoid copy)
    signature_counts = df["_signature"].value_counts()
    
    df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first.
    # For each row, create a dict of non-empty column values. This is using sets and subsets...
    def get_non_empty_dict(row):
        return {col: row[col] for col in COLUMNS if row[col] != ''}
    
    df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)
    df['_non_empty_count'] = df['_non_empty_dict'].apply(len)
    
    # Check if row i's non-empty values are a subset of row j's non-empty values
    def is_subset_of_any(idx):
        row_dict = df.loc[idx, '_non_empty_dict']
        row_count = df.loc[idx, '_non_empty_count']
        
        for other_idx in df.index:
            if idx == other_idx:
                continue
            other_dict = df.loc[other_idx, '_non_empty_dict']
            other_count = df.loc[other_idx, '_non_empty_count']
            
            # Check if all non-empty values in current row match those in other row
            if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):
                # If they match and other has more defined columns, current row is redundant
                if other_count > row_count:
                    return True
        return False
    
    # Keep rows that are not subsets of any other row
    keep_mask = ~df.index.to_series().apply(is_subset_of_any)
    df = df[keep_mask]

    if len(df) > 1:
        # Use pre-computed signature counts instead of original_df
        remaining_sigs = df['_signature']
        sig_counts = signature_counts[remaining_sigs]
        max_signature = sig_counts.idxmax()
        df = df[df['_signature'] == max_signature]

    df['icao'] = icao
    df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])
    return df

# names of releases something like
# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz

# Let's build historical first. 

_ch_client = None

def _get_clickhouse_client():
    """Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors."""
    global _ch_client
    if _ch_client is not None:
        return _ch_client

    import clickhouse_connect
    import time

    max_retries = 5
    for attempt in range(1, max_retries + 1):
        try:
            _ch_client = clickhouse_connect.get_client(
                host=os.environ["CLICKHOUSE_HOST"],
                username=os.environ["CLICKHOUSE_USERNAME"],
                password=os.environ["CLICKHOUSE_PASSWORD"],
                secure=True,
            )
            return _ch_client
        except Exception as e:
            wait = min(2 ** attempt, 30)
            print(f"  ClickHouse connect attempt {attempt}/{max_retries} failed: {e}")
            if attempt == max_retries:
                raise
            print(f"  Retrying in {wait}s...")
            time.sleep(wait)


def load_raw_adsb_for_day(day):
    """Load raw ADS-B data for a day from cache or ClickHouse."""
    from datetime import timedelta
    from pathlib import Path
    import pandas as pd
    import time
    
    start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)
    end_time = start_time + timedelta(days=1)
    
    # Set up caching
    cache_dir = Path("data/adsb")
    cache_dir.mkdir(parents=True, exist_ok=True)
    cache_file = cache_dir / f"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst"
    
    # Check if cache exists
    if cache_file.exists():
        print(f"  Loading from cache: {cache_file}")
        df = pd.read_csv(cache_file, compression='zstd')
        df['time'] = pd.to_datetime(df['time'])
    else:
        # Format dates for the query
        start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')
        end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
        
        max_retries = 3
        for attempt in range(1, max_retries + 1):
            try:
                client = _get_clickhouse_client()
                print(f"  Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}")
                df = client.query_df(f"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'")
                break
            except Exception as e:
                wait = min(2 ** attempt, 30)
                print(f"  Query attempt {attempt}/{max_retries} failed: {e}")
                if attempt == max_retries:
                    raise
                # Reset client in case connection is stale
                global _ch_client
                _ch_client = None
                print(f"  Retrying in {wait}s...")
                time.sleep(wait)
        
        # Save to cache
        df.to_csv(cache_file, index=False, compression='zstd')
        print(f"  Saved to cache: {cache_file}")
    
    return df

def load_historical_for_day(day):
    from pathlib import Path
    import pandas as pd
    
    df = load_raw_adsb_for_day(day)
    print(df)
    df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)
    df = df.drop(columns=['aircraft'])
    df = df.sort_values(['icao', 'time'])
    df[COLUMNS] = df[COLUMNS].fillna('')
    df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)
    cols = df_compressed.columns.tolist()
    cols.remove('time')
    cols.insert(0, 'time')
    cols.remove("icao")
    cols.insert(1, "icao")
    df_compressed = df_compressed[cols]
    return df_compressed


def concat_compressed_dfs(df_base, df_new):
    """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
    import pandas as pd
    
    # Combine both dataframes
    df_combined = pd.concat([df_base, df_new], ignore_index=True)
    
    # Sort by ICAO and time
    df_combined = df_combined.sort_values(['icao', 'time'])
    
    # Fill NaN values
    df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')
    
    # Apply compression logic per ICAO to get the best row
    df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)
    
    # Sort by time
    df_compressed = df_compressed.sort_values('time')
    
    return df_compressed


def get_latest_aircraft_adsb_csv_df():
    """Download and load the latest ADS-B CSV from GitHub releases."""
    from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
    
    import pandas as pd
    import re
    
    csv_path = download_latest_aircraft_adsb_csv()
    df = pd.read_csv(csv_path)
    df = df.fillna("")
    
    # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
    match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
    if not match:
        raise ValueError(f"Could not extract date from filename: {csv_path.name}")
    
    date_str = match.group(1)
    return df, date_str



In [None]:
# SOME KIND OF MAP REDUCE SYSTEM


COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
def compress_df(df):
    icao = df.name
    df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
    original_df = df.copy()
    df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first.
    # For each row, create a dict of non-empty column values. This is using sets and subsets...
    def get_non_empty_dict(row):
        return {col: row[col] for col in COLUMNS if row[col] != ''}
    
    df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)
    df['_non_empty_count'] = df['_non_empty_dict'].apply(len)
    
    # Check if row i's non-empty values are a subset of row j's non-empty values
    def is_subset_of_any(idx):
        row_dict = df.loc[idx, '_non_empty_dict']
        row_count = df.loc[idx, '_non_empty_count']
        
        for other_idx in df.index:
            if idx == other_idx:
                continue
            other_dict = df.loc[other_idx, '_non_empty_dict']
            other_count = df.loc[other_idx, '_non_empty_count']
            
            # Check if all non-empty values in current row match those in other row
            if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):
                # If they match and other has more defined columns, current row is redundant
                if other_count > row_count:
                    return True
        return False
    
    # Keep rows that are not subsets of any other row
    keep_mask = ~df.index.to_series().apply(is_subset_of_any)
    df = df[keep_mask]

    if len(df) > 1:
        original_df = original_df[original_df['_signature'].isin(df['_signature'])]
        value_counts = original_df["_signature"].value_counts()
        max_signature = value_counts.idxmax()
        df = df[df['_signature'] == max_signature]

    df['icao'] = icao
    df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])
    return df

# names of releases something like
# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz

# Let's build historical first. 

def load_raw_adsb_for_day(day):
    """Load raw ADS-B data for a day from cache or ClickHouse."""
    from datetime import timedelta
    import clickhouse_connect
    from pathlib import Path
    import pandas as pd
    
    start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)
    end_time = start_time + timedelta(days=1)
    
    # Set up caching
    cache_dir = Path("data/adsb")
    cache_dir.mkdir(parents=True, exist_ok=True)
    cache_file = cache_dir / f"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst"
    
    # Check if cache exists
    if cache_file.exists():
        print(f"  Loading from cache: {cache_file}")
        df = pd.read_csv(cache_file, compression='zstd')
        df['time'] = pd.to_datetime(df['time'])
    else:
        # Format dates for the query
        start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')
        end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
        
        client = clickhouse_connect.get_client(
            host=os.environ["CLICKHOUSE_HOST"],
            username=os.environ["CLICKHOUSE_USERNAME"],
            password=os.environ["CLICKHOUSE_PASSWORD"],
            secure=True,
        )
        print(f"  Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}")
        df = client.query_df(f"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'")
        
        # Save to cache
        df.to_csv(cache_file, index=False, compression='zstd')
        print(f"  Saved to cache: {cache_file}")
    
    return df

def load_historical_for_day(day):
    from pathlib import Path
    import pandas as pd
    
    df = load_raw_adsb_for_day(day)
    
    df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)
    df = df.drop(columns=['aircraft'])
    df = df.sort_values(['icao', 'time'])
    df[COLUMNS] = df[COLUMNS].fillna('')
    df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)
    cols = df_compressed.columns.tolist()
    cols.remove('time')
    cols.insert(0, 'time')
    cols.remove("icao")
    cols.insert(1, "icao")
    df_compressed = df_compressed[cols]
    return df_compressed


def concat_compressed_dfs(df_base, df_new):
    """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
    import pandas as pd
    
    # Combine both dataframes
    df_combined = pd.concat([df_base, df_new], ignore_index=True)
    
    # Sort by ICAO and time
    df_combined = df_combined.sort_values(['icao', 'time'])
    
    # Fill NaN values
    df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')
    
    # Apply compression logic per ICAO to get the best row
    df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)
    
    # Sort by time
    df_compressed = df_compressed.sort_values('time')
    
    return df_compressed


def get_latest_aircraft_adsb_csv_df():
    """Download and load the latest ADS-B CSV from GitHub releases."""
    from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
    
    import pandas as pd
    import re
    
    csv_path = download_latest_aircraft_adsb_csv()
    df = pd.read_csv(csv_path)
    df = df.fillna("")
    
    # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
    match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
    if not match:
        raise ValueError(f"Could not extract date from filename: {csv_path.name}")
    
    date_str = match.group(1)
    return df, date_str



In [None]:
from datetime import datetime
df = load_historical_for_day(datetime(2024,1,1))

In [None]:
len(df)

In [None]:
df[(df['icao'] == "008081")]

In [None]:
df[df['icao'] == "a4e1d2"]

In [None]:
df[df['r'] == "N4131T"]

In [None]:
df_compressed[df_compressed['icao'].duplicated(keep=False)]


In [None]:
import gzip
import json

path = "/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json"

with gzip.open(path, "rt", encoding="utf-8") as f:
    data = json.load(f)

print(type(data))
# use `data` here
import json
print(json.dumps(data, indent=2)[:2000])


In [None]:
# First, load the JSON to inspect its structure
import json
with open("/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json", 'r') as f:
    data = json.load(f)

# Check the structure
print(type(data))

In [None]:
data['AC97E3']