# Insights Module Schema Correction

This notebook demonstrates the utility of the OEA_py class notebook, while correcting module tables initially ingested without headers and incorrect data types.

The steps outlined below describe how this notebook is used to correct the Microsoft Education Insights module tables:
- Set the workspace for where the table schemas are to be corrected. 
- 4 functions are defined and used:
   1. **_extract_element**: uses the Insights metadata to extract the correct column names.
   2. **_dtype_config**: uses the Insights metadata to extract the correct column dtypes.
   3. **correct_insights_table_schema**: uses the corrected column names and dtypes to correct the schema per table given to the function.
   4. **correct_insights_dataset**: extracts the names of all the folders currently stored in stage2/Ingested/M365, corrects the schema per table using the function above, and overwrites the tables with the updated schemas.
   

In [3]:
workspace = 'dev'

StatementMeta(spark3p2med, 7, 1, Finished, Available)

In [4]:
%run OEA_py

StatementMeta(, 7, -1, Finished, Available)

2022-12-27 19:48:03,881 - OEA - INFO - Now using workspace: dev
2022-12-27 19:48:03,883 - OEA - INFO - OEA initialized.


In [5]:
# 1) set the workspace (this determines where in the data lake you'll be writing to and reading from).
# You can work in 'dev', 'prod', or a sandbox with any name you choose.
# For example, Sam the developer can create a 'sam' workspace and expect to find his datasets in the data lake under oea/sandboxes/sam
oea.set_workspace(workspace)

StatementMeta(spark3p2med, 7, 3, Finished, Available)

2022-12-27 19:48:05,400 - OEA - INFO - Now using workspace: dev


In [6]:
# 2) schema correction, since Insights data initially landed doesn't have column headers

def _extract_element(lst, element_num=0):
    return [item[element_num] for item in lst]

def _dtype_config(dtype_lst):
    return [item.capitalize() + 'Type()' for item in dtype_lst]

def correct_insights_table_schema(df, table_name):
    list_of_column_names = _extract_element(metadata[table_name])
    list_of_column_dtypes = _extract_element(metadata[table_name], 1)
    list_of_column_dtypes = _dtype_config(list_of_column_dtypes)

    n = 0
    df_updatedColumns = df
    for c in df.columns:
        if c != 'rundate':
            new_col_name = list_of_column_names[n]
            df_updatedColumns = df_updatedColumns.withColumnRenamed(c, new_col_name)
            if list_of_column_dtypes[n] != 'StringType()':
#               df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(list_of_column_dtypes[n]))
                if list_of_column_dtypes[n] == 'IntegerType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(IntegerType()))
                elif list_of_column_dtypes[n] == 'TimestampType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(TimestampType()))
                elif list_of_column_dtypes == 'ShortType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(ShortType()))
                elif list_of_column_dtypes[n] == 'LongType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(LongType()))
                elif list_of_column_dtypes[n] == 'DoubleType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(DoubleType()))
                elif list_of_column_dtypes[n] == 'DateType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(DateType()))
                elif list_of_column_dtypes[n] == 'BooleanType()':
                    df_updatedColumns = df_updatedColumns.withColumn(new_col_name, df_updatedColumns[new_col_name].cast(BooleanType()))
        else:
            df_updatedColumns = df_updatedColumns
        n = n + 1
    return df_updatedColumns

StatementMeta(spark3p2med, 7, 4, Finished, Available)

In [12]:
def correct_insights_dataset(tables_source):
    items = oea.get_folders(tables_source)
    for item in items: 
        if item == 'metadata.csv':
            logger.info('ignore metadata processing, since this is not a table to be ingested')
        else:
            table_path = tables_source +'/'+ item
            df = spark.read.format('delta').load(oea.to_url(table_path), header='false')
            df_correctedSchema = correct_insights_table_schema(df, table_name=item)
            df_correctedSchema.write.save(oea.to_url(table_path), format='delta', mode='overwrite', header='true', overwriteSchema='true')
            logger.info('Successfully corrected the schema for table: ' + item + ' from: ' + table_path)

StatementMeta(spark3p2med, 7, 10, Finished, Available)

In [13]:
metadata = oea.get_metadata_from_url('https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/main/modules/module_catalog/Microsoft_Education_Insights/test_data/metadata.csv')
#metadata = oea.get_metadata_from_url('https://raw.githubusercontent.com/cstohlmann/oea-insights-module/main/test_data/metadata.csv')
correct_insights_dataset('stage2/Ingested/M365/v1.14')

StatementMeta(spark3p2med, 7, 11, Finished, Available)

2022-12-27 19:55:19,844 - OEA - INFO - Successfully corrected the schema for table:AadGroupfrom:stage2/Ingested/M365/v1.14/AadGroup
2022-12-27 19:55:24,739 - OEA - INFO - Successfully corrected the schema for table:AadUserfrom:stage2/Ingested/M365/v1.14/AadUser
2022-12-27 19:55:28,776 - OEA - INFO - Successfully corrected the schema for table:AadUserPersonMappingfrom:stage2/Ingested/M365/v1.14/AadUserPersonMapping
2022-12-27 19:55:32,715 - OEA - INFO - Successfully corrected the schema for table:Coursefrom:stage2/Ingested/M365/v1.14/Course
2022-12-27 19:55:38,783 - OEA - INFO - Successfully corrected the schema for table:CourseGradeLevelfrom:stage2/Ingested/M365/v1.14/CourseGradeLevel
2022-12-27 19:55:42,015 - OEA - INFO - Successfully corrected the schema for table:CourseSubjectfrom:stage2/Ingested/M365/v1.14/CourseSubject
2022-12-27 19:55:45,883 - OEA - INFO - Successfully corrected the schema for table:Enrollmentfrom:stage2/Ingested/M365/v1.14/Enrollment
2022-12-27 19:55:50,433 - OE

In [14]:
df = spark.read.format('delta').load(oea.to_url('stage2/Ingested/M365/v1.14/activity'), header='true')
display(df.limit(10))

StatementMeta(spark3p2med, 7, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5276092e-4d3f-471f-996b-06d832959e9e)

In [15]:
df.printSchema()

StatementMeta(spark3p2med, 7, 13, Finished, Available)

root
 |-- SignalType: string (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- UserAgent: string (nullable = true)
 |-- SignalId: string (nullable = true)
 |-- SisClassId: string (nullable = true)
 |-- ClassId: string (nullable = true)
 |-- ChannelId: string (nullable = true)
 |-- AppName: string (nullable = true)
 |-- ActorId: string (nullable = true)
 |-- ActorRole: string (nullable = true)
 |-- SchemaVersion: string (nullable = true)
 |-- AssignmentId: string (nullable = true)
 |-- SubmissionId: string (nullable = true)
 |-- SubmissionCreatedTime: timestamp (nullable = true)
 |-- Action: string (nullable = true)
 |-- DueDate: timestamp (nullable = true)
 |-- ClassCreationDate: timestamp (nullable = true)
 |-- Grade: double (nullable = true)
 |-- SourceFileExtension: string (nullable = true)
 |-- MeetingDuration: string (nullable = true)
 |-- MeetingSessionId: string (nullable = true)
 |-- MeetingType: string (nullable = true)
 |-- ReadingSubmissionWordsPerMinute: in