# Clever_py

__NOTE:__ This notebook currently only processes the Student Clever data (rather than students, teachers, and staff data).

This notebook can be modified as needed to account for processing more Clever user data (being teachers and/or staff).

In [None]:
import datetime

class Clever(BaseOEAModule):
    def __init__(self, source_folder='clever'):
        BaseOEAModule.__init__(self, source_folder)

        self.schemas['daily_participation'] = [['date', 'date', 'no-op'],
                                            ['sis_id', 'string', 'hash'],
                                            ['clever_user_id', 'string', 'hash'],
                                            ['clever_school_id', 'string', 'no-op'],
                                            ['school_name', 'string', 'no-op'],
                                            ['active', 'boolean', 'no-op'],
                                            ['num_logins', 'integer', 'no-op'],
                                            ['num_resources_accessed', 'integer', 'no-op']]
        self.schemas['resource_usage'] = [['date', 'date', 'no-op'], 
                                            ['sis_id', 'string', 'hash'],
                                            ['clever_user_id', 'string', 'hash'],
                                            ['clever_school_id', 'string', 'no-op'],
                                            ['school_name', 'string', 'no-op'],
                                            ['resource_type', 'string', 'no-op'],
                                            ['resource_name', 'string', 'no-op'],
                                            ['resource_id', 'string', 'no-op'],
                                            ['num_access', 'integer', 'no-op']]

    def ingest(self):
        """Processes clever data from stage1 into stage2 using structured streaming within the defined functions below."""
        logger.info("Processing clever data from: " + self.stage1np)

        items = mssparkutils.fs.ls(self.stage1np)
        for item in items:
            if item.name == "daily-participation":
                self._process_clever_stage1_data(table_name='daily-participation', folder='/*', partition_column='date')
            elif item.name == "resource-usage":
                self._process_clever_stage1_data(table_name='resource-usage', folder='/*', partition_column='date')
            else:
                logger.info("No defined function for processing this clever data")
        
        logger.info("Finished processing clever data from stage 1 to stage 2")

    def _process_clever_stage1_data(self, table_name=None, folder=None, partition_column=None):
        """ Processes any clever data table from stage1 to stage2 using structured streaming. """
        
        # change new table name to match OEA conventions
        new_table_name = table_name.replace('-', '_')

        source_path = f'{self.stage1np}/{table_name}'
        p_destination_path = f'{self.stage2p}/{new_table_name}_pseudo'
        np_destination_path = f'{self.stage2np}/{new_table_name}_lookup'

        logger.info(f'Processing incremental data from: {source_path} and writing out to: {p_destination_path}')
        
        clever_spark_schema = oea.to_spark_schema(self.schemas[new_table_name])
        # read in the raw data using structured streaming
        df = spark.readStream.csv(source_path + folder + '/*.csv', header='true', schema=clever_spark_schema)

        df_pseudo, df_lookup = oea.pseudonymize(df, self.schemas[new_table_name])

        # write out the resulting, final tables 
        if len(df_pseudo.columns) == 0:
            logger.info('No data to be written to stage2p')
        else:
            df_pseudo = df_pseudo.withColumn('year', F.year(F.col(partition_column))).withColumn('month', F.month(F.col(partition_column)))
            query = df_pseudo.writeStream.format("delta").outputMode("append").trigger(once=True).option("checkpointLocation", source_path +  '/_checkpoints_p').partitionBy('year', 'month')
            query = query.start(p_destination_path)
            query.awaitTermination()   # block until query is terminated, with stop() or with error; A StreamingQueryException will be thrown if an exception occurs.
        
        if len(df_lookup.columns) == 0:
            logger.info('No data to be written to stage2np')
        else:
            query2 = df_lookup.writeStream.format("delta").outputMode("append").trigger(once=True).option("checkpointLocation", source_path + '/_checkpoints_np')
            query2 = query2.start(np_destination_path)
            query2.awaitTermination()   # block until query is terminated, with stop() or with error; A StreamingQueryException will be thrown if an exception occurs.