# Processing Big Data - Deequ Analysis

© Explore Data Science Academy

## Honour Code
I {**NELSON**, **MWEMBE**}, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.


## Context

Having completed manual data quality checks, it should be obvious that the process can become quite cumbersome. As the Data Engineer in the team, you have researched some tools that could potentially save the team from having to do this cumbersome work. In your research, you have come a across a tool called [Deequ](https://github.com/awslabs/deequ), which is a library for measuring the data quality of large datasets.

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/raw/master/data_engineering/transform/predict/DataQuality.jpg"
     alt="Data Quality"
     style="float: center; padding-bottom=0.5em"
     width=100%/>
     <p><em>Figure 1. Six dimensions of data quality</em></p>
</div>

You present this tool to your manager; he is quite impressed and gives you the go-ahead to use this in your implementation. You are now required to perform some data quality tests using this automated data testing tool.
 

> ## 🚩️ Important Notice 🚩️
>
>To successfully run `pydeequ` without any errors, please make sure that you have an environment that is running pyspark version 3.0.
> You are advised to **create a new conda environment** and install this specific version of pyspark to avoid any technical issues:
>
> `pip install pyspark==3.0`

<br>

## Import dependencies

If you do not have `pydeequ` already installed, install it using the following command:
- `pip install pydeequ`

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pydeequ
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, DateType, NumericType, StructType, StringType, StructField
from pyspark.sql.functions import col
from pyspark.sql.functions import max
from pyspark.sql.functions import concat

In [3]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

## Read data into spark dataframe

In this notebook, we set out to run some data quality tests, with the possiblity of running end to end on the years 1963, 1974, 1985, 1996, 2007, and 2018. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Data_ingestion_student_version.ipynb` notebook to create the parquet files for the following years:
>       - 1963
>       - 1974
>       - 1985
>       - 1996
>       - 2007
>       - 2018
>
>2. Ingest the data for the for the years given above. You should only do it one year at a time.
>3. Ingest the metadata file.


When developing your code, it will be sufficient to focus on a single year. However, after your development is done, you will need to run this notebook for all of the given years above so that you can answer all the questions given in the Data Testing MCQ.

In [4]:
#TODO: Write your code here
# Use this variable (year) to determine which year your are focusing on
#year = 1963
df_1963 = spark.read.parquet("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/Task3_automatic_data_quality_testing/1963_output")

#year = 1974
df_1974 = spark.read.parquet("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/Task3_automatic_data_quality_testing/1974_output")

#year = 1985
df_1985 = spark.read.parquet("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/Task3_automatic_data_quality_testing/1985_output")

#year = 1996
df_1996 = spark.read.parquet("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/Task3_automatic_data_quality_testing/1996_output")

#year = 2007
df_2007 = spark.read.parquet("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/Task3_automatic_data_quality_testing/2007_output")

#year = 2018
df_2018 = spark.read.parquet("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/Task3_automatic_data_quality_testing/2018_output")

In [5]:
metadata = spark.read.csv("C:/Users/nmwem/Downloads/Compressed/processing-big-data-predict-main/symbols_valid_meta.csv", header=True)

In [47]:
# What is the incorrect stock ticker included in 1985?
parquet_stocks = df_1985.select('stock').distinct()
metadata_stocks = metadata.select('Symbol').distinct()
inconsistent_stocks = parquet_stocks.exceptAll(metadata_stocks)
inconsistent_stocks.show()

+-----+
|stock|
+-----+
| UTX#|
+-----+



In [49]:
metadata.createOrReplaceTempView("metadata")

spark.sql("select * from metadata where Symbol like 'UTX%'").show()

+-------------+------+--------------------+----------------+---------------+---+--------------+----------+----------------+----------+-------------+----------+
|Nasdaq Traded|Symbol|       Security Name|Listing Exchange|Market Category|ETF|Round Lot Size|Test Issue|Financial Status|CQS Symbol|NASDAQ Symbol|NextShares|
+-------------+------+--------------------+----------------+---------------+---+--------------+----------+----------------+----------+-------------+----------+
|            Y|   UTX|United Technologi...|               N|               |  N|         100.0|         N|            null|       UTX|          UTX|         N|
|            Y| UTX.V|United Technologi...|               N|               |  N|         100.0|         N|            null|      UTXw|         UTX#|         N|
+-------------+------+--------------------+----------------+---------------+---+--------------+----------+----------------+----------+-------------+----------+



## **Run tests on the dataset**

## Test 1 - Null values ⛔️
For the first test, you are required to check the data for completeness.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for missing values in the data. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*


In [7]:
#TODO: Write your code here
#year = 1963
columns_to_check = df_1963.columns

# Define the completeness check for each column
checks = [Check(spark, CheckLevel.Warning, f"Completeness check for column {col}")
          .isComplete(col) for col in columns_to_check]

# Run the completeness checks
result = VerificationSuite(spark).onData(df_1963)
for check in checks:
    result = result.addCheck(check)
result = result.run()

# View the check results
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()


+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [8]:
#year = 1974
columns_to_check = df_1974.columns

# Define the completeness check for each column
checks = [Check(spark, CheckLevel.Warning, f"Completeness check for column {col}")
          .isComplete(col) for col in columns_to_check]

# Run the completeness checks
result = VerificationSuite(spark).onData(df_1974)
for check in checks:
    result = result.addCheck(check)
result = result.run()

# View the check results
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [9]:
#year = 1985
columns_to_check = df_1985.columns

# Define the completeness check for each column
checks = [Check(spark, CheckLevel.Warning, f"Completeness check for column {col}")
          .isComplete(col) for col in columns_to_check]

# Run the completeness checks
result = VerificationSuite(spark).onData(df_1985)
for check in checks:
    result = result.addCheck(check)
result = result.run()

# View the check results
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [11]:
#year = 1996
columns_to_check = df_1996.columns

# Define the completeness check for each column
checks = [Check(spark, CheckLevel.Warning, f"Completeness check for column {col}")
          .isComplete(col) for col in columns_to_check]

# Run the completeness checks
result = VerificationSuite(spark).onData(df_1996)
for check in checks:
    result = result.addCheck(check)
result = result.run()

# View the check results
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [12]:
#year = 2007
columns_to_check = df_2007.columns

# Define the completeness check for each column
checks = [Check(spark, CheckLevel.Warning, f"Completeness check for column {col}")
          .isComplete(col) for col in columns_to_check]

# Run the completeness checks
result = VerificationSuite(spark).onData(df_2007)
for check in checks:
    result = result.addCheck(check)
result = result.run()

# View the check results
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [13]:
#year = 2018
columns_to_check = df_2018.columns

# Define the completeness check for each column
checks = [Check(spark, CheckLevel.Warning, f"Completeness check for column {col}")
          .isComplete(col) for col in columns_to_check]

# Run the completeness checks
result = VerificationSuite(spark).onData(df_2018)
for check in checks:
    result = result.addCheck(check)
result = result.run()

# View the check results
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



## Test 2 - Zero Values 🅾️

For the second test, you are required to check for zero values within the dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for zero values within the data. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [14]:
#TODO: Write your code here
#year = 1963
check = Check(spark, CheckLevel.Warning, "Integrity Checks")

columns_to_check = [col for col in df_1963.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.satisfies(f"{col}==0", "Checking for zeros", lambda x: x == 0)

check_result = VerificationSuite(spark).onData(df_1963).addCheck(check).run()
check_result_df_1963_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df_1963_zero.show()



Python Callback server started!
+----------------+-----------+------------+--------------------+-----------------+--------------------+
|           check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+----------------+-----------+------------+--------------------+-----------------+--------------------+
+----------------+-----------+------------+--------------------+-----------------+--------------------+



In [6]:
from pyspark.sql.functions import col

# Filter the DataFrame to only include entries with non-zero volume
df_1963_nonzero = df_1963.filter(col("volume") != 0)

# Calculate the percentage of non-zero entries
percentage_nonzero = (df_1963_nonzero.count() / df_1963.count()) * 100

print(f"The percentage of non-zero entries for the volume field in 1963 is {percentage_nonzero:.2f}%.")


The percentage of non-zero entries for the volume field in 1963 is 99.94%.


In [15]:
# year = 1974
check = Check(spark, CheckLevel.Warning, "Integrity Checks")

columns_to_check = [col for col in df_1974.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.satisfies(f"{col}==0", "Checking for zeros", lambda x: x == 0)

check_result = VerificationSuite(spark).onData(df_1974).addCheck(check).run()
check_result_df_1974_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df_1974_zero.show()

+----------------+-----------+------------+--------------------+-----------------+--------------------+
|           check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+----------------+-----------+------------+--------------------+-----------------+--------------------+
+----------------+-----------+------------+--------------------+-----------------+--------------------+



In [19]:
# year = 1974
columns_to_check = [col for col in df_1974.columns if col not in ["date", "stock"]]
cols_with_zeros = []

for col in columns_to_check:
    if df_1974.filter(f"{col} == 0").count() > 0:
        cols_with_zeros.append(col)
        check = check.satisfies(f"{col} != 0", f"Checking for zeros in {col}", lambda x: x != 0)

check_result = VerificationSuite(spark).onData(df_1974).addCheck(check).run()
check_result_df_1974_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)

if cols_with_zeros:
    print(f"The following columns contain zeros: {cols_with_zeros}")
else:
    print("No columns contain zeros.")


The following columns contain zeros: ['open', 'volume']


In [16]:
# year = 1985
check = Check(spark, CheckLevel.Warning, "Integrity Checks")

columns_to_check = [col for col in df_1985.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.satisfies(f"{col}==0", "Checking for zeros", lambda x: x == 0)

check_result = VerificationSuite(spark).onData(df_1985).addCheck(check).run()
check_result_df_1985_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df_1985_zero.show()

+----------------+-----------+------------+--------------------+-----------------+--------------------+
|           check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+----------------+-----------+------------+--------------------+-----------------+--------------------+
+----------------+-----------+------------+--------------------+-----------------+--------------------+



In [40]:
# percentage of zeros are found in the high field in 1985
# Count the number of entries in the high column
count_high = df_1985.select("high").count()

# Count the number of zeros in the high column
count_high_zero = df_1985.filter(col("high") == 0).select("high").count()

# Calculate the percentage of zero entries
percentage_zero = (count_high_zero / count_high) * 100

print(f"The percentage of zeros in the high column is {percentage_zero:.3f}%.")



The percentage of zeros in the high column is 0.000%.


In [17]:
# year = 1996
check = Check(spark, CheckLevel.Warning, "Integrity Checks")

columns_to_check = [col for col in df_1996.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.satisfies(f"{col}==0", "Checking for zeros", lambda x: x == 0)

check_result = VerificationSuite(spark).onData(df_1996).addCheck(check).run()
check_result_df_1996_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df_1996_zero.show()

+----------------+-----------+------------+--------------------+-----------------+--------------------+
|           check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+----------------+-----------+------------+--------------------+-----------------+--------------------+
+----------------+-----------+------------+--------------------+-----------------+--------------------+



In [18]:
# year = 2007
check = Check(spark, CheckLevel.Warning, "Integrity Checks")

columns_to_check = [col for col in df_2007.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.satisfies(f"{col}==0", "Checking for zeros", lambda x: x == 0)

check_result = VerificationSuite(spark).onData(df_2007).addCheck(check).run()
check_result_df_2007_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df_2007_zero.show()

+----------------+-----------+------------+--------------------+-----------------+--------------------+
|           check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+----------------+-----------+------------+--------------------+-----------------+--------------------+
+----------------+-----------+------------+--------------------+-----------------+--------------------+



In [19]:
# year = 2018
check = Check(spark, CheckLevel.Warning, "Integrity Checks")

columns_to_check = [col for col in df_2018.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.satisfies(f"{col}==0", "Checking for zeros", lambda x: x == 0)

check_result = VerificationSuite(spark).onData(df_2018).addCheck(check).run()
check_result_df_2018_zero = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df_2018_zero.show()

+----------------+-----------+------------+--------------------+-----------------+--------------------+
|           check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+----------------+-----------+------------+--------------------+-----------------+--------------------+
+----------------+-----------+------------+--------------------+-----------------+--------------------+



## Test 3 - Negative values ➖️
The third test requires you to check that all values in the data are positive.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check negative values within the dataset. 
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [20]:
#TODO: Write your code here
#year = 1963
check = Check(spark, CheckLevel.Warning, "Review Check")

columns_to_check = [col for col in df_1963.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.isNonNegative(col)

check_result = VerificationSuite(spark).onData(df_1963).addCheck(check).run()
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df.show()


+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+



In [21]:
#year = 1974
check = Check(spark, CheckLevel.Warning, "Review Check")

columns_to_check = [col for col in df_1974.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.isNonNegative(col)

check_result = VerificationSuite(spark).onData(df_1974).addCheck(check).run()
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df.show()


+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



In [35]:
# year = 1974
# Get the column names that have negative values
numeric_cols = [c for c in df_1974.columns if c not in ['date', 'stock'] and df_1974.select(c).dtypes[0][1] in ['bigint', 'double', 'float', 'int']]
negative_cols = [c for c in numeric_cols if df_1974.filter(col(c) < 0).count() > 0]

if negative_cols:
    print("The following columns have negative values:", negative_cols)
else:
    print("No columns have negative values.")


The following columns have negative values: ['adj_close']


In [22]:
#year = 1985
check = Check(spark, CheckLevel.Warning, "Review Check")

columns_to_check = [col for col in df_1985.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.isNonNegative(col)

check_result = VerificationSuite(spark).onData(df_1985).addCheck(check).run()
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df.show()


+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



In [23]:
#year = 1996
check = Check(spark, CheckLevel.Warning, "Review Check")

columns_to_check = [col for col in df_1996.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.isNonNegative(col)

check_result = VerificationSuite(spark).onData(df_1996).addCheck(check).run()
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df.show()


+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



In [24]:
#year = 2007
check = Check(spark, CheckLevel.Warning, "Review Check")

columns_to_check = [col for col in df_2007.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.isNonNegative(col)

check_result = VerificationSuite(spark).onData(df_2007).addCheck(check).run()
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df.show()


+------------+-----------+------------+--------------------+-----------------+--------------------+
|       check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
+------------+-----------+------------+--------------------+-----------------+--------------------+



In [25]:
#year = 2018
check = Check(spark, CheckLevel.Warning, "Review Check")

columns_to_check = [col for col in df_2018.columns if col not in ["date", "stock"]]
for col in columns_to_check:
    check = check.isNonNegative(col)

check_result = VerificationSuite(spark).onData(df_2018).addCheck(check).run()
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, check_result)
check_result_df.show()


+------------+-----------+------------+--------------------+-----------------+------------------+
|       check|check_level|check_status|          constraint|constraint_status|constraint_message|
+------------+-----------+------------+--------------------+-----------------+------------------+
+------------+-----------+------------+--------------------+-----------------+------------------+



## Test 4 - Determine Maximum Values ⚠️

For the fourth test, we want to find the maximum values in the dataset for the numerical fields. Extremum values can often be used to define an upper bound for the column values so we can define them as the threshold values. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Column Profiler Runner` to generate summary statistics for all the available columns. 
>2. Extract the maximum values for all the numeric columns in the data.
>
> *You may use as many cells as necessary*

In [26]:
#year = 1963
# Get the column names for all numeric columns except "stock" and "date"
num_cols = [c for c in df_1963.columns if c not in ['stock', 'date']]

# Compute the maximum value for each numeric column
max_values = {}
for col_name in num_cols:
    max_value = df_1963.select(max(col(col_name))).collect()[0][0]
    max_values[col_name] = max_value

# Output the table with column name and max values
print("Column Name\t\tMax Value")
for col, value in max_values.items():
    print(f"{col}\t\t{value}")



Column Name		Max Value
open		303.125
high		315.625
low		311.875
close		313.75
adj_close		148.7704620361328
volume		20692800.0


In [27]:
#year = 1974
# Get the column names for all numeric columns except "stock" and "date"
num_cols = [c for c in df_1974.columns if c not in ['stock', 'date']]

# Compute the maximum value for each numeric column
max_values = {}
for col_name in num_cols:
    max_value = df_1974.select(max(col(col_name))).collect()[0][0]
    max_values[col_name] = max_value

# Output the table with column name and max values
print("Column Name\t\tMax Value")
for col, value in max_values.items():
    print(f"{col}\t\t{value}")

Column Name		Max Value
open		367.5
high		376.6666564941406
low		364.5833435058594
close		367.5
adj_close		292.9978332519531
volume		75315200.0


In [28]:
#year = 1985
# Get the column names for all numeric columns except "stock" and "date"
num_cols = [c for c in df_1985.columns if c not in ['stock', 'date']]

# Compute the maximum value for each numeric column
max_values = {}
for col_name in num_cols:
    max_value = df_1985.select(max(col(col_name))).collect()[0][0]
    max_values[col_name] = max_value

# Output the table with column name and max values
print("Column Name\t\tMax Value")
for col, value in max_values.items():
    print(f"{col}\t\t{value}")

Column Name		Max Value
open		100000.0
high		224062.5
low		219375.0
close		221250.0
adj_close		190826.34375
volume		183495200.0


In [29]:
#year = 1996
# Get the column names for all numeric columns except "stock" and "date"
num_cols = [c for c in df_1996.columns if c not in ['stock', 'date']]

# Compute the maximum value for each numeric column
max_values = {}
for col_name in num_cols:
    max_value = df_1996.select(max(col(col_name))).collect()[0][0]
    max_values[col_name] = max_value

# Output the table with column name and max values
print("Column Name\t\tMax Value")
for col, value in max_values.items():
    print(f"{col}\t\t{value}")

Column Name		Max Value
open		498750.0
high		507500.0
low		490000.0
close		507500.0
adj_close		507500.0
volume		546630400.0


In [30]:
#year = 2007
# Get the column names for all numeric columns except "stock" and "date"
num_cols = [c for c in df_2007.columns if c not in ['stock', 'date']]

# Compute the maximum value for each numeric column
max_values = {}
for col_name in num_cols:
    max_value = df_2007.select(max(col(col_name))).collect()[0][0]
    max_values[col_name] = max_value

# Output the table with column name and max values
print("Column Name\t\tMax Value")
for col, value in max_values.items():
    print(f"{col}\t\t{value}")

Column Name		Max Value
open		62672400384.0
high		63503998976.0
low		61613998080.0
close		62067601408.0
adj_close		62067601408.0
volume		1835338880.0


In [31]:
#year = 2018
# Get the column names for all numeric columns except "stock" and "date"
num_cols = [c for c in df_2018.columns if c not in ['stock', 'date']]

# Compute the maximum value for each numeric column
max_values = {}
for col_name in num_cols:
    max_value = df_2018.select(max(col(col_name))).collect()[0][0]
    max_values[col_name] = max_value

# Output the table with column name and max values
print("Column Name\t\tMax Value")
for col, value in max_values.items():
    print(f"{col}\t\t{value}")

Column Name		Max Value
open		117187.5
high		125000.0
low		109375.0
close		109375.0
adj_close		109375.0
volume		358775712.0


## Test 5 - Stock Tickers 💹️

For the fifth test, we want to determine if the stock tickers contained in our dataset are consistent. To do this, you will need to make use of use of the metadata file to check that the stock names used in the dataframe are valid. 

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine if the stock tickers contained in the dataset appear in the metadata file.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [32]:
#year = 1963
unique_stock_symbol = metadata.select("Symbol").rdd.flatMap(lambda x: x).collect()

check = Check(spark, CheckLevel.Warning, "Consistency Check").isContainedIn("stock", unique_stock_symbol)

checkResult = VerificationSuite(spark) \
    .onData(df_1963) \
    .addCheck(check) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()


+-----------------+-----------+------------+--------------------+-----------------+------------------+
|            check|check_level|check_status|          constraint|constraint_status|constraint_message|
+-----------------+-----------+------------+--------------------+-----------------+------------------+
+-----------------+-----------+------------+--------------------+-----------------+------------------+



In [33]:
#year = 1974
unique_stock_symbol = metadata.select("Symbol").rdd.flatMap(lambda x: x).collect()

check = Check(spark, CheckLevel.Warning, "Consistency Check").isContainedIn("stock", unique_stock_symbol)

checkResult = VerificationSuite(spark) \
    .onData(df_1974) \
    .addCheck(check) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+-----------------+-----------+------------+--------------------+-----------------+--------------------+
|            check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+-----------------+-----------+------------+--------------------+-----------------+--------------------+
+-----------------+-----------+------------+--------------------+-----------------+--------------------+



In [34]:
#year = 1985
unique_stock_symbol = metadata.select("Symbol").rdd.flatMap(lambda x: x).collect()

check = Check(spark, CheckLevel.Warning, "Consistency Check").isContainedIn("stock", unique_stock_symbol)

checkResult = VerificationSuite(spark) \
    .onData(df_1985) \
    .addCheck(check) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+-----------------+-----------+------------+--------------------+-----------------+--------------------+
|            check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+-----------------+-----------+------------+--------------------+-----------------+--------------------+
+-----------------+-----------+------------+--------------------+-----------------+--------------------+



In [35]:
#year = 1996
unique_stock_symbol = metadata.select("Symbol").rdd.flatMap(lambda x: x).collect()

check = Check(spark, CheckLevel.Warning, "Consistency Check").isContainedIn("stock", unique_stock_symbol)

checkResult = VerificationSuite(spark) \
    .onData(df_1996) \
    .addCheck(check) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+-----------------+-----------+------------+--------------------+-----------------+--------------------+
|            check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+-----------------+-----------+------------+--------------------+-----------------+--------------------+
+-----------------+-----------+------------+--------------------+-----------------+--------------------+



In [36]:
#year = 2007
unique_stock_symbol = metadata.select("Symbol").rdd.flatMap(lambda x: x).collect()

check = Check(spark, CheckLevel.Warning, "Consistency Check").isContainedIn("stock", unique_stock_symbol)

checkResult = VerificationSuite(spark) \
    .onData(df_2007) \
    .addCheck(check) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+-----------------+-----------+------------+--------------------+-----------------+--------------------+
|            check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+-----------------+-----------+------------+--------------------+-----------------+--------------------+
+-----------------+-----------+------------+--------------------+-----------------+--------------------+



In [37]:
#year = 2018
unique_stock_symbol = metadata.select("Symbol").rdd.flatMap(lambda x: x).collect()

check = Check(spark, CheckLevel.Warning, "Consistency Check").isContainedIn("stock", unique_stock_symbol)

checkResult = VerificationSuite(spark) \
    .onData(df_2018) \
    .addCheck(check) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

+-----------------+-----------+------------+--------------------+-----------------+--------------------+
|            check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+-----------------+-----------+------------+--------------------+-----------------+--------------------+
+-----------------+-----------+------------+--------------------+-----------------+--------------------+



## Test 6 - Duplication 👥️
Lastly, we want to determine the uniqueness of the items found in the dataframe. You need to make use of the Verification Suite to check for the validity of the stock tickers. 

Similar to the previous notebook - `Data_profiling_student_version.ipynb`, the first thing to check will be if the primary key values within the dataset are unique - in our case, that will be a combination of the stock name and the date. Secondly, we want to check if the entries are all unique, which is done by checking for duplicates across that whole dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine the uniqueness of entries contained within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*



In [40]:
#TODO: Write your code here
#year = 1963 uniqueness of primary keys
df_1963 = df_1963.withColumn("primary_key", concat(col("date"), col("stock")))

# Use Verification Suite to check for primary key uniqueness
check1 = Check(spark, CheckLevel.Warning, "Primary Key Uniqueness Check")
checkResult1 = VerificationSuite(spark) \
    .onData(df_1963) \
    .addCheck(
        check1.isUnique("primary_key")
    ) \
    .run()

# Display the results of the test
checkResult1_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult1)
checkResult1_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [41]:
#year = 1963 uniqueness of entries
# Create the check for uniqueness across the whole dataset
check = Check(spark, CheckLevel.Warning, "Check for duplicates").isUnique("All columns")

