# Data Profiler and Schema Validation

Profiles given input data based on the custom queries you provide, and validates its schema against schema repository. 
You can find how to insert a schema to schema-repository in README.md

In [None]:
%%help

## Spark job configuration parameters like memory and cores may vary from one job to other

In [None]:
%%configure -f
{"name":"data-profiler", 
 "executorMemory": "2GB", 
 "executorCores": 4, 
 "conf": {"spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0,com.github.gphat:censorinus_2.11:2.1.13"} 
}

## Set parameters that will be overwritten by values passed externally

In [None]:
val dataFormat = "data-format"
val delimiter = ""
val inputDataLocation = "input-data-location"
val appName = "app-name" 
val schemaRepoUrl = "schema-repo-url"
val scheRepoSubjectName = "subject-name"
val schemaVersionId = "schema-version"
val customQ1 = "custom-query-1"
val customQ1ResultThreshold = 0
val customQ1Operator = "custom-operator-1"
val customQ2 = "custom-query-2"
val customQ2ResultThreshold = 0
val customQ2Operator = "custom-operator-2"
val customQ3 = "custom-query-3"
val customQ3ResultThreshold = 0
val customQ3Operator = "custom-query-3"

## Setup datadog statsd interface

In [None]:
import github.gphat.censorinus.DogStatsDClient

val statsd = new DogStatsDClient(hostname = "localhost", port = 8125, prefix = "mlp.validator")

## Read data, if data being read is CSV, it needs to have a header

In [None]:
val df = dataFormat match {
    case "parquet" => spark.read.parquet(inputDataLocation)
    case "json" => spark.read.json(inputDataLocation)
    case "csv" => spark.read.option("mode", "DROPMALFORMED").option("header", "true").option("delimiter", delimiter).csv(inputDataLocation)
    case _ => throw new Exception(s"$dataFormat, as a dataformat is not supported ")
}

### Publish some basic stats about the data. This can be extended further

In [None]:
val recordCount = df.count()
val numColumns = df.columns.size
statsd.histogram(name = "recordCount", value = recordCount, tags = Seq(s"appName:$appName", "data-validation", "env:dev"));
statsd.histogram(name = "numColumns", value = numColumns, tags = Seq(s"appName:$appName", "data-validation","env:dev"));

## Read registered schema from schema repository

### Utility method to call rest endpoint for schema

In [None]:
import java.io.IOException;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

def getSchema(url: String) : String = {
    val httpclient: CloseableHttpClient = HttpClients.createDefault()
    try {
      val httpget: HttpGet = new HttpGet(url)
      println("Executing request " + httpget.getRequestLine)
      val responseHandler: ResponseHandler[String] =
        new ResponseHandler[String]() {
          override def handleResponse(response: HttpResponse): String = {
            var status: Int = response.getStatusLine.getStatusCode
            if (status >= 200 && status < 300) {
              var entity: HttpEntity = response.getEntity
              if (entity != null) EntityUtils.toString(entity) else null
            } else {
              throw new ClientProtocolException(
                "Unexpected response status: " + status);
            }
          }
        }
       httpclient.execute(httpget, responseHandler)  
    } finally {
        httpclient.close()
        None
    }
}

#### Create url from input parameters and feth schema for specified version

In [None]:
val schema_url = s"$schemaRepoUrl/schema-repo/$scheRepoSubjectName/id/$schemaVersionId"
val publishedSchema = getSchema(schema_url) 

### Convert Avro schema registered to Spark SQL Schema.

In [None]:
import com.databricks.spark.avro._
import org.apache.avro.Schema.Parser
val schema = new Parser().parse(publishedSchema)

import com.databricks.spark.avro.SchemaConverters
val structSchema =  SchemaConverters.toSqlType(schema).dataType

### Utility method to traverse schema tree and find the leaf node names

In [None]:
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.types._

def findFields(path: String, dt: DataType, columnNames: ListBuffer[String]): Unit = dt match {
    case s: StructType =>
      s.fields.foreach(f => findFields(path + "." + f.name, f.dataType, columnNames))
    case s: ArrayType => findFields(path, s.elementType, columnNames)
    case other =>
      columnNames += path.substring(1)
}

In [None]:
var dfColumnNames = new ListBuffer[String]()
findFields("", df.schema, dfColumnNames)

print(dfColumnNames.toList)

In [None]:
var publishedSchemaDataColumnNames = new ListBuffer[String]()
findFields("", structSchema, publishedSchemaDataColumnNames)

print(publishedSchemaDataColumnNames.toList)

In [None]:
val sourceColumns = dfColumnNames.toSet
val publishedColumns = publishedSchemaDataColumnNames.toSet
val differenceColumns = publishedColumns.diff(sourceColumns)
val numDiffColumns = differenceColumns.size
print(s"Number of columns not matching the schema are: $numDiffColumns")
statsd.histogram(name = "numDiffColumns", value = numDiffColumns, tags = Seq(s"appName:$appName", "data-validation", "env:dev"));

### Custom data quality checks

#### Utility function to assert results

In [None]:
def customCheck(val1 : Long, operator : String, threshold : Long) : Unit = {
    operator match {
        case ">" => try { assert(val1 > threshold) } catch { case e: AssertionError => print(e);System.exit(1)}
        case ">=" => try { assert(val1 >= threshold) } catch { case e: AssertionError => print(e);System.exit(1)}
        case "=" => try { assert(val1 == threshold) } catch { case e: AssertionError => print(e);System.exit(1)}
        case "<" => try { assert(val1 < threshold) } catch { case e: AssertionError => print(e);System.exit(1)}
        case "<=" => try { assert(val1 <= threshold) } catch { case e: AssertionError => print(e);System.exit(1)}
    }
}

#### Create a temporary table, make sure that sql statements return a Long value, to be sure cast results to Long in the queries

In [None]:
df.createOrReplaceTempView("dataset")

val res1 = spark.sql(customQ1).collect().toList(0).getAs[Long](0)
customCheck(res1, customQ1Operator, customQ1ResultThreshold)

val res2 = spark.sql(customQ2).collect().toList(0).getAs[Long](0)
customCheck(res2, customQ2Operator, customQ2ResultThreshold)

val res3 = spark.sql(customQ3).collect().toList(0).getAs[Long](0)
customCheck(res3, customQ3Operator, customQ3ResultThreshold)