<a href="https://colab.research.google.com/github/b-guedes/Projetos-Portfolio/blob/main/C%C3%B3pia_de_Project_%7C_Data_Pipeline_with_Telegram_AWS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![](https://community.nasscom.in/sites/default/files/styles/960_x_600/public/media/images/Nasscom%20poster%20%284%29.png?itok=b81vGQap)

<a id = "tabela-conteudo"></a>
# ÍNDICE

- [INTRODUÇÃO](#intro)
    - [1. Um projeto de Pipeline de dados](#headings)
- [CONTEXTO](#contexto)
    - [2. Pipeline de dados](#pipeline-de-dados)
    - [2.1. Extração dos dados](#extracao-dos-dados)
    - [2.2. Dados transacionais e analíticos](#dados-trans-e-ana)
- [ARQUITETURA](#arquitetura)
    - [3. Ingestão de dados](#ingestao-de-dados)
        - [3.1. Amazon Web Services](#amazon-web-services)
            - [3.1.1. AWS S3](#aws-s3)
            - [3.1.2. AWS Lambda](#aws-lambda)
            - [3.1.3. AWS API Gateway](#aws-api-gateway)
        - [3.2. Telegram](#telegram)
        - [3.3 ETL](#etl)
            - [3.3.1. AWS S3](#aws-s3-2)
            - [3.3.2. AWS Lambda](#aws-lambda-2)
            - [3.3.3. AWS Event Bridge](#aws-event-bridge)
- [4. Apresentação dos dados](#apresentacao-dos-dados)
    - [4.1. AWS Athena](#aws-athena)
    - [4.2. Análise dos dados](#analise-dos-dados)
- [CONSIDERAÇÕES FINAIS](#consideracoes-finais)


<a id = "intro"></a>
# **INTRODUÇÃO**

<a id = "headings"></a>
## 1. **Um projeto de Pipeline de dados**

Este trabalho se propõe a executar um pipeline de dados do início ao final deste processo, ou seja, desde montar o meio de coleta dos dados até a sua transformação e análise.

Esse projeto é parte da formação de Analista de Dados da EBAC, e tem por objetivo agregar todos os conhecimentos do curso em um único projeto robusto. Ele irá abranger tanto as habilidades de um analista dos dados, como a modelagem de dados ou a organização de insights, quanto a construção de um ambiente em nuvem (através da AWS) capaz de coletar, armazenar e transformar dados, um papel comumente atribuído ao engenheiro de dados.

Por tanto, a proposta deste projeto não é propriamente esgotar este conhecimento, mas demonstrar capacidade em dialogar com diferentes níveis de uma organização que se proponha a trabalhar com dados.

<a id = "contexto"></a>
# **CONTEXTO**

![](https://www.collidu.com/media/catalog/product/img/5/b/5bb50fefef2484e06f095938298d0ebbd4714c9527ee027038704300392e1029/data-flow-pipeline-slide1.png)

<a id = "pipeline-de-dados"></a>
## 2. Pipeline de dados

Antes de detalhar o projeto, vamos retomar alguns conceitos para que os conceitos fiquem explicitados a todos os públicos. Começando com o que é pipeline de dados e por que ele é importante para o cenário do analista de dados e as organizações atualmente. Em uma definição simples sobre o pipeline, temos que nele

> (...) ocorre a ingestão de dados brutos de diversas origens de dados e, em seguida, o transporte deles para o armazenamento, que pode ser um data lake ou um data warehouse, com a finalidade de realizar análises. Antes de serem enviados a um repositório, os dados geralmente passam por algum tipo de processamento. Isso inclui transformações de dados, como filtragem, mascaramento e agregações, que garantem a integração e a padronização de dados apropriadas. Isso é particularmente importante quando o destino do conjunto de dados é um banco de dados relacional. Esse tipo de repositório de dados tem um esquema definido que requer alinhamento, ou seja, colunas e tipos de dados correspondentes, para atualizar os dados existentes com novos dados.
[IBM, 2023](https://www.ibm.com/br-pt/topics/data-pipeline)

De tal forma, pode ser entendido como os pipeline de dados vêm sendo uma solução cada vez mais utilizada para automatizar, assegurar escala repetitiva de dados, ingestão, transformação e atividades de integração organizacionais. Uma arquitetura bem projetada e adequada de um pipeline de dados pode qualificar e acelerar significativamente a disponibilidade de dados para o consumo das equipes especializadas em uma organização.

Assim sendo, vamos aproximar o foco para o nosso projeto e detalhar alguns pontos que dizem respeito ao pipeline que iremos construir aqui. Como vimos acima, os dados podem ser coletados de diversas formas, como, por exemplo, a raspagem de dados em sites, análise de documentos, extração de texto, extração baseada em API, etc.

<a id = "extracao-dos-dados"></a>
### 2.1. Extração dos dados

Neste projeto será utilizada a técnica de extração baseada em uma API. Esta será toda construída no ambiente da ***Amazon Web Services*** **(AWS)** e integrada a um chatbot do aplicativo de mensagens ***Telegram***.

O uso do chatbot se justifica por ser uma ferramenta bastante utilizada atualmente no âmbito empresarial, principalmente para a comunicação com os consumidores. Essa interface permite agilizar processos com protocolos bem estabelecidos em que um aplicativo simula e processa uma conversação humana para diversos fins.

> Os chatbots podem ser tão simples quanto programas rudimentares que respondem a uma consulta simples com uma resposta de linha única ou tão sofisticados quanto assistentes digitais que aprendem e evoluem para fornecer níveis crescentes de personalização à medida que coletam e processam informações.[Oracle, 2023](https://www.oracle.com/br/chatbots/what-is-a-chatbot/)

A utilização de chatbots traz diversos benefícios e usabilidades bastante abrangentes e importantes para os dias atuais. Eles podem aumentar a eficiência operacional e a economia de custos, ao mesmo tempo que simplificam algumas interações mais básicas que clientes possam ter. Além disso, a possibilidade de escalar, personalizar e resolver problemas é importante para o tempo atual.

Neste sentido, o projeto usa desta ferramenta para coletar mensagens de um grupo do Telegram para simular uma interação e a capacidade de gravar e transmitir as informações para as próximas etapas.

<a id = "dados-trans-e-ana"></a>
## 2.2. Dados transacionais e analíticos

Uma vez que foi determinado como o projeto irá coletar os dados, é, então, importante distinguir duas etapas que decorrem após este processo, que são os *dados transacionais* e os *dados analíticos*.

O primeiro tem como definição acompanhar as interações relacionadas às atividades de uma organização que podem ser
> (...) transações comerciais, como pagamentos recebidos de clientes, pagamentos feitos a fornecedores e movimentação de produtos pelo inventário, pedidos feitos ou serviços entregues. Eventos transacionais, que representam as transações em si, normalmente contêm uma dimensão temporal, alguns valores numéricos e referências a outros dados. [Microsoft, 2023](https://learn.microsoft.com/pt-br/azure/architecture/data-guide/relational-data/online-transaction-processing)

Assim, esses dados geralmente apresentam transições diárias e em um alto volume, por estas características os dados transacionais podem ser vistos como os dados brutos de uma operação, ou seja, são eles que contêm toda a gama de informações presente em uma organização - dos produtos e inventário à venda e recebimento.

Por sua vez, os dados analíticos podem ser entendidos como derivados dos transacionais, isto porque esses dados são um conjunto de informações coletadas e organizadas de forma estruturada para análises que permitam serem criados insights valiosos para as organizações.

Este trabalho irá se ater a duas frentes, primeiro consumindo os dados transacionais do chatbot e, posteriormente, transformando e analisando os dados na plataforma da AWS para, assim, apresentar alguns possíveis insights extraídos desses dados.

<a id = "arquitetura"></a>
# ARQUITETURA

![](https://github.com/andre-marcos-perez/data-pipeline-demo/blob/main/docs/image/architecture.png?raw=true)

A forma mais simples e comum de explicar a arquitetura de um pipeline de dados é justamente com a comparação ao sistema de tubulação de água (pipeline em inglês!). A ideia por trás do sistema montado é que, assim como uma tubulação leva a água de um local (um reservatório) para o outro (torneira de uma casa) por meio de um caminho determinado, fazer com que os dados sigam esse mesmo padrão, ou seja, os dados são levados de um reservatório (local onde os dados brutos são coletados), sofrem alterações e são salvos em um destino escolhido. A seguir serão explicitados os principais componentes da arquitetura aqui planejada.

<a id = "ingestao-de-dados"></a>
## 3. Ingestão de dados

A etapa de ingestão dos dados acontece quando os dados transacionais passam por ambientes analíticos. De maneira geral, o dado ingerido é transportado no formato mais próximo do original, ou seja, nenhuma transformação é realizada em seu conteúdo ou estrutura (schema).
Neste projeto, as mensagens capturadas pelo bot serão ingeridas através da API web de bots do Telegram, as quais são fornecidas no formato JSON. Como o Telegram retem mensagens por apenas 24h em seus servidores, a ingestão via [streaming](https://aws.amazon.com/pt/what-is/streaming-data/) é a mais indicada.
Para que esse tipo de ingestão seja possível, vamos utilizar um webhook (gancho web), ou seja, vamos redirecionar as mensagens automaticamente para outra API web - o [AWS API Gateway](https://docs.aws.amazon.com/pt_br/apigateway/latest/developerguide/welcome.html).

<a id = "amazon-web-services"></a>
### 3.1. Amazon Web Services

Todos os dados devem ser armazenados em algum lugar e a escolha de como isso deve ser feita também passa pelo projeto, existem hoje diversos serviços capazes de oferecer armazenamento de dados como, por exemplo, [Oracle Cloud](https://www.oracle.com/br/cloud/storage/), [Google Cloud](https://cloud.google.com/products/storage?hl=pt-br), [IBM Cloud Storage](https://www.ibm.com/br-pt/storage?lnk=flathl), [Microsoft Azure](https://azure.microsoft.com/pt-br/products/azure-sql/database/), [AWS Storage](https://aws.amazon.com/pt/products/storage/?nc2=h_ql_prod_st_s), entre outros. Todas essas plataformas oferecem serviços robustos e escaláveis para implementar um projeto de pipeline neles, por isso a escolha de cada um depende da organização e seus objetivos. Neste projeto, utilizaremos a plataforma da Amazon, a **AWS**.

Tendo definido onde os dados serão armazenados e ingeridos, devemos configurar o serviço para recebê-los.

<a id = "aws-s3"></a>
#### 3.1.1. AWS S3

Tendo definido onde os dados serão armazenados e ingeridos, devemos configurar o serviço para recebê-los.
Na plataforma da AWS os locais de armazenamento de dados são conhecidos como buckets e se encontram no serviço denominado AWS S3, neles podemos particionar os dados coletados conforme seja do interesse da organização como, por exemplo, pela fonte dos dados ou os dias em que foram coletados.
Durante a etapa de ingestão, o AWS S3 tem a função de armazenar passivamente as mensagens captadas pelo bot do Telegram no seu formato original: JSON, ou seja, elas não passam por transformações para serem armazenadas (são os dados transacionais do projeto). Dessa forma, a organização pode atualizar quaisquer dados históricos caso precise fazer ajustes nas tarefas de processamento de dados no futuro.

<a id = "aws-lambda"></a>
#### 3.1.2 AWS Lambda

Tendo um local para armazenar os dados, na AWS é necessário criar uma função na ferramenta *AWS Lambda* com um código capaz de persistir os dados do bot do Telegram para os buckets. Para tanto vamos criar uma função que opera da seguinte forma:

 - Recebe a mensagem no parâmetro `event`;
 - Verifica se a mensagem tem origem no grupo do **Telegram** correto;
 - Persiste a mensagem no formato JSON no *bucket* do `AWS S3`;
 - Retorna uma mensagem de sucesso (código de retorno HTTP igual a 200) a API de *bots* do **Telegram**.


Esses dados serão recebidos e armazenados no formato JSON dentro dos *buckets* de destino. Além disso, são criadas variáveis de ambiente ("AWS_S3_BUCKET" e "TELEGRAM_CHAT_ID") e adicionadas permissões (no *AWS IAM*) de escrita para a interação com *AWS S3* e *AWS Lambda*.
O código para escrever a função no ***AWS Lambda*** é o que segue:


In [None]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve,
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')

  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")

<a id = "aws-api-gateway"></a>
#### 3.1.3. AWS API Gateway

Com a função Lambda capaz de ingerir os dados, a próxima etapa é criar uma API no *AWS API Gateway* que a conecte à função do AWS Lambda.
Na etapa de ingestão, o *AWS API Gateway* tem a função de receber as mensagens captadas pelo bot do Telegram, enviadas via webhook, e iniciar uma função do *AWS Lambda*, passando o conteúdo da mensagem no seu parâmetro event. Para tanto, vamos criar uma API e configurá-la como gatilho da função do *AWS Lambda*:

 - Acesse o serviço e selecione: *Create API* -> *REST API*;
 - Insira um nome, como padrão, um que termine com o sufixo `-api`;
 - Selecione: *Actions* -> *Create Method* -> *POST*;
 - Na tela de *setup*:
  - Selecione *Integration type* igual a *Lambda Function*;
  - Habilite o *Use Lambda Proxy integration*;
  - Busque pelo nome a função do `AWS Lambda`.
Isso gerara uma URL que usaremos na variável "aws_api_gateway_url" no código utilizado na etapa de ingestão.

<a id = "telegram"></a>
### 3.2. Telegram

As fontes de dados para um projeto de pipeline podem ser encontradas em muitos lugares, e os próprios dados podem ser de muitas formas. Neste projeto, como foi supracitado, utilizaremos as mensagens trocadas entre usuários e um chatbot do aplicativo de mensagens [Telegram](https://telegram.org/) - a escolha por esse aplicativo se deve pela facilidade com que se pode criar e gerenciar um bot de forma segura e interativa nele.

Para criar o bot no aplicativo, o projeto se baseou na documentação de API do Telegram que pode ser acessada [neste](https://core.telegram.org/bots/api) link. A seguir serão detalhados alguns pontos dessa configuração:

![The Botfather](https://core.telegram.org/file/811140934/1/tbDSLHSaijc/fdcc7b6d5fb3354adf)
* O bot do Telegram é chamado de *The BotFather*;
* Através dele é possível criar um bot com poucos e simples comandos, como mostrado a seguir e utilizado neste projeto:
  * 1) No chat com o BotFather digitar: /start
  * 2) Depois digitar: /newbot
  * 3) Então digitar o nome do bot: < nome_do_bot >
  * 4) Após, digitar o nome de usuário do bot: < nickname_para_o_bot >
  * 5) E por último ativamos o bot abrindo o chat com ele e digitando: /start
* Uma vez criado, o bot gera uma chave única de acesso a ele, a qual será utilizada para a etapa de ingestão de dados e acesso ao conteúdo em que o bot estará responsável por registrar;
* Ainda dentro do programa Telegram, foram feitas configurações para garantir as boas práticas no uso de bots. Primeiro deixando exposto na descrição do grupo que todas as mensagens são gravadas, depois tornando o bot um administrador do grupo do qual ele seja designado e, por fim, configuramos o bot para que ele não possa ser adicionado a outros grupos.

Tendo sido explicitada a configuração utilizada do bot é possível acessar o conteúdo que ele capta via API, para isso o projeto monta uma URL base e usa alguns métodos URL para interagir com o bot - configurar um webhook através do método *setWebhook* da API do bot do **Telegram** para interagir com os serviços **AWS** que serão detalhados posteriormente. A seguir são apresentados os códigos em Python para isto:

In [None]:
# informando a chave única gerada pelo bot no Telegram
token = getpass()

In [None]:
#criando uma URL para o bot - comum a todos os métodos do API
base_url = f'https://api.telegram.org/bot{token}'

In [None]:
#criando a URL criada na AWS
aws_api_gateway_url = getpass()

In [None]:
#algumas informações sobre a configuração do bot
response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')

print(json.dumps(json.loads(response.text), indent=2))

In [None]:
#mostrar as informações das mensagens gravadas pelo bot, em formato JSON
response = requests.get(url=f'{base_url}/getWebhookInfo')

print(json.dumps(json.loads(response.text), indent=2))

Após essas etapas, temos os arquivos brutos coletados do Telegram armazenados e particionados no serviço de nuvem da AWS e podem, então, seguir para a próxima etapa do projeto: extração, transformação e carregamento.

<a id = "table-of-content"></a>
### 3.3 ETL

Após coletar os dados e tê-los armazenados em um repositório central (também conhecido como data warehouse), é o momento de extrair esses dados brutos, depois limpá-los e organizá-los para serem mais adequados às análises e, então, carregá-los em um banco de dados de destino. A sigla **[ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load)** representa estas etapas e significa *Extract, Transform and Load* (em português, Extração, Transformação e Carregamento). Abaixo temos graficamente como esse processo ocorre:

![Como funciona o processo de ETL?](https://d1.awsstatic.com/whatisimg/Fig1-etljob.feff8a73afe5fbbdb8ebb2f8255c1147deda6106.png)


Neste projeto, as mensagens captadas pelo bot em um único dia, serão persistidas na camada crua (como explicitado anteriormente), após isto, elas serão compactadas em um único arquivo. Este será orientado à coluna e comprimido, por fim, ele será persistido em uma camada incremental.

<a id = "aws-s3-2"></a>
### 3.3.1. AWS S3

Devemos novamente criar um bucket na AWS S3, mas desta vez ele irá receber os arquivos que forem transformados na etapa de ETL. Como boas práticas, esses locais são chamados de *raw* e *enriched* ou *bronze, silver e gold*, aqui utilizaremos raw e enriched.

<a id = "aws-lambda-2"></a>
#### 3.3.2. AWS Lambda

Este Lambda irá ativamente processar as mensagens coletadas pelo bot e irá persistir para os buckets enriched e fazer a etapa de data wrangling e manipulação dos dados. Para que os processos tenham uma [melhor performasse computacional](https://www.databricks.com/br/glossary/what-is-parquet), os dados serão transformados e armazenados em uma tabela no PyArrow no formato Parquet - do projeto [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Parquet). OS códigos para isso são os que seguem:

Criando a função Lambda da segunda camada dos dados (enriched) que irá ingerir os dados na AWS.

In [None]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:

  '''
  Diariamente é executado para compactar as diversas mensagensm, no formato
  JSON, do dia anterior, armazenadas no bucket de dados cru, em um único
  arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
  '''

  # vars de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  # código principal

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None

      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True

  except Exception as exc:
      logging.error(msg=exc)
      return False

Criando a função do data wrangling usada na função anterior.

In [None]:
def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data

<a id = "aws-event-bridge"></a>
#### 3.3.3. AWS Event Bridge

Por fim, configuramos o AWS Event Bridge para que ele carregue todos os dias em determinado horário e o código do Lambda seja executado, funcionando assim como um [scheduler](https://aws.amazon.com/pt/eventbridge/scheduler/). O funcionamento do AWS Event Bridge é representado conforme a seguinte imagem:
![Event Bridge](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Scheduler.ab2cc1a1c0f233a4e1e4c5829a0d6c5fc23d9586.png)

<a id = "apresentacao-dos-dados"></a>
## 4. Apresentação dos dados

A última etapa do projeto é onde os dados coletados são entregues para os usuários (analistas, cientistas, etc) e sistemas (dashboards, motores de consulta, etc) para que eles sejam utilizados. Este projeto opta por utilizar a linguagem em [SQL](https://pt.wikipedia.org/wiki/SQL) pelo seu fácil e abrangente uso dentro do mercado de dados. Além disso, nesta etapa, é importante entregar os dados de forma refinada, para que as consultas não precisem ser muito custosas, assim os dados são, também, mais consistentes - por terem de lidar com menos dados desnecessários.
Nesta etapa, iremos construir uma tabela com os dados de maior interesse na ferramenta AWS Athena e, após esta etapa, poderão ser realizadas as consultas analíticas desejadas. A seguir será demonstrado como isto foi feito.

<a id = "aws-athena"></a>
### 4.1. AWS Athena

Na etapa de apresentação, o AWS Athena tem função de entregar os dados por uma interface SQL para os usuários do sistema analítico. Para criar a interface, basta criar uma tabela externa sobre o dado armazenado na camada mais refinada da arquitetura, a camada enriquecida. A construção da tabela foi feita como segue:

```sql
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket-enriquecido>/'
```

Como o monitoramento foi feito em mais de um dia, devemos adicionar as diferentes partições dos dados a uma única tabela, para isso é preciso usar:

Para adicionar todas as partições:

```sql
MSCK REPAIR TABLE `telegram`;
```

Ou para determinar quais partições serão adicionadas:

```sql
ALTER TABLE <nome-tabela> ADD PARTITION <coluna-partição> = <valor-partição>
```

Com a tabela construida podemos ver um pouco melhor como são os dados utilizados, eles podem ser descritos desta maneira:

| chave | tipo valor | opcional | descrição |
| -- | -- | -- | -- |
| message_id | int | não | id da mensagem enviada ao grupo |
| updated_id | int | não | id da mensagem enviada ao **bot** |
| user_id | int | sim | id do usuário que enviou a mensagem |
| user_is_bot | bool | sim | se o usuário que enviou a mensagem é um **bot** |
| user_first_name | str | sim | primeiro nome do usário que enviou a mensagem |
| chat_id | int | não | id do *chat* em que a mensagem foi enviada |
| chat_type | str | não | tipo do *chat*: private, group, supergroup ou channel |
| text | str | sim | texto da mensagem |
| date | int | não | data de envio da mensagem no formato unix |


<a id = "analise-dos-dados"></a>
### 4.2. Análise dos dados

Neste momento temos duas opções para seguir com o projeto, sendo elas também cumulativas, continuamos na linguagem SQL e realizamos consultas através disto, ou criamos um arquivo para analisar em outro local, como um ambiente em Python ou de visualização de dados, como o Power BI. Não tendo aqui o objetivo de esgotar as possibilidades, colocaremos apenas alguns exemplos de como isso pode ser feito.

Utilizando consultas básicas como:
* O total de mensagens por usuário

```sql
SELECT user_id, COUNT(1) AS amount_msg
FROM telegram
GROUP BY user_id;
```

Utilizando função e agregação de datas como:
* A quantidade de dias entre a primeira e a última mensagem de cada usuário

```sql
SELECT user_id, EXTRACT(DAY FROM (MAX(context_date) - MIN(context_date))) AS "days_between"
FROM telegram
GROUP BY user_id;
```


* O total de mensagens de cada usuário, agrupado por dia

```sql
SELECT
  user_id,
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY
  user_id,
  context_date
ORDER BY context_date DESC
```

Usando agrupamento e agregação como:
* O total de caracteres das mensagens por dia, de cada usuário.

```sql
SELECT
  user_id,
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
  user_id,
  context_date
ORDER BY context_date DESC
```

Usando Common Table Expression (CTE) e extração de data/hora:
* Mensagens agrupadas por hora e dias da semana

```sql
WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday
ORDER BY
    parsed_date_weekday
```

Enfim, esses são apenas alguns exemplos de como podem ser extraídas informações valiosas de dados através da consulta com SQL, as possibilidades são infinitas e devem ser adequadas aos objetivos dos usuários e das organizações.

É interessante destacar ainda que é possível extrair qualquer resultado para o formato CSV e assim utilizá-lo em outras plataformas também.

Como exemplo, podemos demonstrar que, ao consultarmos a tabela inteira através da seguinte expressão:

```sql
SELECT *
FROM telegram;
```

Com a consulta feita, podemos salvar o arquivo no formato CSV e assim importá-lo para outra plataforma, como, por exemplo, o Power BI - como no modelo semântico seguir:

![](https://github.com/b-guedes/ebacFIGS/blob/main/consulta8.png?raw=true)

Um exemplo de dashboard que poderia ser criado:

![image](https://github.com/b-guedes/ebacFIGS/blob/main/powerbi_exemplo.png?raw=true)


O objetivo deste projeto não é se aprofundar nas possibilidades de extração dos insights ou apresentação dos dados, mas demonstrar como eles poderiam seguir por esse caminho e gerar resultados que interessem os usuários e organizações do pipeline de dados.

<a id = "consideracoes-finais"></a>
# Considerações finais

Ao final desse projeto, ficou demonstrado como os dados podem ser usados hoje em dia pelas organizações e como eles estão cada vez mais acessíveis, tanto com a profusão de ferramentas e plataformas existentes como com a redução da complexidade em implementar essas estruturas.

Em um contexto de organizações que o volume de dados seja muito grande, é de se pressupor que a complexidade desses projetos cresça e também o envolvimento de mais especialistas em cada área, como os engenheiros de dados, cientistas de dados e os analistas de dados, cada um com uma função específica e necessária em uma grande organização. Ainda assim, é importante que alguém na ponta desse processo tenha em mente como, em linhas gerais, o dado chega para ser analisado e este era o objetivo deste projeto.

Por fim, é de se destacar como a documentação para aplicação de cada etapa é muito divulgada e de fácil acesso, disponibilizada pelas próprias plataformas para que os usuários possam implementar as tecnologias da melhor forma possível. A intuitividade na montagem de projetos assim parece ser uma tendência em um mundo de crescente aperfeiçoamento das IAs ou de montagem de sistemas complexos de Machine Learn com técnicas simples, como no projeto [Visual Blocks](https://visualblocks.withgoogle.com/) da Google.