# Run the check
result = VerificationSuite(spark) \
    .onData(df_1963) \
    .addCheck(check) \
    .run()

# Print the results
if result.status == "Success":
    print("No duplicate entries found")
else:
    print(f"Duplicate entries found:\n{result}")
    check_result_df = result.checkResultsAsDataFrame(spark, result )
    check_result_df.show()



Duplicate entries found:
<pydeequ.verification.VerificationResult object at 0x00000164F6A18E80>
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [13]:
non_key_cols = ['open', 'low', 'high', 'close', 'adj_close', 'volume']
# Count the total number of non-key entries
total_entries = df_1963.select([col(c) for c in non_key_cols]).count()

# Count the number of unique non-key entries
unique_entries = df_1963.select([col(c) for c in non_key_cols]).distinct().count()

# Calculate the percentage of uniqueness
percentage_uniqueness = unique_entries / total_entries * 100

print(f"The percentage of uniqueness for all non-key entries in 1963 is: {percentage_uniqueness:.2f}%")



The percentage of uniqueness for all non-key entries in 1963 is: 99.88%


In [42]:
#year = 1974 uniqueness of primary keys
df_1974 = df_1974.withColumn("primary_key", concat(col("date"), col("stock")))

# Use Verification Suite to check for primary key uniqueness
check1 = Check(spark, CheckLevel.Warning, "Primary Key Uniqueness Check")
checkResult1 = VerificationSuite(spark) \
    .onData(df_1974) \
    .addCheck(
        check1.isUnique("primary_key")
    ) \
    .run()

