# Homework Assignment 5: Data Prep
In this homework assignment, you will continue your exploration of the [SWAN-SF Dataset](https://doi.org/10.7910/DVN/EBCFKM), described in the paper found [here](https://doi.org/10.1038/s41597-020-0548-x).


This assignment will have you explore the cardinalities, number of missing values, detect outliers, handle missing values and outliers, and create data quality report for original and cleaned dataset. Additionally, you will be asked to provide documentation for your functions.

## Step 1: Downloading the Data

This assignment will only be using [Partition 1](https://dataverse.harvard.edu/api/access/datafile/:persistentId?persistentId=doi:10.7910/DVN/EBCFKM/BMXYCB). Recall that in Homework 4 we started to construct the analytics base table for our [SWAN-SF Dataset](https://doi.org/10.7910/DVN/EBCFKM). In that assignment, we read the data from one of the two subdirectories, __FL__ and __NF__, of the __partition1__ directory. These two subdirectories represented the two classes of our target feature in a solar flare prediction problem, and you were asked to extract the flare class of a sample. For this assignment, I have processed these samples of multivariate time series to construct descriptive features for each sample, and then placed them into an analytics base table.

In this assignment, you will be utilizing a set of extracted descriptive features. This dataset contains many extracted features for each multi variate time series instance (>800), so we need to explore the data to find data quality issues and identify ways to address these issues. Below are links to the full extracted feature dataset for partition 1 and a toy dataset to use for testing your functions. __Note:__ Since the full dataset, and multiple copies of partially processed intermediary results, tend to take up a bit of space, you can use the toy dataset to implement and test your code. You may need to edit the data to fully test each of the requirements, but that is left as an exercise for the student. The full partition dataset is only included for those who wish to work with it once they have their code implemented. 

- [Full Partition 1 feature dataset](http://dmlab.cs.gsu.edu/solar/data/partition1ExtractedFeatures.csv)
- [Toy Partition 1 feature dataset](http://dmlab.cs.gsu.edu/solar/data/toy_partition1ExtractedFeatures.csv)

Now that you have the extracted features csv files, you will load that data into a Spark DataFrame.  

In [None]:
import os
import pandas as pd
from pyspark.sql import DataFrame
import pyspark.sql.functions as sf

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("spark://127.0.0.1:7077")
    # the number of executors this job needs
    .config("spark.executor.instances", 2)
    # the number of CPU cores memory this needs from the executor,
    # it would be reserved on the worker
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4G")
    .getOrCreate()
)
sc = spark.sparkContext

In [None]:
data_dir = '/data/MVTS'
#data_dir = './'
data_file = 'toy_partition1ExtractedFeatures.csv'
#data_file = 'partition1ExtractedFeatures.csv'

In [None]:
abt = (spark.read.format("csv").option("inferSchema", "true")
       .option("sep", ",").option("lineSep", "\n")
       .option("header", "true")
       .load(os.path.join(data_dir, data_file)))

### Q1 (20 points)

Write a function to extract the various pieces of a data quality report, for a specific attribute, and return a pandas DataFrame with this information.

 * 'Feature Name': Contains the time series statistical feature name
 
 * 'Cardinality': Contains the count of unique values for the feature
            
 * 'Non-null Count': Contains the number of non-null entries for the feature
            
 * 'Null Count': Contains the number of null or missing entries for the feature
            
 * 'Min': Contains the minimum value of the feature
 
 * '25th': Contains the first quartile (25%) of the feature values
 
 * 'Mean': Contains the mean of the feature values
 
 * '50th': Contains the median of the feature values
            
 * '75th': Contains the third quartile (75%) of the feature values
 
 * 'Max': Contains the maximum value of the feature,
            
 * 'Std. Dev': Contains the standard deviation of the feature
 
In addition to the values above, you should identify the number of upper and lower outliers using the $val < Q1-1.5IQR$ and $val > Q3+1.5IQR$ outlier identification method ($IRQ$ is the inter quartile range or the distance between $Q1$ and $Q3$). These added features should be called `Outlier Count Low` and `Outliers Count High` respectively.


 
 Some useful functions for this can be found at:
 
 * [pyspark.sql.functions.percentile](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.percentile.html#pyspark.sql.functions.percentile)
 
 * [pyspark.sql.functions.isnan](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.isnan.html#pyspark.sql.functions.isnan)
 
 * [pyspark.sql.functions.mean](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.mean.html)
 
 * [pyspark.sql.functions.std](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.std.html#pyspark.sql.functions.std)

 * For more functions go to [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

In [None]:
#Place your code here
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

def data_quality_report(df, attribute):
    quantiles = df.approxQuantile(attribute, [0.25, 0.5, 0.75], 0)
    Q1, Q3 = quantiles[0], quantiles[2]
    IQR = Q3 - Q1

    outliers_low = df.filter(col(attribute) < Q1 - 1.5 * IQR).count()
    outliers_high = df.filter(col(attribute) > Q3 + 1.5 * IQR).count()

    report = df.select(
        count(col(attribute)).alias('Non-null Count'),
        sum(col(attribute).isNull().cast('int')).alias('Null Count'),
        min(col(attribute)).alias('Min'),
        expr(f'percentile_approx({attribute}, 0.25)').alias('25th'),
        expr(f'percentile_approx({attribute}, 0.5)').alias('50th'),
        expr(f'percentile_approx({attribute}, 0.75)').alias('75th'),
        max(col(attribute)).alias('Max'),
        stddev(col(attribute)).alias('std. Dev')
    ).withColumn('Feature Name', expr(f"'{attribute}'")) \
     .withColumn('Cardinality', df.select(attribute).distinct().count()) \
     .withColumn('Outlier Count Low', expr(f"{outliers_low}")) \
     .withColumn('Outlier Count High', expr(f"{outliers_high}"))

    return report.toPandas()

attribute = 'name'
report = data_quality_report(abt, attribute)
print(report)

### Q2 (20 points)
Using what you produced to answer Q1, you should now write a function to construct the data quality report for all of the numerical features of our dataset.  You should loop over all of the features in the analytics base table represented by the input feature dataset files from partition 1, with the exception of the first column (this is the index column if you read the file correctly), and the `id`, `lab`, `st`, and `et` columns.  

Your output from this function will be a DataFrame that has 1 row for each feature. 

In [None]:
#Place your code here
def construct_data_quality_report(df):
    ft = [c for c in df.columns if c not in ['_c0', 'id', 'lab', 'st', 'et']]
    report = pd.DataFrame()
    for ft in ft:
        ft_report = data_quality_report(df, ft)
        report = report.append(ft_report)
        return report.reset_index(drop=True)
report= construct_data_quality_report(abt)
print(report)

### Q3 (20 points)
#### Drop features with improper cardinality:
Using the quality report summary table that is returned from the function you wrote for Q2, we are now going to investigate our data. For this, you should use the table returned for the [Full Partition 1 feature dataset](http://dmlab.cs.gsu.edu/solar/data/partition1ExtractedFeatures.csv) and not the toy dataset I provided for testing.

Since we are using real valued features, a majority of them shall have a cardinality close to the sample count. So, for this question, you are to write a function that takes in the summary table and the input dataset DataFrame, and drops the feature that have a cardinality less than 10. This feature should be dropped from both the data quality report summary table and from the actual input dataset Spark DataFrame.

A useful method for this operation on the summary table is:

* [pandas.DataFrame.drop](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.drop.html) (Make sure to use the inplace option otherwise it returns a copy, that way you don't need to return a copy of the summary table it just edits the original)

* For the dataset, you will need to return a new DataFrame as they are immutable and the drop operation reutrns a new dataframe.

In [None]:
#Place your code here
def drop_low_cardinality_features(report, df):
    low_cardinality_features = report[report['Cardinality'] < 10]['Feature Name'].values
    report.drop(report[report['Feature Name'].isin(low_cardinality_features)].index, inplace=True)
    for ft in low_cardinality_features:
        df = df.drop(ft)
    return df
df_new = drop_low_cardinality_features(report, abt)

### Q4 (20 points)
#### Drop features with excessive NaN

Again, using the quality report summary table that is returned from the function you wrote for Q2, we are going to continue investigating our data. For this, you should still be using the table returned for the [Full Partition 1 feature dataset](http://dmlab.cs.gsu.edu/solar/data/partition1ExtractedFeatures.csv) and not the toy dataset I provided for testing.

Like the features that were dropped for Q3, some of the extracted features don't work on all of the variates of the input multi-variate time series samples very well.  So, some of these features return an excessive number of `NaN` values.  These are not verry useful features, so we want to get rid of them before we continue. To do this, you are to write a function that takes in the summary table and the input dataset DataFrame, and drops the features that have **more than 1%** of the entries as null/nan values. Again, these features should be dropped from both the data quality report summary table and from the actual input dataset Spark DataFrame.

As in Q3, a useful method for this operation is:

* [pandas.DataFrame.drop](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.drop.html) (Make sure to use the inplace option otherwise it returns a copy)
  
* For the dataset, you will need to return a new DataFrame as they are immutable and the drop operation reutrns a new dataframe.

In [None]:
#Place your code here
def drop_NaN_ft(report, df):
    total_count = df.count()
    NaN_ft = report[report['Null Count'] > 0.01 * total_count]['Feature Name'].values
    report.drop(report[report['Feature Name'].isin(NaN_ft)].index, inplace=True)
    for ft in NaN_ft:
        df = df.drop(ft)
    return df

### Q5 (20 points)
#### Add docstring to your functions

Let's revisit our programming skill while learning some fundamental data science operations. 

Your code is only as valuable as its reusability. Without understandable and legible documentation (which makes maintainability and reusability possible) nobody would like to use your code, let alone to pay for it. ;)

If you want to know more about the value of documentation, read [this article](https://www.freecodecamp.org/news/why-documentation-matters-and-why-you-should-include-it-in-your-code-41ef62dd5c2f/). There are even conferences on this topic; see [this website](https://www.writethedocs.org/guide/writing/beginners-guide-to-docs/).

In Python, the documentation that is embedded in the code is called **docstring**. In the example below, the "string" wrapped in triple quotes is there to tell us all about this function.

In [None]:
def nanmean(a, axis=None):
    """
    Compute the arithmetic mean along the specified axis, ignoring NaNs.
    
    Parameters
    ----------
    a : array_like
        Array containing numbers whose mean is desired. If `a` is not an
        array, a conversion is attempted.
    axis : {int, tuple of int, None}, optional
        Axis or axes along which the means are computed. The default is to compute
        the mean of the flattened array.
    """
    # some magic happens here that we don't care about.
    pass

Note that this is not just a *comment*. If you execute `nanmean` and then call it (as if you want to use it), you can hit `shift+Tab` while your cursor is on the function name, and see how the docstring gets compiled and then pops up. This allows other users to see our description even when they don't have access to our source code. Try it! You can do this with other NumPy and Pandas functions/methods that you've been using.

The above example is a simplified version of the method `nanmean` copied from the NumPy library ([here](https://github.com/numpy/numpy/blob/v1.21.0/numpy/lib/nanfunctions.py#L862-L957)). Feel free to check out their complete docstrings.


Your last task is to provide docstrings for the 4 methods you've implemented. Simply go back to those cells and modify your functions. Feel free to use the text provided to you (in the assignment descriptions) to enrich your docstrings. Keep in mind that your docstring needs (1) a general description, (2) a short description for each input, and (3) a short description for the output.

How to check your docstring? Hit `shift+Tab` and see if the pop-up message is correctly compiled, and make sure your description answers all the questions about your functions.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
def data_quality_report(df, attribute):
    """
    Generates a data quality report for a specific numerical attribute in a pandas DataFrame.
    Parameters:
    - df (pd.DataFrame): The input pandas DataFrame containing the dataset.
    - attribute (str): The name of the numerical attribute column for which the report is generated.

    Returns:
    pd.DataFrame: Data quality report for the specified numerical attribute.
    """
    quantiles = df.approxQuantile(attribute, [0.25, 0.5, 0.75], 0)
    Q1, Q3 = quantiles[0], quantiles[2]
    IQR = Q3 - Q1

    outliers_low = df.filter(col(attribute) < Q1 - 1.5 * IQR).count()
    outliers_high = df.filter(col(attribute) > Q3 + 1.5 * IQR).count()

    report = df.select(
        count(col(attribute)).alias('Non-null Count'),
        sum(col(attribute).isNull().cast('int')).alias('Null Count'),
        min(col(attribute)).alias('Min'),
        expr(f'percentile_approx({attribute}, 0.25)').alias('25th'),
        expr(f'percentile_approx({attribute}, 0.5)').alias('50th'),
        expr(f'percentile_approx({attribute}, 0.75)').alias('75th'),
        max(col(attribute)).alias('Max'),
        stddev(col(attribute)).alias('std. Dev')
    ).withColumn('Feature Name', expr(f"'{attribute}'")) \
     .withColumn('Cardinality', df.select(attribute).distinct().count()) \
     .withColumn('Outlier Count Low', expr(f"{outliers_low}")) \
     .withColumn('Outlier Count High', expr(f"{outliers_high}"))

    return report.toPandas()

attribute = 'name'
report = data_quality_report(abt, attribute)
print(report)

def construct_data_quality_report(df):
    """
    Constructs a comprehensive data quality report for all numerical features in a pandas DataFrame.
     Parameters:
    - df (pd.DataFrame): The input pandas DataFrame containing the dataset.

    Returns:
    pd.DataFrame: Summary data quality report for all numerical features in the dataset.
    """
     ft = [c for c in df.columns if c not in ['_c0', 'id', 'lab', 'st', 'et']]
    report = pd.DataFrame()
    for ft in ft:
        ft_report = data_quality_report(df, ft)
        report = report.append(ft_report)
        return report.reset_index(drop=True)
report= construct_data_quality_report(abt)
print(report)


def drop_low_cardinality_features(report, df):
    """
    Drops features with a low cardinality from both the data quality report summary table
    and the input pandas DataFrame.
    Parameters:
    - report (pd.DataFrame): The data quality report summary table.
    - df (pd.DataFrame): The input pandas DataFrame containing the dataset.

    Returns:
    tuple: Updated data quality report summary table and input dataset DataFrame.
    """
    low_cardinality_features = report[report['Cardinality'] < 10]['Feature Name'].values
    report.drop(report[report['Feature Name'].isin(low_cardinality_features)].index, inplace=True)
    for ft in low_cardinality_features:
        df = df.drop(ft)
    return df
df_new = drop_low_cardinality_features(report, abt)

def drop_NaN_ft(report, df, nan_threshold=0.01):
    """
    Drops features with a high percentage of nan values from both the data quality
    report summary table and the input pandas DataFrame.
    Parameters:
    - report (pd.DataFrame): The data quality report summary table.
    - df (pd.DataFrame): The input pandas DataFrame containing the dataset.
    - nan_threshold (float): The threshold for the percentage of null/nan values.

    Returns:
    tuple: Updated data quality report summary table and input dataset DataFrame.
    """
    total_count = df.count()
    NaN_ft = report[report['Null Count'] > 0.01 * total_count]['Feature Name'].values
    report.drop(report[report['Feature Name'].isin(NaN_ft)].index, inplace=True)
    for ft in NaN_ft:
        df = df.drop(ft)
    return df


