# **Pipeline de Dados do Telegram**

## **Tópicos**


0. Contexto
1. Arquitetura;
2. Análise exploratória de dados;
3. Resultado.


 ## **0\. Contexto**

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/images-notebook/chatbot.jpg" width="280" />

<font size="1"><a href="http://www.freepik.com">Designed by stories / Freepik</a></font>






Todos nós temos contato com o mundo digital e suas facilidades. Uma dessas tecnologias que chegaram para ficar são os chatbots, sistemas de interação automática de mensagens. Normalmente, o chatbot ajuda a direcionar o cliente para um atendimento mais dinâmico, diminuindo a necessidade do atendente humano.

Porém, ele também pode ser um aliado em compreensão das solicitações do cliente por meio da coleta de dados. Pense comigo: se a empresa souber qual o tipo de atendimento mais solicitado, qual a duvida mais recorrente? Qual os problemas que as pessoas mais enfrentam entre outras possibilidades? Etc.










---



<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/images-notebook/dados.png" width="300" />


Contudo, não adianta só ter o bot e coletar os dados. Primeiro, acontece essa coleta das informações por meio do chatbot no grupo, processo esse que ocorrerá de forma autônoma pelas API's do telegram e da AWS.

Depois, tem que haver toda uma segmentação analítica desses dados, fazendo toda a manipulação de forma que possamos ler, estruturar em conjuntos bem definidos para posterior análise em visualização.

Tendo em vista dos dados trabalhados, é possivel tanto gerar insights das mensagens circuladas no grupo em formato de tabela estruturada, quando gerar gráficos em dashboards e visualização.






---

Com isso, esse trabalho tem como objetivo utilizar um chatbot para coletar os dados presentes em um grupo do Telegram por meio das etapas de ingestão, ETL e Apresentação. Para isso, serão utilizados os serviços da AWS.

O resumo está presente na imagem:


<img src="https://github.com/Caiombr/ebac-pipeline/blob/main/images-notebook/arquitetura.png?raw=true" />

 ## **1\. Arquitetura**

 ### **1\.1. Sistema transacional**

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/images-notebook/botfather.jpg" width="150"/>

O sistema utilizado aqui será o **Telegram**, uma plataforma de mensagens instantâneas *freeware* (distribuído gratuitamente) e, em sua maioria, *open source*. Ele possui um sistema de bot de fácil criação por meio do BotFather. Por meio do Bot criado em um grupo do Telegram, será utilizada a API do Telegram, para que todas as mensagens enviadas no grupo possam ser arquivadas como JSON.  

