# Full Data Process | Data Engineer and Data Analysis

**Objetivo:** Criar um processo completo de dados com pipeline, tabelas e vizualizador 

![dagster logo](resources/images/6282a6a9dd6a38c040c0caea_dagster.png)
## Dagster - Orquestrador de Dados

### Pipeline
- covid19_vac_pipe

### Tasks
- get_data_vac_covid19
- modify_covid19_vac_df
- load_covid19_vac_to_s3

![dagster | pipe](resources/images/dagit_pt_11.png)
***

![dagster | lauch](resources/images/dagit_pt_2.png)
***

![dagster | run](resources/images/dagit_pt_3.png)

### Code

### get_data_vac_covid19:

~~~python
@op(config_schema={
    "size": Field(int, is_required=False, default_value=10000),
    "scroll": Field(bool, is_required=False, default_value=True, description="Use Paginator"),
    "UF": str,
    "date": Field(
        str,
        is_required=False,
        description="Ex. 2022-11-08"
    )
})
def get_data_vac_covid19(context):
    size = context.op_config['size']
    scroll = context.op_config['scroll']
    UF = context.op_config.get('UF') # noqa
    date = context.op_config.get('date')

    url = "https://imunizacao-es.saude.gov.br/_search"
    auth = ("imunizacao_public", "qlto5t&7r_@+#Tlstigi")  # noqa
    headers = {'Content-Type': 'application/json'}
    params = {"scroll": "1m"}

    if date is None:
        date = dt.date.today().strftime("%Y-%m-%d")

    data = json.dumps(
        {
            "query": {
                "bool": {
                    "must": [
                        {"match": {"estabelecimento_uf": UF.upper()}},
                        {"match": {"@timestamp": date}}
                    ],
                }
            },
            "sort": [
                {"@timestamp": {"order": "asc", "format": "epoch_millis"}}
            ],
            "size": size
        }
    )

    lst_dfs = []
    while True:
        data_json = requests.request('POST', url=url, auth=auth, data=data, params=params, headers=headers).json()

        if scroll:
            scroll_id = data_json.get('_scroll_id')
            data = json.dumps({"scroll_id": scroll_id, "scroll": "1m"})
            params = {}
            url += "/scroll"
        else:
            return pd.json_normalize(data_json['hits'], sep='_', record_path='hits').astype("object")

        if data_json.get('hits', None) is None:
            break

        df = pd.json_normalize(data_json['hits'], sep='_', record_path='hits')
        lst_dfs.append(df)

    try:
        df_concated = pd.concat(lst_dfs).reset_index(drop=True).astype("object")
    except ValueError:
        raise "No objects to concatenate, Verify scroll option in config schema"

    return df_concated
~~~

### modify_covid19_vac_df:
~~~python
@op
def modify_covid19_vac_df(df):
    columns_names = list(df)
    refactor_columns = [
        *map(
            lambda string: string[1:].replace("source_", "").replace("@", "")
            if string.startswith("_") else string, columns_names
        )]

    df.columns = refactor_columns

    df['uf'] = df['estabelecimento_uf']
    df['paciente_idade'] = df['paciente_idade'].astype('float')

    date = dt.datetime.strptime(df['timestamp'][0], "%Y-%m-%dT%H:%M:%S.%fZ")
    df['year'] = date.date().strftime("%Y")
    df['month'] = date.date().strftime("%m")
    df['day'] = date.date().strftime("%d")
    return df
~~~

### load_covid19_vac_to_s3:
~~~python
@op(required_resource_keys={"s3_resource"})
def load_covid19_vac_to_s3(context, df):
    s3 = context.resources.s3_resource()

    date = dt.datetime.strptime(
        df['timestamp'][0], "%Y-%m-%dT%H:%M:%S.%fZ"
    ).date()

    parquet_buffer = io.BytesIO()
    df.to_parquet(parquet_buffer, engine='pyarrow', index=False)
    buffer = io.BytesIO(parquet_buffer.getvalue())

    s3.upload_file(
        file=buffer,
        key=f'covid19-vac/'
            f'uf={df["uf"][0]}/'
            f'year={date.year}/'
            f'month={date.strftime("%m")}/'
            f'day={date.strftime("%d")}/'
            f'data.parquet'
    )

    yield Output(None)
