# Introdução

Neste projeto, será desenvolvido um pipeline de dados do Telegram. A proposta consiste em criar um grupo no Telegram cujas mensagens serão automaticamente coletadas e armazenadas em formato JSON em um bucket da AWS (Amazon Web Services).

Após o armazenamento, esses dados serão processados e tratados, permitindo a realização de consultas SQL para análise e extração de insights.

Esse pipeline serve como um exemplo prático de engenharia de dados aplicada a mensagens em tempo real, utilizando serviços em nuvem para coleta, armazenamento e análise.


## Tecnologias utilizadas:
1. AWS(Api gateway, s3, lambda...)
2. Python.
3. Sql.
4. Webhooks.

## Arquitetura do pipeline.
A arquitetura é dividia em duas parte, o sistema transacional,feito no telegram, e o sistema analítico, feito no AWS. Abaixo, um esquema visual demonstrando o fluxo de dados do projeto:

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/esquema.png?raw=true)
  

## Sistema transacional

### Grupo do Telegram
Será criado um grupo do telegram, e um bot será adicionado á ele.

## Sistema analítico

### Ingestão.

As mensagens do grupo vão ser captadas pelo Api gateway, através de um wehbook, e encaminhadas a uma função lambda. Essa função vai transformar as mensagens em arquivos json e depois salvar em um bucket s3.

### ETL.

Será configurado um EventBridge, que acionará uma nova função Lambda sempre que um novo arquivo JSON for salvo no bucket. Essa função fará a transformação dos dados, convertendo o arquivo JSON em formato Parquet,e o salvará em outro bucket S3 otimizado para análise.

### Apresentação.

Por fim, será criada uma tabela particionada com os arquivos parquet, podendo fazer consultas sql e análise no AWS Athena.




# 1.Grupo.
Nesta etapa, será criado um grupo e um bot. O bot será adicionado ao grupo como adminstrador e captará todas as mensagens enviadas. A opção de adicionar o bot a novos grupos será desabilitada e as mensagens que surgirem no grupo serão acessadas através da API (application programming interface) de bots dos Telegram (documentação neste link: https://core.telegram.org/bots/api).



# 2.Ingestão
A etapa de ingestão é responsável, como seu o próprio nome diz, pela ingestão dos dados  .De maneira geral, o dado ingerido é persistido no formato mais próximo do original, ou seja, nenhuma transformação é realizada em seu conteúdo ou estrutura (schema). Como exemplo, dados de uma API web que segue o formato REST (representational state transfer) são entregues, logo, persistidos, no formato JSON.


## 2.1.AWS Api Gateway

No AWS API gateway, será criada uma nova API HTTP, que servirá como ponte entre o Telegram e a função Lambda responsável pelo tratamento dos dados. Essa api, vai obter os dados através de um webhook

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/111.png?raw=true)



## Webhook

Agora será configurado o webhook, para isso é necessário o token do bot que está no grupo, e a url de invocação disponibilizada pela API HTTP.

### Token

Para isso obter o token, no chat do BotFather(link:), digite:
    

/token

Ele responderá com o token do seu bot, algo como:

123456789:ABCdefGhIjKlMnOpQrStUvWxYz

### URl da API
Para obter a URl de invocação da API, vá em API Gateway → [sua API] → Stages → [stage] e copie a Invoke URL mostrada no topo.


![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/113.png?raw=true)


### Configuração
Após obter o token, basta configurar o webhook digitando a seguinte URL no navegador (ou via terminal):

https://api.telegram.org/botSEU_TOKEN/setWebhook?url=URL_DA_API_GATEWAY

Se o webhook for configurado corretamente, a resposta será

{
  "ok": true,
  "result": true,
  "description": "Webhook was set"
}

## 2.2. AWS Bucket S3
Nesta etapa o AWS S3 tem a função de armazenar as mensagens enviadas no grupo passiavamente, no formato JSON. Para isso será criado um bucket S3.


## 2.3. AWS lambda
Depois de criar o bucket, o AWS Lambda tem a função de ativamente persistir as mensagens captadas pelo bot do Telegram em um bucket do AWS S3. Para tanto vamos criar uma função que opera da seguinte forma:

Recebe a mensagem no parâmetro event;
Verifica se a mensagem tem origem no grupo do Telegram correto;
Persiste a mensagem no formato JSON no bucket do AWS S3;
Retorna uma mensagem de sucesso (código de retorno HTTP igual a 200) a API de bots do Telegram.

O código da função lambda:

In [None]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta
import boto3

# Configura logs (para aparecer no CloudWatch)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Variáveis de ambiente (definidas na AWS Lambda)
BUCKET = os.environ['AWS_S3_BUCKET']
TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

# Cliente do S3
s3_client = boto3.client('s3')

def lambda_handler(event, context):
    """
    Recebe mensagens do Telegram via API Gateway,
    verifica se vêm do chat correto e salva no S3 em formato JSON.
    """
    try:
        # Lê o corpo da requisição (mensagem do Telegram)
        message = json.loads(event["body"])
        chat_id = message.get("message", {}).get("chat", {}).get("id")

        # Ignora mensagens de outros grupos
        if chat_id != TELEGRAM_CHAT_ID:
            logger.info(f"Mensagem ignorada (chat_id: {chat_id})")
            return {"statusCode": 200, "body": "Ignored"}

        # Cria nome de arquivo com data e hora
        tzinfo = timezone(timedelta(hours=-3))
        date = datetime.now(tzinfo).strftime('%Y-%m-%d')
        timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')
        key = f"telegram/context_date={date}/{timestamp}.json"

        # Envia a mensagem diretamente pro S3
        s3_client.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(message, ensure_ascii=False).encode("utf-8")
        )

        logger.info(f"Mensagem salva em {key}")
        return {"statusCode": 200, "body": "OK"}

    except Exception as exc:
        logger.exception("Erro ao processar mensagem")
        return {"statusCode": 500, "body": "Internal Server Error"}

### Variaveis de ambiente

Nesta etapa, serão criadas variáveis de ambiente, que são estruturas no formato chave:valor.
Elas servem para armazenar informações sensíveis ou de configuração que a função Lambda precisa durante sua execução — sem que esses dados fiquem expostos diretamente no código-fonte.

As variáveis de ambiente são definidas no ambiente de execução da função Lambda, dentro do console da AWS, na aba Configuration → Environment variables.

Esse recurso é essencial para manter segurança e boas práticas, evitando a exposição de credenciais, nomes de bucket, tokens de acesso ou URLs internas.

Note que o código exige a configuração de duas variáveis de ambiente: AWS_S3_BUCKET com o nome do bucket do AWS S3 e TELEGRAM_CHAT_ID com o id do chat do grupo do Telegram.

Exemplo de variáveis:

AWS_S3_BUCKET: BUCKET-DATALAKE-RAW

TELEGRAM_CHAT_ID:-123456789

### Permissão
Por fim foi permitido acesso total ao  AWS S3 para a função do AWS Lambda no AWS IAM.

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/110.png?raw=true)


# 3.ETL

Após a fase de ingestão, em que os dados foram armazenados em formato JSON em um bucket S3, será executado um processo de ETL (Extract, Transform, Load). Nessa etapa, os arquivos JSON passam por um processo de wrangling realizado por uma função Lambda, responsável por limpar, transformar e padronizar os dados. Em seguida, os resultados são convertidos para o formato Parquet e armazenados em um bucket S3 específico, organizados em uma única pasta com orientação a colunas, visando melhor desempenho em consultas e otimização de armazenamento.

## 3.1.AWS S3
Na etapa de ETL, o AWS S3 tem a função de passivamente armazenar as mensagens processadas de um dia em um único arquivo no formato Parquet. Para tanto, basta a criação de um bucket.

## 3.2. AWS Lambda

Agora o AWS tem a função de tratar os dados e fazer um processo de wrangling nos arquivos, após isso eles serão salvos em uma única pasta, orientada a coluna dentro de um bucket S3. Esse arquivos serão salvos no formato parquet. Para realizar todo esse processo, será criada uma função lambda.

