In [1]:
import great_expectations as gx
import pandas as pd
import dotenv
import shutil
import os


def load_postgres_instance_datasource_asset(context):
    """ Cria ou carrega o Datasource, o Data Asset e o objeto Batch Request para a validação 
    com o Great Expectations. É a primeira etapa do workflow, sendo a definição dos dados
    que serão validados. Esta função conecta a uma instância de Banco de Dados Postgres.

    Args:
        context (gx.Context): Objeto configurado de Contexto do projeto

    Returns:
        gx.Batch_Request: objeto de batch_request com definição dos dados a serem processados.
    """
    # Environment
    dotenv.load_dotenv()
    POSTGRES_USER = os.environ.get('POSTGRES_USER')
    POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD')
    POSTGRES_PORT = os.environ.get('POSTGRES_PORT_CONTAINER')
    POSTGRES_HOST = os.environ.get('POSTGRES_HOST')

    POSTGRES_URI = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/postgres"


    # Datasource - engine
    datasource_name = "postgres_src"
    datasource = context.datasources.get('airbnb', None)
    if datasource == None:
        datasource = context.sources.add_postgres(name=datasource_name, connection_string=POSTGRES_URI, )


    # Data Asset - connection
    asset_name = 'listings_asset'
    asset_table_name = "g1_listings"  # SQL table
    list_asset_names = [asset_obj.name for asset_obj in datasource.assets]
    if asset_name in list_asset_names:
        table_asset = datasource.get_asset('reviews', None)
    else:
        table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name, schema_name='raw')
        # add_query_asset
    data_asset = context.get_datasource( datasource_name ).get_asset( asset_name )

    batch_request = table_asset.build_batch_request()
    return batch_request



def load_csv_datasource_asset_raw(context, datasource_name: str, asset_name:str, layer_name='raw'):
    """ Cria ou carrega o Datasource, o Data Asset e o objeto Batch Request para a validação 
    com o Great Expectations. É a primeira etapa do workflow, sendo a definição dos dados
    que serão validados.

    Args:
        context (gx.Context): Objeto configurado de Contexto do projeto
        datasource_name (str): Nome do Datasource.
        asset_name (str): Nome do Asset
        layer_name (str, optional): Nome da camada de dados (raw, trusted ou specs). Defaults to 'raw'.

    Returns:
        gx.Batch_Request: objeto de batch_request com definição dos dados a serem processados.
    """
    file_data_regex = asset_name + '\.csv\.gz'
    expectation_suite_name_str = f'{layer_name}_{asset_name}'
    run_name = f'{layer_name}.{asset_name}'
    
    # Datasource - engine
    datasource = context.datasources.get(datasource_name, None)
    if datasource == None:
        datasource = context.sources.add_pandas_filesystem(datasource_name, base_directory='./data')


    # Data Asset - connection
    list_asset_names = [asset_obj.name for asset_obj in datasource.assets]
    if asset_name in list_asset_names:
        table_asset = datasource.get_asset(asset_name)
    else:
        table_asset = datasource.add_csv_asset(asset_name, batching_regex=file_data_regex)

    batch_request = table_asset.build_batch_request()
    return batch_request

In [2]:
# Monitoramento RAW - Reviews
def suite_monitoring_execution(asset_name: str, datasource_name='airbnb', data_context_path = '.'):
    """ Função de execução do Monitoramento para um dado CSV. Cria o suite ou obtém o existente com o nome
    para criar o Checkpoint de Validação da Qualidade dos dados. Por fim, garante a execução do Checkpoint.

    Args:
        asset_name (str): Nome do asset
        datasource_name (str, optional): Nome do Datasource a ser criado ou usado.. Defaults to 'airbnb'.
        data_context_path (str, optional): Diretório do Data Context. Defaults to '.'.

    Returns:
        checkpoint_result: Resultado da execução do Checkpoint.
    """
    suite_name = f'raw_{asset_name}'

    # Data Context
    context = gx.data_context.FileDataContext.create(project_root_dir=data_context_path)

    # Batch request
    batch_request = load_csv_datasource_asset_raw(context=context, datasource_name=datasource_name, asset_name=asset_name, layer_name='raw')

    # Suite
    if suite_name not in context.list_expectation_suite_names():
        context.add_or_update_expectation_suite(suite_name)

    # Checkpoint Validation
    checkpoint = context.add_or_update_checkpoint(
        name=f"{suite_name}",
        validations=[{
            "batch_request": batch_request,
            "expectation_suite_name": suite_name,
            }])

    checkpoint_result = checkpoint.run(run_name=suite_name)
    return checkpoint_result

In [3]:
# Validação das Bases do RAW em sua totalidade

