In [80]:
%run /OEA_py

StatementMeta(, 139, -1, Finished, Available)

In [87]:
# Pass the below parameters from pipeline. 
directory = 'Ed-Fi'
api_version = '5.2'
metadata_url = 'https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/main/modules/module_catalog/Ed-Fi/docs/edfi_oea_metadata.csv'
swagger_url = 'https://raw.githubusercontent.com/microsoft/OpenEduAnalytics/main//modules/module_catalog/Ed-Fi/docs/edfi_swagger.json'

oea = OEA()
oea_metadatas = oea.get_metadata_from_url(metadata_url)
primitive_datatypes = ['timestamp', 'date', 'decimal', 'boolean', 'integer', 'string', 'long']

schema_gen = OpenAPIUtil(swagger_url)
schemas = schema_gen.create_spark_schemas()

stage2_ingested = oea.to_url(f'stage2/Ingested/{directory}/v{api_version}')
stage2_refined = oea.to_url(f'stage2/Refined/{directory}/v{api_version}')

StatementMeta(spark3p1sm, 139, 8, Finished, Available)

2022-11-01 18:47:49,529 - OEA - INFO - OEA initialized.


In [88]:
def get_descriptor_schema(descriptor):
    fields = []
    fields.append(StructField('_etag',LongType(), True))
    fields.append(StructField(f"{descriptor[:-1]}Id", IntegerType(), True))
    fields.append(StructField('codeValue',StringType(), True))
    fields.append(StructField('description',StringType(), True))
    fields.append(StructField('id',StringType(), True))
    fields.append(StructField('namespace',StringType(), True))
    fields.append(StructField('shortDescription',StringType(), True))
    return StructType(fields)

def get_descriptor_metadata(descriptor):
    return [['_etag', 'long', 'no-op'],
            [f"{descriptor[:-1]}Id", 'integer', 'hash'],
            ['codeValue','string', 'no-op'],
            ['description','string', 'no-op'],
            ['id','string', 'no-op'],
            ['namespace','string', 'no-op'],
            ['shortDescription','string', 'no-op']]

StatementMeta(spark3p1sm, 139, 9, Finished, Available)

In [132]:
def flatten_reference_col(df, target_col):
    col_prefix = target_col.name.replace('Reference', '')
    """for sub_col in [x for x in target_col.dataType.names if x != 'link']:
        df = df.withColumn(f"{col_prefix}_{sub_col}", f.col(f"{target_col.name}.{sub_col}"))"""
    df = df.withColumn(f"{col_prefix}_lake_id", f.concat_ws('_', f.col('SchoolYear')\
                              , f.col('DistrictId')\
                              , f.split(f.col(f'{target_col.name}.link.href'), '/').getItem(3)))
    df = df.drop(target_col.name)
    return df

def explode_arrays(df, target_col, schema_name, table_name):
    cols = ['lake_id', 'DistrictId', 'SchoolYear']
    child_df = df.select(cols + [target_col.name])
    child_df = child_df.withColumn("exploded", f.explode(target_col.name)).drop(target_col.name).select(cols + ['exploded.*'])
    for ref_col in [x for x in child_df.columns if re.search('Reference$', x) is not None]:
        child_df = flatten_reference_col(child_df, target_col.dataType.elementType[ref_col])
    
    for array_sub_col in [x.name for x in target_col.dataType.elementType.fields if x.dataType.typeName() == 'array' ]:
        cols_to_include = [x for x in child_df.columns if '_lake_id' in x] + cols + [array_sub_col]
        array_child_df = child_df.select(cols_to_include)
        array_child_df = array_child_df.withColumn('array_exploded', f.explode(array_sub_col)).select(cols_to_include + ['array_exploded.*']).drop(array_sub_col)
    
        array_child_df.withColumnRenamed('lake_id', f"{table_name}_lake_id").write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear')\
                .save(f"{stage2_refined}/General/{schema_name}/{table_name}_{target_col.name}_{array_sub_col}")
        child_df = child_df.drop(array_sub_col)
    child_df.withColumnRenamed('lake_id', f"{table_name}_lake_id").write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear')\
                .save(f"{stage2_refined}/General/{schema_name}/{table_name}_{target_col.name}")
    df = df.drop(target_col.name)
    return df

        
