# Insights Module Ingestion - Pre-Processing

This notebook demonstrates the utility of the OEA_py class notebook, by adding the unique primary key column to the AadGroupMembership table pre-ingestion. Once the column is added, the table is overwritten in stage1.

The steps outlined below describe how this notebook is used to correct the Microsoft Education Insights module AadGroupMembership table:
- Set the workspace for where the AadGroupMembership table is to be corrected. 
- Read in the original CSV landed in ```stage1/Transactional/M365/v1.14/AadGroupMembership``` and add the primary key column. Overwrite the CSV (and remove any additional rundate folders, as described by the method below).
- 1 function is defined and used:
   1. **clean_data_lake_latest**: removes any additional folders in the data lake for a location, keeping only the latest rundate folder.

**This notebook may either need updating or removal from pipeline, when processing production data.**

In [None]:
workspace = 'dev'
version = '1.4.0'

In [None]:
%run OEA_py

In [None]:
# 1) set the workspace (this determines where in the data lake you'll be writing to and reading from).
# You can work in 'dev', 'prod', or a sandbox with any name you choose.
# For example, Sam the developer can create a 'sam' workspace and expect to find his datasets in the data lake under oea/sandboxes/sam
oea.set_workspace(workspace)

In [None]:
# 2) read in the original AadGroupMembership table, add the primary key column and confirm it has been added.
df = spark.read.format('csv').load(oea.to_url('stage1/Transactional/M365/v' + version + '/graduationRatesbySubgroup'), header='false')
df_corrected = df.withColumn('_c5', F.concat(F.col('_c0'),F.lit('_'),F.col('_c1')))
df_corrected = df_corrected.select('_c0', '_c1', '_c2', '_c3', '_c4', '_c5')
display(df_corrected.limit(10))

In [None]:
# 2.5) set the current date and time (using the correct format), and write out to the same relative location, with a new rundate partition-folder.
import datetime
currentDate = datetime.datetime.now()
currentDateTime = currentDate.strftime("%Y-%m-%d %H:%M:%S")
table_path = 'stage1/Transactional/M365/v' + version + '/graduationRatesbySubgroup/snapshot_batch_data/rundate=' + currentDateTime
df_corrected.write.save(oea.to_url(table_path), format='csv', mode='overwrite', overwriteSchema='true', header='false')

In [None]:
# 3) only house the latest rundate folder compared to the old data (which didn't have the primary key column).
def clean_data_lake_latest(source_path):
    latest_folder = oea.get_latest_folder(source_path)
    items = mssparkutils.fs.ls(oea.to_url(source_path))
    for item in items:
        if item.name != latest_folder:
            logger.info('file removal path: ' + item.path + ' with item: ' + item.name)
            oea.rm_if_exists(source_path + '/' + item.name)
            logger.info('Successfully removed folder: ' + item.name + ' from path: ' + item.path)
        else:
            logger.info('Kept folder: ' + item.name + ' from path: ' + item.path)
    logger.info('Finished cleaning data lake to house only the latest folder')

In [None]:
clean_data_lake_latest('stage1/Transactional/M365/v' + version + '/graduationRatesbySubgroup/snapshot_batch_data')

In [None]:
# 4) ad hoc work - remove the _SUCCESS file, otherwise this will throw an error when ingesting the table.
table_path = 'stage1/Transactional/M365/v'+version+'/graduationRatesbySubgroup/snapshot_batch_data/rundate=' + currentDateTime
oea.rm_if_exists(table_path + '/_SUCCESS', False)