# Display the results of the test
checkResult1_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult1)
checkResult1_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [43]:
#year = 1974 uniqueness of entries
# Create the check for uniqueness across the whole dataset
check = Check(spark, CheckLevel.Warning, "Check for duplicates").isUnique("All columns")

# Run the check
result = VerificationSuite(spark) \
    .onData(df_1974) \
    .addCheck(check) \
    .run()

# Print the results
if result.status == "Success":
    print("No duplicate entries found")
else:
    print(f"Duplicate entries found:\n{result}")
    check_result_df = result.checkResultsAsDataFrame(spark, result )
    check_result_df.show()


Duplicate entries found:
<pydeequ.verification.VerificationResult object at 0x00000164F65CA280>
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [44]:
#year = 1985 uniqueness of primary keys
df_1985 = df_1985.withColumn("primary_key", concat(col("date"), col("stock")))

# Use Verification Suite to check for primary key uniqueness
check1 = Check(spark, CheckLevel.Warning, "Primary Key Uniqueness Check")
checkResult1 = VerificationSuite(spark) \
    .onData(df_1985) \
    .addCheck(
        check1.isUnique("primary_key")
    ) \
    .run()

# Display the results of the test
checkResult1_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult1)
checkResult1_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [45]:
#year = 1985 uniqueness of entries
# Create the check for uniqueness across the whole dataset
check = Check(spark, CheckLevel.Warning, "Check for duplicates").isUnique("All columns")

