### Summary of Work Done in This Notebook

- **Ingested and cleaned raw data tables**: Loaded bronze layer tables (`bronze_students`, `bronze_courses`, `bronze_enrollments`, `bronze_results`) from the database.
- **Transformed and standardized data**: Casted columns to appropriate data types, removed rows with missing critical fields, and dropped duplicates for each entity.
- **Created Silver Layer Delta Tables**:
  - `silver_students`
  - `silver_courses`
  - `silver_enrollments`
  - `silver_results`
- **Wrote cleaned DataFrames to Delta tables**: Used overwrite mode and schema evolution to ensure up-to-date, clean data.
- **Displayed the contents of each silver table**: Verified data quality and structure after transformation.

In [0]:
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import functions as F

# Define the schema/database name
schema_name = "kusha_solutions.Jeevan"

# Read the bronze_students table as a DataFrame
students_df = spark.table(f"{schema_name}.bronze_students")

# Clean and transform the students DataFrame
students_silver = (
    students_df
    .withColumn("student_id", F.col("student_id").cast(IntegerType()))      # Cast student_id to Integer
    .withColumn("name", F.col("name").cast(StringType()))                   # Cast name to String
    .withColumn("email", F.col("email").cast(StringType()))                 # Cast email to String
    .withColumn("age", F.col("age").cast(IntegerType()))                    # Cast age to Integer
    .withColumn("city", F.col("city").cast(StringType()))                   # Cast city to String
    .withColumn("department", F.col("department").cast(StringType()))       # Cast department to String
    .dropna(subset=["student_id", "name", "email","age", "city", "department"])  # Remove rows with nulls in key fields
    .dropDuplicates(["student_id", "email"])                               # Remove duplicate students based on student_id and email
)

# Write the cleaned DataFrame to the silver_students Delta table
students_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .saveAsTable(f"{schema_name}.silver_students")

print("✅ silver_students table created")

In [0]:
# Define the schema/database name
schema_name = "kusha_solutions.Jeevan"

# Read the silver_students Delta table into a Spark DataFrame
silver_students_df = spark.read.table(f"{schema_name}.silver_students")

# Display the contents of the silver_students DataFrame
display(silver_students_df)

In [0]:
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import functions as F

# Define the target schema/database
schema_name = "kusha_solutions.Jeevan"

# Load the raw bronze_courses table into a DataFrame
courses_df = spark.table(f"{schema_name}.bronze_courses")

# Transform the raw data into a clean silver layer:
#   - Cast columns to appropriate data types
#   - Remove rows with missing critical fields (course_id, course_name)
#   - Drop duplicate records based on course_id
courses_silver = (
    courses_df
    .withColumn("course_id", F.col("course_id").cast(StringType()))   # ensure course_id is string
    .withColumn("course_name", F.col("course_name").cast(StringType()))  # ensure course_name is string
    .withColumn("credits", F.col("credits").cast(IntegerType()))    # ensure credits is integer
    .dropna(subset=["course_id", "course_name"])                    # drop rows missing key identifiers
    .dropDuplicates(["course_id"])                                 # keep only one record per course_id
)

# Write the cleaned DataFrame to a Delta table in the silver layer,
# overwriting any existing data and updating the schema if needed
courses_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .saveAsTable(f"{schema_name}.silver_courses")

print("✅ silver_courses table created")

In [0]:
# Define the schema/database name to use for table references
schema_name = "kusha_solutions.Jeevan"

# Load the cleaned silver_courses Delta table into a Spark DataFrame
silver_courses_df = spark.read.table(f"{schema_name}.silver_courses")

# Display the contents of the silver_courses DataFrame in a tabular format
display(silver_courses_df)

In [0]:
# Import necessary data types and functions for Spark DataFrame transformations
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql import functions as F

# Define the schema/database name
schema_name = "kusha_solutions.Jeevan"

# Load the raw bronze_enrollments table into a DataFrame
enrollments_df = spark.table(f"{schema_name}.bronze_enrollments")

# Clean and transform the enrollments DataFrame:
#   - Cast columns to appropriate data types
#   - Remove rows with missing critical fields (student_id, course_id, status)
#   - Drop duplicate records based on student_id and course_id
enrollments_silver = (
    enrollments_df
    .withColumn("enrollment_id", F.col("enrollment_id").cast(IntegerType()))  # Cast enrollment_id to Integer
    .withColumn("student_id", F.col("student_id").cast(IntegerType()))        # Cast student_id to Integer
    .withColumn("course_id", F.col("course_id").cast(StringType()))           # Cast course_id to String
    .withColumn("enroll_date", F.col("enroll_date").cast(DateType()))         # Cast enroll_date to Date
    .withColumn("status", F.col("status").cast(StringType()))                 # Cast status to String
    .dropna(subset=["student_id", "course_id","status"])                     # Remove rows with nulls in key fields
    .dropDuplicates(["student_id", "course_id"])                             # Remove duplicate enrollments per student and course
)

# Write the cleaned DataFrame to the silver_enrollments Delta table
enrollments_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .saveAsTable(f"{schema_name}.silver_enrollments")

# Indicate successful creation of the silver_enrollments table
print("✅ silver_enrollments table created")

In [0]:
# Define the schema/database name for table references
schema_name = "kusha_solutions.Jeevan"

# Load the cleaned silver_enrollments Delta table into a Spark DataFrame
silver_enrollments_df = spark.read.table(f"{schema_name}.silver_enrollments")

# Display the contents of the silver_enrollments DataFrame in a tabular format
display(silver_enrollments_df)

In [0]:
# Import necessary data types and functions for Spark DataFrame transformations
from pyspark.sql.types import IntegerType, StringType, FloatType, DateType
from pyspark.sql import functions as F

# Define the schema/database name
schema_name = "kusha_solutions.Jeevan"

# Load the raw bronze_results table into a DataFrame
results_df = spark.table(f"{schema_name}.bronze_results")

# Clean and transform the results DataFrame:
#   - Cast columns to appropriate data types
#   - Remove rows with missing critical fields (enrollment_id, marks, grade)
#   - Drop duplicate records based on enrollment_id and result_id
results_silver = (
    results_df
    .withColumn("result_id", F.col("result_id").cast(IntegerType()))         # Cast result_id to Integer
    .withColumn("enrollment_id", F.col("enrollment_id").cast(IntegerType())) # Cast enrollment_id to Integer
    .withColumn("marks", F.col("marks").cast(FloatType()))                   # Cast marks to Float
    .withColumn("grade", F.col("grade").cast(StringType()))                  # Cast grade to String
    .dropna(subset=["enrollment_id", "marks","grade"])                      # Remove rows with nulls in key fields
    .dropDuplicates(["enrollment_id","result_id"])                           # Remove duplicate results per enrollment and result
)

# Write the cleaned DataFrame to the silver_results Delta table
results_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .saveAsTable(f"{schema_name}.silver_results")

# Indicate successful creation of the silver_results table
print("✅ silver_results table created")

In [0]:
# Define the schema/database name for table references
schema_name = "kusha_solutions.Jeevan"

# Read the cleaned silver_results Delta table into a Spark DataFrame
silver_results_df = spark.read.table(f"{schema_name}.silver_results")

# Display the contents of the silver_results DataFrame in a tabular format
display(silver_results_df)