# Tests for the OEA framework

In [None]:
%run OEA_py

In [None]:
oea = OEA()
oea.set_workspace('sandbox1')

In [None]:
def reset_additive_data_tests():
    oea.rm_if_exists('stage1/Transactional/contoso_sis/v0.1/readinessIndictorsBySubgroup')
    oea.rm_if_exists('stage2/Ingested/contoso_sis/v0.1/readinessIndictorsBySubgroup')

def land_readinessIndictorsBySubgroup_2021(expected_record_count):
    data_str = requests.get('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/framework/test_data/contoso_sis/2021/readinessIndictorsBySubgroup/part1.csv').text
    sink_path = oea.land(data_str, 'contoso_sis/v0.1/readinessIndictorsBySubgroup', 'readinessIndictorsBySubgroup.csv', batch_data_type=oea.ADDITIVE_BATCH_DATA)
    sink_df = oea.load_csv(sink_path)
    assert sink_df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {sink_df.count()}'

def land_readinessIndictorsBySubgroup_2022(expected_record_count):
    data_str = requests.get('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/framework/test_data/contoso_sis/2022/readinessIndictorsBySubgroup/part1.csv').text
    sink_path = oea.land(data_str, 'contoso_sis/v0.1/readinessIndictorsBySubgroup', 'readinessIndictorsBySubgroup.csv', batch_data_type=oea.ADDITIVE_BATCH_DATA)
    sink_df = oea.load_csv(sink_path)
    assert sink_df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {sink_df.count()}'

def ingest_readinessIndictorsBySubgroup(expected_record_count):
    oea.ingest('contoso_sis/v0.1/readinessIndictorsBySubgroup')
    df = oea.load('stage2/Ingested/contoso_sis/v0.1/readinessIndictorsBySubgroup')
    assert df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {df.count()}'

reset_additive_data_tests()
# test1 - Land the first batch of studentattendance data
land_readinessIndictorsBySubgroup_2021(1464)
# test2 - Ingest the data from stage1 into stage2
ingest_readinessIndictorsBySubgroup(1464)
# test3 - run the same ingestion a second time and verify that it doesn't change what was ingested (ingestion is idempotent via use of _checkpoints)
ingest_readinessIndictorsBySubgroup(1464)
# test4 - Land the second batch of studentattendance data
land_readinessIndictorsBySubgroup_2022(2928)
# test5 - Ingest the data from stage1 into stage2
ingest_readinessIndictorsBySubgroup(4392)



In [None]:
def reset_delta_data_tests():
    oea.rm_if_exists('stage1/Transactional/contoso_sis/v0.1/contentMasteryBySubgroup')
    oea.rm_if_exists('stage2/Ingested/contoso_sis/v0.1/contentMasteryBySubgroup')

def land_contentMasteryBySubgroup_2021(expected_record_count):
    data_str = requests.get('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/gene/v0.7dev/framework/test_data/contoso_sis/2021/contentMasteryBySubgroup/part1.csv').text
    sink_path = oea.land(data_str, 'contoso_sis/v0.1/contentMasteryBySubgroup', 'contentMasteryBySubgroup.csv', batch_data_type=oea.DELTA_BATCH_DATA)
    sink_df = oea.load_csv(sink_path)
    assert sink_df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {sink_df.count()}'

def land_contentMasteryBySubgroup_2022(expected_record_count):
    data_str = requests.get('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/gene/v0.7dev/framework/test_data/contoso_sis/2022/contentMasteryBySubgroup/part1.csv').text
    sink_path = oea.land(data_str, 'contoso_sis/v0.1/contentMasteryBySubgroup', 'contentMasteryBySubgroup.csv', batch_data_type=oea.DELTA_BATCH_DATA)
    sink_df = oea.load_csv(sink_path)
    assert sink_df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {sink_df.count()}'

def ingest_contentMasteryBySubgroup(expected_record_count):
    oea.ingest('contoso_sis/v0.1/contentMasteryBySubgroup', 'id')
    df = oea.load('stage2/Ingested/contoso_sis/v0.1/contentMasteryBySubgroup')
    assert df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {df.count()}'

