# Processing Big Data - Deequ Analysis

© Explore Data Science Academy

## Honour Code
I **PRECIOUS**, **AROVO**, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.


## Context

Having completed manual data quality checks, it should be obvious that the process can become quite cumbersome. As the Data Engineer in the team, you have researched some tools that could potentially save the team from having to do this cumbersome work. In your research, you have come a across a tool called [Deequ](https://github.com/awslabs/deequ), which is a library for measuring the data quality of large datasets.

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/raw/master/data_engineering/transform/predict/DataQuality.jpg"
     alt="Data Quality"
     style="float: center; padding-bottom=0.5em"
     width=100%/>
     <p><em>Figure 1. Six dimensions of data quality</em></p>
</div>

You present this tool to your manager; he is quite impressed and gives you the go-ahead to use this in your implementation. You are now required to perform some data quality tests using this automated data testing tool.
 

> ## 🚩️ Important Notice 🚩️
>
>To successfully run `pydeequ` without any errors, please make sure that you have an environment that is running pyspark version 3.0.
> You are advised to **create a new conda environment** and install this specific version of pyspark to avoid any technical issues:
>
> `pip install pyspark==3.0`

<br>

## Import dependencies

If you do not have `pydeequ` already installed, install it using the following command:
- `pip install pydeequ`

In [1]:
import findspark
findspark.init() 

In [2]:
import os
import sys

os.environ["SPARK_VERSION"] = "3.0"
#os.environ['PYSPARK_PYTHON'] = sys.executable

#os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pydeequ
import json
import sagemaker_pyspark

from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *

from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, FloatType, DateType, NumericType, StructType, StringType, StructField

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [4]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

## Read data into spark dataframe

In this notebook, we set out to run some data quality tests, with the possiblity of running end to end on the years 1963, 1974, 1985, 1996, 2007, and 2018. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Data_ingestion_student_version.ipynb` notebook to create the parquet files for the following years:
>       - 1963
>       - 1974
>       - 1985
>       - 1996
>       - 2007
>       - 2018
>
>2. Ingest the data for the for the years given above. You should only do it one year at a time.
>3. Ingest the metadata file.


When developing your code, it will be sufficient to focus on a single year. However, after your development is done, you will need to run this notebook for all of the given years above so that you can answer all the questions given in the Data Testing MCQ.

In [5]:
# Ingesting the data created in data_ingestion_student_version.ipynb notebook, for the different year specified.
stock_data_path = r"C:\Users\USER\Desktop\ExploreDE\processing-big-data-predict\Task1_data_ingestion\1963_stock_data"
stock_data = spark.read.parquet(stock_data_path)

In [6]:
stock_data.count()

5020

In [7]:
# Reading the metadata "symbols_valid_meta.csv" into pandas data frame.
metadata_path = r"C:\Users\USER\Desktop\ExploreDE\processing-big-data-predict\symbols_valid_meta.csv"

metadata = pd.read_csv(metadata_path)
metadata.head()

Unnamed: 0,Nasdaq Traded,Symbol,Security Name,Listing Exchange,Market Category,ETF,Round Lot Size,Test Issue,Financial Status,CQS Symbol,NASDAQ Symbol,NextShares
0,Y,A,"Agilent Technologies, Inc. Common Stock",N,,N,100.0,N,,A,A,N
1,Y,AA,Alcoa Corporation Common Stock,N,,N,100.0,N,,AA,AA,N
2,Y,AAAU,Perth Mint Physical Gold ETF,P,,Y,100.0,N,,AAAU,AAAU,N
3,Y,AACG,ATA Creativity Global - American Depositary Sh...,Q,G,N,100.0,N,N,,AACG,N
4,Y,AADR,AdvisorShares Dorsey Wright ADR ETF,P,,Y,100.0,N,,AADR,AADR,N


## **Run tests on the dataset**

## Test 1 - Null values ⛔️
For the first test, you are required to check the data for completeness.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for missing values in the data. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*


In [8]:
# Specifying the display options to prevent truncating for when using .toPandas() to display result
pd.set_option('display.max_rows', None)  # Show all rows
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', None)  # Disable column width restriction
pd.set_option('display.max_colwidth', None)  # Disable column content width restriction

In [9]:
stock_data.columns

['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'stock']

In [10]:
# Read the Parquet data
df = stock_data

In [11]:
# Checking completeness of the stock data

# Seting up PyDeequ for Completeness Check
check = Check(spark, CheckLevel.Error, "Data Completeness Check")

# Looping through the columns of the stock data
for column in df.columns:
    checkResult = VerificationSuite(spark) \
        .onData(df) \
        .addCheck(
            check.isComplete(column)
    ) \
    .run()

resultDataFrame = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
resultDataFrame.toPandas()



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(date,None))",Success,
1,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(open,None))",Success,
2,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(high,None))",Success,
3,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(low,None))",Success,
4,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(close,None))",Success,
5,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(adj_close,None))",Success,
6,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(volume,None))",Success,
7,Data Completeness Check,Error,Success,"CompletenessConstraint(Completeness(stock,None))",Success,


## Test 2 - Zero Values 🅾️

For the second test, you are required to check for zero values within the dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for zero values within the data. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [12]:
# Specify numerical columns
numerical_cols = ['open', 'high', 'low', 'close', 'adj_close', 'volume']

In [13]:
# Checking for zero values in the dataset

# Setting up PyDeequ for Zero Values Check
check_zero = Check(spark, CheckLevel.Error, "Zero Values Check")

# Looping through the numerical columns of the dataset
for column in numerical_cols:
    checkResult_zero = VerificationSuite(spark) \
        .onData(df) \
        .addCheck(
            check_zero.hasMin(column, lambda x: x == 0)
        ) \
        .run()

# Displaying the results
resultDataFrame_zero = VerificationResult.checkResultsAsDataFrame(spark, checkResult_zero)
resultDataFrame_zero.toPandas()


Python Callback server started!




Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Zero Values Check,Error,Error,"MinimumConstraint(Minimum(open,None))",Success,
1,Zero Values Check,Error,Error,"MinimumConstraint(Minimum(high,None))",Failure,Value: 0.06785380095243454 does not meet the constraint requirement!
2,Zero Values Check,Error,Error,"MinimumConstraint(Minimum(low,None))",Failure,Value: 0.06563635170459747 does not meet the constraint requirement!
3,Zero Values Check,Error,Error,"MinimumConstraint(Minimum(close,None))",Failure,Value: 0.06607984006404877 does not meet the constraint requirement!
4,Zero Values Check,Error,Error,"MinimumConstraint(Minimum(adj_close,None))",Failure,Value: 4.89296041905618E-7 does not meet the constraint requirement!
5,Zero Values Check,Error,Error,"MinimumConstraint(Minimum(volume,None))",Success,


## Test 3 - Negative values ➖️
The third test requires you to check that all values in the data are positive.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check negative values within the dataset. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [14]:
# Checking for zero values in the dataset

# Setting up PyDeequ for Negative Values Check
constraints = Check(spark, CheckLevel.Error, "Negative Values Check")

# Looping through the numerical columns of the dataset
for column in numerical_cols:
    checkResult_zero = VerificationSuite(spark) \
        .onData(df) \
        .addCheck(
            constraints.isNonNegative(column)
        ) \
        .run()

# Displaying the results
resultDataFrame_zero = VerificationResult.checkResultsAsDataFrame(spark, checkResult_zero)
resultDataFrame_zero.toPandas()



Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Negative Values Check,Error,Success,"ComplianceConstraint(Compliance(open is non-negative,COALESCE(CAST(open AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
1,Negative Values Check,Error,Success,"ComplianceConstraint(Compliance(high is non-negative,COALESCE(CAST(high AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
2,Negative Values Check,Error,Success,"ComplianceConstraint(Compliance(low is non-negative,COALESCE(CAST(low AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
3,Negative Values Check,Error,Success,"ComplianceConstraint(Compliance(close is non-negative,COALESCE(CAST(close AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
4,Negative Values Check,Error,Success,"ComplianceConstraint(Compliance(adj_close is non-negative,COALESCE(CAST(adj_close AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,
5,Negative Values Check,Error,Success,"ComplianceConstraint(Compliance(volume is non-negative,COALESCE(CAST(volume AS DECIMAL(20,10)), 0.0) >= 0,None))",Success,


## Test 4 - Determine Maximum Values ⚠️

For the fourth test, we want to find the maximum values in the dataset for the numerical fields. Extremum values can often be used to define an upper bound for the column values so we can define them as the threshold values. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Column Profiler Runner` to generate summary statistics for all the available columns. 
>2. Extract the maximum values for all the numeric columns in the data.
>
> *You may use as many cells as necessary*

In [15]:
# Generating a pyspark automated process to provide the maximum values for each column as in df as the ColumnProfilerRunner
# seem not to be working.

# Generate summary statistics for all available columns
summary_statistics = df.select(numerical_cols).summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max")

# Showing summary statistics for a general overview
summary_statistics.show()

# Extracting and display maximum values for all the numeric columns
for column in numerical_cols:
    max_value = df.agg(max(column).alias(f'max_{column}')).collect()[0][f'max_{column}']
    print(f"Maximum value for {column}: {max_value}")

+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|summary|              open|             high|               low|            close|         adj_close|           volume|
+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|  count|              5020|             5020|              5020|             5020|              5020|             5020|
|   mean|1.1365598366658882|19.17383405059962|18.943191553180316|19.06198000288253|  7.20845818042487|525223.0677290837|
| stddev| 4.757480282400392|61.95001257800808| 61.21457985181483|61.60101770760706|29.681165595553537|910433.9606092008|
|    min|               0.0|        0.0678538|        0.06563635|       0.06607984|      4.8929604E-7|              0.0|
|    25%|               0.0|       0.30110678|         0.2985026|       0.30110678|       0.005313047|          42400.0|
|    50%|               0.0|    

## Test 5 - Stock Tickers 💹️

For the fifth test, we want to determine if the stock tickers contained in our dataset are consistent. To do this, you will need to make use of use of the metadata file to check that the stock names used in the dataframe are valid. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine if the stock tickers contained in the dataset appear in the metadata file.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [16]:
# Converting the metadata to 
meta = spark.createDataFrame(metadata)

In [17]:
distinct_stock = df.groupBy('stock').count()
distinct_symbol = metadata['Symbol'].unique()

# Converting both data to list
stock_column = distinct_stock.toPandas()
stock_column = stock_column.values.tolist()
symbol_column = distinct_symbol.tolist()

stock_symbol = [item[0] for item in stock_column]

for stock in stock_symbol:
    if stock not in symbol_column:
        print(stock)

In [18]:
# Converting the meta DataFrame's 'Symbol' column to a Python list of allowed values
allowed_stock_symbols = meta.select("Symbol").distinct().rdd.flatMap(lambda x: x).collect()

# Useing VerificationSuite from PyDeequ to verify the stock tickers in the DataFrame `df`
verificationResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        Check(spark, CheckLevel.Error, "Stock Ticker Verification") \
            .isContainedIn("stock", allowed_stock_symbols, 
                           hint="The stock ticker is not listed in the metadata.")
    ) \
    .run()

