In [1]:
import findspark

In [2]:
findspark.init('/home/ec2-user/spark-2.4.7-bin-hadoop2.7')

In [3]:
from pyspark.sql import SparkSession, Row, DataFrame
import pydeequ

In [24]:
##wrapper function for pydeequ

import pyspark.sql.functions as F
from pydeequ.pandas_utils import ensure_pyspark_df
import mysql.connector
from mysql.connector import Error
#from pydeequ.verification import VerificationResult


class MetricsRepository(pydeequ.repository.MetricsRepository):
    
    @classmethod
    def helper_metrics_file(cls, spark_session: SparkSession, filename: str = "metrics.json", path: str = None):
        """
        Helper method to create the metrics file for storage
        """
        if path is None:
            path = spark_session._jvm.com.google.common.io.Files.createTempDir()          
        f = spark_session._jvm.java.io.File(path, filename)
        f_path = f.getAbsolutePath()
        return f_path
    
    @classmethod
    def create_db_connection(cls):
        conn = None
        try:
            conn = mysql.connector.connect(host='host.us-east-1.rds.amazonaws.com',
                                           port='3306',
                                           database='pydeequ_metrics',
                                           user='<dbuser>',
                                           password='<dbpassword>')
            if conn.is_connected():
                return conn
        except Error as e:
            print(e)
            return None
        
    @classmethod
    def close_db_connection(cls, con):
        if con is not None and con.is_connected():
            con.close()
            

class FileSystemMetricsRepository(pydeequ.repository.FileSystemMetricsRepository,MetricsRepository):
    
    def __init__(self, spark_session: SparkSession, path: str = None):

        self._spark_session = spark_session
        self._jvm = spark_session._jvm
        self._jspark_session = spark_session._jsparkSession
        if not path: path = self.helper_metrics_file(self._spark_session)
        self.path = path
        self.deequFSmetRep = spark_session._jvm.com.amazon.deequ.repository.fs.FileSystemMetricsRepository
        self.repository = self.deequFSmetRep(self._jspark_session, path)
        super().__init__(spark_session, path)

class DatabaseMetricsRepository(MetricsRepository):
    
    def __init__(self, con):
        self.con = con
        #TODO create connection and store it it self_con
        cursor = self.con.cursor()
        self.metric_table =cursor.execute("CREATE TABLE IF NOT EXISTS " \
                                          "`metrics`(`entity` varchar(255), `instance` varchar(255), " \
                                          "`name` varchar(255), `value` float, `tag` varchar(255), " \
                                          "`timestamp` varchar(255) ) ")

        

class AnalysisRunner:
    """
    Runs a set of analyzers on the data at hand and optimizes the resulting computations to minimize
    the number of scans over the data. Additionally, the internal states of the computation can be
    stored and aggregated with existing states to enable incremental computations.
    :param spark_session SparkSession: SparkSession
    """

    def __init__(self, spark_session: SparkSession):
        self._spark_session = spark_session

    def onData(self, df):
        """
        Starting point to construct an AnalysisRun.
        :param dataFrame df: tabular data on which the checks should be verified
        :return: new AnalysisRunBuilder object
        """
        df = ensure_pyspark_df(self._spark_session, df)
        return AnalysisRunBuilder(self._spark_session, df)
    
class AnalyzerContext(pydeequ.analyzers.AnalyzerContext):
    
    @classmethod
    def addUpdateRepository(self,spark_session: SparkSession, tag_value: str ='my new column',df_metrics: DataFrame= None):
               
        df_metrics = df_metrics.withColumn('tag',F.lit(tag_value)) \
                               .withColumn('timestamp',F.current_timestamp().cast(StringType()))
        df_metrics.write.format('jdbc').options(
        url='jdbc:mysql://host.us-east-1.rds.amazonaws.com:3306/pydeequ_metrics',
        driver='com.mysql.jdbc.Driver',
        dbtable='metrics',
        user='<dbuser>',
        password='<dbpassword>').mode('append').save()
        return self

class AnalysisRunBuilder(pydeequ.analyzers.AnalysisRunBuilder):
    
    def __init__(self, spark_session: SparkSession, df: DataFrame ):
        self._spark_session = spark_session
        self._jvm = spark_session._jvm
        self._jspark_session = spark_session._jsparkSession
        self._df = df
        self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
        super().__init__(spark_session, df)
        
    
