In [2]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name,col, lit , current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType,LongType


csv_schema = StructType([
    StructField("source_id", LongType(), True),
    StructField("ra", DoubleType(), True),
    StructField("dec", DoubleType(), True),
    StructField("parallax", DoubleType(), True),
    StructField("parallax_error", DoubleType(), True),
     StructField("parallax_over_error", DoubleType(), True),
    StructField("pm", DoubleType(), True),
     StructField("pmra", DoubleType(), True),
    StructField("pmdec", DoubleType(), True),
     StructField("phot_g_mean_mag", DoubleType(), True),
     StructField("teff_gspphot", DoubleType(), True),
    StructField("distance_gspphot", DoubleType(), True),
    #StructField("processing_timestamp", TimestampType(), True)
])

# 2. Define the path to the directory Spark should monitor for new CSV files
input_csv_directory = "Files/Staging" 

# Ensure the directory exists (though Spark will monitor it even if initially empty)
print(f"Monitoring directory for new CSVs: {input_csv_directory}")

# 3. Read from the directory as a stream
# Spark will automatically discover new files added to this directory.
# It processes files based on their modification timestamps or names.
streaming_df = spark.readStream \
    .format("csv") \
    .schema(csv_schema) \
    .option("header", "true") \
    .load(input_csv_directory)
PARSEC_TO_METERS = 3.08567758*1e16
# You can add transformations here if needed, for example:
transformed_streaming_df = streaming_df \
   .withColumn("distance", col("distance_gspphot")*PARSEC_TO_METERS) \
   .withColumn("processing_timestamp",current_timestamp()) \
   .withColumn("source_file", input_file_name())


# 4. Write the stream to a Delta table
delta_table_name = "stg_gaia" # Your target Delta table name
checkpoint_location = f"Files/checkpoints/{delta_table_name}_checkpoint" # MUST be a unique, reliable path in OneLake

# Ensure the checkpoint directory exists or Spark has permissions to create it

# This is the simplest streaming sink.
query_append = (transformed_streaming_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_location) \
    .trigger(availableNow=True) \
    .start(f"abfss://b3979a08-4bbe-4d35-b864-4da7d2c8b2b4@onelake.dfs.fabric.microsoft.com/608b9b1d-a537-4410-abb1-240bd8dc87ff/Tables/dbo/stg_gaia"))

# Wait for the streaming query to finish
query_append.awaitTermination() # Writes to a metastore-registered Delta table
   

print(f"Streaming query started in append mode. New CSVs dropped in '{input_csv_directory}' will be loaded into '{delta_table_name}'.")
print(f"Checkpoint location: {checkpoint_location}")

StatementMeta(, 1b82218c-0272-43a0-b906-f75c6732565c, 9, Finished, Cancelled, Cancelled)

Monitoring directory for new CSVs: Files/Staging


Streaming query started in append mode. New CSVs dropped in 'Files/Staging' will be loaded into 'stg_gaia'.
Checkpoint location: Files/checkpoints/stg_gaia_checkpoint_test5