reset_delta_data_tests()
# test1 - Land the first batch of studentattendance data
land_contentMasteryBySubgroup_2021(2)
# test2 - Ingest the data from stage1 into stage2
ingest_contentMasteryBySubgroup(2)
# test3 - run the same ingestion a second time and verify that it doesn't change what was ingested (ingestion is idempotent via use of _checkpoints)
ingest_contentMasteryBySubgroup(2)
# test4 - Land the second batch of studentattendance data
land_contentMasteryBySubgroup_2022(2)
# test5 - Ingest the data from stage1 into stage2
ingest_contentMasteryBySubgroup(3)

In [None]:
def reset_snapshot_data_tests():
    oea.rm_if_exists('stage1/Transactional/contoso_sis/v0.1/studentsectionmark')
    oea.rm_if_exists('stage2/Ingested/contoso_sis/v0.1/studentsectionmark')

def land_progressTowardsEnglishLanguageProficiency_2021(expected_record_count):
    data_str = requests.get('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/gene/v0.7dev/framework/test_data/contoso_sis/2021/progressTowardsEngLangProfibySubgr/part1.csv').text
    sink_path = oea.land(data_str, 'contoso_sis/v0.1/progressTowardsEngLangProfibySubgr', 'progressTowardsEngLangProfibySubgr.csv', batch_data_type=oea.DELTA_BATCH_DATA)
    sink_df = oea.load_csv(sink_path)
    assert sink_df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {sink_df.count()}'

def land_progressTowardsEnglishLanguageProficiency_2022(expected_record_count):
    data_str = requests.get('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/gene/v0.7dev/framework/test_data/contoso_sis/2022/progressTowardsEngLangProfibySubgr/part1.csv').text
    sink_path = oea.land(data_str, 'contoso_sis/v0.1/progressTowardsEngLangProfibySubgr', 'progressTowardsEngLangProfibySubgr.csv', batch_data_type=oea.DELTA_BATCH_DATA)
    sink_df = oea.load_csv(sink_path)
    assert sink_df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {sink_df.count()}'

def ingest_progressTowardsEnglishLanguageProficiency(expected_record_count):
    oea.ingest('contoso_sis/v0.1/progressTowardsEnglishLanguageProficiency')
    df = oea.load('stage2/Ingested/contoso_sis/v0.1/progressTowardsEngLangProfibySubgr')
    assert df.count() == expected_record_count, f'Expected {expected_record_count} records in landed data, but found {df.count()}'

reset_snapshot_data_tests()
# test1 - Land the first batch of studentattendance data
land_progressTowardsEnglishLanguageProficiency_2021(12)
# test2 - Ingest the data from stage1 into stage2
ingest_progressTowardsEnglishLanguageProficiency(12)
# test3 - run the same ingestion a second time and verify that it doesn't change what was ingested (ingestion is idempotent via use of _checkpoints)
ingest_progressTowardsEnglishLanguageProficiency(12)
# test4 - Land the second batch of studentattendance data
land_progressTowardsEnglishLanguageProficiency_2022(12)
# test5 - Ingest the data from stage1 into stage2
ingest_progressTowardsEnglishLanguageProficiency(12)

In [None]:
def refine_contoso_sis(df_source):
    metadata = oea.get_metadata_from_url('https://raw.githubusercontent.com/aanseacorelearn/OpenEduAnalyticsCCRPIdata/gene/v0.7dev/modules/module_catalog/Student_and_School_Data_Systems/metadata.csv')
    #oea.upsert(df_source, 'stage2/Refined/contoso_sis/v0.1/general/studentattendance')
    df_pseudo, df_lookup = oea.pseudonymize(df_source, metadata['readinessIndicatorsbySubgroup'])
    oea.upsert(df_pseudo, 'stage2/Refined/contoso_sis/v0.1/general/readinessIndicatorsbySubgroup')
    oea.upsert(df_lookup, 'stage2/Refined/contoso_sis/v0.1/sensitive/readinessIndicatorsbySubgroup')

oea.process('stage2/Ingested/contoso_sis/v0.1/readinessIndicatorsbySubgroup', refine_contoso_sis)

# query a sample of the data refined into stage2/refined
oea.display('stage2/Refined/contoso_sis/v0.1/general/readinessIndicatorsbySubgroup')
oea.display('stage2/Refined/contoso_sis/v0.1/sensitive/readinessIndicatorsbySubgroup')