# Data Reporting Project
### By Spencer Warezak -- August 30, 2023

This project is for the analysis of 4 separate datasets (mdata_2023-03-10, mdata_2023-03-11, mdata_2023-03-12, mdata_2023-03-13). The object of this project is to monitor datasets for data quality and present the result in an email (with colors). The daily report for each dataset should include:
* Duplicate data
* Missing data
* Bad data
* Anomalies within the data
* Ensuring that all 6 customers have data

The data has 4 fields:
* Time (epoch format)
* Consumption (in kwh)
* ChannelId
* Whether the time is in UTC format
 

In [947]:
pip install yagmail

Note: you may need to restart the kernel to use updated packages.


In [956]:
## import pandas library for data consumption and validation
import pandas as pd
import numpy as np
import os
import yagmail


## define our input files globally
input_strings = [
    "{}/{}".format(os.getcwd(),'./mdata-2023-03-10.csv'),
    "{}/{}".format(os.getcwd(),'./mdata-2023-03-11.csv'),
    "{}/{}".format(os.getcwd(),'./mdata-2023-03-12.csv'),
    "{}/{}".format(os.getcwd(),'./mdata-2023-03-13.csv')
]

## define a new object class to handle all of the reporting data
## Our report class will be populated to handle all of our value tracking
## and will be used for the purpose of report generation

## The following fields will be used in our report
## INITIALIZED VALUE(S)
## * df: This is our dataframe being used for the report

## INTERNAL VALUE(S)
## * duplicates_count: This is the count for number of times we see a row
## * duplicates_list: This is the list mapping row string to the row object
## * missing: This is the list of missing rows based on epoch intervals
## * bad: This is the number of bad values we have (NaN, inf, -inf, poor formatting, etc.)
## * outliers: This is a list of row values with outlier consumption values (+/- 3 std deviations from the mean)
## * channel_ids: The list of unique channel ids we see (this will be used to validate that we have all 6 customers)

