In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
from datetime import datetime

## 1. Transform Student Details

In [None]:
# Read raw student details
raw_students = spark.table("raw_g4s_student_details")

# Transform to base schema matching original C# logic
base_students = raw_students.select(
    # Composite key matching C# implementation
    concat(col("_academy_code"), col("_academic_year"), lit("-"), col("id")).alias("StudentId"),
    col("_academic_year").alias("DataSet"),
    col("_academy_code").alias("Academy"),
    col("id").alias("G4SStuId"),
    
    # Student name fields
    col("legal_first_name").alias("LegalFirstName"),
    col("legal_last_name").alias("LegalLastName"),
    col("preferred_first_name").alias("PreferredFirstName"),
    col("preferred_last_name").alias("PreferredLastName"),
    col("middle_names").alias("MiddleNames"),
    
    # Demographics
    col("sex").alias("Sex"),
    to_timestamp(col("date_of_birth"), "yyyy-MM-dd'T'HH:mm:ss'Z'").alias("DateOfBirth"),
    
    # Audit columns
    col("_ingested_at").alias("IngestedAt"),
    current_timestamp().alias("TransformedAt")
)

# Write to base layer
base_students.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("Academy", "DataSet") \
    .saveAsTable("base_students")

print(f"✓ Transformed {base_students.count()} student records")

## 2. Transform Education Details

In [None]:
# Read raw education details
raw_education = spark.table("raw_g4s_education_details")

# Transform to base schema
base_education = raw_education.select(
    # Foreign key to Students
    concat(col("_academy_code"), col("_academic_year"), lit("-"), col("student_id")).alias("StudentId"),
    col("_academic_year").alias("DataSet"),
    col("_academy_code").alias("Academy"),
    
    # Education fields (adjust based on actual API response structure)
    col("admission_date").alias("AdmissionDate"),
    col("admission_number").alias("AdmissionNumber"),
    col("year_group").alias("YearGroup"),
    col("registration_group").alias("RegistrationGroup"),
    col("house").alias("House"),
    col("upn").alias("UPN"),
    col("former_upn").alias("FormerUPN"),
    col("enrolment_status").alias("EnrolmentStatus"),
    col("leaving_date").alias("LeavingDate"),
    
    # Audit columns
    col("_ingested_at").alias("IngestedAt"),
    current_timestamp().alias("TransformedAt")
)

# Write to base layer
base_education.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("Academy", "DataSet") \
    .saveAsTable("base_education_details")

print(f"✓ Transformed {base_education.count()} education detail records")

## 3. Transform Student Attributes (General, Demographic, SEND, Sensitive)

In [None]:
# Function to transform attribute data
def transform_attributes(raw_table_name, attribute_type):
    """
    Transform student attribute data from raw to base layer
    Handles nested attribute structure from API
    """
    raw_df = spark.table(raw_table_name)
    
    # Explode nested attributes if they exist
    # The API typically returns attributes as an array of objects
    if "attributes" in raw_df.columns:
        exploded = raw_df.select(
            col("_academy_code"),
            col("_academic_year"),
            col("student_id"),
            explode(col("attributes")).alias("attribute")
        )
        
        transformed = exploded.select(
            concat(col("_academy_code"), col("_academic_year"), lit("-"), col("student_id")).alias("StudentId"),
            col("_academic_year").alias("DataSet"),
            col("_academy_code").alias("Academy"),
            lit(attribute_type).alias("AttributeType"),
            col("attribute.name").alias("AttributeName"),
            col("attribute.value").alias("AttributeValue"),
            col("attribute.id").alias("G4SAttributeId"),
            current_timestamp().alias("TransformedAt")
        )
    else:
        # Handle flat structure if attributes are not nested
        transformed = raw_df.select(
            concat(col("_academy_code"), col("_academic_year"), lit("-"), col("student_id")).alias("StudentId"),
            col("_academic_year").alias("DataSet"),
            col("_academy_code").alias("Academy"),
            lit(attribute_type).alias("AttributeType"),
            current_timestamp().alias("TransformedAt")
        )
    
    return transformed

# Transform each attribute type
attribute_types = [
    ("raw_g4s_general_attributes", "General"),
    ("raw_g4s_demographic_attributes", "Demographic"),
    ("raw_g4s_send_attributes", "SEND"),
    ("raw_g4s_sensitive_attributes", "Sensitive")
]

all_attributes = []
for raw_table, attr_type in attribute_types:
    try:
        if spark.catalog.tableExists(raw_table):
            df = transform_attributes(raw_table, attr_type)
            all_attributes.append(df)
            print(f"✓ Transformed {attr_type} attributes from {raw_table}")
    except Exception as e:
        print(f"✗ Error transforming {raw_table}: {str(e)}")

# Union all attribute types
if all_attributes:
    from functools import reduce
    from pyspark.sql import DataFrame
    
    base_attributes = reduce(DataFrame.union, all_attributes)
    
    # Write to base layer
    base_attributes.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .partitionBy("Academy", "DataSet", "AttributeType") \
        .saveAsTable("base_student_attributes")
    
    print(f"✓ Transformed {base_attributes.count()} total attribute records")
else:
    print("⚠ No attribute data found")

## 4. Data Quality Checks

In [None]:
# Perform data quality checks
print("\n=== Data Quality Checks ===")

# Check 1: Student counts match between raw and base
raw_count = spark.table("raw_g4s_student_details").count()
base_count = spark.table("base_students").count()
print(f"Raw student records: {raw_count}")
print(f"Base student records: {base_count}")
print(f"Match: {'✓' if raw_count == base_count else '✗'}")

# Check 2: No null StudentIds
null_ids = spark.table("base_students").filter(col("StudentId").isNull()).count()
print(f"\nNull StudentIds: {null_ids} {'✓' if null_ids == 0 else '✗'}")

# Check 3: DateOfBirth validity
invalid_dob = spark.table("base_students") \
    .filter((col("DateOfBirth") < lit("1900-01-01")) | (col("DateOfBirth") > current_date())) \
    .count()
print(f"Invalid DateOfBirth records: {invalid_dob} {'✓' if invalid_dob == 0 else '✗'}")

# Check 4: Education details exist for all students
students_with_education = spark.sql("""
    SELECT COUNT(DISTINCT s.StudentId) as count
    FROM base_students s
    INNER JOIN base_education_details e ON s.StudentId = e.StudentId
""").collect()[0][0]
print(f"\nStudents with education details: {students_with_education}/{base_count}")

print("\n=== Transformation Complete ===")

## 5. Create Summary Views (Optional)

In [None]:
# Create a denormalized view combining students and education details
spark.sql("""
CREATE OR REPLACE VIEW vw_students_complete AS
SELECT 
    s.*,
    e.AdmissionDate,
    e.AdmissionNumber,
    e.YearGroup,
    e.RegistrationGroup,
    e.House,
    e.UPN,
    e.EnrolmentStatus,
    e.LeavingDate
FROM base_students s
LEFT JOIN base_education_details e ON s.StudentId = e.StudentId
""")

print("✓ Created vw_students_complete view")