In [None]:
import logging
import asyncio
import socket
import pandas as pd
import asyncpg

# Create a global connection pool
db_pool = None

# Set asyncio logger to WARNING or ERROR to suppress INFO-level output
logging.getLogger('asyncio').setLevel(logging.WARNING)
logging.basicConfig(level=logging.ERROR)


In [None]:
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.dialects.postgresql import ARRAY, insert
from sqlalchemy import Column, String, Integer, Float, DateTime
from sqlalchemy import select, UniqueConstraint

DATABASE_URL = "postgresql+asyncpg://shooca:shooca222@postgres:5432/shooca_db"
engine = create_async_engine(DATABASE_URL, pool_size=10, max_overflow=20, echo=True)
async_session = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
Base = declarative_base()

# Attempting to Suppress INFO logs from SQLAlchemy
logging.getLogger('sqlalchemy.engine.Engine').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING)

class BC3Line(Base):
    __tablename__ = 'bc3_client'
    tracknumber = Column(Integer, primary_key=True)
    latitude = Column(Float)
    longitude = Column(Float)
    altitude = Column(Float)
    groundspeed = Column(Float)
    heading = Column(Float)
    trackquality = Column(Integer)
    trackcategory = Column(String)
    trackid = Column(String)
    specifictype = Column(Integer)
    platform = Column(Integer)
    jtn = Column(String)
    callsign = Column(String)
    source = Column(String)
    squawkid = Column(String)
    fuel = Column(String)
    fuel_factor = Column(String)
    fuel_time_marker = Column(String)
    last_updated = Column(DateTime(timezone=False))

async def upsert_bc3(df):
    records = df.to_dict(orient='records')
    async with async_session() as session:
        stmt = insert(BC3Line).values(records)

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=['tracknumber'],  # conflict target
            set_={key: stmt.excluded[key] for key in df.columns if key != 'tracknumber'}
        )
        await session.execute(upsert_stmt)
        await session.commit()

class WpnLine(Base):
    __tablename__ = 'bc3_weapons'
    __table_args__ = (
        UniqueConstraint('jtn', 'slot', name='uq_jtn_slot'),
    )
    jtn = Column(String, primary_key=True)
    slot = Column(Integer)
    weapon = Column(String)
    quantity = Column(Integer)
    last_updated = Column(DateTime(timezone=False))

async def upsert_weapons(df):
    # Validate input shape
    if not isinstance(df, pd.DataFrame) or df.empty:
        print("⚠️ Skipping upsert: invalid or empty DataFrame")
        return

    # Normalize and filter slot range
    df['slot'] = pd.to_numeric(df['slot'], errors='coerce')  # Force numeric slot
    df = df[df['slot'].isin(range(1, 9))]  # Keep slot 1–8 only

    if df.empty:
        print("🚫 No valid slot entries found — nothing to process")
        return

    records = df.to_dict(orient='records')
    async with async_session() as session:
        stmt = insert(WpnLine).values(records)

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=['jtn', 'slot'],  # composite conflict target
            set_={key: stmt.excluded[key] for key in df.columns if key not in ['jtn', 'slot']}
        )
        await session.execute(upsert_stmt)
        await session.commit()

class FuelLine(Base):
    __tablename__ = 'bc3_fuel'
    jtn = Column(String, primary_key=True)
    fuel = Column(String)
    fuel_factor = Column(String)
    fuel_time_marker = Column(String)
    last_updated = Column(DateTime(timezone=False))

async def upsert_fuel(df):
    records = df.to_dict(orient='records')
    async with async_session() as session:
        stmt = insert(FuelLine).values(records)

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=['jtn'],  # conflict target
            set_={key: stmt.excluded[key] for key in df.columns if key != 'jtn'}
        )
        await session.execute(upsert_stmt)
        await session.commit()

class SensorLine(Base):
    __tablename__ = 'bc3_sensors'
    jtn = Column(String, primary_key=True)
    aircraft_type = Column(String)
    radar = Column(String)
    radar_warning = Column(String)
    laser = Column(String)
    air_to_ground = Column(String)
    air_to_air = Column(String)
    last_updated = Column(DateTime(timezone=False))

