#Implement Data Quality in Notebooks by using Soda

1. Install required dev tools
2. Install Libraries for 
  1. [Soda](https://www.soda.io/) -> Data quality framework
  2. [GE](https://greatexpectations.io/) - Data quality framework
  3. [Influxdb](https://www.influxdata.com/) - Time series database to publish and visualize measurements.
3. Read data from a sample csv and create Dataframe.
4. Create methods to return Soda scan results to Dataframe. (For easier analysis)
5. Define Soda scan yml and execute on Dataframe.
  1. The results are also published to a free trial version of Soda Cloud.
6. Display the Scan results from the Dataframe.

##### TODO:
1. Explore GreatExpectations for Data quality
2. Explore publishing data to InfluxDB to replicate the dashboards in Soda Cloud.

In [None]:
#Initial setup required to succesfully install soda-spark
%sh
#!/bin/bash
pip list | egrep 'thrift-sasl|sasl'
pip install --upgrade thrift
dpkg -l | egrep 'thrift_sasl|libsasl2-dev|gcc|python-dev'
sudo apt-get -y install unixodbc-dev libsasl2-dev gcc python-dev

In [None]:
#Installing required libraries. Idea is to explore both Great expectations and Soda. Influxdb is to publish metrics as time series data.
%pip install soda-spark
# To explore Great expectations and compare it with Soda
%pip install great-expectations
# To explore publishing metrics to Influxdb directly from Databricks
%pip install influxdb

In [None]:
#Column names with a space caused the Soda scan to fail So i renamed the column names with space. 

from pyspark.sql.types import StructField, StructType, StringType,IntegerType
from pyspark.sql.functions import length

# File location and type
file_location = "/FileStore/tables/all_india_PO_list_without_APS_offices_ver2_lat_long.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
#df = spark.read.format(file_type).schema(custom_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location)

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df = df.withColumnRenamed("Related Suboffice","RelatedSuboffice").withColumnRenamed("Related Headoffice","RelatedHeadoffice")


#df.where("Deliverystatus").show()
df = df.filter(df.Deliverystatus == "Delivery")

In [None]:
# Create a view or table

# temp_table_name = "allindiapo"

# df.createOrReplaceTempView(temp_table_name)

In [None]:
%sql

/* Query the created temp table in a SQL cell */

-- select * from allindiapo

In [None]:
%sql

/* Query the created temp table in a SQL cell */

--select distinct Deliverystatus from allindiapo /*Delivery, Non-Delivery*/
--select * from allindiapo where length(statename)=4

In [None]:
from sodasql.scan.test_result import TestResult
from sodasql.scan.measurement import Measurement
from typing import List, Tuple
from pyspark.sql import DataFrame, Row, SparkSession, types as T
def measurements_to_data_frame(measurements: List[Measurement]) -> DataFrame:
    """
    Convert measurements to a data frame.
    Parameters
    ----------
    measurements: List[Measurement]
        The measurements.
    Returns
    -------
    out : DataFrame
        The measurements as data frame.
    """
    schema_group_values = T.StructType(
        [
            T.StructField("group", T.StringType(), True),
            T.StructField("value", T.StringType(), True),
        ]
    )
    schema = T.StructType(
        [
            T.StructField("metric", T.StringType(), True),
            T.StructField("columnName", T.StringType(), True),
            T.StructField("value", T.StringType(), True),
            T.StructField("groupValues", schema_group_values, True),
        ]
    )

    spark_session = SparkSession.builder.getOrCreate()
    out = spark_session.createDataFrame(
        [measurement.to_dict() for measurement in measurements], schema=schema
    )
    return out
  
def testresults_to_data_frame(testresults: List[TestResult]) -> DataFrame:
    """
    Convert TestResults to a data frame.
    Parameters
    ----------
    testresults: List[TestResult]
        The testresults.
    Returns
    -------
    out : DataFrame
        The testresults as data frame.
    """
    schema_group_values = T.StructType(
        [
            T.StructField("expression_result", T.LongType(), True),
            T.StructField("row_count", T.LongType(), True),
        ]
    )    
    schema = StructType([StructField("columnName",T.StringType(),True),
                     StructField("description",T.StringType(),True),
                     StructField("expression",T.StringType(),True),
                     StructField("id",T.StringType(),True),
                     StructField("passed",T.BooleanType(),True),
                     StructField("skipped",T.BooleanType(),True),
                     StructField("title",T.StringType(),True),
                     StructField("values",schema_group_values,True)])

    spark_session = SparkSession.builder.getOrCreate()
    out = spark_session.createDataFrame(
        [testresult.to_dict() for testresult in testresults], schema=schema
    )
    return out

In [None]:
from pyspark.sql import DataFrame, SparkSession
from sodaspark import scan
import pyodbc

import os
from sodasql.soda_server_client.soda_server_client import SodaServerClient
soda_server_client = SodaServerClient(
  host="cloud.soda.io",
  #api_key_id=os.getenv("API_PUBLIC"), 
  api_key_id='41cff32e-97d4-41e7-9fcc-7908e490d016',
  #api_key_secret=os.getenv("API_PRIVATE"), 
  api_key_secret ='RAIGVFYRnCyL7nduwI-STb4MLGM5Jm_kOh5zm1JoN75hCT3NB7-jsQ'
    )
#Define the soda scan yml file
#Lot of documentation available at https://docs.soda.io/soda-sql/sql_metrics.html#metric-groups-and-dependencies
scan_definition = ("""
    table_name: podata
    metrics:
    - row_count
    - max_length
    - unique_count
    - distinct
    - duplicate_count
    samples:
      table_limit: 50
    tests:
    - row_count > 0
    columns:
      Deliverystatus:
        tests:
        - distinct == 2
    """)

scan_result = scan.execute(scan_definition, df, soda_server_client=soda_server_client)
display(scan_result)

measurement_result = measurements_to_data_frame(scan_result.measurements)
display(measurement_result)

test_result = testresults_to_data_frame(scan_result.test_results)
display(test_result)

<sodasql.scan.scan_result.ScanResult at 0x7f64d460bf70>

metric,columnName,value,groupValues
schema,,"[{logicalType=text, nullable=true, semanticType=text, dataType=string, name=officename, type=string}, {logicalType=number, nullable=true, semanticType=number, dataType=int, name=pincode, type=int}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=officeType, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=Deliverystatus, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=divisionname, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=regionname, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=circlename, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=Taluk, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=Districtname, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=statename, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=Telephone, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=RelatedSuboffice, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=RelatedHeadoffice, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=longitude, type=string}, {logicalType=text, nullable=true, semanticType=text, dataType=string, name=latitude, type=string}]",
row_count,,145298,
values_count,officename,145298,
valid_count,officename,145298,
max_length,officename,50,
values_count,pincode,145298,
valid_count,pincode,145298,
values_count,officeType,145298,
valid_count,officeType,145298,
max_length,officeType,28,


columnName,description,expression,id,passed,skipped,title,values
,test(row_count > 0),row_count > 0,"{""expression"":""row_count > 0""}",True,False,test(row_count > 0),"List(145298, 145298)"
Deliverystatus,column(Deliverystatus) test(distinct == 2),distinct == 2,"{""column"":""Deliverystatus"",""expression"":""distinct == 2""}",False,False,column(Deliverystatus) test(distinct == 2),"List(1, null)"
