## Módulo: Analytics Engineering
    
## Aula 4 - Parte 1

### Programação da Aula 4:

> ### 1. **Projeto com o "Great Expectations" e o PostgreSQL**;
> ### 2. **Desenvolvimento do projeto final**.

#### Link para o formulário para informar os integrantes do grupo do projeto:
https://forms.gle/xgT27QH81dxSrZ1w7

#### Link para o formulário de Feedback da aula:
https://forms.gle/xf4JGTTZrHTSwDEq7

### Instalação da biblioteca "great_expectations"

In [None]:
pip install great_expectations

In [None]:
pip show great_expectations

### Chamada do "contexto"

In [None]:
import great_expectations as gx

context = gx.get_context()
print(context)

### No primeiro momento o "contexto" não possuí nenhuma fonte de dados

In [None]:
context.list_datasources()

### Configuração de uma nova fonte de dados do PostgreSQL

In [4]:
my_connection_string = (
    #"postgresql+psycopg2://<username>:<password>@<host>:<port>/<database>"
    "postgresql+psycopg2://postgres:ada@localhost:5432/ada"
)

In [5]:
datasource = context.sources.add_postgres(name="ge_datasource", connection_string=my_connection_string)

### Agora a lista de fonte de dados possui o Postgres

In [None]:
context.list_datasources()

### Adicionando um "data asset" na fonte de dados adicionada, no caso abaixo, a tabela "ibm_prices_silver" do banco de dados

In [7]:
asset_name = "silver"
asset_table_nome = "ibm_prices_silver"

table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_nome)

### Adicionando mais um "data asset" na fonte de dados, mas agora ao invés de passar a tabela, será passado a query:

In [8]:
asset_name = "gold_filter"
asset_query = """
SELECT *
FROM ibm_prices_gold
WHERE date > '2024-03-03'
"""

query_asset = datasource.add_query_asset(name=asset_name, query=asset_query)

### Resultado final com os "data assets" criados:

In [None]:
context.list_datasources()

### Agora que existe uma fonte de dados e seus componetes ("datasource" e "data asset"), pode-se adquirir uma amostra desses dados chamado de "Batch":

In [10]:
my_datasource = context.get_datasource("ge_datasource")
my_table_asset = my_datasource.get_asset(asset_name="silver")
batch_request = my_table_asset.build_batch_request()

### Adiciona um novo conjunto de expectativas ou "Expectation Suite"

In [None]:
context.add_or_update_expectation_suite("suite_silver")

### A partir da amostra "Batch" e do conjunto de expectativas "Expectation Suite" cria um validador:

In [12]:
validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = "suite_silver",
)

In [13]:
checkpoint = context.add_or_update_checkpoint(
    name="checkpoint_silver",
    validator=validator
)

In [None]:
checkpoint_result = checkpoint.run()

In [None]:
validator.head(10)

### Exemplo com o outro "asset" da tabela Gold

In [16]:
my_datasource = context.get_datasource("ge_datasource")
my_table_asset = my_datasource.get_asset(asset_name="gold_filter")
batch_request = my_table_asset.build_batch_request()

context.add_or_update_expectation_suite("suite_gold_filter")

validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = "suite_gold_filter",
)

In [None]:
validator.head(10)

### Adiciona uma expectativa nova no conjunto:

In [None]:
validator.expect_column_values_to_not_be_null(column="mean_diff_high_low")

### Salva o conjunto de expectativas

In [19]:
validator.save_expectation_suite(discard_failed_expectations=False)

### A partir do validador, cria um novo checkpoint e processa o mesmo

In [20]:
checkpoint = context.add_or_update_checkpoint(name="checkpoint_gold_filter", validator=validator)

In [None]:
checkpoint_result = checkpoint.run()

### Repete todo o processo com o "asset" da camada "silver"

In [None]:
my_datasource = context.get_datasource("ge_datasource")
my_table_asset = my_datasource.get_asset(asset_name="silver")
batch_request = my_table_asset.build_batch_request()

context.add_or_update_expectation_suite("silver_suite")

validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = "silver_suite",
)
validator.head()

### Adiciona novas expectativas

In [None]:
validator.expect_column_values_to_be_of_type(column="1__open", type_='REAL')
validator.expect_column_values_to_be_of_type(column="5__volume", type_='INTEGER')

