In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

In [None]:
df_date_spark = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("data/dim_Date.csv")

df_attendancesessions_spark = spark.read.parquet("data/fact_AttendanceSession")
df_organisation_spark = spark.read.parquet("data/dim_Organisation")
df_student_spark = spark.read.parquet("data/dim_Student")
df_studentextended_spark = spark.read.parquet("data/dim_StudentExtended")

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import countDistinct
from functools import reduce

In [None]:
def show_df_missing_breakdown(df: DataFrame) -> None:

    total_rows = df.count()
    total_cols = len(df.columns)

    agg_exprs = []
    for field in df.schema.fields:
        col_name = field.name
        is_numeric = field.dataType.typeName() in (
            "double", "float", "decimal",
            "integer", "long", "short", "byte"
        )

        # Count NULLs
        null_count_expr = F.sum(
            F.when(F.col(col_name).isNull(), 1).otherwise(0)
        ).alias(col_name + "_nullCount")


        empty_count_expr = F.sum(
            F.when(F.col(col_name).cast("string") == "", 1).otherwise(0)
        ).alias(col_name + "_emptyCount")

        na_str_expr = F.sum(
            F.when(
                F.upper(F.col(col_name).cast("string")).isin("NA", "NAN"),
                1
            ).otherwise(0)
        ).alias(col_name + "_naStrCount")

        # Count numeric NaN (only for numeric columns)
        if is_numeric:
            nan_numeric_expr = F.sum(
                F.when(F.isnan(F.col(col_name)), 1).otherwise(0)
            ).alias(col_name + "_nanNumericCount")
        else:
            # For non-numeric columns, this will always be 0
            nan_numeric_expr = F.lit(0).alias(col_name + "_nanNumericCount")

        # Collect all expressions
        agg_exprs.extend([
            null_count_expr, empty_count_expr, na_str_expr, nan_numeric_expr
        ])

    # Perform a single pass to get all missing counts
    agg_df = df.select(agg_exprs)
    result_row = agg_df.collect()[0].asDict()  # single row with all counts

    # Print header
    print(f"DataFrame has {total_rows} rows and {total_cols} columns.\n")
    print(
        "Column                             "
        "Null  EmptyStr  NA/NaNStr  NumericNaN  TotalMissing  %Missing"
    )
    print("-" * 70)

    # Loop over columns and print breakdown
    for field in df.schema.fields:
        c = field.name
        null_count = result_row[c + "_nullCount"]
        empty_count = result_row[c + "_emptyCount"]
        na_str_count = result_row[c + "_naStrCount"]
        nan_numeric_count = result_row[c + "_nanNumericCount"]

        total_missing = null_count + empty_count + na_str_count + nan_numeric_count
        pct_missing = (total_missing / total_rows * 100) if total_rows else 0.0

        print(
            f"{c:34s}"
            f"{null_count:5d}"
            f"{empty_count:10d}"
            f"{na_str_count:10d}"
            f"{nan_numeric_count:12d}"
            f"{total_missing:13d}"
            f"{pct_missing:10.2f}%"
        )

In [None]:
ef show_distinct_counts(df: DataFrame, top_n: int = 20) -> None:
    """
    Displays the number of distinct values in each column of the DataFrame
    and lists the top_n columns with the highest distinct counts.

    Additionally, creates and displays a DataFrame containing all columns with their distinct counts.

    Parameters:
    df (DataFrame): The Spark DataFrame to analyze.
    top_n (int): The number of top columns to display based on distinct counts.
    """
    # Calculate distinct counts for each column
    distinct_counts = df.agg(*[countDistinct(c).alias(c) for c in df.columns]).collect()[0].asDict()

    # Sort columns by distinct count in descending order
    sorted_counts = sorted(distinct_counts.items(), key=lambda x: x[1], reverse=True)

    # Display the top_n columns
    print(f"{'Column':34s} {'Distinct Count'}")
    print("-" * 50)
    for col, cnt in sorted_counts[:top_n]:
        print(f"{col:34s} {cnt}")

    # Create a DataFrame of all distinct counts
    df_distinct_counts = spark.createDataFrame(sorted_counts, ["Column", "Distinct_Count"])

    # Show the DataFrame of distinct counts
    print("\nAll Column Distinct Counts:")

In [None]:
from pyspark.sql import SparkSession, DataFrame, functions as F

