# Pipeline de dados Telegram

Por [Josué Morfim](https://www.linkedin.com/in/josu%C3%A9-morfim/)


--------

# Sumário

1.  Contexto
2.  Telegram
3.  Ingestão
4.  ETL
5.  Apresentação
-------------

# 1 - Contexto

## 1.1 - Chatbot


Um chatbot é um programa de computador projetado para simular conversas com seres humanos por meio de mensagens de texto ou voz. Os chatbots são frequentemente utilizados em diversas aplicações, como atendimento ao cliente, suporte técnico, vendas, entretenimento e educação.

Os chatbots podem ser implementados em várias plataformas, incluindo sites, aplicativos de mensagens instantâneas, redes sociais e assistentes virtuais. Eles oferecem uma maneira conveniente e eficiente de interagir com os usuários, fornecendo informações, realizando tarefas específicas e até mesmo oferecendo suporte em tempo real, sem a necessidade de intervenção humana direta.

## 1.2 - Telegram

Telegram é uma plataforma de mensagens instantâneas freeware (distribuído gratuitamente) e, em sua maioria, open source. É muito popular entre desenvolvedores por ser pioneiro na implantação da funcionalidade de criação de chatbots, que, por sua vez, permitem a criação de diversas automações.

## 1.3 - Arquitetura

Uma atividade analítica de interesse é a de realizar a análise exploratória de dados enviadas a um chatbot para responder perguntas como:

* Qual o horário que os usuários mais acionam o bot?
* Qual o problema ou dúvida mais frequente?
* O bot está conseguindo resolver os problemas ou esclarecer as dúvidas?
* Etc.


Portanto, vamos construir um pipeline de dados que ingira, processe, armazene e exponha mensagens de um grupo do Telegram para que profissionais de dados possam realizar análises. A arquitetura proposta é dividida em três: transacional, no Telegram, onde os dados são produzidos, a parte de ingestão e ETL na Amazon Web Services (AWS), e por fim  a apresentação que será usado o AWS Athena para consultas SQL e o Power BI para um Dashboard interativo.


![projeto telegram 2](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/4c08838a-44b3-4bde-8c60-d2d0f27637d4)


## Telegram
O Telegram representa a fonte de dados transacionais. Mensagens enviadas por usuários em um grupo são capturadas por um bot e redirecionadas via webhook do backend do aplicativo para um endpoint (endereço web que aceita requisições HTTP) exposto pelo AWS API Gateway. As mensagens trafegam no corpo ou payload da requisição.

## Ingestão de Dados
Uma requisição HTTP com o conteúdo da mensagem em seu payload é recebia pelo AWS API Gateway que, por sua vez, as redireciona para o AWS Lambda, servindo assim como seu gatilho. Já o AWS Lambda recebe o payload da requisição em seu parâmetro event, salva o conteúdo em um arquivo no formato JSON (original, mesmo que o payload) e o armazena no AWS S3 particionado por dia.

## Processamento ETL
Uma vez ao dia, o AWS Event Bridge aciona o AWS Lambda que processa todas as mensagens do dia anterior (atraso de um dia ou D-1), denormaliza o dado semi-estruturado típico de arquivos no formato JSON, salva o conteúdo processado em um arquivo no formato Apache Parquet e o armazena no AWS S3 particionado por dia.

## Análise e Apresentação
Na fase final, os dados processados estarão prontos para análise. Será criada uma tabela no AWS Athena para realizar consultas SQL sobre os dados armazenados no bucket S3 com sufixo "-enriched". Além disso, utilizaremos o Power BI para criar um dashboard interativo, fornecendo uma visualização intuitiva e abrangente dos insights obtidos a partir das mensagens do grupo do Telegram.

# 2 - Telegram

O Telegram representa a fonte transacional de dados do nosso pipeline de dados. Nesta etapa, vamos criar um grupo, criar um bot e adiciona-lo ao grupo recém criado. O bot então captará todas as mensagens enviadas no grupo. As mensagens pode ser acessadas através da API (application programming interface) de bots dos Telegram (documentação neste [link](https://core.telegram.org/bots/api)).

## 2.2 - Bot

Para criar um bot:

1. Abra o chat com o BotFather;
2. Digite /newbot;
3. Digite o nome do bot;
4. Digite o nome de usuário do bot (precisa terminar com sufixo _bot);

> Salve o token de acesso a API HTTP em um local seguro.

Por fim, precisamos ativar o bot.

Abra o chat com o bot;

Selecione **start**.





## 2.3 - Grupo

**Para criar um novo grupo.**

1. Aperte o botão com o ícone de um lápis;
2. Selecione New Group;
3. Busque e selecione o bot recém criado pelo seu nome;
4. Aperte o botão com o ícone de uma seta;
5. Digite o nome do grupo.


![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/c2e3d6dd-2ae5-4cb1-9010-d7d3806f261e)


**Com o grupo criado, vamos adicionar o bot como administrador para que ele possa receber todas as mensagens do grupo. Uma outra opção seria desabilitar o seu modo de privacidade.**

1. Abra o chat do grupo recém criado;
2. Abra o perfil do grupo;
3. Aperte o botão com o ícone de um lápis;
4. No campo de descrição do grupo escreva: Atenção, todas as mensagens são armazenadas pelo bot do grupo;
5. Selecione Administrators;
6. Aperte o botão com o ícone de um usuário;
7. Selecione o bot.
8. Aperte o botão com o ícone de um check.

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/b36995d2-4ac9-4917-9952-47f68ca34f16)

**Por fim, vamos configurar o bot para que ele não possa ser adicionado a outros grupos.**

1. Abra o chat com o BotFather;
2. Digite /mybots;
3. Selecione o bot pelo seu nome de usuário;
4. Selecione Bot Settings;
5. Selecione Allow Groups?;
6. Selecione Turn groups off.

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/1cceab3d-20d2-4498-ae3b-cd1c053867b9)

**Com tudo pronto, envie algumas mensagens no grupo.**

## 2.4 - Bot API

As mensagens captadas por um bot podem ser acessadas via API. A única informação necessária é o token de acesso fornecido pelo BotFather na criação do bot.

> **Nota:** A documentação completa da API pode ser encontrada neste [link](https://core.telegram.org/bots/api)

A url base pe comum a todos os métodos da API.

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/a58839da-25fe-4cb7-abdb-349e04b6945e)

- **getMe**

O método getMe retorna informações sobre o bot.

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/5c921b22-1fbb-4639-99e5-d667b001e813)

 - **getUpdates**
 
 O método getMe retorna as mensagens captadas pelo bot.
 
 ![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/901c03b9-ed04-4f0f-a1c5-33aae38fca46)

