In [1]:
import os
os.environ['SPARK_VERSION'] = '3.5.0' 

In [2]:
pip install pydeequ

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install pyspark --user

Note: you may need to restart the kernel to use updated packages.


In [4]:
import pydeequ

In [15]:
import pandas as pd

In [6]:
from pyspark.sql import SparkSession, Row

In [60]:
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.suggestions import *
from pydeequ.verification import *

In [71]:
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes

In [11]:
from py4j.java_gateway import java_import
java_import(spark._sc._jvm, "org.apache.spark.sql.api.python.*")

In [8]:
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

In [9]:
# File location and type
file_location = "CountryHits240901.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [10]:
display(df)

DataFrame[Rank: string, Track: string, Artist1: string, Artist2: string, Album: string, Release: string, Popularity: string, Duration: string, Danceability: string, Energy: string, Key: string, Loudness: string, Mode: string, Speechiness: string, Acousticness: string, Instrumentalness: string, Liveness: string, Valence: string, Tempo: string]

In [12]:
spark = (SparkSession
.builder
.config("spark.jars.packages", 'deequ-2.0.8-spark-3.5.jar')
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)
.getOrCreate())

In [37]:
### Data Analyzers section
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("Track")) \
                    .addAnalyzer(CountDistinct("Rank")) \
                    .addAnalyzer(MaxLength("Album")) \
                    .addAnalyzer(Completeness("Energy")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+-------------+-----+
| entity|instance|         name|value|
+-------+--------+-------------+-----+
| Column|  Energy| Completeness|  1.0|
|Dataset|       *|         Size|100.0|
| Column|   Album|    MaxLength| 57.0|
| Column|    Rank|CountDistinct|100.0|
| Column|   Track| Completeness|  1.0|
+-------+--------+-------------+-----+



In [38]:
analysisResult_pandas_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult, pandas=True)
analysisResult_pandas_df

Unnamed: 0,entity,instance,name,value
0,Column,Energy,Completeness,1.0
1,Dataset,*,Size,100.0
2,Column,Album,MaxLength,57.0
3,Column,Rank,CountDistinct,100.0
4,Column,Track,Completeness,1.0


In [81]:
df_results.to_csv('analyzer_report.csv', index=False)

In [40]:
### Data profiling section
profiling = ColumnProfilerRunner(spark) \
            .onData(df) \
            .run()

In [47]:
data_for_df = []

In [48]:
for col, profile in result.profiles.items():
    column = col
    completeness = profile.completeness
    dist_values = profile.approximateNumDistinctValues
    data_type = profile.dataType
    data_for_df.append({
        "Column": column,
        "Completeness": completeness,
        "Approximate number of distinct values": dist_values,
        "Data type": data_type
    })

In [49]:
df_results = pd.DataFrame(data_for_df) #create the dataframe with the previous values
df_results #display the dataframe

Unnamed: 0,Column,Completeness,Approximate number of distinct values,Data type
0,Speechiness,1.0,91,Fractional
1,Duration,1.0,100,Integral
2,Acousticness,1.0,97,Fractional
3,Rank,1.0,103,Integral
4,Valence,1.0,94,Fractional
5,Release,1.0,63,String
6,Artist1,1.0,65,String
7,Popularity,1.0,44,Integral
8,Energy,1.0,86,Fractional
9,Album,1.0,95,String


In [82]:
df_results.to_csv('profiling_report.csv', index=False)

In [50]:
### Constraint Suggestions section
suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

In [51]:
data_for_df = []

In [53]:
for sugg in suggestionResult['constraint_suggestions']:
    column = sugg['column_name']
    description = sugg['description']
    py_code = sugg['code_for_constraint']
    data_for_df.append({
        "Column": column,
        "Description": description,
        "Python code": py_code
    })
    

In [54]:
df_results = pd.DataFrame(data_for_df) #create the dataframe with the previous values
df_results #display the dataframe

Unnamed: 0,Column,Description,Python code
0,Speechiness,'Speechiness' is not null,".isComplete(""Speechiness"")"
1,Speechiness,'Speechiness' has no negative values,".isNonNegative(""Speechiness"")"
2,Speechiness,'Speechiness' has type Fractional,".hasDataType(""Speechiness"", ConstrainableDataT..."
3,Duration,'Duration' is not null,".isComplete(""Duration"")"
4,Duration,'Duration' has no negative values,".isNonNegative(""Duration"")"
5,Duration,'Duration' has type Integral,".hasDataType(""Duration"", ConstrainableDataType..."
6,Duration,'Duration' is unique,".isUnique(""Duration"")"
7,Acousticness,'Acousticness' is not null,".isComplete(""Acousticness"")"
8,Acousticness,'Acousticness' has no negative values,".isNonNegative(""Acousticness"")"
9,Acousticness,'Acousticness' has type Fractional,".hasDataType(""Acousticness"", ConstrainableData..."


In [83]:
df_results.to_csv('suggestions_constraints_report.csv', index=False)

In [72]:
check = Check(spark, CheckLevel.Warning, "Spotify DB Verification")

In [76]:
check = Check(spark, CheckLevel.Warning, "DB Verification")
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.isNonNegative("Speechiness") \
        .isUnique("Duration")  \
        .isComplete("Acousticness")  \
        .isNonNegative("Rank")  \
        .isContainedIn("Mode", ["1", "0"])  \
        .hasDataType("Key", ConstrainableDataTypes.Integral) \
        .hasDataType("Loudness", ConstrainableDataTypes.Fractional)) \
    .run()

In [77]:
print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

Verification Run Status: Success
+---------------+-----------+------------+--------------------+-----------------+------------------+
|          check|check_level|check_status|          constraint|constraint_status|constraint_message|
+---------------+-----------+------------+--------------------+-----------------+------------------+
+---------------+-----------+------------+--------------------+-----------------+------------------+



In [79]:
checkResult_pandas_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_pandas_df

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,DB Verification,Warning,Success,ComplianceConstraint(Compliance(Speechiness is...,Success,
1,DB Verification,Warning,Success,UniquenessConstraint(Uniqueness(List(Duration)...,Success,
2,DB Verification,Warning,Success,CompletenessConstraint(Completeness(Acousticne...,Success,
3,DB Verification,Warning,Success,ComplianceConstraint(Compliance(Rank is non-ne...,Success,
4,DB Verification,Warning,Success,ComplianceConstraint(Compliance(Mode contained...,Success,
5,DB Verification,Warning,Success,"AnalysisBasedConstraint(DataType(Key,None),<fu...",Success,
6,DB Verification,Warning,Success,"AnalysisBasedConstraint(DataType(Loudness,None...",Success,


In [84]:
df_results.to_csv('constraint_verification_report.csv', index=False)