# Run the check
result = VerificationSuite(spark) \
    .onData(df_1985) \
    .addCheck(check) \
    .run()

# Print the results
if result.status == "Success":
    print("No duplicate entries found")
else:
    print(f"Duplicate entries found:\n{result}")
    check_result_df = result.checkResultsAsDataFrame(spark, result )
    check_result_df.show()


Duplicate entries found:
<pydeequ.verification.VerificationResult object at 0x00000164F67D0880>
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [46]:
#year = 1996 uniqueness of primary keys
df_1996 = df_1996.withColumn("primary_key", concat(col("date"), col("stock")))

# Use Verification Suite to check for primary key uniqueness
check1 = Check(spark, CheckLevel.Warning, "Primary Key Uniqueness Check")
checkResult1 = VerificationSuite(spark) \
    .onData(df_1996) \
    .addCheck(
        check1.isUnique("primary_key")
    ) \
    .run()

# Display the results of the test
checkResult1_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult1)
checkResult1_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [47]:
#year = 1996 uniqueness of entries
# Create the check for uniqueness across the whole dataset
check = Check(spark, CheckLevel.Warning, "Check for duplicates").isUnique("All columns")

# Run the check
result = VerificationSuite(spark) \
    .onData(df_1996) \
    .addCheck(check) \
    .run()