# 3 - Ingestão

A etapa de **ingestão** é responsável, como seu o próprio nome diz, pela ingestão dos dados transacionais em ambientes analíticos. De maneira geral, o dado ingerido é persistido no formato mais próximo do original, ou seja, nenhuma transformação é realizada em seu conteúdo ou estrutura (*schema*). Como exemplo, dados de uma API *web* que segue o formato REST (*representational state transfer*) são entregues, logo, persistidos, no formato JSON.

> Persistir os dados em seu formato original trás muitas vantagens, como a possibilidade de reprocessamento.

Pode ser conduzida de duas formas:

 - **Batch**: blocos de dados são ingeridos em uma frequência bem definida, geralmente na escala de horas ou dias;
 - **Streaming**: dados são ingeridos conforme são produzidos e disponibilizados.
 
 No projeto, as mensagens capturadas pelo *bot* podem ser ingeridas através da API *web* de *bots* do **Telegram**, portanto são fornecidos no formato JSON. Como o **Telegram** retem mensagens por apenas 24h em seus servidores, a ingestão via **streaming** é a mais indicada. Para que seja possível esse tipo de **ingestão** seja possível, vamos utilizar um *webhook* (gancho *web*), ou seja, vamos redirecionar as mensagens automaticamente para outra API *web*.
 
 
 Sendo assim, precisamos de um serviço da AWS que forneça um API *web* para receber os dados redirecionados, o `AWS API Gateway` (documentação neste [link](https://docs.aws.amazon.com/pt_br/apigateway/latest/developerguide/welcome.html)). Dentre suas diversas funcionalidades, o `AWS API Gateway` permite o redirecionamento do dado recebido para outros serviços da AWS. Logo, vamos conecta-lo ao `AWS Lambda`, que pode sua vez, irá armazenar o dado em seu formato original (JSON) em um *bucket* do `AWS S3`.
 
 > Sistemas que reagem a eventos são conhecidos como *event-driven*.
 
 Portanto, precisamos:
 
 - Criar um *bucket* no `AWS S3`;
 - Criar uma função no `AWS Lambda`;
 - Criar uma API *web* no `AWS API Gateway`;
 - Configurar o *webhook* da API de *bots* do **Telegram**.
 
 

## 3.1 - AWS S3

Na etapa de **ingestão**, o `AWS S3` tem a função de passivamente armazenar as mensagens captadas pelo *bot* do **Telegram** no seu formato original: JSON. Para tanto, basta a criação de um *bucket*. Como padrão, vamos adicionar o sufixo `-raw` ao seu nome (vamos seguir esse padrão para todos os serviços desta camada).

> **Nota**: um `datalake` é o nome dado a um repositório de um grande volume dados. É organizado em zonas que armazenam replicadas dos dados em diferentes níveis de processamento. A nomenclatura das zonas varia, contudo, as mais comuns são: *raw* e *enriched* ou *bronze*, *silver* e *gold*.



## 3.2. AWS Lambda

Na etapa de **ingestão**, o `AWS Lambda` tem a função de ativamente persistir as mensagens captadas pelo *bot* do **Telegram** em um *bucket* do `AWS S3`. Para tanto vamos criar uma função que opera da seguinte forma:

 - Recebe a mensagem no parâmetro `event`;
 - Verifica se a mensagem tem origem no grupo do **Telegram** correto;
 - Persiste a mensagem no formato JSON no *bucket* do `AWS S3`;
 - Retorna uma mensagem de sucesso (código de retorno HTTP igual a 200) a API de *bots* do **Telegram**.
 
> **Nota**: No **Telegram**, restringimos a opção de adicionar o *bot* a grupos, contudo, ainda é possível iniciar uma conversa em um *chat* privado.
 
 O código da função no **[Link](https://github.com/JosueMorfim/pipeline-dados-telegram/blob/main/AWS%20Lambda%20raw.py)**

Para que a função funcione corretamente, algumas configurações precisam ser realizadas.

 - **Variáveis de ambiente**
 
 Note que o código exige a configuração de duas variáveis de ambiente: `AWS_S3_BUCKET` com o nome do *bucket* do `AWS S3` e `TELEGRAM_CHAT_ID` com o id do *chat* do grupo do **Telegram**. Para adicionar variáveis de ambiente em uma função do `AWS Lambda`, basta acessar configurações -> variáveis de ambiente no console da função.
 
> **Nota**: Variáveis de ambiente são excelentes formas de armazenar informações sensíveis.
 
 - **Permissão**
  
 Por fim, precisamos adicionar a permissão de escrita no *bucket* do `AWS S3` para a função do `AWS Lambda` no `AWS IAM`.

### **3.4 - AWS API Gateway**

Na etapa de **ingestão**, o `AWS API Gateway` tem a função de receber as mensagens captadas pelo *bot* do **Telegram**, enviadas via *webhook*, e iniciar uma função do `AWS Lambda`, passando o conteúdo da mensagem no seu parâmetro *event*. Para tanto vamos criar uma API e configurá-la como gatilho da função do `AWS Lambda`:

 - Acesse o serviço e selecione: *Create API* -> *REST API*;
 - Insira um nome, como padrão, um que termine com o sufixo `-api`;
 - Selecione: *Actions* -> *Create Method* -> *POST*;
 - Na tela de *setup*:
     - Selecione *Integration type* igual a *Lambda Function*;
     - Habilite o *Use Lambda Proxy integration*;
     - Busque pelo nome a função do `AWS Lambda`.


## 3.4 - Telegram

Vamos configurar o *webhook* para redirecionar as mensagens para a `url` do `AWS API Gateway`.

 - **setWebhook**
 
O método `setWebhook` configura o redirecionamento das mensagens captadas pelo *bot* para o endereço *web* do paramametro `url`.

> **Nota**: os métodos `getUpdates` e `setWebhook` são mutualmente exclusivos, ou seja, enquanto o *webhook* estiver ativo, o método `getUpdates` não funcionará. Para desativar o *webhook*, basta utilizar o método `deleteWebhook`.


![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/ddd7fed7-a0e5-4664-a9ef-3ace126705e3)



 - **getWebhookInfo**
 
 O método getWebhookInfo retorna as informações sobre o webhook configurado.
 
 
![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/6510e6b1-0569-40d0-8ae8-4cb57c676d5e)
 

# 4 - ETL

A etapa de **extração, transformação e carregamento** (do inglês *extraction, transformation and load* ou **ETL**) é uma etapa abrangente responsável pela manipulação dos dados ingeridos de sistemas transacionais, ou seja, já persistidos em camadas cruas ou *raw* de sistemas analíticos. Os processos conduzidos nesta etapa variam bastante de acordo com a área da empresa, do volume/variedade/velocidade do dado consumido, etc. Contudo, em geral, o dado cru ingerido passa por um processo recorrente de *data wrangling* onde o dado é limpo, deduplicado, etc. e persistido com técnicas de particionamento, orientação a coluna e compressão. Por fim, o dado processado está pronto para ser analisado por profissionais de dados.

No projeto, as mensagens de um único dia, persistidas na camada cru, serão compactas em um único arquivo, orientado a coluna e comprimido, que será persistido em uma camada enriquecida. Além disso, durante este processo, o dado também passará por etapas de *data wrangling*.

Para isso, vamos utilizar uma função do `AWS Lambda` como motor de processamento e um *bucket* do `AWS S3` como camada enriquecida para a persistência do dado processado. Para garantir a recorrência, vamos configurar uma regra do `AWS Event Bridge` como gatilho diáro da função.

## 4.1 - AWS S3

Na etapa de **ETL**, o `AWS S3` tem a função de passivamente armazenar as mensagens processadas de um dia em um único arquivo no formato Parquet. Para tanto, basta a criação de um *bucket*. Como padrão, vamos adicionar o sufixo `-enriched` ao seu nome (vamos seguir esse padrão para todos os serviços desta camada).


## 4.2 - AWS Lambda

Na etapa de **ETL**, o `AWS Lambda` tem a função de ativamente processar as mensagens captadas pelo *bot* do **Telegram**, persistidas na camada cru no *bucket* do `AWS S3`, e persisti-las na camada enriquecida, também em um *bucket* do `AWS S3`. Logo, vamos criar uma função que opera da seguinte forma:

 - Lista todos os arquivos JSON de uma única participação da camada crua de um *bucket* do `AWS S3`;
 - Para cada arquivo listado:
  - Faz o *download* do arquivo e carrega o conteúdo da mensagem;
  - Executa uma função de *data wrangling*;
  - Cria uma tabela do PyArrow e a contatena com as demais.
 - Persiste a tabela no formato Parquet na camada enriquecida em um *bucket* do `AWS S3`.
 
> **Nota**: O fato de utilizarmos duas camadas de armazenamento e processamento, permite que possamos reprocessar os dados crus de diversas maneiras, quantas vezes forem preciso.


O código da função no **[Link](https://github.com/JosueMorfim/pipeline-dados-telegram/blob/main/AWS%20Lambda%20enriched.py)**


Para que a função funcione corretamente, algumas configurações precisam ser realizadas.

 - **Variáveis de ambiente**
 
O código exige a configuração de duas variáveis de ambiente: `AWS_S3_BUCKET` e `AWS_S3_ENRICHED` com os nomes dos *bucket* do `AWS S3` da camada cru e enriquecida, respectivamente. Para adicionar variáveis de ambiente em uma função do `AWS Lambda`, basta acessar configurações -> variáveis de ambiente no console da função.

 - **Permissão**
 
Precisamos adicionar a permissão de escrita nos *buckets* do `AWS S3` para a função do `AWS Lambda` no `AWS IAM`.

 - **Recursos**
 
O *timeout* padrão de funcões do `AWS Lambda` é de 3 segundos. Para a função, vamos aumentar o tempo para 5 minutos, principalmente para lidar com o IO (*input/output*) de arquivos do `AWS S3`.

 - **Camadas**
 
Por fim, note que o código da função utiliza o pacote Python PyArrow. Contudo, o ambiente padrão do `AWS Lambda` possui poucos pacotes externos instalado, como o pacote Python boto3, logo o PyArrow não será encontrado e a execução da função falhará. Existem algumas formas de adicionar pacotes externos no ambiente de execução do AWS Lambda, um deles é a criação de camadas ou *layers*, onde podemos fazer o *upload* dos pacotes Python direto na plataforma ou através de um *bucket* do `AWS S3`. Vamos então seguir com a última opção, onde teremos que:

 - Criar um *bucket* no `AWS S3`;
 - Fazer o *upload* do código do pacote Python do PyArrow (*download* neste **[link](https://github.com/awslabs/aws-data-wrangler/releases)**);
 - Criar *layer* e conectar na função.
 
 

## 4.3 - AWS Event Bridge

Na etapa de **ETL**, o `AWS Event Bridge` tem a função de ativar diariamente a função de **ETL** do `AWS Lambda`, funcionando assim como um *scheduler*.




# 5 - Apresentação

A etapa de apresentação é reponsável por entregar o dado para os usuários (analistas, cientistas, etc.) e sistemas (dashboards, motores de consultas, etc.), idealmente através de uma interface de fácil uso, como o SQL, logo, essa é a única etapa que a maioria dos usuários terá acesso. Além disso, é importante que as ferramentas da etapa entregem dados armazenados em camadas refinadas, pois assim as consultas são mais baratas e o dados mais consistentes.



## 5.1 - AWS Athena

Na etapa de **apresentação**, o `AWS Athena` tem função de entregar o dados através de uma interface SQL para os usuários do sistema analítico. Para criar a interface, basta criar uma tabela externa sobre o dado armazenado na camada mais refinada da arquitetura, a camada enriquecida.



![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/5d34995a-d34a-4c0c-ae1c-5e45fd108227)

Por fim, adicione as partições disponíveis.

> **Importante**: Toda vez que uma nova partição é adicionada ao repositório de dados, é necessário informar o `AWS Athena` para que a ela esteja disponível via SQL. Para isso, use o comando SQL `MSCK REPAIR TABLE <nome-tabela>` para todas as partições (mais caro) ou `ALTER TABLE <nome-tabela> ADD PARTITION <coluna-partição> = <valor-partição>` para uma única partição (mais barato), documentação neste [link](https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html)).



![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/42df0e6f-e817-452a-b158-3707b7c71c22)

## 5.2 - AWS Glue


Por fim foi criado um Job no AWS Glue para ativar diariamente a SQL Query 'MSCK REPAIR TABLE telegram' , para atualizar os dados de nossa tabela automaticamente todos os dias.

Codigo no **[Link](https://github.com/JosueMorfim/pipeline-dados-telegram/blob/main/AWS%20Glue%20-%20msck%20repair%20table.py)**

Não esqueça de criar e configurar o schedules conforme necessário.

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/7233263a-9359-45d0-bca8-53075ffb33b0)

> **Nota** -  Para quem estiver usando a cota free da AWS, atenção, pois o AWS Glue gera custos adicionais.

## 5.3 Analytics

Com a tabela ja configurada, vamos realizar algumas consultas para entender melhor os dados coletados.

* **A primeira consulta realizada sera das 10 primeiras linhas, ordenadas pelo message_id.**

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/953f410a-e121-44d8-b81c-60565fe1b44b)




![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/a4b781cb-1e79-4932-90b7-c0de631a3772)


* **Quantidade de mensagens por dia.**



![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/f333fffd-2045-40bd-bcbf-fdd170d98af2)

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/693fce60-6c2e-4bd2-94a6-e7222c3c42d5)




Percebemos que nos dias 21 e 22 tivemos os maiores volumes de mensagens no grupo, em contra partida, no dia 25 houve apenas uma mensagem no grupo, o que pode indicar uma falta de interesse dos participantes no grupo e sua finalidade.

* **Quantidade de mensagens por usuário por dia.**



![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/0ba47cc4-0c66-4f13-93c2-1719e641f953)

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/c3c91e38-a9bb-41f9-b51a-8eaae097980f)

* **Média do tamanho das mensagens por usuário por dia.**



![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/12695124-5a97-456a-a8c4-d707b2ab7b8b)

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/d64edb89-0b32-49a5-8286-6c166ca6e364)

Fica claro que o usuário Gabe usa as frases mais longas para se comunicar.

* **Quantidade de mensagens por hora por dia da semana por número da semana.**

![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/1058a2ad-9971-4ee7-ae56-e0e36dac1081)


![image](https://github.com/JosueMorfim/Analise_Credito_SQL/assets/141301164/8360d95e-5359-40a0-be24-fdc1ad6b422c)

Os usuários do grupo se comunicam mais na parte da noite, entre 21:00h e 22:00h.


## 5.4 - Power BI

Por fim, usamos o Power Bi para criar um Dashboard interativo, para uma visualização melhor dos dados.

* **Nota** -  No Power BI no momento de obter os dados, procuramos por "**Script do Python** " e usamos o código do [Link](https://github.com/JosueMorfim/pipeline-dados-telegram/blob/main/Power%20BI%20importar%20S3.py) para acessar os dados diretamente do nosso bucket-enriched no AWS S3, para isso também precisamos criar um usuário no AWS IAM com permissão de read no AWS S3 e alterar as informações no código.

> **[Link do Dashboard](https://app.powerbi.com/view?r=eyJrIjoiOGNmYThhY2YtMzk0Zi00YmFmLTk1YjItNGYzYzVkNTQ0NmNkIiwidCI6ImEwZWJjZDRhLTg0N2ItNDFjMC1iYmYyLWUzNjNkZGMzN2Y5MiJ9)**

![image](https://github.com/user-attachments/assets/b75988fd-2067-418f-b9ce-900badfcb797)

Após a análise do dashboard, observamos que:

1. As quintas-feiras e sextas-feiras são os dias com maior atividade no grupo;
2. O intervalo das 19h às 23h marca os momentos de maior interação entre os membros;
3. Predominantemente, os usuários optam por enviar mensagens de texto para se comunicarem.

Essas descobertas nos permitem compreender as preferências dos membros do grupo quanto a dias, horários e modos de comunicação mais frequentes. Em um contexto de grupo de vendas, isso nos orienta a direcionar esforços e campanhas publicitárias para esses períodos específicos e a alocar um número maior de atendentes nos horários de pico. Assim, podemos reduzir custos operacionais e otimizar o uso de recursos em momentos de baixa demanda, aprimorando o processo.