<h1>Udacity Data Engeneering Nanodegree Program - The Capstone Project</h1>
<h2>Project Sumary</h2>

The context is to read a log file from the great expectations lib, which runs daily in ariflow.
And here we are going to take this log file and put it in a model in the big query and make the necessary treatments to create a product to monitor the data quality executions that are stored by this log, using pyspark (spark/hadoop) to do this.

The data is stored in star schemata (two dimension and 1 facts tables) and is ready for analysis.


The project follows the follow steps:

1 - Scope the Project and Gather Data

2 - Explore and Assess the Data

3 - Define the Data Model

4 - Run ETL to Model the Data

5 - Deal with Data Quality




<h4>Imports</h4>

In [57]:
import json
import os
import string
from datetime import datetime
import pyspark
from pyspark.sql.functions import lit,unix_timestamp

import time
import datetime

from google.cloud import storage
from platform import python_version

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')

<h4>Build SparkSession</h4>

In [58]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('Pipeline BQ data quality')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()

<h4>Reading the file from the raw bucket and turning it into a dataframe</h4>

In [59]:
json_file_path = 'gs://bucket-dataproc-ge/great_expectations/validations/teste_tabela_ge/__none__/20220318T172550.578808Z/b82d2b0bc0d5deaf8922db55075f898b.json'

df2 = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").option("inferSchema", "true").json(json_file_path)
df2.printSchema()