class VerificationResult(pydeequ.verification.VerificationResult):
    
    @classmethod
    def addUpdateRepository(self,spark_session: SparkSession, tag_value: str ='my new column',df_metrics: DataFrame= None):
               
        df_metrics = df_metrics.withColumn('tag',F.lit(tag_value)) \
                               .withColumn('timestamp',F.current_timestamp().cast(StringType()))
        df_metrics.write.format('jdbc').options(
        url='jdbc:mysql://host.us-east-1.rds.amazonaws.com:3306/pydeequ_metrics',
        driver='com.mysql.jdbc.Driver',
        dbtable='metrics',
        user='<dbuser>',
        password='<dbpassword>').mode('append').save()
        return df_metrics
    

In [5]:
##create spark session
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

In [6]:
#load data
df = spark.read.csv('/home/ec2-user/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/1000 Records.csv',header=True)

In [7]:
df.printSchema()
df.show(1)

root
 |-- Emp ID: string (nullable = true)
 |-- Name Prefix: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Middle Initial: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- E Mail: string (nullable = true)
 |-- Father's Name: string (nullable = true)
 |-- Mother's Name: string (nullable = true)
 |-- Mother's Maiden Name: string (nullable = true)
 |-- Date of Birth: string (nullable = true)
 |-- Time of Birth: string (nullable = true)
 |-- Age in Yrs.: string (nullable = true)
 |-- Weight in Kgs.: string (nullable = true)
 |-- Date of Joining: string (nullable = true)
 |-- Quarter of Joining: string (nullable = true)
 |-- Half of Joining: string (nullable = true)
 |-- Year of Joining: string (nullable = true)
 |-- Month of Joining: string (nullable = true)
 |-- Month Name of Joining: string (nullable = true)
 |-- Short Month: string (nullable = true)
 |-- Day of Joining: string (nullable = true)
 |-- DO

In [8]:
from pyspark.sql.functions import col
from pyspark.sql.types import (StructField, IntegerType,DoubleType,
                               StringType,StructType)

df =df.withColumn("Salary",col("Salary").cast(DoubleType()))
df =df.withColumn("Date of Joining",col("Date of Joining").cast(IntegerType()))
df =df.withColumn('Age in Company (Years)',col('Age in Company (Years)').cast(IntegerType()))
df=df.withColumnRenamed('Weight in Kgs.','Weight in Kgs') \
     .withColumnRenamed('Age in Yrs.','Age in Yrs') \
     .withColumnRenamed('Phone No. ','Phone No')
df=df.withColumn('Weight in Kgs',col('Weight in Kgs').cast(DoubleType())) \
     .withColumn('Age in Yrs',col('Age in Yrs').cast(DoubleType())) \
     .withColumn('Month of Joining',col('Month of Joining').cast(IntegerType())) \
     .withColumn('Phone No',col('Phone No').cast(IntegerType()))


In [9]:
df.printSchema()

root
 |-- Emp ID: string (nullable = true)
 |-- Name Prefix: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Middle Initial: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- E Mail: string (nullable = true)
 |-- Father's Name: string (nullable = true)
 |-- Mother's Name: string (nullable = true)
 |-- Mother's Maiden Name: string (nullable = true)
 |-- Date of Birth: string (nullable = true)
 |-- Time of Birth: string (nullable = true)
 |-- Age in Yrs: double (nullable = true)
 |-- Weight in Kgs: double (nullable = true)
 |-- Date of Joining: integer (nullable = true)
 |-- Quarter of Joining: string (nullable = true)
 |-- Half of Joining: string (nullable = true)
 |-- Year of Joining: string (nullable = true)
 |-- Month of Joining: integer (nullable = true)
 |-- Month Name of Joining: string (nullable = true)
 |-- Short Month: string (nullable = true)
 |-- Day of Joining: string (nullable = true)
 |-- DO

In [10]:
#store data in database
con = DatabaseMetricsRepository.create_db_connection()
db_repo = DatabaseMetricsRepository(con)

In [11]:
#store data in file
from pydeequ.analyzers import ResultKey

metrics_file = FileSystemMetricsRepository.helper_metrics_file(spark, 'verify_newfile.json','/home/ec2-user/')
repository = FileSystemMetricsRepository(spark, metrics_file)
key_tags = {'tag': 'analyzer'}

resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)