def show_distinct_counts_approx(df: DataFrame, top_n: int = 20, rsd: float = 0.05) -> None:
    """
    Displays the approximate number of distinct values in each column of the DataFrame
    and lists the top_n columns with the highest distinct counts.

    Additionally, creates and displays a DataFrame containing all columns with
    their approximate distinct counts, but only shows the top_n rows to reduce
    the chance of memory/network issues.

    Parameters:
    -----------
    df : DataFrame
        The Spark DataFrame to analyze.
    top_n : int
        The number of top columns to display based on distinct counts.
    rsd : float
        Relative Standard Deviation for approx_count_distinct.
        Lower = more accurate but more memory usage. Typical default is 0.05.
    """

    # Build a list of approx_count_distinct expressions for each column
    approx_exprs = [
        F.approx_count_distinct(F.col(c), rsd=rsd).alias(c)
        for c in df.columns
    ]

    # Collect the single row of approximate distinct counts as a dict
    #  e.g. {'colA': 123, 'colB': 999, ...}
    approx_counts_row = df.agg(*approx_exprs).collect()[0].asDict()

    # Convert that dict into a list of (column, distinct_count) tuples and sort
    sorted_counts = sorted(approx_counts_row.items(), key=lambda x: x[1], reverse=True)

    # Print header
    print(f"{'Column':34s} {'Approx Distinct Count'}")
    print("-" * 60)

    # Show only the top_n columns in console
    for col_name, cnt in sorted_counts[:top_n]:
        print(f"{col_name:34s} {cnt}")

    # Create a small DataFrame from the sorted counts
    # Each row: (column_name, approx_distinct_count)
    spark = SparkSession.builder.getOrCreate()
    df_approx_counts = spark.createDataFrame(
        sorted_counts, ["Column", "ApproxDistinctCount"]
    )

    # Show only the top_n rows, so we don't blow up memory
    print("\nAll Column Approx Distinct Counts (showing top_n only):")
    df_approx_counts.limit(top_n).show(truncate=False)

In [None]:
show_df_missing_breakdown(df_studentextended_spark)

In [None]:
df_student_spark.show()

In [None]:
show_df_missing_breakdown(df_student_spark)

In [None]:
df_organisation_spark.show()

In [None]:
show_df_missing_breakdown(df_organisation_spark)


In [None]:
df_attendancesessions_spark.show()


In [None]:
show_df_missing_breakdown(df_attendancesessions_spark)


In [None]:
from pyspark.sql import functions as F

#Creat a new column datekey in the df_attendancesessions_spark which will take the value of the Date column without the "-" character.
#This can act as key to join the df_attendancesessions_spark with the df_date_spark

df_attendancesessions_spark = df_attendancesessions_spark.withColumn(
    "datekey",
    F.regexp_replace("Date", "-", "").cast("int")
)
df_attendancesessions_spark.show()

In [None]:
df_date_spark.show() #check the df_date_spark


In [None]:
show_df_missing_breakdown(df_date_spark)

In [None]:
from pyspark.sql import functions as F

# 1. Alias DataFrames to reference them in the join condition and in the column selection
df_att_aliased = df_attendancesessions_spark.alias("att")
df_org_aliased = df_organisation_spark.alias("org")
df_stu_aliased = df_student_spark.alias("stu")
df_stex_aliased = df_studentextended_spark.alias("stex")
df_date_aliased = df_date_spark.alias("dd")

# 2. Join them explicitly
df_joined = (
    df_att_aliased
    .join(df_org_aliased, df_att_aliased["organisationkey"] == df_org_aliased["organisationkey"], "left")
    .join(df_stu_aliased, df_att_aliased["studentkey"] == df_stu_aliased["studentkey"], "left")
    .join(df_stex_aliased, df_att_aliased["studentkey"] == df_stex_aliased["studentkey"], "left")
    .join(df_date_aliased, df_att_aliased["datekey"] == df_date_aliased["DateKey"], "left") #join the df_attendancesessions_spark with the df_date_spark
)

# 3. Programmatically build a list of columns to select
#    Each column is referenced by alias + column name, and renamed with a prefix
att_cols = [F.col(f"att.{c}").alias(f"att_{c}") for c in df_attendancesessions_spark.columns]
org_cols = [F.col(f"org.{c}").alias(f"org_{c}") for c in df_organisation_spark.columns]
stu_cols = [F.col(f"stu.{c}").alias(f"stu_{c}") for c in df_student_spark.columns]
stex_cols = [F.col(f"stex.{c}").alias(f"stex_{c}") for c in df_studentextended_spark.columns]
date_cols = [F.col(f"dd.{c}").alias(f"dd_{c}") for c in df_date_spark.columns]

# Combine all these column lists
all_cols = att_cols + org_cols + stu_cols + stex_cols + date_cols

# 4. Select everything into a new DataFrame, with prefixed column names
df_joined_renamed = df_joined.select(*all_cols)

df_joined_renamed.show(truncate=False)

In [None]:
#check the data types of the columns
df_joined_renamed.dtypes

