In [None]:
%run /OEA_py

In [None]:
# Pass the below parameters from pipeline. 
directory = 'Ed-Fi'
api_version = '5.2'
metadata_url = 'https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/main/modules/module_catalog/Ed-Fi/docs/edfi_oea_metadata.csv'
workspace = 'prod'
# TODO: swagger_url = 'https://api.edgraph.dev/edfi/v5.2/saas/metadata/data/v3/123/2022/resources/swagger.json'
# KeyError exception because the 'x-Ed-Fi-explode' is not part of the standard swager.json
# We should remove all references to 'x-Ed-Fi-explode' from these notebooks since we are now dynamicallly exploding arrays based on swagger datatypes
swagger_url = 'https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/main//modules/module_catalog/Ed-Fi/docs/edfi_swagger.json'

oea = OEA(workspace=workspace)
oea_metadatas = oea.get_metadata_from_url(metadata_url)
primitive_datatypes = ['timestamp', 'date', 'decimal', 'boolean', 'integer', 'string', 'long']

# TODO: Use the swagger file available from the Ed-Fi API landing page, instead of hardcoding it.
# For example, https://api.edgraph.dev/edfi/v5.2/saas is the openApiMetadata endpoint will help fetch the descriptors and resources swagger.json
# The base path of the api can be passed as a parameter to this notebook instead and assigned to swagger_url variable
# This will also help get latest version of the swagger.json based on the Ed-Fi version and it will contain a list of all endpoints 
# and entity definition, including any "extensions" or custommizations
# For example, 
# - Descriptors: https://api.edgraph.dev/edfi/v5.2/saas/metadata/data/v3/123/2022/resources/swagger.JSON
# - Resources:   https://api.edgraph.dev/edfi/v5.2/saas/metadata/data/v3/123/2022/descriptors/swagger.JSON
schema_gen = OpenAPIUtil(swagger_url)
schemas = schema_gen.create_spark_schemas()

stage2_ingested = oea.to_url(f'stage2/Ingested/{directory}/v{api_version}')
stage2_refined = oea.to_url(f'stage2/Refined/{directory}/v{api_version}')

In [None]:
def get_descriptor_schema(descriptor):
    fields = []
    fields.append(StructField('_etag',LongType(), True))
    fields.append(StructField(f"{descriptor[:-1]}Id", IntegerType(), True))
    fields.append(StructField('codeValue',StringType(), True))
    fields.append(StructField('description',StringType(), True))
    fields.append(StructField('id',StringType(), True))
    fields.append(StructField('namespace',StringType(), True))
    fields.append(StructField('shortDescription',StringType(), True))
    return StructType(fields)

def get_descriptor_metadata(descriptor):
    return [['_etag', 'long', 'no-op'],
            [f"{descriptor[:-1]}Id", 'integer', 'hash'],
            ['codeValue','string', 'no-op'],
            ['description','string', 'no-op'],
            ['id','string', 'no-op'],
            ['namespace','string', 'no-op'],
            ['shortDescription','string', 'no-op']]

In [None]:
import copy
import pyspark.sql.functions as f

def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

def modify_descriptor_value(df, col_name):
    if col_name in df.columns:
        # TODO: @Abhinav, I do not see where you made the changes to use the descriptorId instead of Namespace/CodeValue
        df = df.withColumn(f"{col_name}LakeId", f.concat_ws('_', f.col('DistrictId'), f.col('SchoolYear'), f.regexp_replace(col_name, '#', '_')))
        df = df.drop(col_name)
    else:
        df = df.withColumn(f"{col_name}LakeId", f.lit(None).cast("String"))

    return df

def flatten_reference_col(df, target_col):
    col_prefix = target_col.name.replace('Reference', '')
    df = df.withColumn(f"{col_prefix}LakeId", f.when(f.col(target_col.name).isNotNull(), f.concat_ws('_', f.col('DistrictId'), f.col('SchoolYear'), f.split(f.col(f'{target_col.name}.link.href'), '/').getItem(3))))
    df = df.drop(target_col.name)
    return df

def modify_references_and_descriptors(df, target_col):
    for ref_col in [x for x in df.columns if re.search('Reference$', x) is not None]:
        df = flatten_reference_col(df, target_col.dataType.elementType[ref_col])
    for desc_col in [x for x in df.columns if re.search('Descriptor$', x) is not None]:
        df = modify_descriptor_value(df, desc_col)
    return df

def explode_arrays(df, target_col, schema_name, table_name):
    cols = ['lakeId', 'DistrictId', 'SchoolYear']
    child_df = df.select(cols + [target_col.name])
    child_df = child_df.withColumn("exploded", f.explode(target_col.name)).drop(target_col.name).select(cols + ['exploded.*'])

    # TODO: It looks like te {target_col.name}LakeId column is not addedd to the child entities
    #       We should use LakeId suffix when using the "id" column from the parent and HKey suffix when creating a Hash Key based on composite key columns
    identity_cols = [x.name for x in target_col.dataType.elementType.fields if 'x-Ed-Fi-isIdentity' in x.metadata].sort()
    if(identity_cols is not None and len(identity_cols) > 0):
        child_df = child_df.withColumn(f"{target_col.name}LakeId", f.concat(f.col('DistrictId'), f.lit('_'), f.col('SchoolYear'), f.lit('_'), *[f.concat(f.col(x), f.lit('_')) for x in identity_cols]))
    
    # IMPORTANT: We must modify Reference and Descriptor columns for child columns "first". 
    # This must be done "after" the composite key from identity_cols has been created otherwise the columns are renamed and will not be found by identity_cols.
    # This must be done "before" the grand_child is exploded below
    child_df = modify_references_and_descriptors(child_df, target_col)

    for array_sub_col in [x for x in target_col.dataType.elementType.fields if x.dataType.typeName() == 'array' ]:
        grand_child_df = child_df.withColumn('exploded', f.explode(array_sub_col.name)).select(child_df.columns + ['exploded.*']).drop(array_sub_col.name)
        
        # Modifying Reference and Descriptor columns for the grand_child array
        grand_child_df = modify_references_and_descriptors(grand_child_df, array_sub_col)

        # TODO: Pseudonimize and Write to Sensitive folder for child arrays?
        grand_child_df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear')\
                .save(f"{stage2_refined}/General/{schema_name}/{table_name}_{target_col.name}_{array_sub_col.name}")

    # TODO: Pseudonimize and Write to Sensitive folder for child arrays?
    child_df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear')\
        .save(f"{stage2_refined}/General/{schema_name}/{table_name}_{target_col.name}")

    # Drop array column from parent entity
    df = df.drop(target_col.name)
    return df

