In [0]:
# ============================================
# SPACE DATA DISTRIBUTED SYSTEM DEMO
# Unity Catalog Safe â€“ No DBFS (Just for the demo )
# ============================================

from pyspark.sql.functions import *
from pyspark.sql.types import *
import random

spark.sql("CREATE DATABASE IF NOT EXISTS space_db")
spark.sql("USE space_db")

spark.sql("DROP TABLE IF EXISTS bronze_space")
spark.sql("DROP TABLE IF EXISTS silver_space")
spark.sql("DROP TABLE IF EXISTS gold_space")

# ============================================
# 1. Generate Mock Telescope Data (Batch)
# ============================================

num_rows = 1000000

data = []
for i in range(num_rows):
    lat = random.uniform(-90, 90)
    lon = random.uniform(-180, 180)

    # create sparse feature map (2 features per row)
    features = {
        random.randint(1,1000): random.random(),
        random.randint(1,1000): random.random()
    }

    data.append((i, lat, lon, features))

schema = StructType([
    StructField("obs_id", LongType()),
    StructField("lat", DoubleType()),
    StructField("lon", DoubleType()),
    StructField("features", MapType(IntegerType(), DoubleType()))
])

bronze_df = spark.createDataFrame(data, schema) \
    .withColumn("timestamp", current_timestamp())

spark.sql("DROP TABLE IF EXISTS bronze_space")

bronze_df.write.format("delta").saveAsTable("bronze_space")

print("======================================")
print("Bronze table created")
print("======================================")


# ============================================
# 2. Spatial Partitioning (Octant Example)
# ============================================

def compute_partition(lat, lon):
    if lat >= 0 and lon >= 0:
        return "P0"
    elif lat >= 0 and lon < 0:
        return "P1"
    elif lat < 0 and lon >= 0:
        return "P2"
    else:
        return "P3"

partition_udf = udf(compute_partition, StringType())

bronze_df = spark.table("bronze_space") \
    .withColumn("partition_id", partition_udf("lat","lon"))



bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("bronze_space")

print("Spatial partitioning added")
print("======================================")

# ============================================
# 3. Sparse Triplet Conversion (Silver Layer)
# ============================================

silver_df = bronze_df.select(
    "obs_id",
    "timestamp",
    "lat",
    "lon",
    "partition_id",
    explode("features").alias("feature_id","value")
)

silver_df.write \
    .format("delta") \
    .partitionBy("partition_id") \
    .mode("overwrite") \
    .saveAsTable("silver_space")

print("Silver table created (sparse triplets)")
print("======================================")

# ============================================
# 4. Optimize Layout
# ============================================

spark.sql("OPTIMIZE silver_space ZORDER BY (feature_id)")

print("Optimization complete")
print("======================================")


# ============================================
# 5. Gold Layer (Aggregations)
# ============================================

gold_df = spark.table("silver_space") \
    .groupBy("partition_id","feature_id") \
    .agg(
        avg("value").alias("avg_value"),
        count("*").alias("observation_count")
    )

gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_space")

print("Gold table created")
print("======================================")

# ============================================
# 6. Demo Queries
# ============================================

print("Top Features by Average Value:")
spark.sql("""
SELECT partition_id, feature_id, avg_value
FROM gold_space
ORDER BY avg_value DESC
LIMIT 20
""").show()
print("======================================")

print("Sample bronze Sparse Records:")
spark.sql("""
SELECT *
FROM bronze_space
LIMIT 20
""").show()
print("======================================")


# spark.sql("""
# EXPLAIN
# SELECT *
# FROM silver_space
# WHERE partition_id = 'P0'
# AND feature_id = 500
# """).show(truncate=False)
# print("======================================")

# spark.sql("DESCRIBE DETAIL silver_space").show(truncate=False)

# print("======================================")