~~~

![s3 logo](resources/images/s3_logo.png)

## AWS S3 - Object Storage | Datalake

![s3 datalake](resources/images/s3_datalake.png)

![athena logo](resources/images/athena_logo.png)

## AWS Athena - Data Catalog

- **Databases:** staging e final
- **Table:** covid19_vac_raw
- **View:** covid19_vac_sp_view

**Queries**

**Create Table**
~~~sql
CREATE EXTERNAL TABLE covid19_vac_raw(
  `index` string,
  `type` string,
  `id` string,
  `score` string,
  `sort` string,
  `estalecimento_nofantasia` string,
  `vacina_lote` string,
  `estabelecimento_municipio_codigo` string,
  `estabelecimento_valor` string,
  `vacina_nome` string,
  `ds_condicao_maternal` string,
  `paciente_endereco_copais` string,
  `version` string,
  `document_id` string,
  `paciente_nacionalidade_enumnacionalidade` string,
  `vacina_categoria_codigo` string,
  `vacina_fabricante_referencia` string,
  `paciente_id` string,
  `paciente_idade` double,
  `vacina_descricao_dose` string,
  `paciente_endereco_coibgemunicipio` string,
  `vacina_grupoatendimento_codigo` string,
  `data_importacao_datalake` string,
  `paciente_racacor_codigo` string,
  `estabelecimento_uf` string,
  `estabelecimento_razaosocial` string,
  `vacina_numdose` string,
  `timestamp` string,
  `sistema_origem` string,
  `paciente_datanascimento` string,
  `paciente_endereco_uf` string,
  `vacina_fabricante_nome` string,
  `paciente_endereco_cep` string,
  `id_sistema_origem` string,
  `status` string,
  `paciente_endereco_nmpais` string,
  `vacina_categoria_nome` string,
  `paciente_endereco_nmmunicipio` string,
  `estabelecimento_municipio_nome` string,
  `vacina_codigo` string,
  `paciente_enumsexobiologico` string,
  `dt_deleted` string,
  `co_condicao_maternal` string,
  `vacina_grupoatendimento_nome` string,
  `paciente_racacor_valor` string,
  `vacina_dataaplicacao` string,
  `data_importacao_rnds` string
  )
partitioned by (uf string, year string, month string, day string)
STORED AS parquet
LOCATION 's3://guhls-lake/covid19-vac'
~~~

**Create metadados of partitions**
~~~sql
MSCK REPAIR TABLE staging.covid19_vac_raw;
~~~

**Create View**
~~~sql
CREATE OR REPLACE VIEW "covid19_vac_sp_view" AS 
SELECT
  vacina_nome
, vacina_fabricante_nome
, "date"("date_parse"(vacina_dataaplicacao, '%Y-%m-%dT%T.%fZ')) vacina_dataaplicacao
, estalecimento_noFantasia
, estabelecimento_razaosocial
, estabelecimento_municipio_nome
, paciente_idade
, TRY_CAST(paciente_dataNascimento AS date) paciente_dataNascimento
, CAST(paciente_enumSexoBiologico AS char(1)) paciente_enumSexoBiologico
, paciente_racaCor_valor
, paciente_endereco_uf
, paciente_endereco_nmMunicipio
FROM
  staging.covid19_vac_raw
WHERE (estabelecimento_uf = 'SP')
~~~

![superset logo](resources/images/superset_logo.png)

## Apache superset - Vizualizador de dados

**Aplicação em produção:**  <a href='superset.guhls.com.br:8088' target='blank'> superset.guhls.com.br </a>   

- **Dashboard:** covid19 | insights
- **Charts:** 
    - % de vacinas aplicadas por nome da vacina | covid19
    - soma das vacinas aplicadas por idade | covid19
    - quantidade de vacinas aplicadas por município - top 10 | covid19

![dash covid19 | insights filter](resources/images/superset_dash_pt_1.png)

![dash covid19 | insights pt.1](resources/images/superset_dash_pt_2.png)
    
![dash covid19 | insights pt.2](resources/images/superset_dash_pt_3.png)
