# Data Profiling using Apache Spark

##### _"To ingest data with quality from external sources is really challenging, particularly when you’re not aware of how the data looks like or are ambiguous about its contents. That’s where the data profiling comes in. You can use data profiling to get insights on the data source and figure out how the data looks like."_

## What is Data Profiling?
Data Profiling is the process of running analysis on source data to understand it’s structure and content. You can get following insights by doing data profiling on a new dataset:
* __Structure Discovery:__ No of columns and their names in the source data
* __Content Discovery:__ Data types of the columns, Identify Nullable columns
* __Cardinality of Data:__ Number of Distinct Values in each column
* __Statistical Analysis:__ Min / Max / Mean / Std Dev of numerical columns
* __Value Histograms:__ Frequency of values in low-cardinality columns


<br></br>
This notebook follow the Sajjad Sarwar implementation of deequ <br>
https://medium.com/@sajjadsarwar1/data-profiling-using-spark-deequ-46ad8dbe3ed1

This notebook was created by Bruno Nogueira Renzo <br>
bruno.nog@hotmail.com


## Amazon Deequ 

This notebook uses a a Spark library, named Deequ, an open source tool developed and used at Amazon. Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution.

https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/



In [1]:
spark

org.apache.spark.sql.SparkSession@1f562227

In [29]:
//Creating a magic in Apache-Toree
//https://github.com/apache/incubator-toree/blob/master/etc/examples/notebooks/magic-tutorial.ipynb

//LsMagic - lists all the available magics. - %LsMagic
%LsMagic
//DATAFRAME - Converts a Spark SQL DataFrame into various formats. - %%dataframe
//SparkSQL - Allows for SQL queries to be performed against tables saved in spark. -%%SQL

//AddDeps - Add dependencies from a maven repository. - %AddDeps
%AddDeps com.amazon.deequ deequ 1.0.5

Available line magics:
%lsmagic %showtypes %showoutput %adddeps %truncation %addjar

Available cell magics:
%%sql %%html %%javascript %%dataframe %%scala

Type %<magic_name> for usage info.
         
Marking com.amazon.deequ:deequ:1.0.5 for download
Obtained 2 files


In [3]:
import com.amazon.deequ.profiles.{ColumnProfilerRunner, NumericColumnProfile}
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}

In [4]:
val inputDataPaths = "hdfs:///user/willporto/bruno/data/weather.csv"

inputDataPaths = hdfs:///user/willporto/bruno/data/weather.csv


hdfs:///user/willporto/bruno/data/weather.csv

In [12]:
// Initiallizing config & importing classes
//spark.conf.set("spark.default.parallelism",20)
//spark.conf.set("spark.sql.shuffle.partitions",20)

spark.sparkContext.getConf.getAll