O código da função:

In [None]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone
import boto3


logger = logging.getLogger()
logger.setLevel(logging.INFO)

# ----------------------------------------------------------------------
# FUNÇÃO AUXILIAR: Transforma o dicionário JSON em um formato plano
# ----------------------------------------------------------------------


def parse_data(raw_update_data: dict, date_to_process: str) -> dict:
  """
  Extrai os campos relevantes da estrutura JSON da mensagem.
  """

  # A mensagem pode estar aninhada em 'message' ou ser o objeto raiz
  data_message = raw_update_data.get("message", raw_update_data)

  parsed_data = {
    'message_id': data_message.get('message_id'),
    'date': data_message.get('date'),
    'text': data_message.get('text', None),
    'user_id': data_message.get('from', {}).get('id'),
    'user_is_bot': data_message.get('from', {}).get('is_bot', False),
    'user_first_name': data_message.get('from', {}).get('first_name'),
    'chat_id': data_message.get('chat', {}).get('id'),
    'chat_type': data_message.get('chat', {}).get('type'),
    # Coluna de partição para o S3
    'context_date': date_to_process
  }
  return parsed_data

# ----------------------------------------------------------------------
# FUNÇÃO PRINCIPAL
# ----------------------------------------------------------------------

def lambda_handler(event: dict, context: dict) -> bool:
  # IMPORTAÇÃO LENTA (Lazy Import): Crucial para evitar o erro de 1.92 ms
  try:
    import awswrangler as wr
    import pandas as pd # Adicionado pandas para criar o DataFrame
    logger.info("[INFO] awswrangler e pandas importados com sucesso.")
  except Exception as exc:
    logger.error(f"[ERRO CRÍTICO] Falha ao importar bibliotecas: {exc}", exc_info=True)
    return False


  logger.info("--- INICIANDO ETL com duas funções ---")


  # 1. Leitura das Variáveis de Ambiente
  try:
    RAW_BUCKET = os.environ['AWS_S3_BUCKET']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']
  except KeyError as exc:
    logger.error(f"Erro de Variável de Ambiente: {exc}")
    return False



  # 2. Definição da Data de Processamento (Lógica Corrigida)
  tzinfo = timezone(offset=timedelta(hours=-3))
  date_to_process = event.get("date")


  if not date_to_process:
    # date_to_process = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
    date_to_process = (datetime.now(tzinfo) - timedelta(days=0)).strftime('%Y-%m-%d')

  logger.info(f"Processando dados para a partição: {date_to_process}")


  # 3. Listar e processar arquivos JSON
  s3_path_prefix = f'telegram/context_date={date_to_process}'
  client = boto3.client('s3')
  lista_de_mensagens = []


  try:
    response = client.list_objects_v2(
      Bucket=RAW_BUCKET,
      Prefix=s3_path_prefix
    )


    if 'Contents' not in response:
      logger.warning(f"Nenhum arquivo JSON encontrado para {date_to_process}")
      return False


    for content in response['Contents']:
      key = content['Key']
      file_name = key.split('/')[-1]
      tmp_path = f"/tmp/{file_name}"


      try:
        # Baixa e lê o arquivo JSON
        client.download_file(RAW_BUCKET, key, tmp_path)
        with open(tmp_path, mode='r', encoding='utf8') as fp:
          raw_update_data = json.load(fp)


        # Usa a função auxiliar para extrair os dados
        parsed_data = parse_data(raw_update_data, date_to_process)
        lista_de_mensagens.append(parsed_data)

      except Exception as exc:
        logger.error(f"Erro ao processar arquivo {key}. Pulando. Detalhe: {exc}", exc_info=True)
        continue


    # 4. Converte e Salva em Parquet
    if lista_de_mensagens:
      df = pd.DataFrame(lista_de_mensagens)

      s3_path_write = f's3://{ENRICHED_BUCKET}/telegram_enriched/'

      # Escreve o DataFrame como Parquet no S3
      wr.s3.to_parquet(
        df=df,
        path=s3_path_write,
        dataset=True,
        partition_cols=['context_date'],
        mode="append"
      )

      logger.info(f"ETL concluído. {len(df)} linhas salvas em Parquet.")
      return True
    else:
      logger.warning(f"Nenhum dado processado para {date_to_process}")
      return False


  except Exception as exc:
    logger.error(f"Erro fatal: {exc}", exc_info=True)
    return False

