**Import Libraries**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, lit, to_date, log10, round, from_json, explode
from datetime import datetime, timedelta

**Logging**

In [0]:
spark = SparkSession.builder.appName("Neo Transform").getOrCreate()

def log_event(level, message):
    log_df = spark.createDataFrame([(datetime.now(), level, message)],["timestamp", "level", "message"])
    log_df.write.format("delta").mode("append").saveAsTable("neo_logs")
    


**Silver Layer: feature engineering**

In [0]:
def transform_silver():

    df_neos = spark.table("neos")
    df_approaches = spark.table("approaches")
    df_orbit = spark.table("orbits")

    df_silver_join = df_approaches.join(df_neos, df_approaches.neo_id == df_neos.id, "inner").withColumn("close_approach_date", to_date("close_approach_date")).join(df_orbit, df_neos.id == df_orbit.orbit_id, "inner")

    silver_df = df_silver_join.select(
        col("id").alias("neo_id"),
        "name", 
        round(col("absolute_magnitude_h"), 2).alias("absolute_magnitude_h"),  
        round(col("estimated_diameter_min"), 2).alias("diameter_min_m"),
        round(col("estimated_diameter_max"), 2).alias("diameter_max_m"),
        "is_potentially_hazardous",
        "close_approach_date", 
        round(col("relative_velocity_km_h"), 2).alias("velocity_km_h"),
        round(col("relative_velocity_km_s"), 2).alias("velocity_km_s"),
        round(col("miss_distance_km"), 2).alias("miss_distance_km"),            
        "orbiting_body",
        round(col("orbital_period_days"), 2).alias("orbital_period_days"),         
        round(col("orbital_period_years"), 2).alias("orbital_period_years"),          
        "semi_major_axis",
        "eccentricity",
        "inclination"
    )

    # define "50year windows"
    today = datetime.today().date()
    past_25 = today - timedelta(days=25*365)
    future_25 = today + timedelta(days=25*365)

    # Filter data, add the derived columns
    df_silver = (silver_df.filter
    (
        col("diameter_min_m").isNotNull() & 
        col("diameter_max_m").isNotNull() &
        col("orbital_period_years").isNotNull() &
        col("miss_distance_km").isNotNull()
        ).withColumn(
            "size_avg", (silver_df["diameter_min_m"] + silver_df["diameter_max_m"]) / 2
        ).withColumn("size_category", when(col("size_avg") < 10, "<10m")
                                .when(col("size_avg") < 50, "10-50m")
                                .when(col("size_avg") < 100, "50-100m")
                                .when(col("size_avg") < 500, "100m-500m")
                                .otherwise(">500m")
        ).withColumn("size_category_label", when(col("diameter_max_m") < 150, "Small")
                                .when(col("diameter_max_m") < 500, "Medium")
                                .otherwise("Large")
        ).withColumn("in_50yr_window", 
                     when((col("close_approach_date") >= lit(past_25)) & 
                          ((col("close_approach_date") <= lit(future_25))), True)
                          .otherwise(False)
        ).withColumn("hazard_category", when(( col("is_potentially_hazardous") == True) & 
                                             ( col("miss_distance_km") < 7500000) &
                                             ( col("absolute_magnitude_h") < 22),
                                                            "High Risk"
                                        ).when((col("is_potentially_hazardous") == True),
                                                    "Moderate Risk"
                                              ).otherwise("Low Risk")
        ).withColumn("log_diameter_max", round(log10(col("diameter_max_m") + 1), 2)
        ).withColumn("log_miss_distance", round(log10(col("miss_distance_km") + 1), 2)
        ).withColumn("distance_category", when(col("miss_distance_km") < 100000, "<100k km")
                                    .when(col("miss_distance_km") < 500000, "100k-500k km")
                                    .when(col("miss_distance_km") < 1000000, "500k-1M km")
                                    .when(col("miss_distance_km") < 5000000, "1M-5M km")
                                    .otherwise (">5M km")
        ).withColumn("hazard_level", when(col("is_potentially_hazardous"), "Hazardous")
                                    .otherwise("Non-Hazardous"))
    )       
        
        
    return df_silver




**Write Silver**

In [0]:
def write_silver(df, table_name):
    try:
        if df.isEmpty():
            log_event("INFO", f"No data to write to silver table: {table_name}")
            return
        df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name)
        log_event("INFO", f"Data written to silver table: {table_name}")
    except Exception as e:
        log_event("ERROR", f"Error writing data to silver table: {table_name}. Error: {e}")
        raise e

**Orchestrate the pipeline**

In [0]:
def run_neo_transform():
    neo_silver = transform_silver()
    write_silver(neo_silver, "neo_approaches")

**Run the pipeline**

In [0]:
run_neo_transform()