Array((spark.history.kerberos.keytab,none), (spark.eventLog.enabled,true), (spark.dynamicAllocation.initialExecutors,2), (spark.repl.class.outputDir,/tmp/spark-09203711-2172-4ff7-9002-0a8d9a6fb407/repl-a9afd631-bb78-4801-931e-e78c815ae74e), (spark.shuffle.service.enabled,true), (spark.history.ui.port,18081), (spark.dynamicAllocation.maxExecutors,10), (spark.driver.extraLibraryPath,/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64), (spark.yarn.queue,default), (spark.executor.id,driver), (spark.jars,file:/usr/local/share/jupyter/kernels/apache_toree_scala/lib/toree-assembly-0.3.0-incubating.jar), (spark.app.name,Apache Toree), (spark.driver.host,gw03.itversity.com), (spark.executor.extraLibraryPath,/usr/hdp...

## Importing Sample Dataset

We’ll use a sample CSV file for weather data downloaded from a Kaggle <br>
Walmart Recruiting II: Sales in Stormy Weather

In [16]:
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._

val df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "false")
  .option("header", "true")
  .load(inputDataPaths)
  .filter("snowfall not like '%T%'")
  .withColumn("tmax", when(col("tmax").equalTo("M"), null).otherwise(col("tmax")))
  .withColumn("tmin", when(col("tmax").equalTo("M"), null).otherwise(col("tmax")))
  .withColumn("tavg", when(col("tmax").equalTo("M"), null).otherwise(col("tmax")))
  .withColumn("snowfall", when(col("snowfall").equalTo("T"), 0.1).otherwise(col("snowfall")));



println("Total Rows: " + df.count())
println("Distinct Rows: " + df.dropDuplicates.count)

Total Rows: 20206
Distinct Rows: 20206


df = [station_nbr: string, date: string ... 18 more fields]


[station_nbr: string, date: string ... 18 more fields]

In [28]:
%%dataframe
df.limit(5)

station_nbr,date,tmax,tmin,tavg,depart,dewpoint,wetbulb,heat,cool,sunrise,sunset,codesum,snowfall,preciptotal,stnpressure,sealevel,resultspeed,resultdir,avgspeed
1,2012-01-01,52,52,52,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6
2,2012-01-01,48,48,48,16,37,39,24,0,0716,1626,RA,0.0,0.07,28.82,29.91,9.1,23,11.3
3,2012-01-01,55,55,55,9,24,36,20,0,0735,1720,,0.0,0.0,29.77,30.47,9.9,31,10.0
4,2012-01-01,63,63,63,4,28,43,10,0,0728,1742,,0.0,0.0,29.79,30.48,8.0,35,8.2
6,2012-01-01,63,63,63,0,31,43,16,0,0727,1742,,0.0,0.0,29.95,30.47,14.0,36,13.8


## Running the Data Profiler 

In [30]:
// Run column profiler

val result = ColumnProfilerRunner()
  .onData(df)
  .run()

result = ColumnProfiles(Map(tmax -> NumericColumnProfile(tmax,0.9552113233692963,124,Integral,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19301, Unknown -> 905, String -> 0),None,None,Some(72.15102844412206),Some(114.0),Some(-11.0),Some(1392587.0),Some(19.475529292818884),None), dewpoint -> StandardColumnProfile(dewpoint,1.0,98,String,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19543, Unknown -> 0, String -> 663),Some(Distribution(Map(45 -> DistributionValue(253,0.012521033356428783), 34 -> DistributionValue(232,0.011481738097594774), 67 -> DistributionValue(505,0.0249925764624369), -19 -> DistributionValue(2,9.898050084133426E-5), -10 -> DistributionValue(5,2.4745125210333564E-4), 12 -> DistributionValue(103,0.005097495793328714...


ColumnProfiles(Map(tmax -> NumericColumnProfile(tmax,0.9552113233692963,124,Integral,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19301, Unknown -> 905, String -> 0),None,None,Some(72.15102844412206),Some(114.0),Some(-11.0),Some(1392587.0),Some(19.475529292818884),None), dewpoint -> StandardColumnProfile(dewpoint,1.0,98,String,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19543, Unknown -> 0, String -> 663),Some(Distribution(Map(45 -> DistributionValue(253,0.012521033356428783), 34 -> DistributionValue(232,0.011481738097594774), 67 -> DistributionValue(505,0.0249925764624369), -19 -> DistributionValue(2,9.898050084133426E-5), -10 -> DistributionValue(5,2.4745125210333564E-4), 12 -> DistributionValue(103,0.005097495793328714...

## Print Columns Profile

In [34]:
// Print Columns Profile

var profileArray =  Seq[(String, String, Int, String, String)]()

result.profiles.foreach { case (columnName, profile) =>

  val completeness = profile.completeness.toString
  ;
  val approximateNumDistinctValues = profile.approximateNumDistinctValues.toString
  val dataType = profile.dataType.toString
  var stats = ""

    if(profile.dataType.toString.equalsIgnoreCase("Integral") || profile.dataType.toString.equalsIgnoreCase("Fractional")){
      val numericProfile = result.profiles(columnName).asInstanceOf[NumericColumnProfile]
      val minVal = numericProfile.minimum.get.toString
      val maxVal = numericProfile.maximum.get.toString
      val meanVal = numericProfile.mean.get.toString
      val stdVal = numericProfile.stdDev.get.toString
      stats = minVal+" / "+maxVal+" / "+meanVal+" / "+stdVal
    }
  profileArray :+= (s"$columnName",completeness, approximateNumDistinctValues.toInt, dataType, stats)
}


val profileDf = profileArray.toDF("Column Name", "Completeness", "Approx Distinct Values", "Data Type", "Stats (Min / Max / Mean / Std Dev)")


profileArray = List((tmax,0.9552113233692963,124,Integral,-11.0 / 114.0 / 72.15102844412206 / 19.475529292818884), (dewpoint,1.0,98,String,""), (avgspeed,1.0,265,String,""), (resultdir,1.0,38,String,""), (resultspeed,1.0,264,String,""), (stnpressure,1.0,333,String,""), (snowfall,1.0,79,String,""), (codesum,1.0,438,String,""), (sealevel,1.0,157,String,""), (date,1.0,1017,String,""), (cool,1.0,38,String,""), (sunrise,1.0,226,Integral,406.0 / 740.0 / 583.2591089295828 / 90.79831774323632), (depart,1.0,64,String,""), (sunset,1.0,228,Integral,1611.0 / 1949.0 / 1825.806116669813 / 91.0557981088056), (heat,1.0,78,String,""), (wetbulb,1.0,93,String,""), (tavg,0.9552113233692963,124,Integral,-11.0 / 114.0 / 72.15102844412206 / 19.475529292818884), (pre...


List((tmax,0.9552113233692963,124,Integral,-11.0 / 114.0 / 72.15102844412206 / 19.475529292818884), (dewpoint,1.0,98,String,""), (avgspeed,1.0,265,String,""), (resultdir,1.0,38,String,""), (resultspeed,1.0,264,String,""), (stnpressure,1.0,333,String,""), (snowfall,1.0,79,String,""), (codesum,1.0,438,String,""), (sealevel,1.0,157,String,""), (date,1.0,1017,String,""), (cool,1.0,38,String,""), (sunrise,1.0,226,Integral,406.0 / 740.0 / 583.2591089295828 / 90.79831774323632), (depart,1.0,64,String,""), (sunset,1.0,228,Integral,1611.0 / 1949.0 / 1825.806116669813 / 91.0557981088056), (heat,1.0,78,String,""), (wetbulb,1.0,93,String,""), (tavg,0.9552113233692963,124,Integral,-11.0 / 114.0 / 72.15102844412206 / 19.475529292818884), (pre...

In [35]:
%%dataframe
profileDf.orderBy(asc("Approx Distinct Values")).limit(1000)

Column Name,Completeness,Approx Distinct Values,Data Type,Stats (Min / Max / Mean / Std Dev)
station_nbr,1.0,20,Integral,1.0 / 20.0 / 10.543551420370187 / 5.747125833616759
resultdir,1.0,38,String,
cool,1.0,38,String,
depart,1.0,64,String,
heat,1.0,78,String,
snowfall,1.0,79,String,
wetbulb,1.0,93,String,
dewpoint,1.0,98,String,
tmax,0.9552113233692964,124,Integral,-11.0 / 114.0 / 72.15102844412206 / 19.475529292818884
tavg,0.9552113233692964,124,Integral,-11.0 / 114.0 / 72.15102844412206 / 19.475529292818884


## Automated Constraint Suggestion
If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method

In [36]:
val suggestionResult = ConstraintSuggestionRunner()
  .onData(df)
  .addConstraintRules(Rules.DEFAULT)
  .run()

suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
  suggestions.foreach { suggestion =>
    println(s"# ${suggestion.description}" 
      //s"The corresponding scala code is ${suggestion.codeForConstraint}\n"
      )
  }
}


# 'tmax' has less than 5% missing values
# 'tmax' has type Integral
# 'dewpoint' is not null
# 'dewpoint' has value range 'M', '68', '69', '65', '66', '67', '63', '64', '70', '71', '72', '59', '62', '61', '60', '55', '56', '58', '54', '73', '52', '57', '51', '50', '46', '53', '48', '49', '33', '47', '29', '43', '28', '44', '40', '45', '27', '37', '39', '42', '31', '26', '38', '36', '41', '34', '25', '30', '22', '35', '32', '24', '74', '23', '21', '20', '17', '18', '19', '16', '75', '15', '12', '11', '14', '13', '10', '9', '7', '0', '8', '4', '6', '3', '2', '5', '1', '-3', '-1', '76', '-5', '-2', '-7', '-6', '-4', '-8', '-9', '-11', '-12', '-13', '-10', '-14', '-15', '77', '-19', '-17', '-24', '-16'
# 'dewpoint' has value range 'M', '68', '69', '65', '66', '67', '63', '64', '70', '71', '72', '59', '62', '61', '60', '55', '56', '58', '54', '73', '52', '57', '51', '50', '46', '53', '48', '49', '33', '47', '29', '43', '28', '44', '40', '45', '27', '37', '39', '42', '31', '26', '38', '36', 

suggestionResult = ConstraintSuggestionResult(Map(tmax -> NumericColumnProfile(tmax,0.9552113233692963,124,Integral,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19301, Unknown -> 905, String -> 0),None,None,Some(72.15102844412206),Some(114.0),Some(-11.0),Some(1392587.0),Some(19.475529292818884),None), dewpoint -> StandardColumnProfile(dewpoint,1.0,98,String,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19543, Unknown -> 0, String -> 663),Some(Distribution(Map(45 -> DistributionValue(253,0.012521033356428783), 34 -> DistributionValue(232,0.011481738097594774), 67 -> DistributionValue(505,0.0249925764624369), -19 -> DistributionValue(2,9.898050084133426E-5), -10 -> DistributionValue(5,2.4745125210333564E-4), 12 -> Distr...


ConstraintSuggestionResult(Map(tmax -> NumericColumnProfile(tmax,0.9552113233692963,124,Integral,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19301, Unknown -> 905, String -> 0),None,None,Some(72.15102844412206),Some(114.0),Some(-11.0),Some(1392587.0),Some(19.475529292818884),None), dewpoint -> StandardColumnProfile(dewpoint,1.0,98,String,true,Map(Boolean -> 0, Fractional -> 0, Integral -> 19543, Unknown -> 0, String -> 663),Some(Distribution(Map(45 -> DistributionValue(253,0.012521033356428783), 34 -> DistributionValue(232,0.011481738097594774), 67 -> DistributionValue(505,0.0249925764624369), -19 -> DistributionValue(2,9.898050084133426E-5), -10 -> DistributionValue(5,2.4745125210333564E-4), 12 -> Distr...

## Show Histograms of Low-Cardinality Columns

In addition to the above insights, you can also look at potential skewness in the data by looking data histogram values.

In [37]:
// Show Histograms of Low-Cardinality Columns

result.profiles.foreach { case (columnName, profile) =>

  val approximateNumDistinctValues = profile.approximateNumDistinctValues
  if (!profile.histogram.isEmpty) {
    println("Column Name: " + columnName + ", Distinct Vals: " + approximateNumDistinctValues)
        println(s""+String.format("%1$"+30+ "s", "Value") +  String.format("%1$"+30+ "s", "Percentage") +  String.format("%1$"+30+ "s", "Count") +"\t\t"+ String.format("%1$-"+30+ "s", "Plot"))
    profile.histogram.foreach {
      _.values.toSeq.sortBy(x=>(x._2.absolute*(-1))).foreach { case (key, entry) =>
        val percentage = (entry.ratio * 100000).round/1000.toDouble
        val plotString = ("|"*(entry.ratio*20).round.toInt)
        println(s""+String.format("%1$"+30+ "s", key) +  String.format("%1$"+30+ "s", percentage.toString) +  String.format("%1$"+30+ "s", entry.absolute.toString) +"\t\t|"+ String.format("%1$-"+30+ "s", plotString))
      }
    }
    println("\n")
  }
}

Column Name: dewpoint, Distinct Vals: 98
                         Value                    Percentage                         Count		Plot                          
                             M                         3.281                           663		||                             
                            68                         2.801                           566		||                             
                            69                         2.638                           533		||                             
                            65                         2.578                           521		||                             
                            66                         2.514                           508		||                             
                            67                         2.499                           505		|                              
                            63                         2.425                           490		

## Spark StructType code proposed by the Data Profiler 

In [38]:
// Proposed Schema

    var proposedSchemaText = "val proposedSchema = new StructType(Array( \n"

    df.schema.foreach { originalColScehma =>
      val columnName = originalColScehma.name
      val profile = result.profiles(columnName)
      val completeness = profile.completeness;
      val dataType = profile.dataType.toString
      val isNullable = (completeness<1.0);

      if(dataType.equalsIgnoreCase("Integral")){
        proposedSchemaText += "\tStructField(\"" +columnName +"\", IntegerType, "+isNullable+"), \n"
      }else if(dataType.equalsIgnoreCase("Fractional")){
        proposedSchemaText += "\tStructField(\"" +columnName +"\", DoubleType, "+isNullable+"), \n"
      }else{
        proposedSchemaText += "\tStructField(\"" +columnName +"\", StringType, "+isNullable+"), \n"
      }
    }

    proposedSchemaText =proposedSchemaText.dropRight(3)+"\n))"
    println(proposedSchemaText+"\n")

val proposedSchema = new StructType(Array( 
	StructField("station_nbr", IntegerType, false), 
	StructField("date", StringType, false), 
	StructField("tmax", IntegerType, true), 
	StructField("tmin", IntegerType, true), 
	StructField("tavg", IntegerType, true), 
	StructField("depart", StringType, false), 
	StructField("dewpoint", StringType, false), 
	StructField("wetbulb", StringType, false), 
	StructField("heat", StringType, false), 
	StructField("cool", StringType, false), 
	StructField("sunrise", IntegerType, false), 
	StructField("sunset", IntegerType, false), 
	StructField("codesum", StringType, false), 
	StructField("snowfall", StringType, false), 
	StructField("preciptotal", StringType, false), 
	StructField("stnpressure", StringType, false), 
	StructField("sealevel", StringType, false), 
	StructField("resultspeed", StringType, false), 
	StructField("resultdir", StringType, false), 
	StructField("avgspeed", StringType, false)
))



proposedSchemaText = 


val proposedSchema = new StructType(Array(
	StructField("station_nbr", IntegerType, false),
	StructField("date", StringType, false),
	StructField("tmax", IntegerType, true),
	StructField("tmin", IntegerType, true),
	StructField("tavg", IntegerType, true),
	StructField("depart", StringType, false),
	StructField("dewpoint", StringType, false),
	StructField("wetbulb", StringType, false),
	StructField("heat", StringType, false),
	StructField("cool", StringType, false),
	StructField("sunrise", IntegerType, false),
	StructField("sunset", IntegerType, false),
	StructField("codesum", StringType, false),
	StructField("snowfall", StringType, false),
	StructField("preciptotal", StringType, false),
	StructField("stnpressure", StringType, false),
	StructField("sealevel",...