In [12]:
from pydeequ.analyzers import (Completeness, Compliance, ApproxCountDistinct,
                               Size,Mean, Correlation, MutualInformation, PatternMatch)
######Profiling#############
analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("Emp ID")) \
                    .addAnalyzer(ApproxCountDistinct('Emp ID')) \
                    .addAnalyzer(Mean('Salary')) \
                    .addAnalyzer(Completeness('Month of Joining')) \
                    .addAnalyzer(Compliance("Salary greater than 10000","Salary >=55000")) \
                    .addAnalyzer(Correlation('Age in Company (Years)','Age in Yrs')) \
                    .addAnalyzer(Correlation('Age in Yrs','Salary')) \
                    .addAnalyzer(Correlation('Age in Yrs','Weight in Kgs')) \
                    .addAnalyzer(MutualInformation(['Age in Yrs','Weight in Kgs'])) \
                    .addAnalyzer(MutualInformation(['Age in Yrs','Age in Company (Years)'])) \
                    .useRepository(repository) \
                    .saveOrAppendResult(resultKey) \
                    .run()

In [13]:
repository.path

'/home/ec2-user/verify_newfile.json'

In [14]:
##refactor analysis result as dataframe
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)

In [15]:
analysisResult_df.show(truncate=False)

+-----------+---------------------------------+-------------------+--------------------+
|entity     |instance                         |name               |value               |
+-----------+---------------------------------+-------------------+--------------------+
|Column     |Emp ID                           |Completeness       |1.0                 |
|Column     |Emp ID                           |ApproxCountDistinct|990.0               |
|Column     |Month of Joining                 |Completeness       |1.0                 |
|Mutlicolumn|Age in Yrs,Age in Company (Years)|MutualInformation  |3.129138342929379   |
|Mutlicolumn|Age in Yrs,Weight in Kgs         |MutualInformation  |3.6304269674121072  |
|Mutlicolumn|Age in Yrs,Salary                |Correlation        |0.041421648278586576|
|Dataset    |*                                |Size               |1000.0              |
|Column     |Salary                           |Mean               |120288.528          |
|Column     |Salary g

In [16]:
#add analysisresult dataframe to database
AnalyzerContext.addUpdateRepository(spark,'analyzer',analysisResult_df)

__main__.AnalyzerContext

In [17]:
#########verification and check



In [18]:
#get data from database table; to be passed on to verificationsuite
def getMonths_df():
    months_df =spark.read.format('jdbc').options(
        url='jdbc:mysql://host.us-east-1.rds.amazonaws.com:3306/pydeequ_metrics',
        driver='com.mysql.jdbc.Driver',
        dbtable='months',
        user='<dbuser>',
        password='<dbpassword>').load()
    print(months_df)
    return months_df
    
mylist=getMonths_df().select('months_val').rdd.map(lambda row : row[0]).collect()
mylist1=list(getMonths_df().select('months_val').toPandas()['months_val'])
print(mylist1)


DataFrame[months_val: string, months_val_short: string]
DataFrame[months_val: string, months_val_short: string]
['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']


In [19]:
from pydeequ.verification import VerificationSuite, VerificationResult
from pydeequ.checks import *

key_tags = {'tag': 'verification_Check'}
resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags)


_check = Check(spark, CheckLevel.Error, "Review Check") \
         .hasSize(lambda x: x>= 1000) \
         .isComplete("First Name") \
         .isComplete("Month of Joining") \
         .isContainedIn("Month of Joining",mylist) \
         .isComplete("Phone No") \
         .containsEmail("E Mail") \
         .containsSocialSecurityNumber("SSN") \
         .isUnique("Emp ID")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(_check) \
    .useRepository(repository) \
    .saveOrAppendResult(resultKey) \
    .run()

Python Callback server started!


In [20]:
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult, pandas=True)
checkResult_df