def transform(df, schema_name, table_name, parent_schema_name, parent_table_name):
    if re.search('Descriptors$', table_name) is None:
        # Use Deep Copy otherwise the schemas object also gets modified every time target_schema is modified
        target_schema = copy.deepcopy(schemas[table_name])
        # Add primary key
        if has_column(df, 'id'):
            df = df.withColumn('lakeId', f.concat_ws('_', f.col('DistrictId'), f.col('SchoolYear'), f.col('id')).cast("String"))
        else:
            df = df.withColumn('lakeId', f.lit(None).cast("String"))
    else:
        target_schema = get_descriptor_schema(table_name)
        # Add primary key
        if has_column(df, 'namespace') and has_column(df, 'codeValue'):
            # TODO: @Abhinav, I do not see where you made the changes to use the descriptorId instead of Namespace/CodeValue
            df = df.withColumn('lakeId', f.concat_ws('_', f.col('DistrictId'), f.col('SchoolYear'), f.col('namespace'), f.col('codeValue')).cast("String"))
        else:
            df = df.withColumn('lakeId', f.lit(None).cast("String"))

    target_schema = target_schema.add(StructField('DistrictId', StringType()))\
                                 .add(StructField('SchoolYear', StringType()))\
                                 .add(StructField('LastModifiedDate', TimestampType()))

    for col_name in target_schema.fieldNames():
        target_col = target_schema[col_name]
        # If Primitive datatype, i.e String, Bool, Integer, etc.abs
        # Note: Descriptor is a String therefore is a Primitive datatype
        if target_col.dataType.typeName() in primitive_datatypes:
            # If it is a Descriptor
            if re.search('Descriptor$', col_name) is not None:
                df = modify_descriptor_value(df, col_name)
            else:
                if col_name in df.columns:
                    # Casting columns to primitive data types
                    df = df.withColumn(col_name, f.col(col_name).cast(target_col.dataType))
                else:
                    # If Column not present in dataframe, add column with None values.
                    df = df.withColumn(col_name, f.lit(None).cast(target_col.dataType))
        # If Complex datatype, i.e. Object, Array
        else:
            if col_name not in df.columns:
                df = df.withColumn(col_name, f.lit(None).cast(target_col.dataType))
            else:
                # Generate JSON column as a Complex Type
                df = df.withColumn(f"{col_name}_json", f.to_json(f.col(col_name))) \
                    .withColumn(col_name, f.from_json(f.col(f"{col_name}_json"), target_col.dataType)) \
                    .drop(f"{col_name}_json")
            
            # Modify the links with surrogate keys
            if re.search('Reference$', col_name) is not None:
                df = flatten_reference_col(df, target_col)
    
            if target_col.dataType.typeName() == 'array':
                df = explode_arrays(df, target_col, schema_name, table_name)
        
    return df

#df = spark.read.format('delta').load(f"{stage2_ingested}/ed-fi/absenceEventCategoryDescriptors")
#df = transform(df, "ed-fi", "absenceEventCategoryDescriptors", None, None)


In [None]:
for schema_name in [x.name for x in mssparkutils.fs.ls(stage2_ingested) if x.isDir]:
    print(f"Processing schema: {schema_name}")
    
    for table_name in [y.name for y in mssparkutils.fs.ls(f"{stage2_ingested}/{schema_name}") if y.isDir]:
        print(f"Processing schema/table: {schema_name}/{table_name}")

        # 1. Read Delta table from Ingested Folder.

        # Process each file even when it is empty. The tables will be created using on target_schema and will be available for query in SQL.
        df = spark.read.format('delta').load(f"{stage2_ingested}/{schema_name}/{table_name}")

        # 2. Transformation step
        try:
            df = transform(df, schema_name, table_name, None, None)
        except:
            print(f"Error while Transforming {schema_name}/{table_name}")

        # 3. Pseudonymize the data using metadata.
        if(re.search('Descriptors$', table_name) is None):
            # Use Deep Copy otherwise the schemas object also gets modified every time oea_metadatas is modified
            oea_metadata = copy.deepcopy(oea_metadatas[table_name])
        else:
            oea_metadata = get_descriptor_metadata(table_name)

        oea_metadata += [
                            ['DistrictId', 'string', 'partition-by'],
                            ['SchoolYear', 'string', 'partition-by'],
                            ['LastModifiedDate', 'timestamp', 'no-op']
                        ]

        try:
            df_pseudo, df_lookup = oea.pseudonymize(df, oea_metadata)
        except:
            print(f"Error while Pseudonymizing {schema_name}/{table_name}")

        # 4. Write to Refined folder (even when file is empty)
        df_pseudo.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear').save(f"{stage2_refined}/General/{schema_name}/{table_name}")
        #if(len(df_lookup.columns) > 2):
        df_lookup.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear').save(f"{stage2_refined}/Sensitive/{schema_name}/{table_name}")