root
 |-- meta: struct (nullable = true)
 |    |-- active_batch_definition: struct (nullable = true)
 |    |    |-- batch_identifiers: struct (nullable = true)
 |    |    |    |-- batch_id: string (nullable = true)
 |    |    |-- data_asset_name: string (nullable = true)
 |    |    |-- data_connector_name: string (nullable = true)
 |    |    |-- datasource_name: string (nullable = true)
 |    |-- batch_markers: struct (nullable = true)
 |    |    |-- ge_load_time: string (nullable = true)
 |    |-- batch_spec: struct (nullable = true)
 |    |    |-- batch_data: string (nullable = true)
 |    |    |-- data_asset_name: string (nullable = true)
 |    |-- expectation_suite_name: string (nullable = true)
 |    |-- great_expectations_version: string (nullable = true)
 |    |-- run_id: struct (nullable = true)
 |    |    |-- run_name: string (nullable = true)
 |    |    |-- run_time: string (nullable = true)
 |    |-- validation_time: string (nullable = true)
 |-- results: array (nullable = t

<h4>Appending the file to a new raw bucket and converting from json to parquet </h4>

In [60]:
gcs_bucket = 'sandbox-coe'
path = 'gs://bucket-raw-ge/raw-ge-files/'
df2.write.format("parquet").option("path", "gs://bucket-raw-ge/raw-ge-files/").save(mode='append')

<h4>Reading the last file appending on the bucket</h4>

In [61]:
client = storage.Client()
bucket = client.get_bucket("bucket-raw-ge")
blobs = client.list_blobs(bucket_or_name=bucket, prefix='raw-ge-files/') 

today = datetime.date.today()


list1 = []

for blob in blobs:
    today = datetime.date.today()
    x = blob.updated
    date = x.date()
    if date == today:
        list1 = f'gs://bucket-raw-ge/{blob.name}'
print(list1)

gs://bucket-raw-ge/raw-ge-files/part-00000-cef7c3a1-cfeb-4ee3-b2fa-d2da74a77832-c000.snappy.parquet


<h4> 
Reading the last file and structuring it in a dataframe </h4>

In [62]:
path_parquet = list1

df3 = spark.read.parquet(path_parquet)
df3.printSchema()

root
 |-- meta: struct (nullable = true)
 |    |-- active_batch_definition: struct (nullable = true)
 |    |    |-- batch_identifiers: struct (nullable = true)
 |    |    |    |-- batch_id: string (nullable = true)
 |    |    |-- data_asset_name: string (nullable = true)
 |    |    |-- data_connector_name: string (nullable = true)
 |    |    |-- datasource_name: string (nullable = true)
 |    |-- batch_markers: struct (nullable = true)
 |    |    |-- ge_load_time: string (nullable = true)
 |    |-- batch_spec: struct (nullable = true)
 |    |    |-- batch_data: string (nullable = true)
 |    |    |-- data_asset_name: string (nullable = true)
 |    |-- expectation_suite_name: string (nullable = true)
 |    |-- great_expectations_version: string (nullable = true)
 |    |-- run_id: struct (nullable = true)
 |    |    |-- run_name: string (nullable = true)
 |    |    |-- run_time: string (nullable = true)
 |    |-- validation_time: string (nullable = true)
 |-- results: array (nullable = t

<h4>
The function below will read the nested/struct/array structure of the dataframe and denormalize it into a table format</h4>

In [63]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

def flatten(df): 
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      #print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
        
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

df=flatten(df3)
df.printSchema()

root
 |-- success: boolean (nullable = true)
 |-- meta_expectation_suite_name: string (nullable = true)
 |-- meta_great_expectations_version: string (nullable = true)
 |-- meta_validation_time: string (nullable = true)
 |-- results_success: boolean (nullable = true)
 |-- statistics_evaluated_expectations: long (nullable = true)
 |-- statistics_success_percent: double (nullable = true)
 |-- statistics_successful_expectations: long (nullable = true)
 |-- statistics_unsuccessful_expectations: long (nullable = true)
 |-- meta_active_batch_definition_data_asset_name: string (nullable = true)
 |-- meta_active_batch_definition_data_connector_name: string (nullable = true)
 |-- meta_active_batch_definition_datasource_name: string (nullable = true)
 |-- meta_batch_markers_ge_load_time: string (nullable = true)
 |-- meta_batch_spec_batch_data: string (nullable = true)
 |-- meta_batch_spec_data_asset_name: string (nullable = true)
 |-- meta_run_id_run_name: string (nullable = true)
 |-- meta_run_

<h4>Doing all the necessary transformations and treatments on the dataframe</h4>

In [64]:
df_cleaner2 = (df.withColumn("expectations_sucesso", col("success").cast("string"))
              .withColumn("resultado_sucesso", col("results_success").cast("string"))
              .withColumn("qtd_inesperado", col("results_result_unexpected_count").cast("int"))
              .withColumnRenamed("meta_expectation_suite_name","nm_suite")
              .withColumnRenamed("meta_active_batch_definition_data_asset_name", "nm_tabela")
              .withColumnRenamed("meta_active_batch_definition_data_connector_name", "nm_conexao_batch")
              .withColumnRenamed("meta_active_batch_definition_datasource_name","nm_source_execucao")
              .withColumnRenamed("meta_run_id_run_name","nm_execucao_expectation")
              .withColumn("timestamp_execucao_expectation", unix_timestamp(col("meta_run_id_run_time"), 'yyyy-MM-dd HH:mm:ss').cast("timestamp")) # esse campo ai precisa rever pq só está trazendo nulo
              .withColumnRenamed("results_exception_info_exception_message","resultado_info_excecao")
              .withColumnRenamed("results_exception_info_exception_traceback","resultado_info_traceback")
              .withColumn("resultado_info_raised", col("results_exception_info_raised_exception").cast("string"))
              .withColumnRenamed("results_expectation_config_expectation_type","ft_nm_regra_expectation")
              .withColumn("qtd_elemento", col("results_result_element_count").cast("int"))
              .withColumn("qtd_elemento_faltante", col("results_result_missing_count").cast("int"))
              .withColumn("porcentagem_faltante", col("results_result_missing_percent").cast("string"))
              .withColumn("porcentagem_nao_faltante", col("results_result_unexpected_percent_nonmissing").cast("string"))
              .withColumnRenamed("results_result_unexpected_percent","porcentagem_inesperada")
              .withColumnRenamed("results_expectation_config_kwargs_batch_id", "id_batch_coluna")
              .withColumnRenamed("results_expectation_config_kwargs_column", "nm_coluna")
              .withColumnRenamed("results_result_observed_value", "resultado_geral")
              .withColumn('dt_insert_fato',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
    
              .drop("meta_great_expectations_version" , "meta_batch_markers_ge_load_time", "meta_run_id_run_time",
                    "meta_batch_spec_batch_data", "meta_batch_spec_data_asset_name",
                    "meta_active_batch_definition_batch_identifiers_batch_id", "results_result_element_count",
                    "results_result_missing_count", "results_expectation_config_expectation_context_description",
                    "results_result_partial_unexpected_counts_count", "success", "results_exception_info_raised_exception",
                    "results_success", "results_result_missing_percent", "results_result_unexpected_percent_nonmissing", "results_result_unexpected_percent_total", 
                    "results_expectation_config_kwargs_mostly", "meta_validation_time", "statistics_evaluated_expectations",
                    "statistics_success_percent", "statistics_successful_expectations", "statistics_unsuccessful_expectations", 
                    "results_result_observed_value", "results_expectation_config_kwargs_column_list", "results_expectation_config_kwargs_value", 
                    "results_expectation_config_kwargs_value_set", "results_result_partial_unexpected_index_list", "results_result_partial_unexpected_list", "results_result_partial_unexpected_counts_value",
                   "results_result_unexpected_count", "results_expectation_config_kwargs_parse_strings_as_datetimes",
                   "results_expectation_config_kwargs_strictly", "results_result_details_mismatched", "results_expectation_config_kwargs_column_set",
                   "results_result_details_value_counts_count","results_result_details_value_counts_value"))

df_cleaner2.printSchema()

root
 |-- nm_suite: string (nullable = true)
 |-- nm_tabela: string (nullable = true)
 |-- nm_conexao_batch: string (nullable = true)
 |-- nm_source_execucao: string (nullable = true)
 |-- nm_execucao_expectation: string (nullable = true)
 |-- resultado_info_excecao: string (nullable = true)
 |-- resultado_info_traceback: string (nullable = true)
 |-- ft_nm_regra_expectation: string (nullable = true)
 |-- resultado_geral: string (nullable = true)
 |-- porcentagem_inesperada: double (nullable = true)
 |-- id_batch_coluna: string (nullable = true)
 |-- nm_coluna: string (nullable = true)
 |-- results_result_details_mismatched_unexpected: string (nullable = true)
 |-- expectations_sucesso: string (nullable = true)
 |-- resultado_sucesso: string (nullable = true)
 |-- qtd_inesperado: integer (nullable = true)
 |-- timestamp_execucao_expectation: timestamp (nullable = true)
 |-- resultado_info_raised: string (nullable = true)
 |-- qtd_elemento: integer (nullable = true)
 |-- qtd_elemento_fa

<h4>Reading the tables already created in bigquery and transformed into dataframes</h4>

In [65]:
table = "sandbox-coe.trusted_ge_dataquality.dim_dimensoes"

df_dim_dimensoes = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .load()

df_dim_dimensoes.printSchema()

root
 |-- id_dimensao: long (nullable = true)
 |-- nm_dimensao: string (nullable = true)
 |-- tipo_dimensao: string (nullable = true)
 |-- desc_dimensao: string (nullable = true)
 |-- dt_insert: string (nullable = true)



<h4>Reading the tables already created in bigquery and transformed into dataframes</h4>

In [81]:
table = "sandbox-coe.trusted_ge_dataquality.dim_regras_expectations"
df_dim_regras_ge = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .load()

df_dim_regras_ge.printSchema()

root
 |-- id_regra_expectation: long (nullable = true)
 |-- id_dimensao: long (nullable = true)
 |-- nm_regra_expectation: string (nullable = true)
 |-- desc_regra_expectation: string (nullable = true)
 |-- tipo_regra_expectation: string (nullable = true)
 |-- dt_insert: string (nullable = true)



<h4>Make the relationship of the dimensions with the fact to retrieve the relationship keys</h4>

In [82]:
df_final = (df_cleaner2.join(df_dim_regras_ge, df_cleaner2.ft_nm_regra_expectation == df_dim_regras_ge.nm_regra_expectation,how='left')
             .drop('tipo_regra_expectation', 'desc_regra_expectation', 'dt_insert', 'nm_regra_expectation')
             .withColumn("id_regra_expectation", col("id_regra_expectation").cast("int"))
             .withColumn("id_dimensao", col("id_dimensao").cast("int")))

df_final.printSchema()


root
 |-- nm_suite: string (nullable = true)
 |-- nm_tabela: string (nullable = true)
 |-- nm_conexao_batch: string (nullable = true)
 |-- nm_source_execucao: string (nullable = true)
 |-- nm_execucao_expectation: string (nullable = true)
 |-- resultado_info_excecao: string (nullable = true)
 |-- resultado_info_traceback: string (nullable = true)
 |-- ft_nm_regra_expectation: string (nullable = true)
 |-- resultado_geral: string (nullable = true)
 |-- porcentagem_inesperada: double (nullable = true)
 |-- id_batch_coluna: string (nullable = true)
 |-- nm_coluna: string (nullable = true)
 |-- results_result_details_mismatched_unexpected: string (nullable = true)
 |-- expectations_sucesso: string (nullable = true)
 |-- resultado_sucesso: string (nullable = true)
 |-- qtd_inesperado: integer (nullable = true)
 |-- timestamp_execucao_expectation: timestamp (nullable = true)
 |-- resultado_info_raised: string (nullable = true)
 |-- qtd_elemento: integer (nullable = true)
 |-- qtd_elemento_fa

<h4> 
Data Quality</h4>

In [83]:
check_columns = df_dim_dimensoes.columns
print(check_columns)

['id_dimensao', 'nm_dimensao', 'tipo_dimensao', 'desc_dimensao', 'dt_insert']


In [84]:
if check_columns == df_dim_dimensoes.columns:
    print("Number of Columns Test Passed!")
else:
    raise ValueError("Number of Columns does not Match with Expected Value!")

Number of Columns Test Passed!


In [85]:
check_columns2 = df_dim_regras_ge.columns
print(check_columns2)

['id_regra_expectation', 'id_dimensao', 'nm_regra_expectation', 'desc_regra_expectation', 'tipo_regra_expectation', 'dt_insert']


In [86]:
if check_columns2 == df_dim_regras_ge.columns:
    print("Number of Columns Test Passed!")
else:
    raise ValueError("Number of Columns does not Match with Expected Value!")

Number of Columns Test Passed!


In [87]:
check_columns3 = df_final.columns
print(check_columns3)

['nm_suite', 'nm_tabela', 'nm_conexao_batch', 'nm_source_execucao', 'nm_execucao_expectation', 'resultado_info_excecao', 'resultado_info_traceback', 'ft_nm_regra_expectation', 'resultado_geral', 'porcentagem_inesperada', 'id_batch_coluna', 'nm_coluna', 'results_result_details_mismatched_unexpected', 'expectations_sucesso', 'resultado_sucesso', 'qtd_inesperado', 'timestamp_execucao_expectation', 'resultado_info_raised', 'qtd_elemento', 'qtd_elemento_faltante', 'porcentagem_faltante', 'porcentagem_nao_faltante', 'dt_insert_fato', 'id_regra_expectation', 'id_dimensao']


In [88]:
if check_columns3 == df_final.columns:
    print("Number of Columns Test Passed!")
else:
    raise ValueError("Number of Columns does not Match with Expected Value!")

Number of Columns Test Passed!


<h4> 
Generating the fact in bigquery and putting the storage parameters in the correct buckets</h4>

In [90]:
gcs_bucket = 'bucket-raw-ge/temp-files'
bq_dataset = 'trusted_ge_dataquality'
bq_table = 'fato_ge_dataquality' 

df_final.write \
  .format("bigquery") \
  .option("table","{}.{}".format(bq_dataset, bq_table)) \
  .option("temporaryGcsBucket", gcs_bucket) \
  .mode('append') \
  .save()

In [91]:
spark.stop()