<a href="https://www.kaggle.com/code/gustavofloliveira/analise-de-dados-do-telegram-chatbot-ebac?scriptVersionId=213725928" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Automatização de coleta e análise de dados do Telegram: Um projeto de chatbots

## Contexto

### Chatbots e o impacto na análise de dados
Os chatbots são ferramentas cada vez mais populares para automatizar a interação entre usuários e sistemas, sendo amplamente utilizados em diversas plataformas de mensagens. Uma das plataformas que tem ganhado destaque é o Telegram, que oferece uma API robusta para criar bots personalizados.

Neste notebook, vamos explorar a coleta e análise de dados gerados por interações em um grupo de Telegram, usando um bot que captura mensagens e as armazena para análise posterior. Esse tipo de projeto é útil para entender como é possível automatizar a coleta e análise de dados em plataformas de mensageria, fornecendo insights valiosos sobre o comportamento dos usuáries do grupo.

### Dados transacionais vs. Dados analíticos
Antes de entrar na implementação do projeto, é importante compreender a diferença entre dois tipos de dados:

* Dados Transacionais: São dados gerados por transações individuais em sistemas, como o envio de uma mensagem em um grupo do Telegram. Esses dados são geralmente não estruturados e precisam passar por um processo de transformação para se tornarem úteis para análise.

* Dados Analíticos: São dados que foram processados e organizados de maneira a fornecer insights valiosos. Eles são derivados dos dados transacionais e podem ser analisados para gerar relatórios, identificar padrões e auxiliar na tomada de decisões.

A criação deste projeto justifica-se pela necessidade de transformar dados transacionais, como mensagens enviadas no Telegram, em dados analíticos que possam ser analisados e utilizados para insights sobre o comportamento dos participantes do grupo.

## Arquitetura do sistema

### Sistema transacional: Telegram e sua API de bots
O sistema transacional utilizado neste projeto é o Telegram, que é uma plataforma de mensagens com recursos avançados para criação de bots. A API de bots do Telegram permite que interajamos com mensagens de grupos, o que é ideal para a coleta de dados.

A captura das mensagens é realizada por meio de um bot criado com o BotFather, e a interação com a API é feita via chamadas POST utilizando o API Gateway da AWS, que encaminha as mensagens para uma função Lambda em Python.

In [1]:
# Exemplo de código para a função Lambda que recebe as mensagens
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3

def lambda_handler(event: dict, context: dict) -> dict:
    '''
    Recebe uma mensagem do Telegram via AWS API Gateway, verifica seu conteúdo
    e a escreve em um formato JSON em um bucket do AWS S3.
    '''
    BUCKET = os.environ['AWS_S3_BUCKET']
    TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

    tzinfo = timezone(offset=timedelta(hours=-3))
    date = datetime.now(tzinfo).strftime('%Y-%m-%d')
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

    filename = f'{timestamp}.json'
    client = boto3.client('s3')

    try:
        # Processamento da mensagem
        if "body" in event:
            message = json.loads(event["body"])  # Desserializa o corpo como JSON
        else:
            message = event  # Caso o evento já esteja em formato de dicionário

        chat_id = message["message"]["chat"]["id"]
        if chat_id == TELEGRAM_CHAT_ID:
            with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
                json.dump(message, fp)

            client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')
    except Exception as exc:
        logging.error(msg=exc)
        return dict(statusCode="500")

    return dict(statusCode="200")

### Sistema analítico: Ingestão, ETL e apresentação dos dados

O sistema analítico é composto por três etapas principais: Ingestão, ETL (extração, transformação e carga) e apresentação dos dados.



1. Ingestão: As mensagens coletadas são armazenadas em um bucket do AWS S3.
2. ETL: As mensagens são processadas em lotes, onde são transformadas em um formato mais organizado, utilizando a biblioteca PyArrow para conversão em arquivos Parquet.
3. Apresentação: Após a transformação, os dados são carregados para o Athena, onde consultas SQL podem ser realizadas para analisar as mensagens, com a utilização do comando MSCK REPAIR TABLE para garantir que as partições do banco de dados estejam sempre atualiz

In [2]:
# Exemplo de código para processamento das mensagens e carga no Athena
import boto3
import pyarrow as pa
import pyarrow.parquet as pq