In [None]:
total_rows = df_joined_renamed.count()
print(f"Total rows: {total_rows}")

In [None]:
column_count = len(df_joined_renamed.columns)
print(f"Number of columns: {column_count}")

In [None]:
from pyspark.sql import functions as F

df_selected = (
    df_joined_renamed
    .select(
        # --- Student details & school info ---
        F.col("stu_Sex").alias("gender"),
        F.col("stu_Forename").alias("student_forename"),
        F.col("stu_Surname").alias("student_surname"),
        F.col("stex_Pupil_Premium_Indicator").alias("pupil_premium"),
        F.col("stex_Year_Group").alias("year_group"),
        F.col("stex_Current_NC_Year").alias("nc_year"),
        F.col("org_Organisation_Type").alias("school_type"),
        F.col("org_Organisation_Name").alias("school"),
        F.col("org_Establishment_Number").alias("establishment_number"),
        F.col("org_LA_Code").alias("la_code"),

        # --- Dates ---
        F.col("att_Date").alias("attendance_date"),
        F.col("dd_AcademicYear").alias("academic_year"),
        F.col("dd_AcademicWeekNumberOfYear").alias("academic_week_number"),
        F.col("dd_TermSession").alias("term"),
        # Replace below if "weekcommencingdate" doesn't exist.
        # For example, use "dd_WeekStartDate" or "dd_WeekCommencing DD/MM/YYYY" from your schema.
        F.col("dd_WeekCommencingName").alias("weekcommencing"),

        # --- Attendance fields ---
        F.col("att_Mark").alias("mark"),
        F.col("att_Session").alias("session"),
        # Use a valid alias for 'att_is_aea' (spaces in column names can cause issues)
        F.col("att_is_aea").alias("is_approved_educational_activity"),
        F.col("att_is_attend").alias("is_attend"),
        F.col("att_is_auth_abs").alias("is_auth_abs"),
        F.col("att_is_late_L").alias("late"),
        F.col("att_is_late_U").alias("late_unauthorised"),
        F.col("att_is_missing").alias("missing"),
        F.col("att_is_nr").alias("no_reason"),
        F.col("att_is_possible").alias("is_possible"),
        F.col("att_is_present").alias("is_present"),
        F.col("att_is_unauth_abs").alias("is_unauth_abs"),

        # --- Current student info ---
        F.col("stex_Is_Current").alias("current_student"),
        F.col("stex_Leaving_Date").alias("leaving_date"),
        F.col("stu_UPN").alias("UPN")
    )
)

df_selected.show(truncate=False)

In [None]:
df_selected.select("year_group").distinct().show()

In [None]:
df_tidy = (
    df_selected
    .withColumn(
        "year_group_tidy",
        F.when(
            F.col("year_group").isin("Nursery 1", "Nursery 2"),
            F.regexp_replace("year_group", "Nursery ", "N")
        )
        .when(
            F.col("year_group") == "R",
            F.lit("Reception")
        )
        .when(
            F.col("year_group") == "Year 13",
            F.lit("Y13")
        )
        .when(
            F.col("year_group").rlike("^[0-9]+$"),
            F.concat(F.lit("Y"), F.col("year_group").cast("int"))
        )
        .when(
            F.col("year_group").rlike("^Y[0-9]{1,2}$"),
            F.concat(F.lit("Y"), F.regexp_replace("year_group", "^[Yy]", "").cast("int"))
        )
        .otherwise(F.col("year_group"))
    )
)

# Get distinct values
distinct_vals = df_tidy.select("year_group_tidy").distinct()
count_distinct = distinct_vals.count()

print(f"Number of distinct values in 'year_group_tidy': {count_distinct}")
distinct_vals.show(truncate=False)

In [None]:
show_df_missing_breakdown(df_tidy.select("year_group_tidy"))

In [None]:
df_tidy.groupBy("year_group_tidy").count().show()

In [None]:
df_selected.select("nc_year").distinct().show()

In [None]:
#count the number of students in each NC year
df_selected.groupBy("nc_year").count().show()

In [None]:
show_df_missing_breakdown(df_selected.select("nc_year"))


In [None]:
# number of distinct values in the columns all except 'student_forename','student_surname', and 'UPN'

show_distinct_counts_approx(df_selected.drop("student_forename", "attendance_date", "student_surname", "UPN"))


In [None]:
#list the distinct values in the column 'mark'
df_selected.select("mark").distinct().show()
df_selected.select("academic_year").distinct().show()


In [None]:
df_selected.show(truncate=False)


