In [0]:
# Step 1:  import Azure Iceberg
from pyspark.sql import SparkSession

# creat SparkSession and set up path of Iceberg and Azure 
spark = SparkSession.builder \
    .config("spark.sql.catalog.silver_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.silver_catalog.type", "hadoop") \
    .config("spark.sql.catalog.silver_catalog.warehouse", "wasb://silver@iceberg2024stockage.dfs.core.windows.net/") \
    .config("spark.hadoop.fs.azure.account.key.iceberg2024stockage.dfs.core.windows.net", "6u5KBSn4tf5V5ak1ROCGA2H6dKEojvQIy74085wkvmWKuyMA5UILRDw3f3kwRHl+nIV0ehuInAX9+ASt3LgZFw==") \
    .getOrCreate()

# Step 2: read CSV to transforme to Iceberg table
bronze_path = "dbfs:/mnt/iceberg-data/bronze/"
csv_files = [
    {"file": "Student Mental health.csv", "table": "mental_health"},
    {"file": "StressLevelDataset.csv", "table": "stress_levels"},
    {"file": "student_sleep_patterns.csv", "table": "sleep_patterns"},
    {"file": "StudentPerformanceFactors.csv", "table": "performance_factors"}
]

# treatment of each CSV 
for csv_file in csv_files: 
    try:
        file_path = f"{bronze_path}{csv_file['file']}"
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
        if df.rdd.isEmpty():
            raise FileNotFoundError(f"The file {csv_file['file']} does not exist or is empty at path: {file_path}")
        print(f"Successfully read {csv_file['file']}")

        # write data into Silver 
        table_name = f"silver_catalog.silver.{csv_file['table']}"
        df.write.format("iceberg").option("write-format", "parquet").mode("overwrite").saveAsTable(table_name)
        print(f"Successfully converted {csv_file['file']} to Iceberg table {table_name}")

    except Exception as e:
        print(f"Failed to process {csv_file['file']}: {e}")


Successfully read Student Mental health.csv
Successfully converted Student Mental health.csv to Iceberg table silver_catalog.silver.mental_health
Successfully read StressLevelDataset.csv
Successfully converted StressLevelDataset.csv to Iceberg table silver_catalog.silver.stress_levels
Successfully read student_sleep_patterns.csv
Successfully converted student_sleep_patterns.csv to Iceberg table silver_catalog.silver.sleep_patterns
Successfully read StudentPerformanceFactors.csv
Successfully converted StudentPerformanceFactors.csv to Iceberg table silver_catalog.silver.performance_factors


In [0]:


# 读取 mental_health 表数据
df_mental_health = spark.read.format("iceberg").load("silver_catalog.silver.mental_health")
df_mental_health.show(5)

# 读取 stress_levels 表数据
df_stress_levels = spark.read.format("iceberg").load("silver_catalog.silver.stress_levels")
df_stress_levels.show(5)



+--------------+------------------+---+--------------------+--------------------------+------------------+--------------+-----------------------+--------------------+-------------------------+--------------------------------------------+
|     Timestamp|Choose your gender|Age|What is your course?|Your current year of Study|What is your CGPA?|Marital status|Do you have Depression?|Do you have Anxiety?|Do you have Panic attack?|Did you seek any specialist for a treatment?|
+--------------+------------------+---+--------------------+--------------------------+------------------+--------------+-----------------------+--------------------+-------------------------+--------------------------------------------+
|8/7/2020 12:02|            Female| 18|         Engineering|                    year 1|       3.00 - 3.49|            No|                    Yes|                  No|                      Yes|                                          No|
|8/7/2020 12:04|              Male| 21|   Islami