In [None]:
# Load JSON files
df = spark.read.option("multiline", "true").json("Files/Document_Analysis/*.json")

# Parse and flatten the nested JSON structure
from pyspark.sql.functions import col, explode_outer, to_timestamp

parsed_df = df.select(
    # Top-level fields - convert analysis_date to timestamp
    to_timestamp(col("analysis_date")).alias("analysis_date"),
    col("document_location"),
    col("source_document"),
    col("summary_text"),
    
    # Analysis metadata fields
    col("analysis_metadata.analysis_model").alias("analysis_model"),
    col("analysis_metadata.api_version").alias("api_version"),
    col("analysis_metadata.approach").alias("approach"),
    col("analysis_metadata.authentication").alias("authentication"),
    col("analysis_metadata.document_path").alias("document_path"),
    col("analysis_metadata.has_images").alias("has_images"),
    col("analysis_metadata.image_count").alias("image_count"),
    
    # Extracted data fields
    col("extracted_data.action_items").alias("action_items"),
    col("extracted_data.author").alias("author"),
    col("extracted_data.confidentiality").alias("confidentiality"),
    col("extracted_data.critical_risks").alias("critical_risks"),
    col("extracted_data.document_date").alias("document_date"),
    col("extracted_data.document_title").alias("document_title"),
    col("extracted_data.document_type").alias("document_type"),
    col("extracted_data.key_topics").alias("key_topics"),
    col("extracted_data.risk_rating").alias("risk_rating"),
    col("extracted_data.stakeholders").alias("stakeholders"),
    col("extracted_data.summary").alias("summary")
)

# Display the parsed dataframe
display(parsed_df)

# Show schema of parsed dataframe
parsed_df.printSchema()


In [None]:
# Save the parsed dataframe as a Delta table
parsed_df.write.mode("overwrite").format("delta").saveAsTable("Document_Analysis")

print("Table 'Document_Analysis' saved successfully in /Tables folder")