In [None]:
# 1. Create a new field by concatenating UPN and attendance_date and session (AM/PM) with an underscore separator
#this will be used to identify the unique attendance record for each student per day per session
df_with_combined = df_selected.withColumn(
    "UPN_AttendanceDate",
    F.concat_ws("_", F.col("UPN"), F.col("attendance_date"), F.col("session"))
)


# 2. Group by this new field and filter for count == 1 (i.e. unique)
df_valid = (
    df_with_combined
    .groupBy("UPN_AttendanceDate")
    .count()
    .filter(F.col("count") == 1) #each student can have only one attendance per day per session AM or PM
)

# 3. Group by this new field and filter for count > 1 (i.e. duplicates)
df_invalid = (
    df_with_combined
    .groupBy("UPN_AttendanceDate")
    .count()
    .filter(F.col("count") > 1)
)


df_with_combined.show(truncate=False)

In [None]:
df_invalid.show(truncate=False)


In [None]:
print(f"Total rows with original df: {df_joined_renamed.count()}")

print(f"Total rows with selected data df: {df_selected.count()}")

print(f"Total rows valid data: {df_valid.count()}")

print(f"Total rows invalid data: {df_invalid.count()}")


In [None]:
# 1. Aggregate all keys with their counts
df_counts = (
    df_with_combined
    .groupBy("UPN_AttendanceDate")
    .agg(F.count("*").alias("count"))
    .withColumn(
        "status",
        F.when(F.col("count") == 1, "valid").otherwise("invalid")
    )
)

# 2. Join back to original rows to get the full data plus the status
df_with_status = (
    df_with_combined.alias("a")
    .join(df_counts.alias("b"), on="UPN_AttendanceDate", how="left")
    .select("a.*", "b.count", "b.status")
)

df_with_status.show(truncate=False)

In [None]:
df_summary = (
    df_validf_rows
    .groupBy("school",  "nc_year", "weekcommencing")
    .agg(
        F.round(
            (F.sum("is_attend") / F.sum("is_possible") * 100), 1
        ).alias("attendance_percentage")
    )
)

#df_summary.limit(20).show(truncate=False)
df_summary.show(truncate=False)

In [None]:
df_summary.limit(20).show(truncate=False)


In [None]:
cols = df_summary.columns

# Build a filter condition: (col1 IS NULL) OR (col2 IS NULL) OR ...
null_condition = reduce(lambda acc, c: acc | F.col(c).isNull(), cols, F.lit(False))

# Filter rows where any column is null
num_rows_with_null = df_summary.filter(null_condition).count()

print(f"Number of rows with at least one NULL value: {num_rows_with_null}")

In [None]:
# 1) From df_selected, group by the same columns used in df_summary
df_sums = (
    df_validf_rows
        .groupBy("school",  "nc_year", "weekcommencing")
        .agg(
            F.sum("is_attend").alias("sum_is_attend"),
            F.sum("is_possible").alias("sum_is_possible")
        )
)


# 2) Filter df_summary for rows where attendance_percentage is NULL
df_summary_nulls = df_summary.filter(F.col("attendance_percentage").isNull())

# 3) Join df_summary_nulls with df_sums to see actual sums for those groups
df_null_sums = (
    df_summary_nulls.alias("summ")
    .join(
        df_sums.alias("sums"),
        on=["school", "nc_year", "weekcommencing"],
        how="left"
    )
    .select(
        "summ.school",
        "summ.nc_year",
        "summ.weekcommencing",
        "summ.attendance_percentage",   # should be NULL
        "sums.sum_is_attend",
        "sums.sum_is_possible"
    )
)

df_null_sums.show(truncate=False)

In [None]:
df_validf_rows.filter(
  (F.col("school") == "Academy 7") &
  (F.col("nc_year") == "9") &
  (F.col("weekcommencing") == "w/c 21/08/2023")

).show()

In [None]:
df_summary = (
    df_invalidf_rows

    # 1) Group and aggregate
    .groupBy("school", "nc_year", "weekcommencing")
    .agg(
        F.round(
            (F.sum("is_attend") / F.sum("is_possible") * 100), 1
        ).alias("attendance_percentage")
    )


)

df_summary.limit(20).show(truncate=False)

In [None]:

# Basic statistics for the attendance_percentage column

from pyspark.sql.functions import round

df_summary.describe("attendance_percentage") \
    .select("summary", round("attendance_percentage", 1).alias("attendance_percentage")) \
    .show()

In [None]:
#conver df_summary to pandas
df_summary_pandas = df_summary.toPandas()

#convert pandas to parquet
df_summary_pandas.to_parquet('data/df_summary_pandas.parquet')
#chceck if the file is created
df_summary_loaded = spark.read.parquet("data/df_summary_pandas.parquet")
df_summary_loaded.show(truncate=False)