## A Infraestrutura na Nuvem: Ingestão de Dados

A próxima etapa foi configurar a infraestrutura na AWS para receber e armazenar os dados do Telegram. Criei um bucket no S3 chamado `ebac-42-exercicio-datalake-raw`, onde os dados brutos seriam armazenados em formato JSON.

Em seguida, criei uma função no Lambda em Python para receber as mensagens do Telegram e salvá-las no bucket S3. O código da função era o seguinte:

``` python
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve,
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente criada nas configurações do lambda

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')
  
  try:

    message = json.loads(event["body"])
    #message = event
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")
```

Configurei as permissões do IAM para que a função Lambda pudesse acessar o bucket S3.

Para conectar o Telegram à função Lambda, criei uma API no API Gateway. Configurei o webhook do bot para enviar as mensagens para a API, utilizando o método POST.

<img width='800px' src='https://github.com/alexmdebarros/Pipeline-aws-telegram/blob/main/api-gateway.png?raw=true'>

# A Transformação dos Dados: ETL

Com os dados brutos (JSON) armazenados no S3, era hora de transformá-los em um formato mais adequado para análise. Criei outro bucket no S3 chamado `ebac-42-exercicio-datalake-enriched`, onde os dados enriquecidos seriam armazenados em formato `PARQUET`.

Criei outra função no Lambda em Python para processar os dados brutos e transformá-los em formato `PARQUET`. O código da função era o seguinte:

```PYTHON
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event: dict, context: dict) -> bool:
    RAW_BUCKET = os.environ['AWS_S3_BUCKET']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

    tzinfo = timezone(offset=timedelta(hours=-3))
    date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

    table = None
    client = boto3.client('s3')

    try:
        prefix = f'telegram/context_date={date}'
        logger.info(f"Listando objetos em {RAW_BUCKET}/{prefix}")
        response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=prefix)

        if 'Contents' not in response:
            logger.info(f"Nenhum arquivo encontrado para {prefix}")
            return True

        for content in response['Contents']:
            key = content['Key']
            local_file = f"/tmp/{key.split('/')[-1]}"
            logger.info(f"Baixando {key} para {local_file}")
            client.download_file(RAW_BUCKET, key, local_file)

            try:
                with open(local_file, mode='r', encoding='utf8') as fp:
                    data = json.load(fp)
                    if "message" in data:
                        parsed_data = parse_data(data=data["message"])
                        iter_table = pa.Table.from_pydict(mapping=parsed_data)

                        if table:
                            table = pa.concat_tables([table, iter_table])
                        else:
                            table = iter_table

                    else:
                        logger.warning(f"Arquivo {key} não possui a chave 'message'.")

            except json.JSONDecodeError as e:
                logger.error(f"Erro ao decodificar JSON de {key}: {e}")
            except Exception as e:
                logger.error(f"Erro ao processar arquivo {key}: {e}")

        if table:
            parquet_file = f'/tmp/{timestamp}.parquet'
            pq.write_table(table=table, where=parquet_file)
            s3_key = f"telegram/context_date={date}/{timestamp}.parquet"
            logger.info(f"Enviando {parquet_file} para {ENRICHED_BUCKET}/{s3_key}")
            client.upload_file(parquet_file, ENRICHED_BUCKET, s3_key)
            return True
        else:
            logger.info("Nenhum dado processado.")
            return True

    except Exception as exc:
        logger.error(f"Erro geral: {exc}")
        return False

def parse_data(data: dict) -> dict:
    parsed_data = {}
    for key, value in data.items():
        if key == 'from':
            for k, v in data[key].items():
                if k in ['id', 'is_bot', 'first_name']:
                    parsed_data[f"user_{k}"] = [v]
        elif key == 'chat':
            for k, v in data[key].items():
                if k in ['id', 'type']:
                    parsed_data[f"chat_{k}"] = [v]
        elif key in ['message_id', 'date', 'text']:
            parsed_data[key] = [value]

    # Garante que a coluna 'text' sempre exista e seja uma lista de strings.
    if 'text' not in parsed_data:
        parsed_data['text'] = [None]
    else:
        if parsed_data['text'][0] is None:
            parsed_data['text'] = [None]
        else:
            if not isinstance(parsed_data['text'][0], str):
                parsed_data['text'] = [str(parsed_data['text'][0])]

    return parsed_data

```

Configurei as permissões do IAM para que a função Lambda pudesse acessar os buckets S3. Também configurei o timeout da função para 5 minutos e adicionei a layer com o pacote `PyArrow`.

Para automatizar o processo, criei uma regra no EventBridge, agendando a execução da função Lambda diariamente, à meia-noite.

<img width='800px' src='https://github.com/alexmdebarros/Pipeline-aws-telegram/blob/main/eventbridge.png?raw=true'>

# A Descoberta dos Insights: Análise e Visualização

Com os dados enriquecidos e armazenados no S3, era hora de explorar os insights. Criei uma tabela no Athena que apontava para os dados no bucket `ebac-42-exercicio-datalake-enriched`.

Executei o comando `MSCK REPAIR TABLE telegram` para carregar as partições da tabela.

Utilizei as seguintes queries SQL para explorar os dados:

**Contagem de mensagens por dia:**

`SELECT context_date, count(*) FROM telegram
GROUP BY context_date
ORDER BY context_date;`

Para entender a atividade do grupo ao longo do tempo, analisei a quantidade de mensagens enviadas por dia. A query agrupa os dados pela data de contexto (context_date), contando o total de mensagens por dia e ordenando os resultados em ordem crescente. A tabela gerada mostra a variação no volume de mensagens, permitindo identificar picos de interação e dias com menor atividade.

<img width='800px' src='https://github.com/alexmdebarros/Pipeline-aws-telegram/blob/main/total-mens-dia.png?raw=true'>

**Frequência de palavras:**

`SELECT word, count(*) FROM telegram, UNNEST(SPLIT(text, ' ')) AS t(word) WHERE text NOT IN ('', '') GROUP BY word ORDER BY count(*) DESC LIMIT 10;`

Uma análise interessante foi a contagem das palavras mais utilizadas nas mensagens. Para isso, utilizei a função `UNNEST` para dividir o texto das mensagens em palavras individuais e contar a frequência de cada uma. O resultado revela os termos mais recorrentes nas conversas do grupo, o que pode ser útil para entender padrões de comunicação e temas mais discutidos.

<img width='800px' src='https://github.com/alexmdebarros/Pipeline-aws-telegram/blob/main/frequencia-palavras-grafico.png?raw=true'>

**Horário com mais mensagens enviadas:**

`SELECT HOUR(FROM_UNIXTIME(date)) AS hora, COUNT(*) AS total_mensagens
FROM telegram
GROUP BY HOUR(FROM_UNIXTIME(date))
ORDER BY total_mensagens DESC;`

Para identificar os horários de maior atividade no grupo, extraí a hora das mensagens a partir da coluna date, convertendo o timestamp para um formato legível. Em seguida, agrupei os dados por hora e contei a quantidade de mensagens enviadas em cada período. A tabela mostra claramente os horários de pico, ajudando a entender quando há maior engajamento no grupo.

<img width='800px' src='https://github.com/alexmdebarros/Pipeline-aws-telegram/blob/main/qtd-mensagens-hora.png?raw=true'>