# Reading Progress Module - Ingest

This notebook demonstrates the utility of the OEA_py class notebook, and speeding up the process of ingesting the Insights/Reading Progress data.

The steps outlined below describe how this notebook is used to ingest the Microsoft Education Insights module tables:

- Set the workspace for where the tables are located. 
- 1 function is defined and used:
   1. **ingest_reading_prog**: identifies primary keys per table and ingests each table from Insights (except AadGroupMembership, PersonRelationship, and RefTranslation).

In [1]:
workspace = 'dev'

StatementMeta(spark3p2med, 75, 1, Finished, Available)

In [2]:
%run OEA_py

StatementMeta(, 75, -1, Finished, Available)

2023-01-13 17:28:45,990 - OEA - INFO - Now using workspace: dev
2023-01-13 17:28:45,991 - OEA - INFO - OEA initialized.


In [3]:
# 1) set the workspace (this determines where in the data lake you'll be writing to and reading from).
# You can work in 'dev', 'prod', or a sandbox with any name you choose.
# For example, Sam the developer can create a 'sam' workspace and expect to find his datasets in the data lake under oea/sandboxes/sam
oea.set_workspace(workspace)

StatementMeta(spark3p2med, 75, 3, Finished, Available)

2023-01-13 17:28:47,566 - OEA - INFO - Now using workspace: dev


In [17]:
# 2) this method is almost identical to the ingest function in the OEA framework, except with the additional function to change the ingested directory 
def ingest_reading_prog(entity_path, write_entity_path, primary_key='id', options={}):
    """ Ingests the data for the entity in the given path.
        CSV files are expected to have a header row by default, and JSON files are expected to have complete JSON docs on each row in the file.
        To specify options that are different from these defaults, use the options param.
        eg, ingest('contoso_sis/v0.1/students') # ingests all entities found in that path
        eg, ingest('contoso_sis/v0.1/students', options={'header':False}) # for CSV files that don't have a header
    """
    primary_key = oea.fix_column_name(primary_key) # fix the column name, in case it has a space in it or some other invalid character
    ingested_path = f'stage2/Ingested/{write_entity_path}'
    raw_path = f'stage1/Transactional/{entity_path}'
    batch_type, source_data_format = oea.get_batch_info(raw_path)
    logger.info(f'Ingesting from: {raw_path}, batch type of: {batch_type}, source data format of: {source_data_format}')
    source_url = oea.to_url(f'{raw_path}/{batch_type}_batch_data')

    if batch_type == 'snapshot': source_url = f'{source_url}/{oea.get_latest_folder(source_url)}' 
            
    logger.debug(f'Processing {batch_type} data from: {source_url} and writing out to: {ingested_path}')
    if batch_type == 'snapshot':
        def batch_func(df): oea.overwrite(df, ingested_path, primary_key)
    elif batch_type == 'additive':
        def batch_func(df): oea.append(df, ingested_path, primary_key)
    elif batch_type == 'delta':
        def batch_func(df): oea.upsert(df, ingested_path, primary_key)
    else:
        raise ValueError("No valid batch folder was found at that path (expected to find a single folder with one of the following names: snapshot_batch_data, additive_batch_data, or delta_batch_data). Are you sure you have the right path?")                      

    if options == None: options = {}
    options['format'] = source_data_format # eg, 'csv', 'json'
    if source_data_format == 'csv' and (not 'header' in options or options['header'] == None): options['header'] = True  # default to expecting a header in csv files

    number_of_new_inbound_rows = oea.process(source_url, batch_func, options)
    if number_of_new_inbound_rows > 0:    
        oea.add_to_lake_db(ingested_path)
    return number_of_new_inbound_rows

StatementMeta(spark3p2med, 75, 17, Finished, Available)

In [None]:
# 3) The next step is to ingest the batch data into stage2
# Note that when you run this the first time, you'll see an info message like "Number of new inbound rows processed: 2".
# If you run this a second time, the number of inbound rows processed will be 0 because the ingestion uses spark structured streaming to keep track of what data has already been processed.
options = {'header':False}
ingest_reading_prog(f'M365/v1.14/activity', f'reading_progress/v0.1/activity', '_c3', options)
ingest_reading_prog(f'M365/v1.14/AadGroup', f'reading_progress/v0.1/AadGroup', '_c0', options)
ingest_reading_prog(f'M365/v1.14/AadUser', f'reading_progress/v0.1/AadUser', '_c0', options)
ingest_reading_prog(f'M365/v1.14/AadUserPersonMapping', f'reading_progress/v0.1/AadUserPersonMapping', '_c0', options)
ingest_reading_prog(f'M365/v1.14/Course', f'reading_progress/v0.1/Course', '_c0', options)
ingest_reading_prog(f'M365/v1.14/CourseGradeLevel', f'reading_progress/v0.1/CourseGradeLevel', '_c0', options)
ingest_reading_prog(f'M365/v1.14/CourseSubject', f'reading_progress/v0.1/CourseSubject', '_c0', options)
ingest_reading_prog(f'M365/v1.14/Enrollment', f'reading_progress/v0.1/Enrollment', '_c0', options)
ingest_reading_prog(f'M365/v1.14/Organization', f'reading_progress/v0.1/Organization', '_c0', options)
ingest_reading_prog(f'M365/v1.14/Person', f'reading_progress/v0.1/Person', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonDemographic', f'reading_progress/v0.1/PersonDemographic', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonDemographicEthnicity', f'reading_progress/v0.1/PersonDemographicEthnicity', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonDemographicPersonFlag', f'reading_progress/v0.1/PersonDemographicPersonFlag', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonDemographicRace', f'reading_progress/v0.1/PersonDemographicRace', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonEmailAddress', f'reading_progress/v0.1/PersonEmailAddress', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonIdentifier', f'reading_progress/v0.1/PersonIdentifier', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonOrganizationRole', f'reading_progress/v0.1/PersonOrganizationRole', '_c0', options)
ingest_reading_prog(f'M365/v1.14/PersonPhoneNumber', f'reading_progress/v0.1/PersonPhoneNumber', '_c0', options)
ingest_reading_prog(f'M365/v1.14/RefDefinition', f'reading_progress/v0.1/RefDefinition', '_c0', options)
ingest_reading_prog(f'M365/v1.14/Section', f'reading_progress/v0.1/Section', '_c0', options)
ingest_reading_prog(f'M365/v1.14/SectionGradeLevel', f'reading_progress/v0.1/SectionGradeLevel', '_c0', options)
ingest_reading_prog(f'M365/v1.14/SectionSession', f'reading_progress/v0.1/SectionSession', '_c0', options)
ingest_reading_prog(f'M365/v1.14/SectionSubject', f'reading_progress/v0.1/SectionSubject', '_c0', options)
ingest_reading_prog(f'M365/v1.14/Session', f'reading_progress/v0.1/Session', '_c0', options)
ingest_reading_prog(f'M365/v1.14/SourceSystem', f'reading_progress/v0.1/SourceSystem', '_c0', options)