# Print the results
if result.status == "Success":
    print("No duplicate entries found")
else:
    print(f"Duplicate entries found:\n{result}")
    check_result_df = result.checkResultsAsDataFrame(spark, result )
    check_result_df.show()


Duplicate entries found:
<pydeequ.verification.VerificationResult object at 0x00000164F65BC9A0>
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [48]:
#year = 2007 uniqueness of primary keys
df_2007 = df_2007.withColumn("primary_key", concat(col("date"), col("stock")))

# Use Verification Suite to check for primary key uniqueness
check1 = Check(spark, CheckLevel.Warning, "Primary Key Uniqueness Check")
checkResult1 = VerificationSuite(spark) \
    .onData(df_2007) \
    .addCheck(
        check1.isUnique("primary_key")
    ) \
    .run()

# Display the results of the test
checkResult1_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult1)
checkResult1_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [49]:
#year = 2007 uniqueness of entries
# Create the check for uniqueness across the whole dataset
check = Check(spark, CheckLevel.Warning, "Check for duplicates").isUnique("All columns")

# Run the check
result = VerificationSuite(spark) \
    .onData(df_2007) \
    .addCheck(check) \
    .run()

# Print the results
if result.status == "Success":
    print("No duplicate entries found")
else:
    print(f"Duplicate entries found:\n{result}")
    check_result_df = result.checkResultsAsDataFrame(spark, result )
    check_result_df.show()