# Displaying the results of the verification
VerificationResult.checkResultsAsDataFrame(spark, verificationResult).show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
|Stock Ticker Veri...|      Error|     Success|ComplianceConstra...|          Success|                  |
+--------------------+-----------+------------+--------------------+-----------------+------------------+





## Test 6 - Duplication 👥️
Lastly, we want to determine the uniqueness of the items found in the dataframe. You need to make use of the Verification Suite to check for the validity of the stock tickers. 

Similar to the previous notebook - `Data_profiling_student_version.ipynb`, the first thing to check will be if the primary key values within the dataset are unique - in our case, that will be a combination of the stock name and the date. Secondly, we want to check if the entries are all unique, which is done by checking for duplicates across that whole dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine the uniqueness of entries contained within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*



In [19]:
# Defining the primary key for uniqueness verification
primary_key = ['stock', 'date']

# Initializing VerificationSuite with the DataFrame
verification_suite = VerificationSuite(spark).onData(df)

# Defining a check for uniqueness
duplication_check = Check(spark, CheckLevel.Warning, "Duplication Check")\
    .hasUniqueness(primary_key, lambda x: x == 1) # Corrected variable name

# Adding the check to the VerificationSuite
verification_result = verification_suite.addCheck(duplication_check).run()

# Displaying the results
print("Duplication check results:")
results_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
results_df.toPandas()

Duplication check results:


Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Duplication Check,Warning,Success,"UniquenessConstraint(Uniqueness(Stream(stock, ?),None))",Success,