def lambda_handler(event: dict, context: dict) -> bool:
    client = boto3.client('s3')
    RAW_BUCKET = os.environ['AWS_S3_BUCKET']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']
    
    table = None
    try:
        # Listar arquivos no bucket RAW
        response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix='telegram/context_date=2023-12-01')
        for content in response.get('Contents', []):
            key = content['Key']
            file_path = f"/tmp/{key.split('/')[-1]}"

            # Baixar e processar o arquivo JSON
            client.download_file(RAW_BUCKET, key, file_path)
            with open(file_path, mode='r', encoding='utf8') as fp:
                data = json.load(fp).get("message", {})

            parsed_data = parse_data(data)
            if parsed_data:
                iter_table = pa.Table.from_pydict(parsed_data)
                table = pa.concat_tables([table, iter_table]) if table else iter_table

        # Se houver dados, gravar no formato Parquet
        if table:
            parquet_file_path = f'/tmp/{timestamp}.parquet'
            pq.write_table(table, parquet_file_path)
            client.upload_file(parquet_file_path, ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")
    except Exception as exc:
        logging.error(f"Erro inesperado: {exc}")
        return False

    return True

## Análise exploratória de dados


### Fonte: Coleta de dados com a API de Bots do Telegram
O dado é originado diretamente das mensagens enviadas pelos usuários no Telegram, sendo capturado em formato JSON através do uso do método getUpdates da API de Bots. Essas mensagens são armazenadas em arquivos JSON no S3 para processamento posterior.

A análise exploratória começa com a revisão dos dados brutos para verificar os campos que podem ser extraídos, como message_id, user_id, chat_id, text, entre outros.

### Destino: Análise com SQL no Athena

Após o processamento e transformação dos dados em arquivos Parquet, podemos utilizar o Amazon Athena para realizar consultas SQL sobre esses dados. O Athena permite consultar dados diretamente de arquivos armazenados no S3, tornando a análise altamente eficiente.

```sql
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket-s3>'
```

Exemplo de consultas de SQL para analisar os dados:

- Quantidade de mensagens por dia.

```sql
SELECT
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC
```

Resultado esperado da consulta:

In [3]:
# Código Python com um arquivo real gerado pelo Athena



import pandas as pd

# Caminho do arquivo CSV no Kaggle
query_1 = '/kaggle/input/querys-chatbot-telegram/query_1.csv'

# Carregando o arquivo CSV no DataFrame
df_query_1 = pd.read_csv(query_1)

# Exibindo o CSV
df_query_1

Unnamed: 0,context_date,message_amount
0,2024-12-08,4
1,2024-12-06,5
2,2024-12-05,5


- Quantidade de mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

In [4]:
# Código Python com um arquivo real gerado pelo Athena



import pandas as pd

# Caminho do arquivo CSV no Kaggle
query_2 = '/kaggle/input/querys-chatbot-telegram/query_2.csv'

# Carregando o arquivo CSV no DataFrame
df_query_2 = pd.read_csv(query_2)

# Exibindo o CSV
df_query_2

Unnamed: 0,user_id,user_first_name,context_date,message_amount
0,7176817083,G,2024-12-08,4
1,7176817083,G,2024-12-06,5
2,7176817083,G,2024-12-05,5


- Média do tamanho das mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

In [5]:
# Código Python com um arquivo real gerado pelo Athena



import pandas as pd

# Caminho do arquivo CSV no Kaggle
query_3 = '/kaggle/input/querys-chatbot-telegram/query_3.csv'

# Carregando o arquivo CSV no DataFrame
df_query_3 = pd.read_csv(query_3)

# Exibindo o CSV
df_query_3

Unnamed: 0,user_id,user_first_name,context_date,average_message_length
0,7176817083,G,2024-12-08,9
1,7176817083,G,2024-12-06,6
2,7176817083,G,2024-12-05,24


- Quantidade de mensagens por hora por dia da semana por número da semana.

```sql
WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum
ORDER BY
    parsed_date_weeknum,
    parsed_date_weekday
```

In [6]:
# Código Python com um arquivo real gerado pelo Athena



import pandas as pd

# Caminho do arquivo CSV no Kaggle
query_4 = '/kaggle/input/querys-chatbot-telegram/query_4.csv'

# Carregando o arquivo CSV no DataFrame
df_query_4 = pd.read_csv(query_4)

# Exibindo o CSV
df_query_4

Unnamed: 0,parsed_date_hour,parsed_date_weekday,parsed_date_weeknum,message_amount
0,21,4,49,1
1,13,5,49,4
2,0,5,49,5
3,1,1,50,4


## Conclusão e resultados


Neste projeto, exploramos o processo de coleta, transformação e análise de dados de interações em grupos de Telegram. A automação desse processo com a AWS facilita a ingestão de dados em grande escala e a análise eficiente, proporcionando insights sobre os usuários de forma dinâmica.

Ao implementar este projeto, você terá uma solução robusta para coletar e analisar dados de plataformas de mensageria, aplicável em diversos cenários de automação e análise de dados.