In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Initialize Spark Session (Common in Azure Databricks/Synapse)
spark = SparkSession.builder.appName("CitadelInterviewData").getOrCreate()

# 1. DTBL_STUDENTS
students_data = [
    (101, "S001", "John", "Doe", "2010-05-15"),
    (102, "S002", "Jane", "Smith", "2011-08-22"),
    (103, "S003", "Robert", "Brown", "2010-12-05")
]
student_cols = ["STUDENTKEY", "STUDENTID", "FIRSTNAME", "LASTNAME", "BIRTHDATE"]
df_students = spark.createDataFrame(students_data, schema=student_cols)

# 2. DTBL_SCHOOLS
schools_data = [
    (501, 1001, "North Academy", "Dr. Alice", "K-5", "TX01"),
    (502, 1002, "South High", "Mr. Bob", "9-12", "TX02")
]
school_cols = ["SCHOOLKEY", "SCHOOLNUMBER", "SCHOOLNAME", "PRINCIPAL", "SCHOOLGRADES", "STATEID"]
df_schools = spark.createDataFrame(schools_data, schema=school_cols)

# 3. FTBL_ATTENDANCE
attendance_data = [
    (1, 9001, 101, 501, "P", "2025-10-01"),
    (2, 9001, 102, 501, "A", "2025-10-01"),
    (3, 9002, 101, 501, "P", "2025-10-02"),
    (4, 9002, 103, 502, "P", "2025-10-02")
]
attendance_cols = ["ATTENDANCEKEY", "FACILITIESKEY", "STUDENTKEY", "SCHOOLKEY", "ATTENDANCETYPE", "ATTENDANCEDATE"]
df_attendance = spark.createDataFrame(attendance_data, schema=attendance_cols)

# Show data
df_attendance.show()

+-------------+-------------+----------+---------+--------------+--------------+
|ATTENDANCEKEY|FACILITIESKEY|STUDENTKEY|SCHOOLKEY|ATTENDANCETYPE|ATTENDANCEDATE|
+-------------+-------------+----------+---------+--------------+--------------+
|            1|         9001|       101|      501|             P|    2025-10-01|
|            2|         9001|       102|      501|             A|    2025-10-01|
|            3|         9002|       101|      501|             P|    2025-10-02|
|            4|         9002|       103|      502|             P|    2025-10-02|
+-------------+-------------+----------+---------+--------------+--------------+



In [None]:
df_main1 = df_students.join(df_attendance,on="STUDENTKEY",how='inner')

df_main2 = df_main1.join(df_schools,on="SCHOOLKEY",how='inner')

df_main3 = df_main2.filter(df_main2['SCHOOLKEY']==501).filter(df_main2.ATTENDANCEDATE=='2025-10-01').filter()
df_main3.show()


+---------+----------+---------+---------+--------+----------+-------------+-------------+--------------+--------------+------------+-------------+---------+------------+-------+
|SCHOOLKEY|STUDENTKEY|STUDENTID|FIRSTNAME|LASTNAME| BIRTHDATE|ATTENDANCEKEY|FACILITIESKEY|ATTENDANCETYPE|ATTENDANCEDATE|SCHOOLNUMBER|   SCHOOLNAME|PRINCIPAL|SCHOOLGRADES|STATEID|
+---------+----------+---------+---------+--------+----------+-------------+-------------+--------------+--------------+------------+-------------+---------+------------+-------+
|      501|       102|     S002|     Jane|   Smith|2011-08-22|            2|         9001|             A|    2025-10-01|        1001|North Academy|Dr. Alice|         K-5|   TX01|
|      501|       101|     S001|     John|     Doe|2010-05-15|            1|         9001|             P|    2025-10-01|        1001|North Academy|Dr. Alice|         K-5|   TX01|
+---------+----------+---------+---------+--------+----------+-------------+-------------+--------------+

In [None]:
# from pyarrow import NULL
# df_main3.join(df_students,on='STUDENTKEY',how='outer').filter(df_main3.STUDENTID.isNull()).select(df_students.STUDENTID).show()

from pyspark.sql import functions as F

# 1. Alias your dataframes to avoid name collisions
df_m = df_main3.alias("m")
df_s = df_students.alias("s")

# 2. Perform the join
# 3. Filter using the specific alias
# 4. Select using the specific alias
result = df_m.join(df_s, on='STUDENTKEY', how='outer') \
             .filter(F.col("m.STUDENTID").isNull()) \
             .select("s.STUDENTID")

result.show()

+---------+
|STUDENTID|
+---------+
|     S003|
+---------+