É possivel coletar informações diversas por meio do JSON, como conteúdo da mensagem, remetente, dia de envio etc., redirecionado via *webhook* do *backend* do aplicativo para um *endpoint* (endereço *web* que aceita requisições HTTP) exposto pelo `AWS API Gateway`, utilizando o `AWS lambda` com o código Python nesse [link](https://github.com/Caiombr/ebac-pipeline/blob/main/ebac-caiombr-datalake-raw.py) e salvando tudo em um bucket no `AWS S3`.

Também houve a padronização das permissões no `AWS IAM`, mas foram todas informadas como full access ao S3, apenas para fins da construção do projeto.

> Para facilitar o processo, o Bot está presente exclusivamente para um grupo teste especifico, mas poderia ser aplicado em mais grupos com uma adaptação.

 ### **1\.2. Sistema analítico**

<img src="https://github.com/Caiombr/ebac-pipeline/blob/main/images-notebook/ETL.jpg?raw=true" width=250/>

Essa etapa envolve o ETL (extração, transformação e carregamento) para que o dado coletado na etapa anterior, reservado em um bucket raw do `AWS S3` seja adaptado para um formato de melhor utilidade.

No projeto, as mensagens de um único dia (sendo esse dia o dia anterior), persistidas na camada cru, serão compactas em um único arquivo, orientado a coluna e comprimido, que será persistido em uma camada enriquecida. Além disso, durante este processo, o dado também passará por etapas de data wrangling.

O código Python utilizado no `AWS Lambda` para resgatar os dados das mensagens no `AWS S3` está nesse [link](https://github.com/Caiombr/ebac-pipeline/blob/main/ebac-caiombr-datalake-enriched.py). Também foi feito um código Python focado apenas para fazer o `MSCK REPAIR TABLE` no `AWS Athena` que está presente neste [link](https://github.com/Caiombr/ebac-pipeline/blob/main/ebac-caiombr-athena-msck.py);


<img src='https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/images-notebook/stepfunctions_graph.png' width=250>

Importante ressaltar que foi utilizado em conjunto o `AWS Step Functions` (código do JSON [aqui](https://github.com/Caiombr/ebac-pipeline/blob/main/ebac-stepfunctions.json) ) e o `AWS EventBrigde` como cronograma agendado de ativação do código Python e `AWS IAM` para permissões de acesso, extraindo as mensagens, fazendo a transformação diariamente para salvamento posterior no bucket enriched do `AWS S3` e rodando o `MSCK REPAIR TABLE`.

 ## **2\. Análise exploratória dos dados**

 ### **2.1. Coleta dos Dados**

A primeira coisa a ser feita aqui envolve saber como que é o dado bruto JSON de cada interação no grupo do telegram que o Bot coleta. Para isso, foi utilizado o método *getUpdates* do Telegram, apenas para chegar as informações como exemplo. Com o projeto ativado, foi utilizado o método *webhook* que, quando conectado a API do AWS se torna uma forma eficaz de puxar os dados.



A estrutura base dele é a seguinte:

In [None]:
exemplo = {"update_id": 856310491,
          "message":
            {"message_id": 11,
              "from": {"id": 1748419651, "is_bot": False, "first_name": "Caio", "last_name": "Marques"},
              "chat": {"id": -1001850427822, "title": "Analise de Dados", "type": "supergroup"},
              "date": 1690477846,
              "text": "hey jude"}
        }

Para possibilitar a utilização dos dados da forma que quisermos, ser o risco de perdemos nada, salvaremos as mensagens em seu formado mais cru, como apresentado acima. Assim, caso queiramos alterar a o processamento, a base de dados das mensagens se manterá inalterada.

Considerando isso, apenas algumas dessas informações delas serão utilizadas neste projeto:


*   message_id: O número ordenal das mensagens;
*   user_id: ID do usuário da mensagem;
*   user_is_bot: Booleano para verificar se o usuário é bot;
*   user_first_name: Primeiro nome do usuário;
*   chat_type: Tipo do chat (padrão é o supergroup);
*   date: Número indicativo da hora e data do envio;
*   data_type: Qual é o tipo do dado (texto, audio, imagem, etc.)
*   text: Qual a mensagem de texto enviada (em caso de texto);





Ainda nessa etapa, os dados serão salvos em um bucket do `AWS S3`e particionados por data de envio. Isso facilita tando o processamento com velocidade, economia no `AWS Athena` e acesso ao armazenamento.

Com esses dados salvos, podemos seguir para a próxima etapa.

 ### **2.2. Limpeza e transformação**

O foco aqui é só tratar essas mensagens que já estão no `AWS S3` para que elas possam serem transformadas em um formato utilizável no `AWS Athena`. Para isso, foi feito um código que recupera apenas as informações desejadas.



Essa é a função de transformação dentro do `AWS Lambda` no ebac-caiombr-datalake-enriched:

In [None]:
import json
from datetime import datetime, timedelta, timezone

def parse_data(data: dict) -> dict:

    # O datetime converte a data presente no arquivo para um valor utilizável
    date = datetime.now().strftime('%Y-%m-%d')
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    parsed_data = dict()

    for key, value in data.items():

        if key == 'from':
            for k, v in data[key].items():
                if k in ['id', 'is_bot', 'first_name']:
                    parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

        elif key == 'chat':
            for k, v in data[key].items():
                if k in ['id', 'type']:
                    parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

        elif key in ['message_id', 'date']:
            parsed_data[key] = [value]

        elif key == 'text':
            parsed_data['data_type'] = [key]
            parsed_data[key] = [value]

        elif key in ['voice', 'photo', 'video', 'new_chat_participant']:
            parsed_data['data_type'] = [key]

  # Verifica se a chave 'data_type' não foi adicionada no loop e define o valor padrão
    if 'data_type' not in parsed_data:
        parsed_data['data_type'] = ['unknown']

    if not 'text' in parsed_data.keys():
        parsed_data['text'] = ['sem mensagem']

    return parsed_data

Chamando a função para o exemplo do 2.1. e fazendo o print, obtemos o seguinte resultado:

In [None]:
parsed_data = parse_data(data=exemplo["message"])

for key, value in parsed_data.items():
    print(f"{key}: {value}")

message_id: [11]
user_id: [1748419651]
user_is_bot: [False]
user_first_name: ['Caio']
chat_id: [-1001850427822]
chat_type: ['supergroup']
date: [1690477846]
data_type: ['text']
text: ['hey jude']


Como a ideia desse projeto é trabalhar com a tabulação no SQL, a conversão será feita em parquet para particionamento e economia no gasto de dados no `AWS Athena`.

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq

table = None

iter_table = pa.Table.from_pydict(mapping=parsed_data)

if table:

  table = pa.concat_tables([table, iter_table])

else:

    table = iter_table
    iter_table = None

> Lembrando que os detalhes de todo o procedimento do código em relação as aplicações da AWS estão presentes nos links da seção 1.2.

Agora, podemos seguir para a próxima etapa.

### **2.3. Visualização**

O primeiro passo é criar a tabela que terão os dados importados. O comando de criação usado, considerando as partições em `context_date`:

``` sql
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `data_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://ebac-caiombr-datalake-enriched/telegram/'
```

O código gerado no `AWS Athena` fará o carregamento das partições automaticamente todos os dias. Mas, para exemplificar:

``` sql
MSCK REPAIR TABLE `default`.`telegram`;
```

``` sql
SELECT * FROM "default"."telegram" limit 10;;
```

Sendo assim, temos a tabela a seguir de resultado:

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/tabelas/select_all.jpg" width=1000>


Podemos fazer mais consultas alias, como por exemplo:

- Quantidade de mensagens por dia.

```sql
SELECT
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC
```

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/tabelas/msgs_por_dia.jpg" width=1000>


- Qual o tipo de arquivo mais enviado pelos usuários.

```sql
SELECT
  data_type,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY data_type
ORDER BY message_amount DESC
```

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/tabelas/data_type.jpg" width=1000>


- Quantas mensagens cada usuário enviou.

```sql
SELECT
  user_first_name,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY user_first_name
ORDER BY message_amount DESC
```

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/tabelas/msgs_por_pessoa.jpg" width=1000>


- Média do tamanho das mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/tabelas/avg_msg_dia_user.jpg" width=1000>


- Quantidade de mensagens por hora por dia da semana e por mês.

```sql
WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(month FROM parsed_date) AS parsed_date_month
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_month,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_month
ORDER BY
    parsed_date_month,
    parsed_date_weekday
```

<img src="https://raw.githubusercontent.com/Caiombr/ebac-pipeline/main/tabelas/hour_week_month.jpg" width=1000>


 ## **3\. Resultado**

O primeiro resultado aparente:



*   Todas as mensagens no grupo do Telegram foram devidamente importadas, salvas em seu arquivo original, transformadas para um formato melhor para análise e tabelados.





Porém, em relação aos dados apresentados, podemos trazer algumas conclusões:

*   O tipo de mensagem mais enviado até então é o de texto;
*   O Caio é o usuário mais ativo no grupo atualmente, com 10 mensagens enviadas;
*   As mensagens enviadas durante os dias se mantém muito próximo a média geral;
*   O peso das mensagens também não é muito grande;
*   O mês que foram enviadas as mensagens foi em Julho.

É possivel encontrar ainda mais informações ao passar do tempo e da constante de conversas sendo feitas no grupo. Como é apenas um projeto, serve como exemplo do que pode ser feito de fato.

 ## **4\. Contato**



*   LinkedIn: https://www.linkedin.com/in/caiombr/
*   GitHub do projeto: https://github.com/Caiombr/ebac-pipeline
*   Email: caiombr@gmail.com



