<a href="https://colab.research.google.com/github/Laurencm130/processing-big-data-predict/blob/main/Lauren_Marais_data_deequ.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processing Big Data - Deequ Analysis

© Explore Data Science Academy

## Honour Code
I {Lauren Marais}, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.


## Context

Having completed manual data quality checks, it should be obvious that the process can become quite cumbersome. As the Data Engineer in the team, you have researched some tools that could potentially save the team from having to do this cumbersome work. In your research, you have come a across a tool called [Deequ](https://github.com/awslabs/deequ), which is a library for measuring the data quality of large datasets.

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/raw/master/data_engineering/transform/predict/DataQuality.jpg"
     alt="Data Quality"
     style="float: center; padding-bottom=0.5em"
     width=100%/>
     <p><em>Figure 1. Six dimensions of data quality</em></p>
</div>

You present this tool to your manager; he is quite impressed and gives you the go-ahead to use this in your implementation. You are now required to perform some data quality tests using this automated data testing tool.


In [99]:
"""
Colab is essentially running on a linux machine on Google Cloud Platform.
This means that should you want to install something in your notebook you
would have to run headless installs as well as wget. Copying the installs
below will ensure that you have spark and java installed in the environment
as well as available for the notebook.
"""
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [100]:
"""
This section helps in rendering your notebook operable and ensuring that
your environment variables for spark are correct. Running this cell in any
notebook allows for any unresolved spark environment to be fixed.
"""
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
os.environ["SPARK_VERSION"] = "3.0.0"

In [101]:
pip install pyspark==3.0



In [102]:
"""
We need to locate Spark in the system. For that, we import findspark and
use the findspark.init() method. If you want to know the location where
Spark is installed, use findspark.find()
"""
import findspark
findspark.init()
findspark.find()
import pyspark

In [103]:
"""
This cell is used to import data/files from your local machine to colab
"""
#from google.colab import files
#uploaded = files.upload()

'\nThis cell is used to import data/files from your local machine to colab\n'

In [104]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [105]:
!unzip '/content/drive/MyDrive/processing-big-data-predict-stocks-data/stocks.zip'

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: stocks/1971/08/25/stocks.csv  
  inflating: stocks/1971/08/26/stocks.csv  
  inflating: stocks/1971/08/27/stocks.csv  
  inflating: stocks/1971/08/30/stocks.csv  
  inflating: stocks/1971/08/31/stocks.csv  
  inflating: stocks/1971/09/01/stocks.csv  
  inflating: stocks/1971/09/02/stocks.csv  
  inflating: stocks/1971/09/03/stocks.csv  
  inflating: stocks/1971/09/07/stocks.csv  
  inflating: stocks/1971/09/08/stocks.csv  
  inflating: stocks/1971/09/09/stocks.csv  
  inflating: stocks/1971/09/10/stocks.csv  
  inflating: stocks/1971/09/13/stocks.csv  
  inflating: stocks/1971/09/14/stocks.csv  
  inflating: stocks/1971/09/15/stocks.csv  
  inflating: stocks/1971/09/16/stocks.csv  
  inflating: stocks/1971/09/17/stocks.csv  
  inflating: stocks/1971/09/20/stocks.csv  
  inflating: stocks/1971/09/21/stocks.csv  
  inflating: stocks/1971/09/22/stocks.csv  
  inflating: stocks/1971/09/23/stocks.csv  
  inflating

> ## 🚩️ Important Notice 🚩️
>
>To successfully run `pydeequ` without any errors, please make sure that you have an environment that is running pyspark version 3.0.
> You are advised to **create a new conda environment** and install this specific version of pyspark to avoid any technical issues:
>
> `pip install pyspark==3.0`

<br>

## Import dependencies

If you do not have `pydeequ` already installed, install it using the following command:
- `pip install pydeequ`

In [106]:
!pip install pydeequ



In [107]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import pydeequ
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.verification import VerificationSuite
from pydeequ.checks import Check, CheckLevel

from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, DateType, NumericType, StructType, StringType, StructField

In [108]:
spark = (SparkSession.builder.master("local")
.appName("Colab")
.config('spark.ui.port', '4050')
.config("spark.jars.packages", pydeequ.deequ_maven_coord)
.config("spark.jars.excludes", pydeequ.f2j_maven_coord).getOrCreate())


## Read data into spark dataframe

In this notebook, we set out to run some data quality tests, with the possiblity of running end to end on the years 1963, 1974, 1985, 1996, 2007, and 2018.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Data_ingestion_student_version.ipynb` notebook to create the parquet files for the following years:
>       - 1963
>       - 1974
>       - 1985
>       - 1996
>       - 2007
>       - 2018
>
>2. Ingest the data for the for the years given above. You should only do it one year at a time.
>3. Ingest the metadata file.


When developing your code, it will be sufficient to focus on a single year. However, after your development is done, you will need to run this notebook for all of the given years above so that you can answer all the questions given in the Data Testing MCQ.

In [109]:
#TODO: Write your code here
year = 1963
# Define the data schema with string data types
data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("adj_close", StringType(), True),
    StructField("volume", StringType(), True),
])

# Path to the CSV files for the year 1963
data_path_1963 = "/content/stocks/1963/*/*/*"


# Read in the data for 2018 with the specified schema
data_1963 = spark.read.csv(data_path_1963, schema=data_schema)

# Show the first few rows of the DataFrame
data_1963.show()

+----------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|      date|               open|               high|                low|              close|           adj_close|   volume|
+----------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|      Date|               Open|               High|                Low|              Close|           Adj Close|   Volume|
|1963-10-28|  6.924644947052003|  6.940664768218994|   6.90862512588501|   6.90862512588501|  1.6844346523284912|  39900.0|
|1963-10-28|  6.488943099975586|  6.512368679046631|  6.465517044067383|  6.465517044067383|  1.5472991466522217|  42600.0|
|1963-10-28| 0.5679012537002563| 0.5699588656425476| 0.5617284178733826| 0.5699588656425476| 0.11004561930894853| 789600.0|
|1963-10-28| 1.8541666269302368| 1.9166666269302368| 1.8541666269302368| 1.9166666269302368| 0.17308039963245392| 319200.0|
|1963-10

In [110]:
import zipfile
# TODO: Replace with the actual path to parquet files
parquet_path = "/content/drive/MyDrive/parquet/stocks_1963.zip"
# Replace "/content/drive/MyDrive/parquet/stocks_1963.zip" with the actual path to zipped Parquet file
zipped_parquet_path = "/content/drive/MyDrive/parquet/stocks_1963.zip"
unzipped_parquet_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1963"

# Create a directory to extract the contents
os.makedirs(unzipped_parquet_path, exist_ok=True)

# Unzip the Parquet file
with zipfile.ZipFile(zipped_parquet_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_parquet_path)

# List the contents of the Parquet directory
contents = os.listdir('/content/drive/MyDrive/parquet/unzipped_stocks_1963/content/stocks_1963.parquet')
print("Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1963/content/stocks_1963.parquet:")
for item in contents:
    print(item)

Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1963/content/stocks_1963.parquet:
part-00000-85891194-ff25-46fe-b327-ddb300e72116-c000.snappy.parquet
.part-00000-85891194-ff25-46fe-b327-ddb300e72116-c000.snappy.parquet.crc
._SUCCESS.crc
_SUCCESS
part-00001-85891194-ff25-46fe-b327-ddb300e72116-c000.snappy.parquet
.part-00001-85891194-ff25-46fe-b327-ddb300e72116-c000.snappy.parquet.crc


In [111]:
#Ingest Metadata
from pyspark import SparkFiles
# Download metadata and make it available to Spark
spark.sparkContext.addFile("https://processing-big-data-predict-stocks-data.s3.eu-west-1.amazonaws.com/symbols_valid_meta.csv")
# Specify the correct path to the metadata file
metadata_path_pbd = "file://" + SparkFiles.get("symbols_valid_meta.csv")
# Read metadata into Spark DataFrame
metadata_df_pbd = spark.read.csv(metadata_path_pbd, header=True)

# Show the metadata DataFrame
metadata_df_pbd.show()

+-------------+------+--------------------+----------------+---------------+---+--------------+----------+----------------+----------+-------------+----------+
|Nasdaq Traded|Symbol|       Security Name|Listing Exchange|Market Category|ETF|Round Lot Size|Test Issue|Financial Status|CQS Symbol|NASDAQ Symbol|NextShares|
+-------------+------+--------------------+----------------+---------------+---+--------------+----------+----------------+----------+-------------+----------+
|            Y|     A|Agilent Technolog...|               N|               |  N|         100.0|         N|            null|         A|            A|         N|
|            Y|    AA|Alcoa Corporation...|               N|               |  N|         100.0|         N|            null|        AA|           AA|         N|
|            Y|  AAAU|Perth Mint Physic...|               P|               |  Y|         100.0|         N|            null|      AAAU|         AAAU|         N|
|            Y|  AACG|ATA Creativity Gl.

## **Run tests on the dataset**

## Test 1 - Null values ⛔️
For the first test, you are required to check the data for completeness.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for missing values in the data.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*


In [112]:
# Test 1 - Null values using Deequ Verification Suite
for col_name in data_1963.columns:
    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Null check for column {col_name}") \
        .isComplete(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1963).addCheck(check).run()

    # Display the results of the check for each column
    print(f"Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")


Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



## Test 2 - Zero Values 🅾️

For the second test, you are required to check for zero values within the dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check for zero values within the data.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [113]:
# Test 2 - Zero values using Deequ Verification Suite
for col_name in data_1963.columns:
    # Convert column values to string for Zero check
    data_1963_str = data_1963.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .isComplete(col_name)

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(data_1963_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")


Zero Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [114]:
# Test - Zero values using Deequ Verification Suite for metadata_df_pbd
for col_name in metadata_df_pbd.columns:
    # Convert column values to string for Zero check
    metadata_df_pbd_str = metadata_df_pbd.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .hasSize(lambda x: x > 0, "Zero values check")

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(metadata_df_pbd_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name} in metadata_df_pbd:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")


Zero Values Check for Column Nasdaq Traded in metadata_df_pbd:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column Symbol in metadata_df_pbd:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column Security Name in metadata_df_pbd:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column Listing Exchange in metadata_df_pbd:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column Market Category in metadata_df_pbd:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column ETF in metadata_df_pbd:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column Round Lot Size in

## Test 3 - Negative values ➖️
The third test requires you to check that all values in the data are positive.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to check negative values within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [115]:
# Test 3 - Negative values using Deequ Verification Suite
table_negative_values = []

for col_name in data_1963.columns:
    # Convert column values to string
    data_1963_str = data_1963.withColumn(col_name, col(col_name).cast("string"))

    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Negative check for column {col_name}") \
        .isNonNegative(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1963_str).addCheck(check).run()

    # Display the results of the Negative check for each column
    print(f"Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")


Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [116]:
from pyspark.sql.functions import col

# List of columns to check
columns_to_check = ["Nasdaq Traded", "Security Name", "Listing Exchange", "Market Category",
                    "Round Lot Size", "Test Issue", "Financial Status", "CQS Symbol",
                    "NASDAQ Symbol", "NextShares"]

# Check minimum values for each column
for col_name in columns_to_check:
    min_value = metadata_df_pbd.selectExpr(f"min(`{col_name}`) as min_value").collect()[0]["min_value"]
    print(f"Minimum value for column {col_name}: {min_value}")


Minimum value for column Nasdaq Traded: Y
Minimum value for column Security Name: 1-800-FLOWERS.COM, Inc. - Class A Common Stock
Minimum value for column Listing Exchange: A
Minimum value for column Market Category:  
Minimum value for column Round Lot Size: 1.0
Minimum value for column Test Issue: N
Minimum value for column Financial Status: D
Minimum value for column CQS Symbol: A
Minimum value for column NASDAQ Symbol: A
Minimum value for column NextShares: N


## Test 4 - Determine Maximum Values ⚠️

For the fourth test, we want to find the maximum values in the dataset for the numerical fields. Extremum values can often be used to define an upper bound for the column values so we can define them as the threshold values.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Column Profiler Runner` to generate summary statistics for all the available columns.
>2. Extract the maximum values for all the numeric columns in the data.
>
> *You may use as many cells as necessary*

In [117]:
from pydeequ.profiles import ColumnProfilerRunner
from pyspark.sql import SparkSession
#Test 4
# Create a Spark session
spark = SparkSession.builder.appName("MaxValuesTest").getOrCreate()

# Load data
data_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1963/content/stocks_1963.parquet"
data_1963 = spark.read.parquet(data_path)

# Use Column Profiler Runner to generate summary statistics
column_profiles = ColumnProfilerRunner(spark) \
    .onData(data_1963) \
    .run()

# Extract maximum values for all numeric columns
max_values = {}

for col_name, profile in column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        max_values[col_name] = profile.maximum

# Display the maximum values
print("Maximum Values for Numeric Columns:")
for col_name, max_value in max_values.items():
    print(f"{col_name}: {max_value}")


Maximum Values for Numeric Columns:
open: 303.125
low: 311.875
close: 313.75
volume: 20692800.0
adj_close: 148.7704620361328
high: 315.625


## Test 5 - Stock Tickers 💹️

For the fifth test, we want to determine if the stock tickers contained in our dataset are consistent. To do this, you will need to make use of use of the metadata file to check that the stock names used in the dataframe are valid.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine if the stock tickers contained in the dataset appear in the metadata file.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*

In [119]:
from pydeequ.verification import VerificationSuite
from pyspark.sql.functions import col

# Assuming the column containing stock tickers is named 'Symbol' in metadata_df_pbd
stock_ticker_col = 'Symbol'

# Extract distinct stock tickers from the metadata
distinct_stock_tickers_metadata = metadata_df_pbd.select(col(stock_ticker_col)).distinct()

# Convert the distinct stock tickers to a list
distinct_tickers_list_metadata = [row[stock_ticker_col] for row in distinct_stock_tickers_metadata.collect()]

# Define a check to verify if stock tickers appear in the metadata
ticker_check = Check(spark, level=CheckLevel.Warning, description="Stock Ticker check") \
    .isContainedIn(stock_ticker_col, distinct_tickers_list_metadata)

# Run the check using the Verification Suite
verification_result = VerificationSuite(spark).onData(metadata_df_pbd).addCheck(ticker_check).run()

# Display the results of the stock ticker check
print("Stock Ticker Check Results:")
print(f"  - Has warnings: {verification_result.status == 'Warning'}")

# Check if there are warnings before trying to access the number of warnings
if 'numRecordsFailed' in verification_result.checkResults[0]:
    print(f"  - Number of warnings: {verification_result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of warnings: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in verification_result.checkResults[0]:
    print(f"  - Warning details: {verification_result.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Warning details: No constraint message available\n")


Stock Ticker Check Results:



## Test 6 - Duplication 👥️
Lastly, we want to determine the uniqueness of the items found in the dataframe. You need to make use of the Verification Suite to check for the validity of the stock tickers.

Similar to the previous notebook - `Data_profiling_student_version.ipynb`, the first thing to check will be if the primary key values within the dataset are unique - in our case, that will be a combination of the stock name and the date. Secondly, we want to check if the entries are all unique, which is done by checking for duplicates across that whole dataset.

> ℹ️ **Instructions** ℹ️
>
>1. Make use of the `Verification Suite` and write code to determine the uniqueness of entries contained within the dataset.
>2. Display the results of your test.
>
> *You may use as many cells as necessary*



In [181]:
# Specify the column to check
col_name = 'date'

# Investigate duplicate values in the 'date' column
data_1963.groupBy("date").count().filter("count > 1").show()

# Remove duplicate rows based on the 'date' column
data_1963 = data_1963.dropDuplicates(["date"])


# Check for uniqueness in data_1963 based on the 'date' column
unique_check_data_1963_column = Check(spark, level=CheckLevel.Error, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Check for general duplicates in data_1963 for the 'date' column
duplicate_check_data_1963_column = Check(spark, level=CheckLevel.Error,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_1963.select(col_name).distinct().count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_1963_column = VerificationSuite(spark) \
    .onData(data_1963) \
    .addCheck(unique_check_data_1963_column) \
    .addCheck(duplicate_check_data_1963_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_1963:")
print(f"  - Has errors: {verification_result_data_1963_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_1963_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1963_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_1963_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1963_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_1963_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1963_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_1963_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1963_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_1963_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_1963_column.checkResults:
        print(check_result)




Verification Results for column date in data_1963:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [184]:
# Specify the column to check
col_name = 'date'

# Check for uniqueness in data_1963 based on the 'date' column
unique_check_data_1963_column = Check(spark, level=CheckLevel.Error, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Check for general duplicates in data_1963 for the 'date' column
duplicate_check_data_1963_column = Check(spark, level=CheckLevel.Error,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_1963.select(col_name).distinct().count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_1963_column = VerificationSuite(spark) \
    .onData(data_1963) \
    .addCheck(unique_check_data_1963_column) \
    .addCheck(duplicate_check_data_1963_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_1963:")
print(f"  - Has errors: {verification_result_data_1963_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_1963_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1963_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_1963_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1963_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_1963_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1963_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_1963_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1963_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_1963_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_1963_column.checkResults:
        print(check_result)




Verification Results for column date in data_1963:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [123]:
# Assuming 'Symbol' is the column representing stock tickers in metadata_df_pbd
stock_ticker_col_metadata = 'Symbol'

# Check for uniqueness in metadata_df_pbd based on 'Symbol'
unique_check_metadata = Check(spark, level=CheckLevel.Error, description="Uniqueness check for stock tickers") \
    .isUnique(stock_ticker_col_metadata)

# Check for general duplicates in metadata_df_pbd
duplicate_check_metadata = Check(spark, level=CheckLevel.Error, description="Duplicate check for entire metadata") \
    .hasSize(lambda x: x == metadata_df_pbd.count(), "Duplicate check for entire metadata")

# Run both checks using the Verification Suite
verification_result_metadata = VerificationSuite(spark) \
    .onData(metadata_df_pbd) \
    .addCheck(unique_check_metadata) \
    .addCheck(duplicate_check_metadata) \
    .run()

# Display the results of the checks for metadata_df_pbd
print("Verification Results for metadata_df_pbd:")
print(f"  - Has errors: {verification_result_metadata.status == 'Error'}")

# Check if there are errors in the uniqueness check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[0]:
    print(f"  - Number of uniqueness errors: {verification_result_metadata.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of uniqueness errors: 0")

# Check if there are errors in the duplicate check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[1]:
    print(f"  - Number of duplicate errors: {verification_result_metadata.checkResults[1]['numRecordsFailed']}")
else:
    print("  - Number of duplicate errors: 0")

# Check if 'constraintMessage' is present before accessing it for uniqueness check
if 'constraintMessage' in verification_result_metadata.checkResults[0]:
    print(f"  - Uniqueness error details: {verification_result_metadata.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Uniqueness error details: No constraint message available\n")

# Check if 'constraintMessage' is present before accessing it for duplicate check
if 'constraintMessage' in verification_result_metadata.checkResults[1]:
    print(f"  - Duplicate error details: {verification_result_metadata.checkResults[1]['constraintMessage']}\n")
else:
    print("  - Duplicate error details: No constraint message available\n")


Verification Results for metadata_df_pbd:
  - Has errors: False
  - Number of uniqueness errors: 0
  - Number of duplicate errors: 0
  - Uniqueness error details: No constraint message available

  - Duplicate error details: No constraint message available



**YEAR 1974**

In [124]:
#TODO: Write your code here
year = 1974
# Define the data schema with string data types
data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("adj_close", StringType(), True),
    StructField("volume", StringType(), True),
])

# Path to the CSV files for the year 1963
data_path_1974 = "/content/stocks/1974/*/*/*"


# Read in the data for 2018 with the specified schema
data_1974 = spark.read.csv(data_path_1974, schema=data_schema)

# Show the first few rows of the DataFrame
data_1974.show()

+----------+-------------------+------------------+-------------------+-------------------+--------------------+---------+
|      date|               open|              high|                low|              close|           adj_close|   volume|
+----------+-------------------+------------------+-------------------+-------------------+--------------------+---------+
|      Date|               Open|              High|                Low|              Close|           Adj Close|   Volume|
|1974-11-06|  4.980217456817627| 5.040292263031006|  4.818015098571777| 4.8300299644470215|  1.5678892135620115| 173000.0|
|1974-11-06|                0.0|             7.875|              7.625|              7.625|   5.073933124542236|  25400.0|
|1974-11-06|              16.25|            16.625|             16.125|             16.375|  0.6937137842178345|  48600.0|
|1974-11-06| 15.372641563415527|15.372641563415527| 15.372641563415527| 15.372641563415527|    10.8353910446167|      0.0|
|1974-11-06|    

In [125]:
import zipfile
# TODO: Replace with the actual path to parquet files
parquet_path = "/content/drive/MyDrive/parquet/stocks_1974.zip"

# Replace "/content/drive/MyDrive/parquet/stocks_1974.zip" with the actual path to zipped Parquet file
zipped_parquet_path = "/content/drive/MyDrive/parquet/stocks_1974.zip"
unzipped_parquet_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1974"

# Create a directory to extract the contents
os.makedirs(unzipped_parquet_path, exist_ok=True)

# Unzip the Parquet file
with zipfile.ZipFile(zipped_parquet_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_parquet_path)

# List the contents of the Parquet directory
contents = os.listdir('/content/drive/MyDrive/parquet/unzipped_stocks_1974/content/stocks_1974.parquet')
print("Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1974/content/stocks_1974.parquet:")
for item in contents:
    print(item)

Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1974/content/stocks_1974.parquet:
._SUCCESS.crc
_SUCCESS
part-00000-dbd4d770-4b28-46a3-b35d-5a3311207199-c000.snappy.parquet
part-00001-dbd4d770-4b28-46a3-b35d-5a3311207199-c000.snappy.parquet
.part-00000-dbd4d770-4b28-46a3-b35d-5a3311207199-c000.snappy.parquet.crc
.part-00001-dbd4d770-4b28-46a3-b35d-5a3311207199-c000.snappy.parquet.crc


In [126]:
# Test 1 - Null values using Deequ Verification Suite
for col_name in data_1974.columns:
    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Null check for column {col_name}") \
        .isComplete(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1974).addCheck(check).run()

    # Display the results of the check for each column
    print(f"Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [127]:
# Test 2 - Zero values using Deequ Verification Suite
for col_name in data_1974.columns:
    # Convert column values to string for Zero check
    data_1974_str = data_1974.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .isComplete(col_name)

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(data_1974_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")


Zero Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [128]:
# Test 3 - Negative values using Deequ Verification Suite
table_negative_values = []

for col_name in data_1974.columns:
    # Convert column values to string
    data_1974_str = data_1974.withColumn(col_name, col(col_name).cast("string"))

    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Negative check for column {col_name}") \
        .isNonNegative(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1974_str).addCheck(check).run()

    # Display the results of the Negative check for each column
    print(f"Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [185]:
# Ensure the data type is correct (assuming 'double' for numeric values)
data_1974_str = data_1974.withColumn("adj_close", col("adj_close").cast("double"))


In [186]:
# Assuming 'double' for numeric values
data_1974_str = data_1974.withColumn("adj_close", col("adj_close").cast("double"))

# Define a Check for the adj_close column
check = Check(spark, level=CheckLevel.Error, description="Negative check for column adj_close") \
    .isNonNegative("adj_close")

# Add the check to the Verification Suite
result = VerificationSuite(spark).onData(data_1974_str).addCheck(check).run()

# Display the results of the Negative check for the adj_close column
print("Negative Values Check for Column adj_close:")
print(f"  - Has errors: {result.status == 'Error'}")

# Check if there are errors before trying to access the number of errors
if 'numRecordsFailed' in result.checkResults[0]:
    print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of errors: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in result.checkResults[0]:
    print(f"  - Error details: {result.checkResults[0]['constraintMessage']}")
else:
    print("  - Error details: No constraint message available")


Negative Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available


In [131]:
#Test 4
# Create a Spark session
spark = SparkSession.builder.appName("MaxValuesTest").getOrCreate()

# Load data
data_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1974/content/stocks_1974.parquet"
data_1974 = spark.read.parquet(data_path)

# Use Column Profiler Runner to generate summary statistics
column_profiles = ColumnProfilerRunner(spark) \
    .onData(data_1974) \
    .run()

# Extract maximum values for all numeric columns
max_values = {}

for col_name, profile in column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        max_values[col_name] = profile.maximum

# Display the maximum values
print("Maximum Values for Numeric Columns:")
for col_name, max_value in max_values.items():
    print(f"{col_name}: {max_value}")

Maximum Values for Numeric Columns:
open: 367.5
low: 364.5833435058594
close: 367.5
volume: 75315200.0
adj_close: 292.9978332519531
high: 376.6666564941406


In [132]:
#Test 5
# Assuming the column containing stock tickers is named 'Symbol' in metadata_df_pbd
stock_ticker_col = 'Symbol'

# Extract distinct stock tickers from the metadata
distinct_stock_tickers_metadata = metadata_df_pbd.select(col(stock_ticker_col)).distinct()

# Convert the distinct stock tickers to a list
distinct_tickers_list_metadata = [row[stock_ticker_col] for row in distinct_stock_tickers_metadata.collect()]

# Define a check to verify if stock tickers appear in the metadata
ticker_check = Check(spark, level=CheckLevel.Warning, description="Stock Ticker check") \
    .isContainedIn(stock_ticker_col, distinct_tickers_list_metadata)

# Run the check using the Verification Suite
verification_result = VerificationSuite(spark).onData(metadata_df_pbd).addCheck(ticker_check).run()

# Display the results of the stock ticker check
print("Stock Ticker Check Results:")
print(f"  - Has warnings: {verification_result.status == 'Warning'}")

# Check if there are warnings before trying to access the number of warnings
if 'numRecordsFailed' in verification_result.checkResults[0]:
    print(f"  - Number of warnings: {verification_result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of warnings: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in verification_result.checkResults[0]:
    print(f"  - Warning details: {verification_result.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Warning details: No constraint message available\n")


Stock Ticker Check Results:



In [187]:
#Test 6
# Specify the column to check
col_name = 'date'

# Check for uniqueness in data_1974 based on the 'date' column
unique_check_data_1974_column = Check(spark, level=CheckLevel.Error, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Investigate duplicate values in the 'date' column
data_1974.groupBy("date").count().filter("count > 1").show()

# Remove duplicate rows based on the 'date' column
data_1974 = data_1974.dropDuplicates(["date"])

# Check for general duplicates in data_1974 for the 'date' column
duplicate_check_data_1974_column = Check(spark, level=CheckLevel.Error,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_1974.select(col_name).distinct().count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_1974_column = VerificationSuite(spark) \
    .onData(data_1974) \
    .addCheck(unique_check_data_1974_column) \
    .addCheck(duplicate_check_data_1974_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_1974:")
print(f"  - Has errors: {verification_result_data_1974_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_1974_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1974_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_1974_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1974_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_1974_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1974_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_1974_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1974_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_1974_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_1974_column.checkResults:
        print(check_result)



Verification Results for column date in data_1974:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [190]:
#Test 6
# 'Symbol' is the column representing stock tickers in metadata_df_pbd
stock_ticker_col_metadata = 'Symbol'

# Check for uniqueness in metadata_df_pbd based on 'Symbol'
unique_check_metadata = Check(spark, level=CheckLevel.Error, description="Uniqueness check for stock tickers") \
    .isUnique(stock_ticker_col_metadata)

# Check for general duplicates in metadata_df_pbd
duplicate_check_metadata = Check(spark, level=CheckLevel.Error, description="Duplicate check for entire metadata") \
    .hasSize(lambda x: x == metadata_df_pbd.count(), "Duplicate check for entire metadata")

# Run both checks using the Verification Suite
verification_result_metadata = VerificationSuite(spark) \
    .onData(metadata_df_pbd) \
    .addCheck(unique_check_metadata) \
    .addCheck(duplicate_check_metadata) \
    .run()

# Display the results of the checks for metadata_df_pbd
print("Verification Results for metadata_df_pbd:")
print(f"  - Has errors: {verification_result_metadata.status == 'Error'}")

# Check if there are errors in the uniqueness check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[0]:
    print(f"  - Number of uniqueness errors: {verification_result_metadata.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of uniqueness errors: 0")

# Check if there are errors in the duplicate check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[1]:
    print(f"  - Number of duplicate errors: {verification_result_metadata.checkResults[1]['numRecordsFailed']}")
else:
    print("  - Number of duplicate errors: 0")

# Check if 'constraintMessage' is present before accessing it for uniqueness check
if 'constraintMessage' in verification_result_metadata.checkResults[0]:
    print(f"  - Uniqueness error details: {verification_result_metadata.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Uniqueness error details: No constraint message available\n")

# Check if 'constraintMessage' is present before accessing it for duplicate check
if 'constraintMessage' in verification_result_metadata.checkResults[1]:
    print(f"  - Duplicate error details: {verification_result_metadata.checkResults[1]['constraintMessage']}\n")
else:
    print("  - Duplicate error details: No constraint message available\n")


Verification Results for metadata_df_pbd:
  - Has errors: False
  - Number of uniqueness errors: 0
  - Number of duplicate errors: 0
  - Uniqueness error details: No constraint message available

  - Duplicate error details: No constraint message available



**YEAR 1985**

In [137]:
#TODO: Write your code here
year = 1985
# Define the data schema with string data types
data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("adj_close", StringType(), True),
    StructField("volume", StringType(), True),
])

# Path to the CSV files for the year 1963
data_path_1985 = "/content/stocks/1985/*/*/*"


# Read in the data for 2018 with the specified schema
data_1985 = spark.read.csv(data_path_1985, schema=data_schema)

# Show the first few rows of the DataFrame
data_1985.show()

+----------+------------------+------------------+------------------+------------------+--------------------+----------+
|      date|              open|              high|               low|             close|           adj_close|    volume|
+----------+------------------+------------------+------------------+------------------+--------------------+----------+
|      Date|              Open|              High|               Low|             Close|           Adj Close|    Volume|
|1985-12-19|11.858804702758787|12.015000343322756|  11.8347749710083|  11.8347749710083|   6.174869060516357|  856500.0|
|1985-12-19|               0.0|12.399999618530273|12.199999809265135|12.399999618530273|  10.775887489318848|    4000.0|
|1985-12-19| 1.203703761100769| 1.203703761100769|1.1666666269302368| 1.203703761100769|-1.50225855009007...|  665500.0|
|1985-12-19|0.3973214328289032|           0.40625|0.3950892984867096|0.4017857015132904|  0.3183508515357971|67530400.0|
|1985-12-19|           71250.0| 

In [138]:
import zipfile
# TODO: Replace with the actual path to parquet files
parquet_path = "/content/drive/MyDrive/parquet/stocks_1985.zip"

# Replace "/content/drive/MyDrive/parquet/stocks_1985.zip" with the actual path to zipped Parquet file
zipped_parquet_path = "/content/drive/MyDrive/parquet/stocks_1985.zip"
unzipped_parquet_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1985"

# Create a directory to extract the contents
os.makedirs(unzipped_parquet_path, exist_ok=True)

# Unzip the Parquet file
with zipfile.ZipFile(zipped_parquet_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_parquet_path)

# List the contents of the Parquet directory
contents = os.listdir('/content/drive/MyDrive/parquet/unzipped_stocks_1985/content/stocks_1985.parquet')
print("Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1985/content/stocks_1985.parquet:")
for item in contents:
    print(item)

Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1985/content/stocks_1985.parquet:
part-00001-2ee97d16-b78c-4a20-b45f-ce48fa9d8f79-c000.snappy.parquet
.part-00000-2ee97d16-b78c-4a20-b45f-ce48fa9d8f79-c000.snappy.parquet.crc
._SUCCESS.crc
_SUCCESS
.part-00001-2ee97d16-b78c-4a20-b45f-ce48fa9d8f79-c000.snappy.parquet.crc
part-00000-2ee97d16-b78c-4a20-b45f-ce48fa9d8f79-c000.snappy.parquet


In [139]:
# Test 1 - Null values using Deequ Verification Suite
for col_name in data_1985.columns:
    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Null check for column {col_name}") \
        .isComplete(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1985).addCheck(check).run()

    # Display the results of the check for each column
    print(f"Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [140]:
# Drop rows with null values in specific columns
columns_with_nulls = ['open', 'high', 'low', 'close', 'adj_close', 'volume']
data_1985_cleaned = data_1985.dropna(subset=columns_with_nulls)

# Display the count of null values after cleaning
for col_name in columns_with_nulls:
    null_count_after_cleaning = data_1985_cleaned.where(col(col_name).isNull()).count()
    print(f"After cleaning, Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {null_count_after_cleaning > 0}")
    print(f"  - Number of errors: {null_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")



After cleaning, Null Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [141]:
# Test 2 - Zero values using Deequ Verification Suite
for col_name in data_1985.columns:
    # Convert column values to string for Zero check
    data_1985_str = data_1985.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .isComplete(col_name)

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(data_1985_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")


Zero Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [142]:
from pyspark.sql.functions import col

# Assuming 'data_1985' is your DataFrame

# Drop rows with zero values in specific columns
columns_with_zeros = ['open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_zeros:
    data_1985 = data_1985.filter(col(col_name) != 0)

# Display the count of zero values after cleaning
for col_name in columns_with_zeros:
    zero_count_after_cleaning = data_1985.where(col(col_name) == 0).count()
    print(f"After cleaning, Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_count_after_cleaning > 0}")
    print(f"  - Number of errors: {zero_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Zero Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [143]:
# Test 3 - Negative values using Deequ Verification Suite
table_negative_values = []

for col_name in data_1985.columns:
    # Convert column values to string
    data_1985_str = data_1985.withColumn(col_name, col(col_name).cast("string"))

    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Negative check for column {col_name}") \
        .isNonNegative(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1985_str).addCheck(check).run()

    # Display the results of the Negative check for each column
    print(f"Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [144]:
from pyspark.sql.functions import col

# Drop rows with negative values in specific columns
columns_with_negatives = ['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_negatives:
    data_1985 = data_1985.filter(col(col_name) >= 0)

# Display the count of negative values after cleaning
for col_name in columns_with_negatives:
    negative_count_after_cleaning = data_1985.where(col(col_name) < 0).count()
    print(f"After cleaning, Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {negative_count_after_cleaning > 0}")
    print(f"  - Number of errors: {negative_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Negative Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Negative Values Check for Column volume:
  - Has errors: Fals

In [145]:
#Test 4
# Create a Spark session
spark = SparkSession.builder.appName("MaxValuesTest").getOrCreate()

# Load data
data_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1985/content/stocks_1985.parquet"
data_1985 = spark.read.parquet(data_path)

# Use Column Profiler Runner to generate summary statistics
column_profiles = ColumnProfilerRunner(spark) \
    .onData(data_1985) \
    .run()

# Extract maximum values for all numeric columns
max_values = {}

for col_name, profile in column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        max_values[col_name] = profile.maximum

# Display the maximum values
print("Maximum Values for Numeric Columns:")
for col_name, max_value in max_values.items():
    print(f"{col_name}: {max_value}")

Maximum Values for Numeric Columns:
open: 100000.0
low: 219375.0
close: 221250.0
volume: 183495200.0
adj_close: 190826.34375
high: 224062.5


In [146]:
from pydeequ.profiles import ColumnProfilerRunner
#Test 4
# Use Column Profiler Runner to generate summary statistics for metadata_df_pbd
metadata_column_profiles = ColumnProfilerRunner(spark) \
    .onData(metadata_df_pbd) \
    .run()

# Extract maximum values for all numeric columns in metadata_df_pbd
metadata_max_values = {}

for col_name, profile in metadata_column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        metadata_max_values[col_name] = profile.maximum

# Display the maximum values for metadata_df_pbd
print("Maximum Values for Numeric Columns in metadata_df_pbd:")
for col_name, max_value in metadata_max_values.items():
    print(f"{col_name}: {max_value}")


Maximum Values for Numeric Columns in metadata_df_pbd:
Round Lot Size: 100.0


In [147]:
#Test 5
# Assuming the column containing stock tickers is named 'Symbol' in metadata_df_pbd
stock_ticker_col = 'Symbol'

# Extract distinct stock tickers from the metadata
distinct_stock_tickers_metadata = metadata_df_pbd.select(col(stock_ticker_col)).distinct()

# Convert the distinct stock tickers to a list
distinct_tickers_list_metadata = [row[stock_ticker_col] for row in distinct_stock_tickers_metadata.collect()]

# Define a check to verify if stock tickers appear in the metadata
ticker_check = Check(spark, level=CheckLevel.Warning, description="Stock Ticker check") \
    .isContainedIn(stock_ticker_col, distinct_tickers_list_metadata)

# Run the check using the Verification Suite
verification_result = VerificationSuite(spark).onData(metadata_df_pbd).addCheck(ticker_check).run()

# Display the results of the stock ticker check
print("Stock Ticker Check Results:")
print(f"  - Has warnings: {verification_result.status == 'Warning'}")

# Check if there are warnings before trying to access the number of warnings
if 'numRecordsFailed' in verification_result.checkResults[0]:
    print(f"  - Number of warnings: {verification_result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of warnings: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in verification_result.checkResults[0]:
    print(f"  - Warning details: {verification_result.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Warning details: No constraint message available\n")


Stock Ticker Check Results:



In [148]:
#Test 6
# Specify the column to check
col_name = 'date'

# Check for uniqueness in data_1985 based on the 'date' column
unique_check_data_1985_column = Check(spark, level=CheckLevel.Warning, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Check for general duplicates in data_1985 for the 'date' column
duplicate_check_data_1985_column = Check(spark, level=CheckLevel.Warning,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_1985.select(col_name).count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_1985_column = VerificationSuite(spark) \
    .onData(data_1985) \
    .addCheck(unique_check_data_1985_column) \
    .addCheck(duplicate_check_data_1985_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_1985:")
print(f"  - Has errors: {verification_result_data_1985_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_1985_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1985_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_1985_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1985_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_1985_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1985_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_1985_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1985_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_1985_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_1985_column.checkResults:
        print(check_result)



Verification Results for column date in data_1985:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [149]:
#Test 6
# 'Symbol' is the column representing stock tickers in metadata_df_pbd
stock_ticker_col_metadata = 'Symbol'

# Check for uniqueness in metadata_df_pbd based on 'Symbol'
unique_check_metadata = Check(spark, level=CheckLevel.Error, description="Uniqueness check for stock tickers") \
    .isUnique(stock_ticker_col_metadata)

# Check for general duplicates in metadata_df_pbd
duplicate_check_metadata = Check(spark, level=CheckLevel.Error, description="Duplicate check for entire metadata") \
    .hasSize(lambda x: x == metadata_df_pbd.count(), "Duplicate check for entire metadata")

# Run both checks using the Verification Suite
verification_result_metadata = VerificationSuite(spark) \
    .onData(metadata_df_pbd) \
    .addCheck(unique_check_metadata) \
    .addCheck(duplicate_check_metadata) \
    .run()

# Display the results of the checks for metadata_df_pbd
print("Verification Results for metadata_df_pbd:")
print(f"  - Has errors: {verification_result_metadata.status == 'Error'}")

# Check if there are errors in the uniqueness check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[0]:
    print(f"  - Number of uniqueness errors: {verification_result_metadata.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of uniqueness errors: 0")

# Check if there are errors in the duplicate check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[1]:
    print(f"  - Number of duplicate errors: {verification_result_metadata.checkResults[1]['numRecordsFailed']}")
else:
    print("  - Number of duplicate errors: 0")

# Check if 'constraintMessage' is present before accessing it for uniqueness check
if 'constraintMessage' in verification_result_metadata.checkResults[0]:
    print(f"  - Uniqueness error details: {verification_result_metadata.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Uniqueness error details: No constraint message available\n")

# Check if 'constraintMessage' is present before accessing it for duplicate check
if 'constraintMessage' in verification_result_metadata.checkResults[1]:
    print(f"  - Duplicate error details: {verification_result_metadata.checkResults[1]['constraintMessage']}\n")
else:
    print("  - Duplicate error details: No constraint message available\n")


Verification Results for metadata_df_pbd:
  - Has errors: False
  - Number of uniqueness errors: 0
  - Number of duplicate errors: 0
  - Uniqueness error details: No constraint message available

  - Duplicate error details: No constraint message available



**YEAR 1996**

In [150]:
#TODO: Write your code here
year = 1996
# Define the data schema with string data types
data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("adj_close", StringType(), True),
    StructField("volume", StringType(), True),
])

# Path to the CSV files for the year 1996
data_path_1996 = "/content/stocks/1996/*/*/*"


# Read in the data for 1996 with the specified schema
data_1996 = spark.read.csv(data_path_1996, schema=data_schema)

# Show the first few rows of the DataFrame
data_1996.show()

+----------+------------------+-------------------+------------------+------------------+------------------+----------+
|      date|              open|               high|               low|             close|         adj_close|    volume|
+----------+------------------+-------------------+------------------+------------------+------------------+----------+
|      Date|              Open|               High|               Low|             Close|         Adj Close|    Volume|
|1996-12-31|  37.9433708190918|   38.1356086730957| 37.53485870361328|  38.1356086730957|  27.1257438659668| 1025800.0|
|1996-12-31|              3.25|               3.25|            3.0625|            3.0625|2.8300909996032715|   17200.0|
|1996-12-31| 3.851851940155029|  4.148148059844972| 3.851851940155029| 4.148148059844972|-147.2616424560547|    1300.0|
|1996-12-31|0.4170095920562744|0.43895748257637024|0.4170095920562744|0.4279835522174835|0.3635455369949341|  269200.0|
|1996-12-31|0.7633928656578064| 0.767857

In [151]:
import zipfile
# TODO: Replace with the actual path to parquet files
parquet_path = "/content/drive/MyDrive/parquet/stocks_1996.zip"

# Replace "/content/drive/MyDrive/parquet/stocks_1996.zip" with the actual path to zipped Parquet file
zipped_parquet_path = "/content/drive/MyDrive/parquet/stocks_1996.zip"
unzipped_parquet_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1996"

# Create a directory to extract the contents
os.makedirs(unzipped_parquet_path, exist_ok=True)

# Unzip the Parquet file
with zipfile.ZipFile(zipped_parquet_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_parquet_path)

# List the contents of the Parquet directory
contents = os.listdir('/content/drive/MyDrive/parquet/unzipped_stocks_1996/content/stocks_1996.parquet')
print("Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1996/content/stocks_1996.parquet:")
for item in contents:
    print(item)

Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_1996/content/stocks_1996.parquet:
part-00001-7de0d854-0cf6-4b1c-bbc1-d30426a01d28-c000.snappy.parquet
._SUCCESS.crc
_SUCCESS
part-00000-7de0d854-0cf6-4b1c-bbc1-d30426a01d28-c000.snappy.parquet
.part-00001-7de0d854-0cf6-4b1c-bbc1-d30426a01d28-c000.snappy.parquet.crc
.part-00000-7de0d854-0cf6-4b1c-bbc1-d30426a01d28-c000.snappy.parquet.crc


In [192]:
# Test 1 - Null values using Deequ Verification Suite
# Iterate over columns and perform null check
for col_name in data_1996.columns:
    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Null check for column {col_name}") \
        .isComplete(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1996).addCheck(check).run()

    # Display the results of the null check for each column
    print(f"Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Null Values Check for Column date:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [154]:
from pyspark.sql.functions import col

for col_name in data_1996.columns:
    # Check if the column has null values
    null_count = data_1996.filter(col(col_name).isNull()).count()
    print(f"Null count for Column {col_name}: {null_count}")


Null count for Column date: 0
Null count for Column open: 19
Null count for Column high: 19
Null count for Column low: 19
Null count for Column close: 19
Null count for Column adj_close: 19
Null count for Column volume: 19


In [155]:
from pyspark.sql.functions import col

# Drop rows with null values in specific columns
columns_with_nulls = ['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_nulls:
    data_1996 = data_1996.filter(col(col_name).isNotNull())

# Display the count of null values after cleaning
for col_name in columns_with_nulls:
    null_count_after_cleaning = data_1996.where(col(col_name).isNull()).count()
    print(f"After cleaning, Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {null_count_after_cleaning > 0}")
    print(f"  - Number of errors: {null_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  

In [156]:
# Test 2 - Zero values using Deequ Verification Suite
for col_name in data_1996.columns:
    # Convert column values to string for Zero check
    data_1996_str = data_1996.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .isComplete(col_name)

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(data_1996_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Zero Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [157]:
# Test 3 - Negative values using Deequ Verification Suite
table_negative_values = []

for col_name in data_1996.columns:
    # Convert column values to string
    data_1996_str = data_1996.withColumn(col_name, col(col_name).cast("string"))

    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Negative check for column {col_name}") \
        .isNonNegative(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_1996_str).addCheck(check).run()

    # Display the results of the Negative check for each column
    print(f"Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [158]:
#Test 4
# Create a Spark session
spark = SparkSession.builder.appName("MaxValuesTest").getOrCreate()

# Load data
data_path = "/content/drive/MyDrive/parquet/unzipped_stocks_1996/content/stocks_1996.parquet"
data_1996 = spark.read.parquet(data_path)

# Use Column Profiler Runner to generate summary statistics
column_profiles = ColumnProfilerRunner(spark) \
    .onData(data_1996) \
    .run()

# Extract maximum values for all numeric columns
max_values = {}

for col_name, profile in column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        max_values[col_name] = profile.maximum

# Display the maximum values
print("Maximum Values for Numeric Columns:")
for col_name, max_value in max_values.items():
    print(f"{col_name}: {max_value}")

Maximum Values for Numeric Columns:
open: 498750.0
low: 490000.0
close: 507500.0
volume: 546630400.0
adj_close: 507500.0
high: 507500.0


In [159]:
from pydeequ.profiles import ColumnProfilerRunner
#Test 4
# Use Column Profiler Runner to generate summary statistics for metadata_df_pbd
metadata_column_profiles = ColumnProfilerRunner(spark) \
    .onData(metadata_df_pbd) \
    .run()

# Extract maximum values for all numeric columns in metadata_df_pbd
metadata_max_values = {}

for col_name, profile in metadata_column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        metadata_max_values[col_name] = profile.maximum

# Display the maximum values for metadata_df_pbd
print("Maximum Values for Numeric Columns in metadata_df_pbd:")
for col_name, max_value in metadata_max_values.items():
    print(f"{col_name}: {max_value}")


Maximum Values for Numeric Columns in metadata_df_pbd:
Round Lot Size: 100.0


In [160]:
#Test 5
# Assuming the column containing stock tickers is named 'Symbol' in metadata_df_pbd
stock_ticker_col = 'Symbol'

# Extract distinct stock tickers from the metadata
distinct_stock_tickers_metadata = metadata_df_pbd.select(col(stock_ticker_col)).distinct()

# Convert the distinct stock tickers to a list
distinct_tickers_list_metadata = [row[stock_ticker_col] for row in distinct_stock_tickers_metadata.collect()]

# Define a check to verify if stock tickers appear in the metadata
ticker_check = Check(spark, level=CheckLevel.Warning, description="Stock Ticker check") \
    .isContainedIn(stock_ticker_col, distinct_tickers_list_metadata)

# Run the check using the Verification Suite
verification_result = VerificationSuite(spark).onData(metadata_df_pbd).addCheck(ticker_check).run()

# Display the results of the stock ticker check
print("Stock Ticker Check Results:")
print(f"  - Has warnings: {verification_result.status == 'Warning'}")

# Check if there are warnings before trying to access the number of warnings
if 'numRecordsFailed' in verification_result.checkResults[0]:
    print(f"  - Number of warnings: {verification_result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of warnings: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in verification_result.checkResults[0]:
    print(f"  - Warning details: {verification_result.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Warning details: No constraint message available\n")


Stock Ticker Check Results:



In [161]:
#Test 6
# Specify the column to check
col_name = 'date'

# Check for uniqueness in data_1996 based on the 'date' column
unique_check_data_1996_column = Check(spark, level=CheckLevel.Warning, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Check for general duplicates in data_1996 for the 'date' column
duplicate_check_data_1996_column = Check(spark, level=CheckLevel.Warning,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_1996.select(col_name).count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_1996_column = VerificationSuite(spark) \
    .onData(data_1996) \
    .addCheck(unique_check_data_1996_column) \
    .addCheck(duplicate_check_data_1996_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_1996:")
print(f"  - Has errors: {verification_result_data_1996_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_1996_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1996_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_1996_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_1996_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_1996_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1996_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_1996_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_1996_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_1996_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_1996_column.checkResults:
        print(check_result)



Verification Results for column date in data_1996:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [162]:
#Test 6
# 'Symbol' is the column representing stock tickers in metadata_df_pbd
stock_ticker_col_metadata = 'Symbol'

# Check for uniqueness in metadata_df_pbd based on 'Symbol'
unique_check_metadata = Check(spark, level=CheckLevel.Error, description="Uniqueness check for stock tickers") \
    .isUnique(stock_ticker_col_metadata)

# Check for general duplicates in metadata_df_pbd
duplicate_check_metadata = Check(spark, level=CheckLevel.Error, description="Duplicate check for entire metadata") \
    .hasSize(lambda x: x == metadata_df_pbd.count(), "Duplicate check for entire metadata")

# Run both checks using the Verification Suite
verification_result_metadata = VerificationSuite(spark) \
    .onData(metadata_df_pbd) \
    .addCheck(unique_check_metadata) \
    .addCheck(duplicate_check_metadata) \
    .run()

# Display the results of the checks for metadata_df_pbd
print("Verification Results for metadata_df_pbd:")
print(f"  - Has errors: {verification_result_metadata.status == 'Error'}")

# Check if there are errors in the uniqueness check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[0]:
    print(f"  - Number of uniqueness errors: {verification_result_metadata.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of uniqueness errors: 0")

# Check if there are errors in the duplicate check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[1]:
    print(f"  - Number of duplicate errors: {verification_result_metadata.checkResults[1]['numRecordsFailed']}")
else:
    print("  - Number of duplicate errors: 0")

# Check if 'constraintMessage' is present before accessing it for uniqueness check
if 'constraintMessage' in verification_result_metadata.checkResults[0]:
    print(f"  - Uniqueness error details: {verification_result_metadata.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Uniqueness error details: No constraint message available\n")

# Check if 'constraintMessage' is present before accessing it for duplicate check
if 'constraintMessage' in verification_result_metadata.checkResults[1]:
    print(f"  - Duplicate error details: {verification_result_metadata.checkResults[1]['constraintMessage']}\n")
else:
    print("  - Duplicate error details: No constraint message available\n")

Verification Results for metadata_df_pbd:
  - Has errors: False
  - Number of uniqueness errors: 0
  - Number of duplicate errors: 0
  - Uniqueness error details: No constraint message available

  - Duplicate error details: No constraint message available



**YEAR 2007**

In [163]:
#TODO: Write your code here
year = 2007
# Define the data schema with string data types
data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("adj_close", StringType(), True),
    StructField("volume", StringType(), True),
])

# Path to the CSV files for the year 2007
data_path_2007 = "/content/stocks/2007/*/*/*"


# Read in the data for 2007 with the specified schema
data_2007 = spark.read.csv(data_path_2007, schema=data_schema)

# Show the first few rows of the DataFrame
data_2007.show()

+----------+------------------+------------------+------------------+------------------+------------------+-----------+
|      date|              open|              high|               low|             close|         adj_close|     volume|
+----------+------------------+------------------+------------------+------------------+------------------+-----------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|     Volume|
|2007-12-27| 26.67382049560547| 26.84549331665039|26.566524505615234|26.566524505615234| 24.24957847595215|  2628500.0|
|2007-12-27| 89.70398712158203| 89.77607727050781| 88.83891296386719| 88.88697052001953| 77.58712005615234|  2743600.0|
|2007-12-27| 15.43000030517578|15.619999885559082|15.079999923706055| 15.18000030517578|14.310813903808596|  1473200.0|
|2007-12-27|1.3799999952316284|               1.5|1.3799999952316284|1.4800000190734863| 1.367684841156006|    16200.0|
|2007-12-27|11.600000381469727|11.633333

In [164]:
import zipfile
# TODO: Replace with the actual path to parquet files
parquet_path = "/content/drive/MyDrive/parquet/stocks_2007.zip"

# Replace "/content/drive/MyDrive/parquet/stocks_2007.zip" with the actual path to zipped Parquet file
zipped_parquet_path = "/content/drive/MyDrive/parquet/stocks_2007.zip"
unzipped_parquet_path = "/content/drive/MyDrive/parquet/unzipped_stocks_2007"

# Create a directory to extract the contents
os.makedirs(unzipped_parquet_path, exist_ok=True)

# Unzip the Parquet file
with zipfile.ZipFile(zipped_parquet_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_parquet_path)

# List the contents of the Parquet directory
contents = os.listdir('/content/drive/MyDrive/parquet/unzipped_stocks_2007/content/stocks_2007.parquet')
print("Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_2007/content/stocks_2007.parquet:")
for item in contents:
    print(item)

Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_2007/content/stocks_2007.parquet:
part-00001-f35f7986-9310-44ce-a8ef-031749839bbb-c000.snappy.parquet
._SUCCESS.crc
_SUCCESS
.part-00000-f35f7986-9310-44ce-a8ef-031749839bbb-c000.snappy.parquet.crc
part-00000-f35f7986-9310-44ce-a8ef-031749839bbb-c000.snappy.parquet
.part-00001-f35f7986-9310-44ce-a8ef-031749839bbb-c000.snappy.parquet.crc


In [165]:
# Test 1 - Null values using Deequ Verification Suite
# Iterate over columns and perform null check
for col_name in data_2007.columns:
    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Null check for column {col_name}") \
        .isComplete(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_2007).addCheck(check).run()

    # Display the results of the null check for each column
    print(f"Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [194]:
from pyspark.sql.functions import col

# Drop rows with null values in specific columns
columns_with_nulls = ['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_nulls:
    data_1996 = data_1996.filter(col(col_name).isNotNull())

# Display the count of null values after cleaning
for col_name in columns_with_nulls:
    null_count_after_cleaning = data_1996.where(col(col_name).isNull()).count()
    print(f"After cleaning, Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {null_count_after_cleaning > 0}")
    print(f"  - Number of errors: {null_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  

In [166]:
# Test 2 - Zero values using Deequ Verification Suite
for col_name in data_2007.columns:
    # Convert column values to string for Zero check
    data_2007_str = data_2007.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .isComplete(col_name)

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(data_2007_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Zero Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [195]:
from pyspark.sql.functions import col

# Drop rows with zero values in specific columns
columns_with_zeros = ['open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_zeros:
    data_2007 = data_2007.filter(col(col_name) != 0)

# Display the count of zero values after cleaning
for col_name in columns_with_zeros:
    zero_count_after_cleaning = data_2007.where(col(col_name) == 0).count()
    print(f"After cleaning, Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_count_after_cleaning > 0}")
    print(f"  - Number of errors: {zero_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Zero Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [167]:
# Test 3 - Negative values using Deequ Verification Suite
table_negative_values = []

for col_name in data_2007.columns:
    # Convert column values to string
    data_2007_str = data_2007.withColumn(col_name, col(col_name).cast("string"))

    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Negative check for column {col_name}") \
        .isNonNegative(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_2007_str).addCheck(check).run()

    # Display the results of the Negative check for each column
    print(f"Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [168]:
#Test 4
# Create a Spark session
spark = SparkSession.builder.appName("MaxValuesTest").getOrCreate()

# Load data
data_path = "/content/drive/MyDrive/parquet/unzipped_stocks_2007/content/stocks_2007.parquet"
data_2007 = spark.read.parquet(data_path)

# Use Column Profiler Runner to generate summary statistics
column_profiles = ColumnProfilerRunner(spark) \
    .onData(data_2007) \
    .run()

# Extract maximum values for all numeric columns
max_values = {}

for col_name, profile in column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        max_values[col_name] = profile.maximum

# Display the maximum values
print("Maximum Values for Numeric Columns:")
for col_name, max_value in max_values.items():
    print(f"{col_name}: {max_value}")

Maximum Values for Numeric Columns:
open: 62672400384.0
low: 61613998080.0
close: 62067601408.0
volume: 1835338900.0
adj_close: 62067601408.0
high: 63503998976.0


In [169]:
#Test 5
# Assuming the column containing stock tickers is named 'Symbol' in metadata_df_pbd
stock_ticker_col = 'Symbol'

# Extract distinct stock tickers from the metadata
distinct_stock_tickers_metadata = metadata_df_pbd.select(col(stock_ticker_col)).distinct()

# Convert the distinct stock tickers to a list
distinct_tickers_list_metadata = [row[stock_ticker_col] for row in distinct_stock_tickers_metadata.collect()]

# Define a check to verify if stock tickers appear in the metadata
ticker_check = Check(spark, level=CheckLevel.Warning, description="Stock Ticker check") \
    .isContainedIn(stock_ticker_col, distinct_tickers_list_metadata)

# Run the check using the Verification Suite
verification_result = VerificationSuite(spark).onData(metadata_df_pbd).addCheck(ticker_check).run()

# Display the results of the stock ticker check
print("Stock Ticker Check Results:")
print(f"  - Has warnings: {verification_result.status == 'Warning'}")

# Check if there are warnings before trying to access the number of warnings
if 'numRecordsFailed' in verification_result.checkResults[0]:
    print(f"  - Number of warnings: {verification_result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of warnings: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in verification_result.checkResults[0]:
    print(f"  - Warning details: {verification_result.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Warning details: No constraint message available\n")


Stock Ticker Check Results:



In [170]:
#Test 6
# Specify the column to check
col_name = 'date'

# Check for uniqueness in data_2007 based on the 'date' column
unique_check_data_2007_column = Check(spark, level=CheckLevel.Warning, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Check for general duplicates in data_2007 for the 'date' column
duplicate_check_data_2007_column = Check(spark, level=CheckLevel.Warning,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_2007.select(col_name).count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_2007_column = VerificationSuite(spark) \
    .onData(data_2007) \
    .addCheck(unique_check_data_2007_column) \
    .addCheck(duplicate_check_data_2007_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_2007:")
print(f"  - Has errors: {verification_result_data_2007_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_2007_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_2007_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_2007_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_2007_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_2007_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_2007_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_2007_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_2007_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_2007_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_2007_column.checkResults:
        print(check_result)



Verification Results for column date in data_2007:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [171]:
#Test 6
# 'Symbol' is the column representing stock tickers in metadata_df_pbd
stock_ticker_col_metadata = 'Symbol'

# Check for uniqueness in metadata_df_pbd based on 'Symbol'
unique_check_metadata = Check(spark, level=CheckLevel.Error, description="Uniqueness check for stock tickers") \
    .isUnique(stock_ticker_col_metadata)

# Check for general duplicates in metadata_df_pbd
duplicate_check_metadata = Check(spark, level=CheckLevel.Error, description="Duplicate check for entire metadata") \
    .hasSize(lambda x: x == metadata_df_pbd.count(), "Duplicate check for entire metadata")

# Run both checks using the Verification Suite
verification_result_metadata = VerificationSuite(spark) \
    .onData(metadata_df_pbd) \
    .addCheck(unique_check_metadata) \
    .addCheck(duplicate_check_metadata) \
    .run()

# Display the results of the checks for metadata_df_pbd
print("Verification Results for metadata_df_pbd:")
print(f"  - Has errors: {verification_result_metadata.status == 'Error'}")

# Check if there are errors in the uniqueness check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[0]:
    print(f"  - Number of uniqueness errors: {verification_result_metadata.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of uniqueness errors: 0")

# Check if there are errors in the duplicate check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[1]:
    print(f"  - Number of duplicate errors: {verification_result_metadata.checkResults[1]['numRecordsFailed']}")
else:
    print("  - Number of duplicate errors: 0")

# Check if 'constraintMessage' is present before accessing it for uniqueness check
if 'constraintMessage' in verification_result_metadata.checkResults[0]:
    print(f"  - Uniqueness error details: {verification_result_metadata.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Uniqueness error details: No constraint message available\n")

# Check if 'constraintMessage' is present before accessing it for duplicate check
if 'constraintMessage' in verification_result_metadata.checkResults[1]:
    print(f"  - Duplicate error details: {verification_result_metadata.checkResults[1]['constraintMessage']}\n")
else:
    print("  - Duplicate error details: No constraint message available\n")

Verification Results for metadata_df_pbd:
  - Has errors: False
  - Number of uniqueness errors: 0
  - Number of duplicate errors: 0
  - Uniqueness error details: No constraint message available

  - Duplicate error details: No constraint message available



**YEAR 2018**

In [172]:
#TODO: Write your code here
year = 2018
# Define the data schema with string data types
data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("adj_close", StringType(), True),
    StructField("volume", StringType(), True),
])

# Path to the CSV files for the year 2007
data_path_2018 = "/content/stocks/2018/*/*/*"


# Read in the data for 2018 with the specified schema
data_2018 = spark.read.csv(data_path_2018, schema=data_schema)

# Show the first few rows of the DataFrame
data_2018.show()

+----------+------------------+------------------+------------------+------------------+------------------+----------+
|      date|              open|              high|               low|             close|         adj_close|    volume|
+----------+------------------+------------------+------------------+------------------+------------------+----------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|    Volume|
|2018-12-18| 68.44999694824219| 69.02999877929689| 67.38999938964844| 67.98999786376953| 67.07752990722656| 2112000.0|
|2018-12-18|  28.1200008392334| 28.65999984741211| 28.09000015258789|28.239999771118164|28.239999771118164| 2107800.0|
|2018-12-18|32.470001220703125|33.650001525878906|32.470001220703125|33.540000915527344|33.006290435791016| 9117600.0|
|2018-12-18| 41.27999877929688|              45.0| 41.27999877929688| 41.58000183105469| 41.58000183105469|    2100.0|
|2018-12-18| 2.400000095367432|2.420000076293945

In [173]:
import zipfile
# TODO: Replace with the actual path to parquet files
parquet_path = "/content/drive/MyDrive/parquet/stocks_2018.zip"

# Replace "/content/drive/MyDrive/parquet/stocks_2018.zip" with the actual path to zipped Parquet file
zipped_parquet_path = "/content/drive/MyDrive/parquet/stocks_2018.zip"
unzipped_parquet_path = "/content/drive/MyDrive/parquet/unzipped_stocks_2018"

# Create a directory to extract the contents
os.makedirs(unzipped_parquet_path, exist_ok=True)

# Unzip the Parquet file
with zipfile.ZipFile(zipped_parquet_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_parquet_path)

# List the contents of the Parquet directory
contents = os.listdir('/content/drive/MyDrive/parquet/unzipped_stocks_2018/content/stocks_2018.parquet')
print("Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_2018/content/stocks_2018.parquet:")
for item in contents:
    print(item)

Contents of the /content/drive/MyDrive/parquet/unzipped_stocks_2018/content/stocks_2018.parquet:
._SUCCESS.crc
_SUCCESS
.part-00000-cffd627a-eddb-4847-938d-16b77028af68-c000.snappy.parquet.crc
.part-00001-cffd627a-eddb-4847-938d-16b77028af68-c000.snappy.parquet.crc
part-00001-cffd627a-eddb-4847-938d-16b77028af68-c000.snappy.parquet
part-00000-cffd627a-eddb-4847-938d-16b77028af68-c000.snappy.parquet


In [174]:
# Test 1 - Null values using Deequ Verification Suite
# Iterate over columns and perform null check
for col_name in data_2018.columns:
    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Null check for column {col_name}") \
        .isComplete(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_2018).addCheck(check).run()

    # Display the results of the null check for each column
    print(f"Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Null Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [197]:
from pyspark.sql.functions import col

# Drop rows with null values in specific columns
columns_with_nulls = ['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_nulls:
    data_2018 = data_2018.filter(col(col_name).isNotNull())

# Display the count of null values after cleaning
for col_name in columns_with_nulls:
    null_count_after_cleaning = data_2018.where(col(col_name).isNull()).count()
    print(f"After cleaning, Null Values Check for Column {col_name}:")
    print(f"  - Has errors: {null_count_after_cleaning > 0}")
    print(f"  - Number of errors: {null_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Null Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Null Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  

In [175]:
# Test 2 - Zero values using Deequ Verification Suite
for col_name in data_2018.columns:
    # Convert column values to string for Zero check
    data_2018_str = data_2018.withColumn(col_name, F.col(col_name).cast("string"))

    # Define a Check for each column for Zero values
    zero_check = Check(spark, level=CheckLevel.Error, description=f"Zero values check for column {col_name}") \
        .isComplete(col_name)

    # Run Zero check
    zero_result = VerificationSuite(spark).onData(data_2018_str).addCheck(zero_check).run()

    # Display the results of the Zero check for each column
    print(f"Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in zero_result.checkResults[0]:
        print(f"  - Number of errors: {zero_result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in zero_result.checkResults[0]:
        print(f"  - Error details: {zero_result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Zero Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column open:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column high:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column low:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column adj_close:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available

Zero Values Check for Column volume:
  - Has errors: True
  - Number of errors: 0
  - Error details: No constraint message available



In [196]:
from pyspark.sql.functions import col

# Drop rows with zero values in specific columns
columns_with_zeros = ['open', 'high', 'low', 'close', 'adj_close', 'volume']
for col_name in columns_with_zeros:
    data_2018 = data_2018.filter(col(col_name) != 0)

# Display the count of zero values after cleaning
for col_name in columns_with_zeros:
    zero_count_after_cleaning = data_2018.where(col(col_name) == 0).count()
    print(f"After cleaning, Zero Values Check for Column {col_name}:")
    print(f"  - Has errors: {zero_count_after_cleaning > 0}")
    print(f"  - Number of errors: {zero_count_after_cleaning}")
    print("  - Error details: No constraint message available\n")


After cleaning, Zero Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

After cleaning, Zero Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [176]:
# Test 3 - Negative values using Deequ Verification Suite
table_negative_values = []

for col_name in data_2018.columns:
    # Convert column values to string
    data_2018_str = data_2018.withColumn(col_name, col(col_name).cast("string"))

    # Define a Check for each column
    check = Check(spark, level=CheckLevel.Error, description=f"Negative check for column {col_name}") \
        .isNonNegative(col_name)

    # Add the check to the Verification Suite
    result = VerificationSuite(spark).onData(data_2018_str).addCheck(check).run()

    # Display the results of the Negative check for each column
    print(f"Negative Values Check for Column {col_name}:")
    print(f"  - Has errors: {result.status == 'Error'}")

    # Check if there are errors before trying to access the number of errors
    if 'numRecordsFailed' in result.checkResults[0]:
        print(f"  - Number of errors: {result.checkResults[0]['numRecordsFailed']}")
    else:
        print("  - Number of errors: 0")

    # Check if 'constraintMessage' is present before accessing it
    if 'constraintMessage' in result.checkResults[0]:
        print(f"  - Error details: {result.checkResults[0]['constraintMessage']}\n")
    else:
        print("  - Error details: No constraint message available\n")

Negative Values Check for Column date:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column open:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column high:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column low:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column adj_close:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available

Negative Values Check for Column volume:
  - Has errors: False
  - Number of errors: 0
  - Error details: No constraint message available



In [177]:
#Test 4
# Create a Spark session
spark = SparkSession.builder.appName("MaxValuesTest").getOrCreate()

# Load data
data_path = "/content/drive/MyDrive/parquet/unzipped_stocks_2018/content/stocks_2018.parquet"
data_2018 = spark.read.parquet(data_path)

# Use Column Profiler Runner to generate summary statistics
column_profiles = ColumnProfilerRunner(spark) \
    .onData(data_2018) \
    .run()

# Extract maximum values for all numeric columns
max_values = {}

for col_name, profile in column_profiles.profiles.items():
    if profile.dataType == "Integral" or profile.dataType == "Fractional":
        max_values[col_name] = profile.maximum

# Display the maximum values
print("Maximum Values for Numeric Columns:")
for col_name, max_value in max_values.items():
    print(f"{col_name}: {max_value}")

Maximum Values for Numeric Columns:
open: 117187.5
low: 109375.0
close: 109375.0
volume: 358775700.0
adj_close: 109375.0
high: 125000.0


In [178]:
#Test 5
# Assuming the column containing stock tickers is named 'Symbol' in metadata_df_pbd
stock_ticker_col = 'Symbol'

# Extract distinct stock tickers from the metadata
distinct_stock_tickers_metadata = metadata_df_pbd.select(col(stock_ticker_col)).distinct()

# Convert the distinct stock tickers to a list
distinct_tickers_list_metadata = [row[stock_ticker_col] for row in distinct_stock_tickers_metadata.collect()]

# Define a check to verify if stock tickers appear in the metadata
ticker_check = Check(spark, level=CheckLevel.Warning, description="Stock Ticker check") \
    .isContainedIn(stock_ticker_col, distinct_tickers_list_metadata)

# Run the check using the Verification Suite
verification_result = VerificationSuite(spark).onData(metadata_df_pbd).addCheck(ticker_check).run()

# Display the results of the stock ticker check
print("Stock Ticker Check Results:")
print(f"  - Has warnings: {verification_result.status == 'Warning'}")

# Check if there are warnings before trying to access the number of warnings
if 'numRecordsFailed' in verification_result.checkResults[0]:
    print(f"  - Number of warnings: {verification_result.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of warnings: 0")

# Check if 'constraintMessage' is present before accessing it
if 'constraintMessage' in verification_result.checkResults[0]:
    print(f"  - Warning details: {verification_result.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Warning details: No constraint message available\n")

Stock Ticker Check Results:



In [200]:
#Test 6
# Specify the column to check
col_name = 'date'

# Check for uniqueness in data_2007 based on the 'date' column
unique_check_data_2018_column = Check(spark, level=CheckLevel.Warning, description=f"Uniqueness check for {col_name}") \
    .isUnique(col_name)

# Check for general duplicates in data_2007 for the 'date' column
duplicate_check_data_2018_column = Check(spark, level=CheckLevel.Warning,
                                          description=f"Duplicate check for {col_name}") \
    .hasSize(lambda x: x == data_2018.select(col_name).count(),
             f"Duplicate check for {col_name}")

# Run checks for the 'date' column using the Verification Suite
verification_result_data_2018_column = VerificationSuite(spark) \
    .onData(data_2018) \
    .addCheck(unique_check_data_2018_column) \
    .addCheck(duplicate_check_data_2018_column) \
    .run()

# Display results for the 'date' column
print(f"\nVerification Results for column {col_name} in data_2018:")
print(f"  - Has errors: {verification_result_data_2018_column.status == 'Error'}")
print(f"  - Number of uniqueness errors: {verification_result_data_2018_column.checkResults[0]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_2018_column.checkResults[0] else "0")
print(f"  - Number of duplicate errors: {verification_result_data_2018_column.checkResults[1]['numRecordsFailed']}"
      if 'numRecordsFailed' in verification_result_data_2018_column.checkResults[1] else "0")
print(f"  - Uniqueness error details: {verification_result_data_2018_column.checkResults[0]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_2018_column.checkResults[0] else "No constraint message available")
print(f"  - Duplicate error details: {verification_result_data_2018_column.checkResults[1]['constraintMessage']}"
      if 'constraintMessage' in verification_result_data_2018_column.checkResults[1] else "No constraint message available")

# Print the content of the column with errors
if verification_result_data_2018_column.status == 'Error':
    print(f"  - Content of the column with errors for {col_name}:")
    for check_result in verification_result_data_2018_column.checkResults:
        print(check_result)



Verification Results for column date in data_2018:
  - Has errors: False
0
0
No constraint message available
No constraint message available


In [180]:
#Test 6
# 'Symbol' is the column representing stock tickers in metadata_df_pbd
stock_ticker_col_metadata = 'Symbol'

# Check for uniqueness in metadata_df_pbd based on 'Symbol'
unique_check_metadata = Check(spark, level=CheckLevel.Error, description="Uniqueness check for stock tickers") \
    .isUnique(stock_ticker_col_metadata)

# Check for general duplicates in metadata_df_pbd
duplicate_check_metadata = Check(spark, level=CheckLevel.Error, description="Duplicate check for entire metadata") \
    .hasSize(lambda x: x == metadata_df_pbd.count(), "Duplicate check for entire metadata")

# Run both checks using the Verification Suite
verification_result_metadata = VerificationSuite(spark) \
    .onData(metadata_df_pbd) \
    .addCheck(unique_check_metadata) \
    .addCheck(duplicate_check_metadata) \
    .run()

# Display the results of the checks for metadata_df_pbd
print("Verification Results for metadata_df_pbd:")
print(f"  - Has errors: {verification_result_metadata.status == 'Error'}")

# Check if there are errors in the uniqueness check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[0]:
    print(f"  - Number of uniqueness errors: {verification_result_metadata.checkResults[0]['numRecordsFailed']}")
else:
    print("  - Number of uniqueness errors: 0")

# Check if there are errors in the duplicate check before trying to access the number of errors
if 'numRecordsFailed' in verification_result_metadata.checkResults[1]:
    print(f"  - Number of duplicate errors: {verification_result_metadata.checkResults[1]['numRecordsFailed']}")
else:
    print("  - Number of duplicate errors: 0")

# Check if 'constraintMessage' is present before accessing it for uniqueness check
if 'constraintMessage' in verification_result_metadata.checkResults[0]:
    print(f"  - Uniqueness error details: {verification_result_metadata.checkResults[0]['constraintMessage']}\n")
else:
    print("  - Uniqueness error details: No constraint message available\n")

# Check if 'constraintMessage' is present before accessing it for duplicate check
if 'constraintMessage' in verification_result_metadata.checkResults[1]:
    print(f"  - Duplicate error details: {verification_result_metadata.checkResults[1]['constraintMessage']}\n")
else:
    print("  - Duplicate error details: No constraint message available\n")

Verification Results for metadata_df_pbd:
  - Has errors: False
  - Number of uniqueness errors: 0
  - Number of duplicate errors: 0
  - Uniqueness error details: No constraint message available

  - Duplicate error details: No constraint message available