## TRACKING VALUE(S)
## * interval: This is the interval calculated between epochs for each customer
## * mean: This is the mean consumption value for each customer in the dataset
## * stddev: This is the stddev of consumption values for each customer in the dataset
class Report():
    ## GLOBAL VARIABLES
    ## Colors and font styling
    PURPLE = '<span style="color: purple;">'
    GREEN = '<span style="color: green;">'
    ORANGE = '<span style="color: orange;">'
    RED = '<span style="color: red;">'
    RESET = '</span>'
    BOLD = '<span style="font-weight: bold;">'
    ITALIC = '<span style="font-style: italic;">'
    
    ## INIT
    ## The initialization of the Report should handle all of the data parsing
    ## It will call all internal methods
    ## There should be no data processing required after initialization
    ## It should populate the duplicates, missing, bad, outliers, and channel ids
    ## 
    ## Initially, the __init__ function will set the interval, mean, and stddev for each channel id
    def __init__(self, df, title) -> None:
        ## Init val(s)
        self.__title = title
        self.__df = df
        
        ## Internal val(s)
        self.__duplicates_count = {}
        self.__duplicates_list = {}
        self.__missing = {}
        self.__bad = {}
        self.__outliers = {}

        ## Tracking val(s)
        self.__interval = {}
        self.__mean = {}
        self.__stddev = {}
        
        ## Set channel ids to be non na and non null values in channel id
        ## Remove all null channelid rows
        self.__channel_ids = [id for id in self.__df.channelid.unique() if not pd.isna(id)]
        self.__df = self.__df[self.__df['channelid'].isin(self.__channel_ids)]
        
        ## Set the duplicates
        ## True/False values for duplicates ==> all column values for a row occur more than once
        duplicate_mask = self.__df.duplicated(subset=['epoch', 'data', 'channelid', 'tz'], keep=False)
        for index, row in self.__df[duplicate_mask].iterrows():
            cid = row.channelid
            if cid not in self.__duplicates_count:
                self.__duplicates_count[cid] = 0
            self.__duplicates_count[cid] += 1
            
            if cid not in self.__duplicates_list:
                self.__duplicates_list[cid] = []
            if row.to_dict() not in self.__duplicates_list[cid]:
                self.__duplicates_list[cid].append(row.to_dict())
                            
        ## Drop all duplicated values from the dataset
        self.__df = self.__df.drop_duplicates(subset=['epoch', 'data', 'channelid', 'tz'])
                
        ## Set the bad values and fix/remove
        ## Group by channelid and 
        ##     Generate frequency map for interval
        ##     Find all na values and handle via interpolation
        ##
        rows_to_drop = []
        grouped_df = self.__df.groupby('channelid')
        for cid, subset in grouped_df:
            ## Frequency map for epoch differences
            freq_map = {}
            for i in range(len(subset) - 1):
                epoch_diff = abs(subset.iloc[i+1].epoch - subset.iloc[i].epoch)
                if epoch_diff not in freq_map:
                    freq_map[epoch_diff] = 0
                freq_map[epoch_diff] += 1
            
            ## Set the interval to the most frequently occurring difference
            self.__interval[cid] = max(freq_map, key=freq_map.get)
            
            ## Bad values
            ## Sum the count of all na values and non_numeric values
            ## For each group, denote the number of bad values
            ## Interpolate these values
            self.__bad[cid] = []
            bad_mask = subset.isna().any(axis=1)
            if bad_mask.any():
                for index, row in subset[bad_mask].iterrows():
                    if row.to_dict() not in self.__bad[cid]:
                        self.__bad[cid].append(row.to_dict())
                    rows_to_drop.append(index)
                        
            non_numeric_mask = pd.to_numeric(subset.data, errors='coerce').isna()
            if non_numeric_mask.any():
                for index, row in subset[non_numeric_mask].iterrows():
                    if row.to_dict() not in self.__bad[cid]:
                        self.__bad[cid].append(row.to_dict())
                    rows_to_drop.append(index)
            
        ## Drop the rows we no longer need
        ## Reset the index of the dataframe to accurately
        ## represent its current state
        self.__df.drop(index=rows_to_drop, inplace=True)
        self.__df.reset_index(drop=True, inplace=True)
        
        ## Now that we have dropped all bad values and removed all duplicate values
        ## we can set column data type
        self.__df.data.astype(float)
        
        ## Iterate through all the rows of each group and find the missing values
        ## A missing value is defined as not being recorded within the interval stated
        ## Rows will be added in the form { epoch: curr + interval, data: np.nan, channelid: cid, tz: 'UTZ' }
        ## The na values will then be interpolated
        updated_df = self.__df.copy()
        for i in range(len(self.__df) - 1):
            next, curr = self.__df.iloc[i+1], self.__df.iloc[i]
            epoch_diff = abs(next.epoch - curr.epoch)
            
            if next.channelid == curr.channelid:
                cid = curr.channelid
                if self.__interval[cid] < epoch_diff:
                    ## Check to see if the channel id is in the missing map
                    if cid not in self.__missing:
                        self.__missing[cid] = []
                    
                    curr = epoch_diff - self.__interval[cid]
                    rows = []
                    while curr > 0:
                        ## Append the data to the current rows list
                        ## Append the data to the missing data map
                        data = { 'epoch': next.epoch + curr, 'data': np.nan, 'channelid': cid, 'tz': 'UTC' }
                        rows.append(data)
                        self.__missing[cid].append(data)
                        
                        ## Decrement curr to the next interval
                        curr -= self.__interval[cid]
                        
                    ## Modify the updated dataframe with the rows we have added
                    ## Ensure it is in the correct location, not overwriting data
                    updated_df = pd.concat(
                        [updated_df.loc[:i],
                        pd.DataFrame(rows),
                         updated_df.loc[i+len(rows):]]
                    ).reset_index(drop=True)

        ## Set the dataframe to our updated dataframe
        ## Interpolate missing values
        self.__df = updated_df
        self.__df.data = pd.to_numeric(self.__df.data, errors='coerce')
        self.__df.data = self.__df.data.interpolate()
        
        grouped_df = self.__df.groupby('channelid')
        for cid, data in grouped_df:
            self.__mean[cid] = data.data.mean()
            self.__stddev[cid] = data.data.std()
            
        ## Set the outliers based on value being >= 3 std deviations from the mean
        for i in range(len(self.__df)):
            curr = self.__df.iloc[i]
            if curr.channelid not in self.__outliers:
                self.__outliers[curr.channelid] = []
            
            ## Check if diff between mean and data val is >= 3 std devs
            mean_diff = abs(curr.data - self.__mean[curr.channelid])
            if mean_diff // self.__stddev[curr.channelid] >= 3:
                self.__outliers[curr.channelid].append(curr.to_dict())
                
    ## This method will send the email report to the intended user with the report string
    ## It will take the below parameters
    ##     1) from email
    ##     2) from password
    ##     3) to email
    def email_report(self, from_email, from_pwd, to_email) -> str:
        if not from_email or not to_email:
            return 'Please define a sender and receiver!'

        try:
            yag = yagmail.SMTP(from_email, from_pwd)
            subject = f"Data Quality Report for {self.__title}"
            message = self.generate_report()
            csv_string = self.__df.to_csv('report.csv', index=False)
            attachments=["{}/{}".format(os.getcwd(),'./report.csv')]
            
            yag.send(to=to_email, subject=subject, contents=message, attachments=attachments)
            return f"Email successfully sent to {to_email}!"
        except Exception as e:
            print(e)
                                            
    ## This method will generate a report that we can send via email
    ## It will denote the below documented functionality:
    ##     1) Duplicate data
    ##     2) Missing data
    ##     3) Bad data
    ##     4) Anomalies within the data
    ##     5) Ensuring that all 6 customers have data
    ## 
    ## This string should be colored with the given color scheme
    ##     * Green = good
    ##     * ORANGE = passable
    ##     * Red = poor
    def generate_report(self) -> str: 
        ## Customer total data values
        ## This is important to have, as we will
        ## be assigning data quality statuses based
        ## on the frequency of a certain category for
        ## each customer relative to the total data
        ## shown for them
        customer_totals = {}
        grouped_df = self.__df.groupby('channelid')
        for cid, df in grouped_df:
            customer_totals[cid] = len(df)
                
        ## Data availability
        data_availability = self.GREEN if len(self.__channel_ids) == 6 else self.RED
        
        ## Duplicate data
        ## Calculate duplicates as a percentage of total observations for each customer
        duplicates_table = f"""\n"""
        data_duplicates = f"""[ """
        for cid in self.__channel_ids:
            count_val = 0
            if cid in self.__duplicates_count:
                count_val = self.__duplicates_count[cid]
                duplicates_table += f"""\n\t----- {cid} -----\n{self.PURPLE}{pd.DataFrame(self.__duplicates_list[cid])}{self.RESET}\n\n"""
                
                
            data_duplicates += f"{cid}: {self.get_status(count_val, customer_totals[cid])}{count_val}/{customer_totals[cid]}{self.RESET}  "
            
        data_duplicates += "]\n"
        
        if duplicates_table == '\n':
            duplicates_table = f"""{self.PURPLE}No duplicates found!{self.RESET}"""
        
        
        ## Bad data
        data_bad = f"""[ """
        bad_table = f"""\n"""
        for cid in self.__channel_ids:
            count_val = 0
            if cid in self.__bad:
                count_val = len(self.__bad[cid])
                if len(self.__bad[cid]) > 0:
                    bad_table += f"""\n\n----- {cid} -----\n{self.PURPLE}{pd.DataFrame(self.__bad[cid])}{self.RESET}\n\n"""
        
            data_bad += f"{cid}: {self.get_status(count_val, customer_totals[cid])}{count_val}/{customer_totals[cid]}{self.RESET}  "
            
        data_bad += "]\n"
        
        if bad_table == '\n':
            bad_table = f"""{self.PURPLE}No bad data found!{self.RESET}"""
        
        ## Missing data
        data_missing = f"""[ """
        missing_table = f"""\n"""
        for cid in self.__channel_ids:
            count_val = 0
            if cid in self.__missing:
                count_val = len(self.__missing[cid])
                if len(self.__missing[cid]) > 0:
                    missing_table += f"""\n\n----- {cid} -----\n{self.PURPLE}{pd.DataFrame(self.__missing[cid])}{self.RESET}\n\n"""
        
            data_missing += f"{cid}: {self.get_status(count_val, customer_totals[cid])}{count_val}/{customer_totals[cid]}{self.RESET}  "
            
        data_missing += "]\n"
        
        if missing_table == '\n':
            missing_table = f"""{self.PURPLE}No missing data found!{self.RESET}"""
            
            
        ## Anomalies
        data_anomalies = f"""[ """
        anomalies_table = f"""\n"""
        for cid in self.__outliers:
            count_val = 0
            if cid in self.__outliers:
                count_val = len(self.__outliers[cid])
                if len(self.__outliers[cid]) > 0:
                    anomalies_table += f"""\n\n----- {cid} -----\n{self.PURPLE}{pd.DataFrame(self.__outliers[cid])}{self.RESET}\n\n"""
                    
            data_anomalies += f"{cid}: {self.get_status(count_val, customer_totals[cid])}{count_val}/{customer_totals[cid]}{self.RESET}  "
            
        data_anomalies += "]\n"
        
        if anomalies_table == '\n':
            anomalies_table = f"""{self.PURPLE}No anomalies found in the data set!{self.RESET}"""
        
        report_template = f"""
            {self.BOLD}{self.__title} DATA QUALITY REPORT{self.RESET}
            --------------------------------------------------------------------------------------------------------------------------------------------------------
            {self.BOLD}LEGEND:{self.RESET} 
                - Quality: [ {self.GREEN}GOOD{self.RESET} -- {self.ORANGE}PASSABLE{self.RESET} -- {self.RED}POOR{self.RESET} -- {self.PURPLE}IMPORTANT{self.RESET} ]
            
            {self.BOLD}Report Overview:{self.RESET}
                - This report summarizes the quality of the data received highlighting key
                  aspects regarding data quality, integrity, and cleanliness. In reading the
                  input dataset, we measured for duplicate data, missing data, bad (poorly)
                  formatted data, outliers (consumption of >= 3 standard deviations from
                  the mean), as well as ensuring all 6 customers have data available to them.
                  
            {self.BOLD}General Statistics:{self.RESET}
                - Epoch Intervals: {self.PURPLE}{self.__interval}{self.RESET}
                - Mean Values: {self.PURPLE}{self.__mean}{self.RESET}
                - Standard Deviation Values: {self.PURPLE}{self.__stddev}{self.RESET}
                
            {self.BOLD}Data Availability:{self.RESET}
                - Status: {data_availability}{len(self.__channel_ids)}/6{self.RESET}
                {self.generate_data_availability_explanation()}
                - Channel IDs Present: {self.PURPLE}{self.__channel_ids}{self.RESET}
                
            {self.BOLD}Duplicate Data:{self.RESET}
                - Status: {data_duplicates}
                {self.generate_duplicate_data_explanation()}
                - Duplicates Tables: {duplicates_table}
                
            {self.BOLD}Bad Data:{self.RESET}
                - Status: {data_bad}
                {self.generate_bad_data_explanation()}
                - Bad Tables: {bad_table}
            
            {self.BOLD}Missing Data:{self.RESET}
                - Status: {data_missing}
                {self.generate_missing_data_explanation()}
                - Missing Tables: {missing_table}
                
            {self.BOLD}Outliers:{self.RESET}
                - Status: {data_anomalies}
                {self.generate_anomalies_data_explanation()}
                - Outliers Tables: {anomalies_table}
                
            Data Frame Summary After Cleaning: \n{self.PURPLE}{self.__df.data.describe()}{self.RESET}
            
            The cleaned data set is attached with the title '{self.ITALIC}report.csv'{self.RESET}'.
            
        """
        
        return report_template
    
    def generate_data_availability_explanation(self) -> str:
        if len(self.__channel_ids) == 6:
            return f"""- Explanation: Data for all 6 customers is ensured to be present. If any
                customers are without data, we cannot provide our suite of services to
                them. Given there are {self.GREEN}6{self.RESET} values present, we are able to service each of
                our customers. No issues are posed in terms of data availability."""
        else:
            return f"""- Explanation: Data for all 6 customers is ensured to be present. If any
                    customers are without data, we cannot provide our suite of services to
                    them. With {self.RED}{len(self.__channel_ids)}{self.RESET} value(s) present, we are unable to service each of our 
                    customers. This poses an issue in terms of data availability."""
        
    def get_status(self, dups, total) -> str:
        if dups / total <= 0.025:
            return self.GREEN
        elif dups / total <= 0.05:
            return self.ORANGE
        else:
            return self.RED
        
    def generate_duplicate_data_explanation(self) -> str:
        return f"""- Explanation: Duplicates found in the input dataset can alter any sort of numeric
                accuracy we are trying to maintain. We also want to limit the number of data points we
                are removing from our dataset. Thus, we have set the boundary for good data as having
                duplicates less than or equal to 2.5% of all available data points, and passable data
                as having less than or equal to 5.0% of all available data points. Anything with more
                than 5% of all available data points being duplicates is flagged as poor data quality.
                Thus, we arrive at the labels measured above for each customer.
        """
    
    def generate_bad_data_explanation(self) -> str:
        return f"""- Explanation: Bad data found in the input dataset can alter any sort of numeric
                accuracy we are trying to maintain. We also want to limit the number of data points we
                are removing and then interpolating in our dataset. Thus, we have set the boundary for 
                good data as having bad inputs less than or equal to 2.5% of all available data points, 
                and passable data as having less than or equal to 5.0% of all available data points. 
                Anything with more than 5% of all available data points being bad is flagged as 
                poor data quality. Thus, we arrive at the labels measured above for each customer.
        """
    
    def generate_missing_data_explanation(self) -> str:
        return f"""- Explanation: Missing data found in the input dataset can alter any sort of numeric
                accuracy we are trying to maintain. We also want to limit the number of data points we
                are adding and then interpolating values for in our dataset. Thus, we have set the boundary for 
                good data as having missing inputs less than or equal to 2.5% of all available data points, 
                and passable data as having less than or equal to 5.0% of all available data points. 
                Anything with more than 5% of all available data points being bad is flagged as 
                poor data quality. Thus, we arrive at the labels measured above for each customer.
        """ 
    
    def generate_anomalies_data_explanation(self) -> str:
        return f"""- Explanation: Anomalies found in the input dataset can alter any sort of numeric
                accuracy we are trying to maintain. Thus, we have set the boundary for good data as 
                having anomalies less than or equal to 2.5% of all available data points, and passable 
                data as having less than or equal to 5.0% of all available data points. Anything with 
                more than 5% of all available data points being bad is flagged as poor data quality. 
                Thus, we arrive at the labels measured above for each customer.
        """ 

In [958]:
from_email = ''
from_pwd = ''
to_email = ''

for input in input_strings:
    df = pd.read_csv(input)
    rep = Report(df, input)
    rc = rep.email_report(from_email, from_pwd, to_email)
    print(rc)

Please define a sender and receiver!
Please define a sender and receiver!
Please define a sender and receiver!
Please define a sender and receiver!