Duplicate entries found:
<pydeequ.verification.VerificationResult object at 0x00000164F65BC1C0>
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [50]:
#year = 2018 uniqueness of primary keys
df_2018 = df_2018.withColumn("primary_key", concat(col("date"), col("stock")))

# Use Verification Suite to check for primary key uniqueness
check1 = Check(spark, CheckLevel.Warning, "Primary Key Uniqueness Check")
checkResult1 = VerificationSuite(spark) \
    .onData(df_2018) \
    .addCheck(
        check1.isUnique("primary_key")
    ) \
    .run()

# Display the results of the test
checkResult1_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult1)
checkResult1_df.show()

+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+



In [51]:
#year = 2018 uniqueness of entries

# Create the check for uniqueness across the whole dataset
check = Check(spark, CheckLevel.Warning, "Check for duplicates").isUnique("All columns")

# Run the check
result = VerificationSuite(spark) \
    .onData(df_2018) \
    .addCheck(check) \
    .run()

# Print the results
if result.status == "Success":
    print("No duplicate entries found")
else:
    print(f"Duplicate entries found:\n{result}")
    check_result_df = result.checkResultsAsDataFrame(spark, result )
    check_result_df.show()


Duplicate entries found:
<pydeequ.verification.VerificationResult object at 0x00000164F66ED250>
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+



In [52]:
spark.stop()