# Ingest student data
https://docs.wonde.com/docs/api/sync#students

includes education_details

In [1]:
%run _Config

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, -1, Finished, Available)

In [2]:
%run _WondeStructs

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, -1, Finished, Available)

In [3]:
%run _WondeAPI

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, -1, Finished, Available)

In [4]:
# Get basic json return
#url = "https://api.wonde.com/v1.0/schools/A1128526306/students?include=education_details"
#session = requests.Session()
#session.get(url, headers=wondeHeaders).json()

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, 15, Finished, Available)

### Get list of available schools

In [5]:
dfSchoolIds = (spark.read
                .format("delta")
                .load(f"{BronzeBasePath}/Schools"))
display(dfSchoolIds)

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 91d56e3f-733c-42b0-ae1d-84411bea1f8a)

### Iterate through schools and get students

In [6]:
def get_allstudents():
    for school in dfSchoolIds.collect():
        schoolId = school["school_id"]
        url = f"{baseWondeUrl}/schools/{schoolId}/students?include=education_details"
        df = getWondeAPI(url, wondeHeaders, student_schema)
        df = (df
                .withColumn("school_id", lit(school["school_id"]))
                .withColumn("on_roll", lit(True))
            )
    
        yield df

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, 17, Finished, Available)

### Write to delta lake

In [7]:
# This pulls down all available data and overwrites. Historic versions available still due to Delta Timetravel.

#If full reset needed run:
#try:
#    mssparkutils.fs.rm(f"{BronzeBasePath}/Students", True)
#except:
#    print("Path doesn't exist, most likely already deleted.")

# Write new school data into delta
for school in get_allstudents():
    dfFinal = (school.select(
                col("school_id"),
                col("id").alias("student_id"),
                col("upi"),
                col("mis_id"),
                col("education_details.data.upn").alias("upn"),
                col("initials"),
                col("surname"),
                col("forename"),
                col("legal_surname"),
                col("legal_forename"),
                col("gender"),
                to_date(col("date_of_birth.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("dob"),
                col("education_details.data.current_nc_year").alias("current_nc_year"),
                col("education_details.data.admission_number").alias("admission_number"),
                to_date(col("education_details.data.admission_date.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("admission_date"),
                to_date(col("education_details.data.leaving_date.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("leaving_date"),
                col("on_roll"),
                to_timestamp(col("created_at.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("created_at_date"),
                to_timestamp(col("updated_at.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("updated_at_date"),
                to_timestamp(col("restored_at.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("restored_at_date"),
                ))

    currentSchoolId = school.first()["school_id"]
    
    (dfFinal.write
        .mode("overwrite")
        .format("delta")
        .partitionBy("school_id")
        .option("replaceWhere", f"school_id = '{currentSchoolId}'")
        .save(f"{BronzeBasePath}/Students")
    )

    print(f"Written {currentSchoolId}")


StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, 18, Finished, Available)

Written A1930499544
Written A1128526306


### Iterate Through Schools and Get Leavers

In [8]:
def get_allleavers():
    for school in dfSchoolIds.collect():
        schoolId = school["school_id"]
        url = f"{baseWondeUrl}/schools/{schoolId}/students-leaver?include=education_details"
        df = getWondeAPI(url, wondeHeaders, student_schema)
        df = (df
                .withColumn("school_id", lit(schoolId))
                .withColumn("on_roll", lit(False))
            )
    
        yield df

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, 19, Finished, Available)

### Append leavers to Delta Lake

In [9]:
for school in get_allleavers():
    dfFinal = (school.select(
                col("school_id"),
                col("id").alias("student_id"),
                col("upi"),
                col("mis_id"),
                col("education_details.data.upn").alias("upn"),
                col("initials"),
                col("surname"),
                col("forename"),
                col("legal_surname"),
                col("legal_forename"),
                col("gender"),
                to_date(col("date_of_birth.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("dob"),
                col("education_details.data.current_nc_year").alias("current_nc_year"),
                col("education_details.data.admission_number").alias("admission_number"),
                to_date(col("education_details.data.admission_date.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("admission_date"),
                to_date(col("education_details.data.leaving_date.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("leaving_date"),
                col("on_roll"),
                to_timestamp(col("created_at.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("created_at_date"),
                to_timestamp(col("updated_at.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("updated_at_date"),
                to_timestamp(col("restored_at.date"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("restored_at_date"),
                ))

    (dfFinal.write
        .mode("append")
        .format("delta")
        .partitionBy("school_id")
        .save(f"{BronzeBasePath}/Students")
    )

StatementMeta(, fe53472e-ef1a-4381-b1ce-bd950605533f, 20, Finished, Available)