# Canvas Module Ingestion - Pre-Processing

This notebook demonstrates the utility of the OEA_py class notebook, by converting the Canvas tables from record-oriented JSONs to CSVs pre-ingestion. Once any ad hoc column-dtype conversion is complete, the table is overwritten in stage1.

The steps outlined below describe how this notebook is used to convert the Canvas module JSON tables:
- Set the workspace for where the Canvas tables are to be converted. 
- Read in the original JSONs landed in ```stage1/Transactional/canvas/v2.0/...```, perform any ad hoc data conversions (e.g. the accounts table needs the parent_account_id column cast to LongType rather than DoubleType) and write the table to stage1 as a CSV (and remove the excess rundate folders containing the original JSONs).
- 3 function is defined and used:
   1. **preprocess_canvas_dataset**: main method that reads in the pandas df JSON using the function ```pd.read_json(..., lines=True)```, converts to a spark df and corrects the column-dtypes as needed.
   2. **write_canvas_json_as_csv**: writes the pre-existing JSON df as a CSV into the same folder housed in stage1.
   3. **clean_data_lake_latest**: removes any additional folders in the data lake for a location, keeping only the latest rundate folder (that is, the CSV rather than the original JSON).

**This notebook may either need updating or removal from pipeline, when processing production data.**

In [None]:
workspace = 'dev'
version = '2.0'

In [None]:
%run OEA_py

In [None]:
# 1) set the workspace (this determines where in the data lake you'll be writing to and reading from).
# You can work in 'dev', 'prod', or a sandbox with any name you choose.
# For example, Sam the developer can create a 'sam' workspace and expect to find his datasets in the data lake under oea/sandboxes/sam
oea.set_workspace(workspace)

In [None]:
# 2) this step pre-processing the canvas data through reading in the JSONs as records, corrects any schema discepancies and then writes out the df as a CSV in stage1.
import datetime
currentDate = datetime.datetime.now()
currentDateTime = currentDate.strftime("%Y-%m-%d %H-%M-%S")

# method: clean data lake latest
def clean_data_lake_latest(source_path):
    """only house the latest rundate folder compared to the old data (which were JSONs)"""
    latest_folder = oea.get_latest_folder(source_path)
    items = mssparkutils.fs.ls(oea.to_url(source_path))
    for item in items:
        if item.name != latest_folder:
            logger.info('file removal path: ' + item.path + ' with item: ' + item.name)
            oea.rm_if_exists(source_path + '/' + item.name)
            logger.info('Successfully removed folder: ' + item.name + ' from path: ' + item.path)
        else:
            logger.info('Kept folder: ' + item.name + ' from path: ' + item.path)
    logger.info('Finished cleaning data lake to house only the latest folder')

# method: write the df to CSV in the pre-existing path that contains the table's JSON
def write_canvas_json_as_csv(write_filepath,batch_type,df):
    df.coalesce(1).write.save(oea.to_url(f'{write_filepath}/{batch_type}_batch_data/rundate={currentDateTime}'), format='csv', mode='overwrite', header='true', mergeSchema='true')

# method: overarching function
def preprocess_canvas_dataset(tables_source):
    items = oea.get_folders(tables_source)
    for item in items: 
        table_path = tables_source +'/'+ item
        pdf = pd.read_json(oea.to_url(f'{table_path}/snapshot_batch_data/*/*.json'),lines=True)
        df = spark.createDataFrame(pdf)
        if item == 'accounts':
            df = df.withColumn('parent_account_id', df['parent_account_id'].cast(LongType()))
            write_canvas_json_as_csv(table_path,'snapshot',df)
        else:
            write_canvas_json_as_csv(table_path,'snapshot',df)
        clean_data_lake_latest(f'{table_path}/snapshot_batch_data')
        new_table_path = f'{table_path}/snapshot_batch_data/rundate={currentDateTime}'
        oea.rm_if_exists(new_table_path + '/_SUCCESS', False)
        logger.info('Pre-processed table: ' + item + ' from: ' + table_path)
    logger.info('Finished pre-processing Canvas tables')

In [None]:
# set the version number and pre-process the dataset
preprocess_canvas_dataset(f'stage1/Transactional/canvas/v{version}')