In [None]:
# PySpark in Microsoft Fabric Notebook

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc, sum as spark_sum
import os
import pandas as pd

# Create Spark session (Fabric usually provides this automatically)
spark = SparkSession.builder.appName("FabricPySparkAnalysis").getOrCreate()

# Path to CSV file in Fabric Lakehouse or local path
# In Fabric, replace with: "Files/sample_data.csv" or lakehouse path

# Use Fabric path if available, else local path
if os.path.exists("Files/sample_data.csv"):
    data_path = "Files/sample_data.csv"  # Fabric Lakehouse
else:
    policy_path = "data/policy_data.csv"   # Local
    plan_path = "data/plan_data.csv"       # Local
    client_path = "data/client_data.csv"   # Local    

df_policy = spark.read.option("header", True).option("inferSchema", True).csv(policy_path)
df_plan = spark.read.option("header", True).option("inferSchema", True).csv(plan_path)
df_client = spark.read.option("header", True).option("inferSchema", True).csv(client_path)


# Show first rows
print("üìÑ Raw Data Preview:")
df_policy.show(5)
df_plan.show(5)
df_client.show(5)


# Cast Value to numeric to avoid mixed-type aggregation errors
df_policy = df_policy.dropna(subset=["policy_id", "plan_id","client_id", "coverage_amount"])
df_policy = df_policy.withColumn("coverage_amount", col("coverage_amount").cast("double"))

df_plan=df_plan.dropna(subset=["plan_id", "plan_name","premium_amount"])
df_plan = df_plan.withColumn("premium_amount", col("premium_amount").cast("double"))

df_client=df_client.dropna(subset=["client_id", "client_name","client_address"])


# Join dataframes
df_joined = df_policy.join(df_plan, on="plan_id", how="inner")
df_joined = df_joined.join(df_client, on="client_id", how="inner").select("policy_id", "client_name", "client_address", "plan_name", "coverage_amount", "premium_amount")

df_joined.show()

# Example aggregation: average value per category
agg_df = (
    df_joined.groupBy("plan_name")
    .agg(avg("coverage_amount").alias("AverageCoverage"), count("*").alias("Count"), spark_sum("premium_amount").alias("TotalPremium"))
    .orderBy(desc("TotalPremium"))
)

# Show results
print("Aggregated Results:")
agg_df.show()


output_path = "data/output"

# Ensure parent directory exists for local runs (Fabric 'Files/' path handled by service)
parent = os.path.dirname(output_path)
if parent:
    os.makedirs(parent, exist_ok=True)

# Write as CSV using Pandas (no Hadoop libraries needed).
# In Fabric, you can use Spark's write.parquet() directly instead.
try:
    pandas_df = agg_df.toPandas()
    csv_file = os.path.join(output_path, "aggregated_data.csv")
    pandas_df.to_csv(csv_file, index=False)
    print(f"‚úÖ Aggregated data written to CSV: {csv_file}")
except Exception as e:
    print(f"‚ùå Error writing CSV to {output_path}: {e}")

üìÑ Raw Data Preview:
+--------+-----+
|Category|Value|
+--------+-----+
|       A|   10|
|       B|   15|
|       A|   20|
|       C|    5|
|       B|   25|
+--------+-----+

Aggregated Results:
+--------+------------+-----+----------+
|Category|AverageValue|Count|TotalValue|
+--------+------------+-----+----------+
|       C|         5.0|    1|       5.0|
|       B|        20.0|    2|      40.0|
|       A|        15.0|    2|      30.0|
+--------+------------+-----+----------+

+--------+-----+
|Category|Value|
+--------+-----+
|       A| 10.0|
|       B| 15.0|
|       A| 20.0|
|       B| 25.0|
+--------+-----+

‚úÖ Aggregated data written to CSV: data/output\aggregated_data.csv


git init
git add .
git commit -m "Initial commit: Fabric + PySpark project"
git branch -M main
git remote add origin https://github.com/hansenguxd/fabric-pyspark-analysis.git
git push -u origin main

git status
# push one file
git add notebooks/data_analysis.ipynb
git commit -m "Improved aggregation logic"
git push