# Suites Enviroment
context = gx.data_context.FileDataContext.create(project_root_dir='.')
src_suites = os.path.join('.', 'utils', 'data-quality-profilers')
dst_suites = os.path.join('.', 'gx', 'expectations')
shutil.copytree(src_suites, dst_suites, dirs_exist_ok=True)

checkpoint_result_raw_reviews = suite_monitoring_execution(asset_name='reviews')
checkpoint_result_raw_reviews = suite_monitoring_execution(asset_name='calendar')
checkpoint_result_raw_listings = suite_monitoring_execution(asset_name='listings')

Calculating Metrics:   0%|          | 0/48 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/107 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/886 [00:00<?, ?it/s]

In [None]:
# Persistência do Relatório
src_path = '.\gx\uncommitted\data_docs\local_site\index.html'
dst_folder = '.\doc\data_quality'

In [None]:
# import great_expectations as gx
# from great_expectations.exceptions import DataContextError
# import pandas as pd
# import dotenv
# import os

# # Environment
# dotenv.load_dotenv()
# POSTGRES_USER = os.environ.get('POSTGRES_USER')
# POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD')
# POSTGRES_PORT = os.environ.get('POSTGRES_PORT_CONTAINER')
# POSTGRES_HOST = os.environ.get('POSTGRES_HOST')

# POSTGRES_URI = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/postgres"


# # Data Context
# data_context_path = '.'
# context = gx.data_context.FileDataContext.create(project_root_dir=data_context_path)

# # Datasource - engine
# datasource_name = "postgres_src"
# try:
#     datasource = context.sources.add_postgres(name=datasource_name, connection_string=POSTGRES_URI, )
# except DataContextError:
#     pass

# # Data Asset - connection
# asset_name = 'listings_asset_11'
# asset_table_name = "g1_listings"  # SQL table
# table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name, schema_name='raw')
# # add_query_asset
# # (name=asset_name, table_name=asset_table_name)
# data_asset = context.get_datasource( datasource_name ).get_asset( asset_name )
# batch_request = table_asset.build_batch_request()

# # Suite
# context.add_or_update_expectation_suite("my_expectation_suite")

In [None]:
# # Data Context
# asset_name='reviews'
# layer_name='raw'
# datasource_name='airbnb'
# suite_name = f'raw_{asset_name}'


# context = gx.data_context.FileDataContext.create(project_root_dir='.')

# file_data_regex = asset_name + '\.csv\.gz'
# expectation_suite_name_str = f'{layer_name}_{asset_name}'
# run_name = f'{layer_name}.{asset_name}'

# # Datasource - engine
# datasource = context.datasources.get(datasource_name, None)
# if datasource == None:
#     datasource = context.sources.add_pandas_filesystem(datasource_name, base_directory='./data')


# # Data Asset - connection
# list_asset_names = [asset_obj.name for asset_obj in datasource.assets]
# if asset_name in list_asset_names:
#     table_asset = datasource.get_asset('reviews')
# else:
#     table_asset = datasource.add_csv_asset(asset_name, batching_regex=file_data_regex)

# batch_request = table_asset.build_batch_request()

# # Suite

# if suite_name not in context.list_expectation_suite_names():
#     context.add_or_update_expectation_suite(suite_name)

# # Checkpoint Validation
# checkpoint = context.add_or_update_checkpoint(
#     name=f"{suite_name}",
#     validations=[{
#         "batch_request": batch_request,
#         "expectation_suite_name": suite_name,
#         }])

# checkpoint_result = checkpoint.run(run_name=suite_name)

In [None]:
# Data Context
data_context_path = '.'
context = gx.data_context.FileDataContext.create(project_root_dir=data_context_path)

# Batch request
# batch_request = load_postgres_instance_datasource_asset(context=context)
batch_request_raw_reviews = load_csv_datasource_asset_raw(context=context, datasource_name='airbnb', asset_name='reviews', layer_name='raw')
# batch_request_raw_listings = load_csv_datasource_asset_raw(context=context, datasource_name='airbnb', asset_name='listings', layer_name='raw')
# batch_request_raw_calendar = load_csv_datasource_asset_raw(context=context, datasource_name='airbnb', asset_name='calendar', layer_name='raw')

# Suite
context.add_or_update_expectation_suite("my_expectation_suite")


# Validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="my_expectation_suite",
)
validator.head()

# validator.expect_column_values_to_not_be_null(column="vendor_id")
# validator.save_expectation_suite(discard_failed_expectations=False)

In [None]:
# VARIÁVEIS ALVOS

host_neighbourhood ,
neighbourhood_cleansed ,
host_is_superhost.