In [1]:
import great_expectations as gx
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

In [2]:
context = gx.get_context()

spark = SparkSession \
    .builder \
    .appName("klaus_session") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/19 15:02:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.parquet("/opt/airflow/silver/")

In [4]:
dataframe_datasource = context.sources.add_or_update_spark(
    name="spark_memory"
)

In [5]:
data_asset = dataframe_datasource.add_dataframe_asset(name="dq_test", dataframe=df)

In [6]:
my_batch_request = data_asset.build_batch_request()

In [7]:
expectation_name = "data_quality_expectations"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_name)

{
  "expectations": [],
  "meta": {
    "great_expectations_version": "0.18.18"
  },
  "data_asset_type": null,
  "ge_cloud_id": null,
  "expectation_suite_name": "data_quality_expectations"
}

In [8]:
validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name=expectation_name
)

In [9]:
columns_to_check_null = [
    'id_brewery'
]

In [11]:
output_dir = "/opt/airflow/notebooks/"
output_file = os.path.join(output_dir, "validation_results.txt")

os.makedirs(output_dir, exist_ok=True)

successful_validations = 0

with open(output_file, 'w') as f:

    for column in columns_to_check_null:
        validator.expect_column_values_to_not_be_null(column=column)

    # Valida se a coluna 'phone_brewery' tem exatamente 11 dígitos (formato padrão)
    validator.expect_column_value_lengths_to_equal(column="phone_brewery", value=11)

    # Valida se 'latitude_brewery' e 'longitude_brewery' têm valores dentro de uma faixa razoável
    validator.expect_column_values_to_be_between(column="latitude_brewery", min_value=-90.0, max_value=90.0)
    validator.expect_column_values_to_be_between(column="longitude_brewery", min_value=-180.0, max_value=180.0)

    # Valida se 'postal_code_brewery' tem o formato esperado (entre 5 e 9 dígitos)
    validator.expect_column_value_lengths_to_be_between(column="postal_code_brewery", min_value=5, max_value=9)

    # Executa as validações e armazena os resultados
    results = validator.validate()
    total_validations = len(results['results'])

    # Exibe o resultado de cada Expectation separadamente
    for result in results['results']:
        expectation_type = result['expectation_config']['expectation_type']
        column = result['expectation_config']['kwargs'].get('column', 'N/A')
        success = result['success']
        
        if success:
            successful_validations += 1
            
        # Formata o resultado
        output = f"Expectativa: {expectation_type} | Coluna: {column} | Sucesso: {success}\n"
        f.write(output)  # Escreve no arquivo
        
        # Se falhar, imprime detalhes adicionais
        if not success:
            failure_details = f"Detalhes da falha: {result['result']}\n"
            f.write(failure_details)
        
        # Separador
        f.write('-' * 60 + '\n')

# Calcula o percentual de sucesso
    success_percentage = (successful_validations / total_validations) * 100

    # Escreve o percentual de sucesso no arquivo
    f.write(f"Percentual de validações bem-sucedidas: {success_percentage:.2f}%\n")

# Exibe o percentual no console
print(f"Percentual de validações bem-sucedidas: {success_percentage:.2f}%")
print(f"Resultados salvos em: {output_file}")

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

                                                                                

Calculating Metrics:   0%|          | 0/12 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/12 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/38 [00:00<?, ?it/s]

Percentual de validações bem-sucedidas: 60.00%
Resultados salvos em: /opt/airflow/notebooks/validation_results.txt