async def upsert_sensor(df):
    records = df.to_dict(orient='records')
    async with async_session() as session:
        stmt = insert(SensorLine).values(records)

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=['jtn'],  # conflict target
            set_={key: stmt.excluded[key] for key in df.columns if key != 'jtn'}
        )
        await session.execute(upsert_stmt)
        await session.commit()

class VCSLine(Base):
    __tablename__ = 'bc3_vcs'
    jtn = Column(String, primary_key=True)
    vcs = Column(String)
    last_updated = Column(DateTime(timezone=False))

async def upsert_vcs(df):
    records = df.to_dict(orient='records')
    async with async_session() as session:
        stmt = insert(VCSLine).values(records)

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=['jtn'],  # conflict target
            set_={key: stmt.excluded[key] for key in df.columns if key != 'jtn'}
        )
        await session.execute(upsert_stmt)
        await session.commit()

In [None]:
import json
import re
import traceback
import pymap3d as pm

async def process_data(line, label):
    dfinsert = pd.DataFrame()
    if label == 'Weapons Client':
        print(f"Processed line: [{label}] {line.strip()}")

        pattern = re.compile(r'(R\d{5})\s+(J\d+\.\d+\w*)\s+(.*)')
        
        match = pattern.search(line)
        if match:
            r_code = match.group(1).strip()
            j_value = match.group(2).strip()
            payload = match.group(3)
            print(f"🆔 R-code: {r_code} | J-value: {j_value}")
            print(f"🧾 Payload: {payload}\n")

        identifier = r_code

        if j_value == 'J13.2I':
            fuel_match = re.search(r"/FUEL(\d+)", payload)
            fl_fct_match = re.search(r"/FL FCT ([^/]+)", payload)
            tm_match = re.search(r"/TM ([^/]+)", payload)
            
            # Build record from extracted values
            record = {
                "jtn": identifier,
                "fuel": fuel_match.group(1) if fuel_match else None,
                "fuel_factor": fl_fct_match.group(1).strip() if fl_fct_match else None,
                "fuel_time_marker": tm_match.group(1).strip() if tm_match else None
            }
            dffuel = pd.DataFrame([record])
            
            dffuel['last_updated'] = pd.Timestamp.now()
            await upsert_fuel(dffuel)
        elif j_value == 'J13.2C1':
            pattern = r"(/(?:RDR|RWR|LSR|AG|AA|AIR)\b[^/]*)"
            matches = re.findall(pattern, payload)

            # Convert to {tag: value} dictionary
            data = {}
            for entry in matches:
                tag_match = re.match(r"/(\w+)\s*(.*)", entry.strip())
                if tag_match:
                    tag, val = tag_match.groups()
                    data[tag] = val.strip()
            
            # Create single-row DataFrame with tags as columns
            dfsensor = pd.DataFrame([data])
            dfsensor['jtn'] = identifier
            dfsensor['last_updated'] = pd.Timestamp.now()
            dfinsert = dfsensor[['jtn', 'RDR', 'RWR', 'LSR', 'AG', 'AA', 'AIR', 'last_updated']].rename(columns={
                'jtn': 'jtn', 
                'RDR': 'radar', 
                'RWR': 'radar_warning', 
                'LSR': 'laser',
                'AG': 'air_to_ground', 
                'AA': 'air_to_air', 
                'AIR': 'aircraft_type', 
                'last_updated': 'last_updated'
            })
            print(dfinsert)
            await upsert_sensor(dfinsert)
        elif j_value in ('J13.2C2','J13.2C7'):
            pattern = r'/N(\d+)\s+(\d+)\s+/TS\1\s+([A-Z0-9\- ]+)'
            matches = re.findall(pattern, payload)
            
            # Build structured JSON
            output = {
                "jtn": identifier,
                "slots": [
                    {
                        "slot": int(slot),
                        "quantity": int(qty),
                        "weapon": weapon.strip()
                    }
                    for slot, qty, weapon in matches
                ]
            }
            
            # Convert to JSON string
            json_string = json.dumps(output, indent=2)
            print(json_string)
        
            # Convert slots to DataFrame
            dfinsert = pd.DataFrame(output["slots"])
        
            # # Optionally add metadata columns
            dfinsert["jtn"] = output["jtn"]

            dfinsert['last_updated'] = pd.Timestamp.now()

            print(dfinsert)
            await upsert_weapons(dfinsert)
        elif j_value == 'J2.2C2':
            match = re.search(r'(/VC[^/]*)', payload)
            if match:
                vc_value = match.group(1)
                vcstag, vcscode = vc_value.split(' ', 1)
                print(f"J-value: {j_value}: /VC Segment → {vcscode.strip()}")

                # Build structured JSON
                output = {"jtn": identifier, "vcs": vcscode.strip()}

                # Convert slots to DataFrame
                dfinsert = pd.DataFrame([output])
                dfinsert['last_updated'] = pd.Timestamp.now()

                print(dfinsert)
                await upsert_vcs(dfinsert)
        else:
            print(f"J Value not found: {j_value}")
    elif label == 'BC3 Client':
        print(f"Processed line: [{label}] {line.strip()}")
        parsed = json.loads(line.strip())
        dfbc3 = pd.json_normalize(parsed)
        col = dfbc3.get('e3.dropTrack')
        if col is None:
            if 'e24.JTN' not in dfbc3.columns:
                dfbc3['e24.JTN'] = None
            
            # Extract and sort keys for callsign
            callsign_keys = sorted([k for k in parsed if k.startswith("e12.callsign")],
              key=lambda x: int(x.split("callsign")[1]))
            # Convert ASCII values to characters
            callsign = ''.join(chr(parsed[k]) for k in callsign_keys if parsed[k] != 0)

            # ecef conversion x y z to lat and long
            x = dfbc3['e1.x']
            y = dfbc3['e1.y']
            z = dfbc3['e1.z']
            lat, lon, alt = pm.ecef2geodetic(x, y, z)

            if 'e7.mode3' in dfbc3.columns:
                dfbc3['mode3_octal'] = dfbc3['e7.mode3'].apply(
                    lambda x: oct(int(x))[2:] if pd.notnull(x) else None
                )
            else:
                dfbc3['mode3_octal'] = None
            
            # print("Decoded Callsign:", callsign)
            dfinsert = dfbc3[['trackNumber', 'e4.altitude', 'e1.groundSpeed', 'e1.heading', 'e1.trackQuality', 'e1.trackCategory', 'e1.trackId', 
                              'e1.specificType', 'e1.platform', 'e4.source', 'mode3_octal', 'e24.JTN']].rename(columns={
                'trackNumber': 'tracknumber',
                'e4.altitude': 'altitude',
                'e1.groundSpeed': 'groundspeed',
                'e1.heading': 'heading',
                'e1.trackQuality': 'trackquality',
                'e1.trackCategory': 'trackcategory',
                'e1.trackId': 'trackid',
                'e1.specificType': 'specifictype', 
                'e1.platform': 'platform',
                'e4.source': 'source', 
                'mode3_octal': 'squawkid', 
                'e24.JTN': 'jtn'
            })
            dfinsert['latitude'] = lat
            dfinsert['longitude'] = lon
            dfinsert['callsign'] = callsign
            dfinsert['last_updated'] = pd.Timestamp.now()
            await upsert_bc3(dfinsert)
    else:
        pass