Unnamed: 0,check,check_level,check_status,constraint,constraint_status,constraint_message
0,Review Check,Error,Error,SizeConstraint(Size(None)),Success,
1,Review Check,Error,Error,CompletenessConstraint(Completeness(First Name...,Success,
2,Review Check,Error,Error,CompletenessConstraint(Completeness(Month of J...,Success,
3,Review Check,Error,Error,ComplianceConstraint(Compliance(Month of Joini...,Failure,Value: 0.0 does not meet the constraint requir...
4,Review Check,Error,Error,"CompletenessConstraint(Completeness(Phone No,N...",Failure,Value: 0.0 does not meet the constraint requir...
5,Review Check,Error,Error,containsEmail(E Mail),Success,
6,Review Check,Error,Error,containsSocialSecurityNumber(SSN),Success,
7,Review Check,Error,Error,"UniquenessConstraint(Uniqueness(List(Emp ID),N...",Success,


In [21]:
checkResult_df1 = VerificationResult.successMetricsAsDataFrame(spark, checkResult)


In [22]:
checkResult_df1.show()

+-------+--------------------+------------+------+
| entity|            instance|        name| value|
+-------+--------------------+------------+------+
| Column|    Month of Joining|Completeness|   1.0|
| Column|                 SSN|PatternMatch|   1.0|
| Column|              E Mail|PatternMatch|   1.0|
| Column|Month of Joining ...|  Compliance|   0.0|
|Dataset|                   *|        Size|1000.0|
| Column|              Emp ID|  Uniqueness|   1.0|
| Column|            Phone No|Completeness|   0.0|
| Column|          First Name|Completeness|   1.0|
+-------+--------------------+------------+------+



In [25]:
##insert verification analysis in database
VerificationResult.addUpdateRepository(spark,'verification_Check',checkResult_df1)

DataFrame[entity: string, instance: string, name: string, value: double, tag: string, timestamp: string]

In [26]:
#load the metrics file

result_metrep_df = repository.load() \
    .before(ResultKey.current_milli_time()) \
    .getSuccessMetricsAsDataFrame()
result_metrep_df.show()

+-----------+--------------------+-------------------+--------------------+-------------+------------------+
|     entity|            instance|               name|               value| dataset_date|               tag|
+-----------+--------------------+-------------------+--------------------+-------------+------------------+
|     Column|              Emp ID|       Completeness|                 1.0|1617948736024|          analyzer|
|     Column|              Emp ID|ApproxCountDistinct|               990.0|1617948736024|          analyzer|
|     Column|    Month of Joining|       Completeness|                 1.0|1617948736024|          analyzer|
|Mutlicolumn|Age in Yrs,Age in...|  MutualInformation|   3.129138342929379|1617948736024|          analyzer|
|Mutlicolumn|Age in Yrs,Weight...|  MutualInformation|  3.6304269674121072|1617948736024|          analyzer|
|Mutlicolumn|   Age in Yrs,Salary|        Correlation|0.041421648278586576|1617948736024|          analyzer|
|    Dataset|      

In [27]:
####constraint suggestion
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

{
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(Month of Joining,None))",
      "column_name": "Month of Joining",
      "current_value": "Completeness: 1.0",
      "description": "'Month of Joining' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"Month of Joining\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('Month of Joining' has no negative values,Month of Joining >= 0,None))",
      "column_name": "Month of Joining",
      "current_value": "Minimum: 1.0",
      "description": "'Month of Joining' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"Month of 

In [28]:
##show suggestion matrix as dataframe
for key, val in suggestionResult.items():
    print(key)

k =suggestionResult.get('constraint_suggestions')
#print(k)


constraint_suggestions


In [29]:
import pandas as pd

df_suggestion = pd.DataFrame.from_dict(k)
df_suggestion.head(5)

Unnamed: 0,constraint_name,column_name,current_value,description,suggesting_rule,rule_description,code_for_constraint
0,CompletenessConstraint(Completeness(Month of J...,Month of Joining,Completeness: 1.0,'Month of Joining' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""Month of Joining"")"
1,ComplianceConstraint(Compliance('Month of Join...,Month of Joining,Minimum: 1.0,'Month of Joining' has no negative values,NonNegativeNumbersRule(),If we see only non-negative numbers in a colum...,".isNonNegative(""Month of Joining"")"
2,CompletenessConstraint(Completeness(First Name...,First Name,Completeness: 1.0,'First Name' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""First Name"")"
3,ComplianceConstraint(Compliance('Short Month' ...,Short Month,Compliance: 1,"'Short Month' has value range 'Apr', 'Oct', 'M...",CategoricalRangeRule(),"If we see a categorical range for a column, we...",".isContainedIn(""Short Month"", [""Apr"", ""Oct"", ""..."
4,CompletenessConstraint(Completeness(Short Mont...,Short Month,Completeness: 1.0,'Short Month' is not null,CompleteIfCompleteRule(),"If a column is complete in the sample, we sugg...",".isComplete(""Short Month"")"