validator.expect_column_values_to_be_between(column="diff_high_low", min_value=1, max_value=2)

validator.expect_column_values_to_be_between(column="5__volume", min_value=1, max_value=100000)

### Salva o novo conjunto de expectativas e processa o novo checkpoint

In [24]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [25]:
checkpoint = context.add_or_update_checkpoint(name="checkpoint_silver", validator=validator)

In [None]:
checkpoint_result = checkpoint.run()

In [None]:
validator.head()

### Divide os dados em vários "batchs" ou amostras, por ano, mês e dia

In [28]:
my_datasource = context.get_datasource("ge_datasource")
my_table_asset = my_datasource.get_asset(asset_name="silver")
my_table_asset.add_splitter_year_and_month_and_day(column_name="datetime")

TableAsset(name='silver', type='table', id=None, order_by=[], batch_metadata={}, splitter=SplitterYearAndMonthAndDay(column_name='datetime', method_name='split_on_year_and_month_and_day'), table_name='ibm_prices_silver', schema_name=None)

In [None]:
my_table_asset = my_datasource.get_asset(asset_name="silver")
batch_request = my_table_asset.build_batch_request()
batches = my_table_asset.get_batch_list_from_batch_request(batch_request)
batches

### Gera o resultado do perfil das amostras ou batchs

In [None]:
data_assistant_result = context.assistants.onboarding.run(batch_request=batch_request)

### Plota os resultados do perfil

In [None]:
data_assistant_result.plot_metrics()

### Usa as amostras para gerar um novo validador

In [None]:
my_datasource = context.get_datasource("ge_datasource")
my_table_asset = my_datasource.get_asset(asset_name="silver")
batch_request = my_table_asset.build_batch_request()

context.add_or_update_expectation_suite("silver_suite_multiple")

validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = "silver_suite_multiple",
)
validator.head()

### Cria as expectativas

In [None]:
validator.expect_column_values_to_be_of_type(column="1__open", type_='REAL')
validator.expect_column_values_to_be_of_type(column="5__volume", type_='INTEGER')

validator.expect_column_values_to_be_between(column="diff_high_low", min_value=0, max_value=1000)

validator.expect_column_values_to_be_between(column="5__volume", min_value=0, max_value=100000)

### Process os novos resultados

In [34]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [35]:
checkpoint = context.add_or_update_checkpoint(
    name="checkpoint_silver_multiple",
    validator=validator
)

In [None]:
checkpoint_result = checkpoint.run()

### Existe a possibilidade de filtrar os dados a partir da divisão feita anteriormente

In [37]:
silver_asset = my_datasource.get_asset(asset_name="silver")

silver_asset.batch_request_options

('year', 'month', 'day')

### Cria um validador apenas com o ano de 2024, mês 3 e dia 3 e gera um novo checkpoint com apenas essa amostra

In [None]:
my_datasource = context.get_datasource("ge_datasource")
my_table_asset = my_datasource.get_asset(asset_name="silver")
batch_request = my_table_asset.build_batch_request(options={'year': 2024, 'month': 4, 'day': 12})

context.add_or_update_expectation_suite("silver_suite_multiple_filter")

validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = "silver_suite_multiple",
)
validator.head()

In [None]:
#expectativa de que a coluna '1__open' precisa ser do tipo 'Real'
validator.expect_column_values_to_be_of_type(column='1__open', type_='REAL') 
#expectativa de que a coluna '5__volume' precisa ser do tipo 'Integer'
validator.expect_column_values_to_be_of_type(column='5__volume', type_='INTEGER')

#expectativa de valores esperados entre 0 e 1000 para a coluna 'diff_high_low'
validator.expect_column_values_to_be_between(
    column="diff_high_low",
    min_value=0,
    max_value=1,
)

#expectativa de valores esperados entre 0 e 100000 para a coluna '5__volume'
validator.expect_column_values_to_be_between(
    column="5__volume",
    min_value=0,
    max_value=1,
)

In [42]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [43]:
checkpoint = context.add_or_update_checkpoint(
    name="checkpoint_silver_multiple_query",
    validator=validator
)

In [None]:
checkpoint_result = checkpoint.run()

In [45]:
context.build_data_docs()

{'local_site': 'file://C:\\Users\\ljsmo\\AppData\\Local\\Temp\\tmpyrdsa0jg\\index.html'}