In [1]:
from pyspark.sql import SparkSession
import os

os.environ["SPARK_VERSION"] = '3.3'

In [2]:
!pip install pydeequ==1.5.0



In [3]:
import pydeequ
from pydeequ.analyzers import *

In [4]:
spark = SparkSession.builder \
    .appName("acura_dq") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.7.5.jar") \
    .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres-datastore:5432/acura_db"
properties = {
    "user": "acura_user",
    "password": "password",
    "driver": "org.postgresql.Driver"
}
query = "(SELECT * FROM public_integration.dev_itg_dataeng_cc_review_master) AS reviews"
df = spark.read.jdbc(url=jdbc_url, table=query, properties=properties)

In [5]:
df.columns

['card_name',
 'platform_name',
 'review_timestamp',
 'review_id',
 'review_customer_id',
 'review_customer_name',
 'review_rating_maximum',
 'review_rating_given',
 'review_raw_text']

In [6]:
from pydeequ.profiles import *

In [7]:
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(Distinctness("review_id")) \
                    .addAnalyzer(Mean("review_rating_given")) \
                    .addAnalyzer(Compliance("top review_rating_given", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("platform_name", "review_rating_given")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult).toPandas()
analysisResult_df.head()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Rating Metadata")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("review_rating_given", lambda x: x == 1.0) \
        .hasMax("review_rating_given", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("card_name")  \
        .hasMin("review_year", lambda x: x == '1996') \
        .hasMax("review_year", lambda x: x == '2017')) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark,checkResult, pandas=True)

In [None]:
checkResult_df.head()