# Kafka + SGP4 Pipeline Interactive Testing

This notebook tests the complete pipeline:
**TLE API → Kafka → SGP4 Computation → HDFS Storage**

Sections:
1. Verify Kafka connection and consume TLE messages
2. Parse TLE data and compute SGP4 vectors locally
3. Test Spark SGP4 pipeline
4. Query HDFS output (Parquet files)

In [None]:
import json
import numpy as np
from kafka import KafkaConsumer
from sgp4.api import Satrec, jday
from datetime import datetime
import pandas as pd

## 1. Connect to Kafka and Read TLE Messages

In [None]:
# Create Kafka consumer
consumer = KafkaConsumer(
    'space_debris_tle',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='notebook_test_group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("✓ Connected to Kafka broker at localhost:9092")
print(f"✓ Subscribed to topic: space_debris_tle")

In [None]:
# Read 5 sample messages
messages = []
for i, message in enumerate(consumer):
    if i >= 5:
        break
    messages.append(message.value)
    
print(f"Read {len(messages)} TLE messages from Kafka\n")
print("Sample message:")
print(json.dumps(messages[0], indent=2))

## 2. Parse TLE and Compute SGP4 Vectors

In [None]:
def compute_sgp4_vectors(tle_data):
    """Compute SGP4 position and velocity vectors from TLE data"""
    try:
        # Initialize SGP4 satellite
        satellite = Satrec.twoline2rv(
            tle_data['tle_line1'],
            tle_data['tle_line2']
        )
        
        # Parse epoch from TLE
        epoch_str = tle_data.get('epoch', tle_data.get('EPOCH'))
        epoch_dt = datetime.fromisoformat(epoch_str.replace('Z', '+00:00'))
        
        # Convert to Julian Date
        jd, fr = jday(epoch_dt.year, epoch_dt.month, epoch_dt.day,
                      epoch_dt.hour, epoch_dt.minute, epoch_dt.second)
        
        # Propagate to epoch time
        error_code, position, velocity = satellite.sgp4(jd, fr)
        
        if error_code != 0:
            return None, f"SGP4 error: {error_code}"
        
        # Calculate altitude and velocity magnitude
        pos_magnitude = np.linalg.norm(position)
        vel_magnitude = np.linalg.norm(velocity)
        altitude_km = pos_magnitude - 6371.0  # Earth radius
        
        return {
            'satellite_id': tle_data.get('satellite_id', tle_data.get('NORAD_CAT_ID')),
            'epoch_time': epoch_str,
            'position_x': position[0],
            'position_y': position[1],
            'position_z': position[2],
            'velocity_x': velocity[0],
            'velocity_y': velocity[1],
            'velocity_z': velocity[2],
            'altitude_km': altitude_km,
            'velocity_magnitude_kms': vel_magnitude,
            'inclination': tle_data.get('inclination', tle_data.get('INCLINATION')),
            'eccentricity': tle_data.get('eccentricity', tle_data.get('ECCENTRICITY')),
            'mean_motion': tle_data.get('mean_motion', tle_data.get('MEAN_MOTION'))
        }, None
        
    except Exception as e:
        return None, str(e)

In [None]:
# Compute SGP4 vectors for sample messages
sgp4_results = []

for msg in messages:
    result, error = compute_sgp4_vectors(msg)
    if result:
        sgp4_results.append(result)
    else:
        print(f"Error for satellite {msg.get('satellite_id')}: {error}")

# Convert to DataFrame
df_sgp4 = pd.DataFrame(sgp4_results)
print(f"\nComputed SGP4 vectors for {len(df_sgp4)} satellites\n")
print(df_sgp4[['satellite_id', 'altitude_km', 'velocity_magnitude_kms', 'inclination']])

## 3. Visualize Position Vectors (3D)

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')

# Plot satellite positions
ax.scatter(df_sgp4['position_x'], df_sgp4['position_y'], df_sgp4['position_z'], 
           c='red', marker='o', s=100, label='Satellites')

# Plot Earth (sphere)
u = np.linspace(0, 2 * np.pi, 50)
v = np.linspace(0, np.pi, 50)
earth_radius = 6371.0
x = earth_radius * np.outer(np.cos(u), np.sin(v))
y = earth_radius * np.outer(np.sin(u), np.sin(v))
z = earth_radius * np.outer(np.ones(np.size(u)), np.cos(v))
ax.plot_surface(x, y, z, color='blue', alpha=0.3)

ax.set_xlabel('X (km)')
ax.set_ylabel('Y (km)')
ax.set_zlabel('Z (km)')
ax.set_title('Satellite Positions in Earth-Centered Inertial (ECI) Frame')
ax.legend()
plt.show()

## 4. Test Full Spark SGP4 Pipeline

The full pipeline runs as a Spark Structured Streaming job. To start it:

```bash
# Start TLE API
cd /home/bharath/Documents/BigData/project/data/Space-Debris-Risk-Prediction
uv run src/demo/api/tle_stream_api.py &

# Stream data to Kafka
uv run src/kafka/tle_api_to_kafka_producer.py --limit 100 &

# Run Spark SGP4 job (in Docker)
docker exec -u root spark-master /opt/spark/bin/spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  /opt/spark/work-dir/src/kafka/spark_sgp4_to_hdfs.py \
  --kafka broker:29092
```

## 5. Query HDFS Output (after Spark job runs)

In [None]:
# This cell requires PySpark - uncomment after Spark job writes data
"""
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Query_SGP4_Vectors") \
    .getOrCreate()

# Read from HDFS
df_hdfs = spark.read.parquet("/tmp/sgp4_vectors/")

# Show sample data
df_hdfs.select('satellite_id', 'altitude_km', 'velocity_magnitude_kms', 'epoch_time').show(10)

# Count total records
print(f"Total SGP4 vectors in HDFS: {df_hdfs.count()}")

# Query satellites above 400km altitude
high_altitude = df_hdfs.filter(df_hdfs.altitude_km > 400)
print(f"Satellites above 400km: {high_altitude.count()}")
"""

## 6. Verify Current Kafka Topic Stats

In [None]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic

# Connect to Kafka admin
admin_client = KafkaAdminClient(
    bootstrap_servers=['localhost:9092'],
    client_id='notebook_admin'
)

# List topics
topics = admin_client.list_topics()
print("Available Kafka topics:")
for topic in topics:
    print(f"  - {topic}")

# Get consumer group offsets (requires consumer to have committed)
consumer_groups = admin_client.list_consumer_groups()
print(f"\nActive consumer groups: {len(consumer_groups)}")