### Variaveis de ambiente

Nesta etapa, serão criadas variáveis de ambiente, que são estruturas no formato chave:valor.
Elas servem para armazenar informações sensíveis ou de configuração que a função Lambda precisa durante sua execução — sem que esses dados fiquem expostos diretamente no código-fonte.

As variáveis de ambiente são definidas no ambiente de execução da função Lambda, dentro do console da AWS, na aba Configuration → Environment variables.

Esse recurso é essencial para manter segurança e boas práticas, evitando a exposição de credenciais, nomes de bucket, tokens de acesso ou URLs internas.

Note que o código exige a configuração de duas variáveis de ambiente: AWS_S3_BUCKET com o nome do bucket com os arquivos json  e AWS_S3_ENRICHED com o nome do bucket aonde serão salvos os arquivos parquet.



### Permissão
Por fim foi permitido acesso total ao  AWS S3 para a função do AWS Lambda no AWS IAM.

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/114.png?raw=true)


## 3.3. AWS Event Bridge
Na etapa de ETL, o AWS Event Bridge tem a função de ativar diariamente a função de ETL do AWS Lambda, funcionando assim como um scheduler.

# 4. Apresentação
Nesta etapa, os dados ficam disponibilizados tanto para os usuários finais — como analistas e cientistas de dados — quanto para sistemas de visualização e consulta, como dashboards e mecanismos de busca. A principal forma de acesso a essas informações se dá por meio de ferramentas de consulta, especialmente SQL, que é amplamente utilizada por grande parte dos usuários. Nesse cenário, utilizanos AWS Athena é empregado como solução de leitura, oferecendo um motor de consulta SQL que facilita a exploração e visualização dos dados já processados na camada ETL, promovendo análises mais eficientes e acessíveis.

## 4.1 AWS Athena
Na etapa da apresentação o AWS Athena tem a função de apresentar os dados, no formato de tabela em sql. Para isso, vamos usar um código em sql para criar a tabela

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/115.png?raw=true)


Toda vez que uma nova partição é adicionada ao repositório de dados, é necessário informar o AWS Athena para que a ela esteja disponível via SQL. Para isso, usamos o comando SQL abaxio:

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/116.png?raw=true)


Por fim, é feito um consulta para verififcar se a tabela foi criada com êxito

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/117.png?raw=true)


Resposta:

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/118.png?raw=true)


## 4.2. Análise Exploratória
Com os dados disponíveis, podemos executar as mais variadas consultas analíticas. Mas por se tratar de um teste inicial que compreendeu apenas o perído de 30-10 até 06-11-2025, teríamos que continuar acompanhando a frequência de mensagens num período maior para assim conseguirmos chegar a uma melhor análise de dados.

Seguem abaixo alguns exemplos de consultas SQL, seus resultados e alguns gráficos gerados para a melhor visualização dos resultados:

### **Quantas mensagens foram enviadas por dia?**

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/119.png?raw=true)

### **Resposta:**

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/120.png?raw=true)



### **Houve mensagens repitidas?**

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/121.png?raw=true)


### **Resposta:**

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/122.png?raw=true)


### **Gráfico com a quantidade de mensagens por usuário**



In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

mensagens = pd.read_csv('/kaggle/input/mensagensusuario/other/default/1/mensagensporusuario.csv')

In [None]:
plt.figure(figsize=(8,5))
sns.barplot(
    data=mensagens,
    x='nome_usuario',
    y='qut_mensagem',
    palette='viridis'
)
plt.title('Quantidade de mensagens por usuário', fontsize=14, weight='bold')
plt.xlabel('Usuário', fontsize=12)
plt.ylabel('Quantidade de Mensagens', fontsize=12)

![Minha imagem](https://github.com/WlandGLL/imagens/blob/main/123.png?raw=true)