async def stream_tcp(ip, port, label):
    global db_pool
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    await asyncio.get_running_loop().sock_connect(sock, (ip, port))
    await read_socket_in_chunks(sock, label)

async def read_socket_in_chunks(sock, label, chunk_size=1024):
    loop = asyncio.get_running_loop()
    sock.setblocking(False)
    buffer = b''  # now working with strings, not bytes

    try:
        while True:
            try:
                chunk = await loop.sock_recv(sock, chunk_size)
            except Exception as e:
                print(f"[Socket] Error: {e}")
                break
            if not chunk:
                # Connection closed
                break
            buffer += chunk
            while b'\n' in buffer:
                try:
                    line, buffer = buffer.split(b'\n', 1)
                    await process_data(line.decode('utf-8') + '\n', label)
                except Exception as e:
                    print(f"[{label}] Error processing line: {e}")
                    traceback.print_exc()
                    break
    except Exception as e:
        print(f"[{label}] Failed to connect: {e}")
    finally:
        sock.close()

In [None]:
# Connect to multiple IP's and gather the streaming data...
tasks = [
    stream_tcp("10.5.185.10", 5001, "BC3 Client"), 
    stream_tcp("10.5.185.9", 5001, "Weapons Client")
]

print("🚀 Async tasks starting...")

await asyncio.gather(*tasks, return_exceptions=False)