In [18]:
from pyspark.sql import SparkSession, Row
import pydeequ
spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

In [9]:
df = spark.read.csv('train.csv',header=True)

In [10]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
from pydeequ.profiles import *

In [12]:
result = ColumnProfilerRunner(spark) \
            .onData(df) \
            .run()

In [13]:
result.columnProfileClasses['StandardColumnProfile']

pydeequ.profiles.StandardColumnProfile

In [14]:
result.columnProfileClasses['NumericColumnProfile']

pydeequ.profiles.NumericColumnProfile

In [15]:
for col, profile in result.profiles.items():
    print(f'Column \'{col}\'')
    print('\t',f'completeness: {profile.completeness}')
    print('\t',f'approximate number of distinct values: {profile.approximateNumDistinctValues}')
    print('\t',f'datatype: {profile.dataType}')
    if profile.dataType == 'Integral' or profile.dataType == 'Fractional':
        print('\t',f"minimum: {profile.minimum}")
        print('\t',f"maximum: {profile.maximum}")
        print('\t',f"mean: {profile.mean}")
        print('\t',f"standard deviation: {profile.stdDev}")

Column 'PassengerId'
	 completeness: 1.0
	 approximate number of distinct values: 888
	 datatype: Integral
	 minimum: 1.0
	 maximum: 891.0
	 mean: 446.0
	 standard deviation: 257.20938292890224
Column 'Name'
	 completeness: 1.0
	 approximate number of distinct values: 936
	 datatype: String
Column 'Ticket'
	 completeness: 1.0
	 approximate number of distinct values: 710
	 datatype: String
Column 'Pclass'
	 completeness: 1.0
	 approximate number of distinct values: 3
	 datatype: Integral
	 minimum: 1.0
	 maximum: 3.0
	 mean: 2.308641975308642
	 standard deviation: 0.8356019334795166
Column 'Parch'
	 completeness: 1.0
	 approximate number of distinct values: 7
	 datatype: Integral
	 minimum: 0.0
	 maximum: 6.0
	 mean: 0.38159371492704824
	 standard deviation: 0.8056047612452213
Column 'Embarked'
	 completeness: 0.9977553310886644
	 approximate number of distinct values: 3
	 datatype: String
Column 'Age'
	 completeness: 0.8013468013468014
	 approximate number of distinct values: 83
	 data

In [16]:
from pydeequ.analyzers import *


analysisResult = AnalysisRunner(spark).onData(df)
analysisResult= analysisResult.addAnalyzer(Completeness('Survived'))
analysisResult= analysisResult.addAnalyzer(CountDistinct('Survived'))
analysisResult= analysisResult.addAnalyzer(Maximum('Survived'))
analysisResult= analysisResult.addAnalyzer(DataType('Survived'))
analysisResult= analysisResult.run()
a_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult, pandas = True)
print(a_df)

    entity  instance                        name  value
0   Column  Survived                Completeness    1.0
1   Column  Survived              Histogram.bins    5.0
2   Column  Survived       Histogram.abs.Boolean    0.0
3   Column  Survived     Histogram.ratio.Boolean    0.0
4   Column  Survived    Histogram.abs.Fractional    0.0
5   Column  Survived  Histogram.ratio.Fractional    0.0
6   Column  Survived      Histogram.abs.Integral  891.0
7   Column  Survived    Histogram.ratio.Integral    1.0
8   Column  Survived       Histogram.abs.Unknown    0.0
9   Column  Survived     Histogram.ratio.Unknown    0.0
10  Column  Survived        Histogram.abs.String    0.0
11  Column  Survived      Histogram.ratio.String    0.0
12  Column  Survived               CountDistinct    2.0


In [17]:
spark.stop()

In [45]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat,lit
import os
from pydeequ.analyzers import Mean,Maximum
import pydeequ

In [36]:
df = spark.read.format('csv').option('header','true').load('Data_Profile.trg')
source_name_file_prefix_list = df.select('source_name_file_prefix').collect()
source_name_file_prefix_list = [i.source_name_file_prefix for i in source_name_file_prefix_list]

In [37]:
landing_zone_files = os.listdir()
feed_files = dict()
for i in source_name_file_prefix_list:
    feed_files[i] = [file_name for file_name in landing_zone_files if file_name.startswith(i)]

In [38]:
df = spark.read.format('csv').option('header','true').load('Data_Profile_Config.csv')

In [39]:
df = df.select("*",concat(col("Source"),lit('_'),col('File Prefix')).alias("source_name_file_prefix"))

In [46]:
for i in feed_files:
    list_of_analyzers = df.where(df.source_name_file_prefix == i).rdd.collect()
    for file in feed_files[i]:
        feed_df = spark.read.format('csv').option('header','true').load(file)
        analysisResult = AnalysisRunner(spark).onData(feed_df)
        for row in list_of_analyzers:
            if row['Column Name'] is not None:
                list_of_columns = list(map(lambda x:x.strip(),row['Column Name'].split(',')))
                for i in list_of_columns:
                    if row['Profile Function'] == "Completeness":
                        analysisResult = analysisResult.addAnalyzer(Completeness(i))
                    elif row['Profile Function'] == "CountDistinct":
                        analysisResult = analysisResult.addAnalyzer(CountDistinct(i))
                    elif row['Profile Function'] == "DataType":
                        analysisResult = analysisResult.addAnalyzer(DataType(i))
                    elif row['Profile Function'] == "Maximum":
                        analysisResult = analysisResult.addAnalyzer(Maximum(i))
                    elif row['Profile Function'] == "Mean":
                        analysisResult = analysisResult.addAnalyzer(Mean(i))
            else:
                list_of_columns = None
                if row['Profile Function'] == "Size":
                    analysisResult = analysisResult.addAnalyzer(Size())
        analysisResult = analysisResult.run()
        analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
        analysisResult_df.show()

+------+--------+-------------+-------------------+
|entity|instance|         name|              value|
+------+--------+-------------+-------------------+
|Column|   Cabin| Completeness|0.21658986175115208|
|Column|     Sex|CountDistinct|                2.0|
|Column|     Age| Completeness| 0.7995391705069125|
|Column|Survived|CountDistinct|                2.0|
+------+--------+-------------+-------------------+

+------+--------+-------------+-------------------+
|entity|instance|         name|              value|
+------+--------+-------------+-------------------+
|Column|   Cabin| Completeness|0.24070021881838075|
|Column|     Sex|CountDistinct|                2.0|
|Column|     Age| Completeness| 0.8030634573304157|
|Column|Survived|CountDistinct|                2.0|
+------+--------+-------------+-------------------+

+-------+--------+-------------+-----+
| entity|instance|         name|value|
+-------+--------+-------------+-----+
| Column| Species|CountDistinct|  1.0|
|Dataset| 