def transform(df, target_schema, schema_name, table_name):
    if re.search('Descriptor$', table_name) is not None:
        df = df.withColumn('lake_id', f.concat(f.col('SchoolYear'), f.lit('_'), f.col('DistrictId')\
                , f.lit('_'), f.col('namespace'), f.lit('#'), f.col('codeValue')))
    else:
        df = df.withColumn('lake_id', f.concat_ws('_', f.col('SchoolYear'), f.col('DistrictId'), f.col('id')))
    
    for col_name in target_schema.fieldNames():
        target_col = target_schema[col_name]
        if col_name in df.columns and target_col.dataType.typeName() in primitive_datatypes:
            # Primitive data types
            df = df.withColumn(col_name, f.col(col_name).cast(target_col.dataType))
            continue
        elif col_name not in df.columns:
            # If Column not present in dataframe, Add column with None values.
            df = df.withColumn(col_name, f.lit(None).cast(target_col.dataType))
            continue
        
        df = df.withColumn(f"{col_name}_json", f.to_json(f.col(col_name)))\
                    .withColumn(col_name, f.from_json(f.col(f"{col_name}_json"), target_col.dataType))\
                    .drop(f"{col_name}_json")
        
        # Modify the links with surrogate keys
        if re.search('Reference$', col_name) is not None:
            df = flatten_reference_col(df, target_col)
        
        if target_col.dataType.typeName() == 'array':
            df = explode_arrays(df, target_col, schema_name, table_name)
    return df
#df = transform(df, target_schema, schema_name, table_name)

StatementMeta(spark3p1sm, 139, 53, Finished, Available)

In [None]:
for schema_name in [x.name for x in mssparkutils.fs.ls(stage2_ingested) if x.isDir]:
    print(f"Processing schema: {schema_name}")
    
    for table_name in [y.name for y in mssparkutils.fs.ls(f"{stage2_ingested}/{schema_name}") if y.isDir]:
        print(f"Processing schema/table: {schema_name}/{table_name}")
    
        # 1. Read Delta table from Ingested Folder.
        df = spark.read.format('delta').load(f"{stage2_ingested}/{schema_name}/{table_name}")
        if len(df.columns) == 3:
            print(f"No Data to process.")
            continue

        # 2. Transformation step
        if(re.search('Descriptors$', table_name) is None):
            target_schema = schemas[table_name]
            oea_metadata = oea_metadatas[table_name]
        else:
            target_schema = get_descriptor_schema(table_name)
            oea_metadata = get_descriptor_metadata(table_name)
        target_schema = target_schema.add(StructField('DistrictId', StringType()))\
                                     .add(StructField('SchoolYear', StringType()))\
                                     .add(StructField('LastModifiedDate', TimestampType()))
        oea_metadata += [['DistrictId', 'string', 'partition-by'],
                        ['SchoolYear', 'string', 'partition-by'],
                        ['LastModifiedDate', 'timestamp', 'no-op']]
        try:
            df = transform(df, target_schema, schema_name, table_name)
        except:
            print(f"Error while Transforming {schema_name}/{table_name}")
        
        # 3. Pseudonymize the data using metadata.
        try:
            df_pseudo, df_lookup = oea.pseudonymize(df, oea_metadata)
        except:
            print(f"Error while Pseudonymizing {schema_name}/{table_name}")

        # 4. Write to Refined folder.
        if(len(df_pseudo.columns) > 2):
            df_pseudo.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear').save(f"{stage2_refined}/General/{schema_name}/{table_name}")
        if(len(df_lookup.columns) > 2):
            df_lookup.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').partitionBy('DistrictId', 'SchoolYear').save(f"{stage2_refined}/Sensitive/{schema_name}/{table_name}")
