### **Introdução do Projeto de ETL**

Olá! Este notebook documenta detalhadamente todo o processo de criação de um pipeline de ETL (Extração, Transformação e Carregamento). O objetivo principal deste projeto é adquirir o máximo de experiência prática na construção de uma solução completa, explorando as diversas etapas e ferramentas envolvidas.  

O pipeline criado neste projeto terá como objetivo extrair dados por meio de uma API de um grupo de chat no Telegram. Após a extração, esses dados serão processados e armazenados para futuras análises.

---



### **1. o que é ETL ?**

O processo de **ETL (Extract, Transform, Load)** é um fluxo de trabalho utilizado para extrair dados de diversas fontes, transformá-los para atender a requisitos analíticos ou de armazenamento e carregá-los em um destino final, como um banco de dados ou data warehouse.

1. **Extract (Extração):**
   - Os dados são coletados de fontes diferentes, como APIs, bancos de dados ou arquivos.
   - Nesta etapa, o principal desafio é lidar com formatos variados e garantir a captura eficiente dos dados.

2. **Transform (Transformação):**
   - Os dados extraídos são processados para atender a um formato consistente e utilizável.
   - Isso pode incluir limpeza, normalização, agregação, ou a criação de novos campos derivados.

3. **Load (Carregamento):**
   - Os dados transformados são carregados para o destino final, como um repositório centralizado.
   - O objetivo é disponibilizar os dados para análise, relatórios ou outras aplicações.

Esse fluxo é essencial para consolidar informações e gerar insights em um ambiente corporativo ou analítico.


---

### **2. Bot no Telegram**

O Telegram permite a criação de **bots**, que são aplicações automatizadas capazes de interagir com os usuários através da sua API oficial. Os bots podem ser usados para uma ampla gama de funcionalidades, como:

- **Envio e recebimento de mensagens.**
- **Consulta de dados externos:** O bot pode consumir APIs e exibir informações, como o clima ou status de pedidos.
- **Automação de tarefas:** Por exemplo, notificações automáticas, lembretes ou coleta de feedback.

#### Etapas básicas para a criação de um bot para o projeto:
1. **Criar um bot no Telegram:**
   - Utilize o usuário **BotFather** no Telegram para registrar e configurar seu bot.
   - Após o registro, o BotFather fornece um **token de acesso** usado para autenticação nas requisições da API.

2. **Integração com a API:**
   - O bot usa o **token** para acessar a API do Telegram e realizar operações, como enviar mensagens ou gerenciar conversas.
   - aqui usaremos o bot como um leitor pondo como um Administrador de um grupo, assim poderemos extrair toda mensagen  mandada no grupo.

3. **outros:**
   - apos adicionar o bot ao grupo volte ao botfather e desligue a capacidade do bot entrar em novos grupos por segurança.

4. **processo pratico**
  - criação do bot
  1. Abra o *chat* com o `BotFather`;
  1. Digite `/newbot`;
  1. Digite o nome do *bot*;
  1. Digite o nome de usuário do *bot* (precisa terminar com sufixo `_bot`);
  1. Salve o `token` de acesso a API HTTP em um <font color='red'>local seguro</font>.
  ---
  - Integração com a API:
  1. Aperte o botão com o ícone de um lápis;
  1. Selecione `New Group`;
  1. Busque e selecione o *bot* recém criado pelo seu nome e confirme;
  1. Digite o nome do grupo.
  1. Abra o *chat* do grupo recém criado e Abra o perfil do grupo;
  1. Aperte o botão com o ícone de um lápis;
  1. Selecione Administrators;
  1. Aperte o botão com o ícone de um usuário;
  1. Selecione o *bot* e confirme;
  ---
  - outros
  1. Abra o *chat* com o `BotFather`;
  1. Digite `/mybots`;
  1. Selecione o *bot* pelo seu nome de usuário;
  1. Selecione `Bot Settings`;
  1. Selecione `Allow Groups?`;
  1. Selecione `Turn groups off`.




## **3. Conceito basico**
O conceito basico visa compreender o funcionamento da API para identificar meios de processar os dados de forma automatizada posteriormente. Por enquanto, serão realizados testes simples que servirão como base para as etapas futuras.

In [None]:
import json #json: Para trabalhar com dados no formato JSON.
import requests #requests: Para realizar requisições HTTP, útil para acessar APIs.
import pandas as pd #pandas: Para manipulação e análise de dados estruturados.
from datetime import datetime #datetime: Para trabalhar com datas e horários.


In [None]:
from getpass import getpass #getpass: Usado para ocultar a entrada de valores no terminal (como senhas ou tokens sensíveis).

api_telegram = getpass()


··········


O valor inserido aqui é armazenado na variável api_telegram. Este token é necessário para autenticar as requisições à API do Telegram.

In [None]:
base_url = f'https://api.telegram.org/bot{api_telegram}'

Define a URL base para as chamadas à API do Telegram.

In [None]:
# usando o getme

response = requests.get(url=f'{base_url}/getMe')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": {
    "id": 7644877241,
    "is_bot": true,
    "first_name": "0_bot",
    "username": "O_EBAC_bot",
    "can_join_groups": false,
    "can_read_all_group_messages": false,
    "supports_inline_queries": false,
    "can_connect_to_business": false,
    "has_main_web_app": false
  }
}


In [None]:
# usando o getupdate
response = requests.get(url=f'{base_url}/getUpdates')

print(json.dumps(json.loads(response.text), indent=2))
dados=json.loads(response.text)

{
  "ok": true,
  "result": [
    {
      "update_id": 484581844,
      "message": {
        "message_id": 86,
        "from": {
          "id": 6349852487,
          "is_bot": false,
          "first_name": "Phan",
          "language_code": "pt-br"
        },
        "chat": {
          "id": -1002482572030,
          "title": "grupo chave",
          "type": "supergroup"
        },
        "date": 1732728637,
        "text": "olaaa"
      }
    },
    {
      "update_id": 484581845,
      "message": {
        "message_id": 87,
        "from": {
          "id": 6349852487,
          "is_bot": false,
          "first_name": "Phan",
          "language_code": "pt-br"
        },
        "chat": {
          "id": -1002482572030,
          "title": "grupo chave",
          "type": "supergroup"
        },
        "date": 1732728641,
        "text": "gg"
      }
    }
  ]
}


In [None]:
dados=(dados['result'][0]['message'])

In [None]:

date = datetime.now().strftime('%Y-%m-%d')
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

dado_processado=dict()

for key , valor in dados.items():

  if key == 'from':
    for k, v in dados[key].items():
        if k in ['id', 'is_bot', 'first_name']:
            dado_processado[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

  elif key == 'chat':
    for k , v in dados[key].items():
      if k in ["id","type"]:
        dado_processado[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
        dado_processado[key] = [value]

if not 'text' in dado_processado.keys():
  dado_processado['text'] = [None]

dado_processado['context_date'] = [date]
dado_processado['context_timestamp'] = [timestamp]


In [None]:
df=pd.DataFrame(dado_processado)
df

Unnamed: 0,user_id,user_is_bot,user_first_name,chat_id,chat_type,text,context_date,context_timestamp
0,6349852487,False,Phan,-1002482572030,supergroup,,2024-11-27,2024-11-27 17:43:39


## **4. Preparação no AWS**

A AWS (Amazon Web Services) é uma plataforma robusta para construir soluções escaláveis e eficientes. Para este projeto, os seguintes serviços foram configurados:

#### **1. Buckets no S3**
Os **buckets** são repositórios de armazenamento no serviço Amazon S3 (Simple Storage Service). Eles são usados para armazenar e organizar arquivos, como os resultados de dados processados no ETL.

- **Criação do bucket:**
  - Acesse o console da AWS e navegue até o serviço S3.
  - Crie um novo bucket, fornecendo um nome único e especificando a região onde será armazenado.
  - Configure permissões de acesso apropriadas (por exemplo, somente leitura/escrita para usuários autorizados).

- **Uso no ETL:**
  - O bucket pode ser usado para armazenar dados extraídos ou transformados antes do carregamento final.
- **buckets criados:**
  - bucket_raw : contem os arquivos da operação de extração.
  - bucket_enriched : contem os arquivos da operação de trasformação e load.
---
#### **2. API no AWS Gateway**
O **API Gateway** é um serviço da AWS para criar e gerenciar APIs seguras e escaláveis.

- **Objetivo:**
  - Criar uma API que atue como interface para receber ou enviar dados ao longo do processo ETL.
  - Por exemplo, um endpoint que o bot Telegram pode chamar para enviar ou obter informações.

- **Configuração:**
  1. Acesse o serviço e clique em *Create API* -> *REST API*.  
  2. Insira um nome com o sufixo `-api`.  
  3. Selecione *Actions* -> *Create Method* -> *POST*.  
  4. Na tela de configuração, escolha *Integration type* como *Lambda Function*.  
  5. Ative a opção *Use Lambda Proxy integration*.  
  6. Localize a função do `AWS Lambda` pelo nome.  
  7. Selecione *Actions* -> *Deploy API*.  
  8. Escolha *New Stage* para *Deployment stage*.  
  9. Adicione *dev* como `Stage name`.
- copie o url para criação de um webhook
---
#### **3. Cronograma no AWS EventBridge**
O **Amazon EventBridge** é um serviço usado para criar regras baseadas em eventos, como agendamentos.

- **Objetivo:**
  - Configurar um agendamento para automatizar o processo ETL.
  - Por exemplo, executar o ETL diariamente às 00:00.

- **Configuração:**
  - Acesse o console do EventBridge.
  - Crie uma nova regra com a opção "Agenda" (Schedule).
  - Especifique a frequência usando expressões cron ou taxas simples (ex.: a cada 24 horas).
  - Configure o destino da regra, como uma função Lambda que realiza o ETL.

  expresão cron = 0 3 * * ? *

OBS:Esta etapa deve ser realizada no final do projeto.


## **4.1 criação de um webhook**

Um webhook é uma maneira de um sistema enviar notificações ou informações para outro sistema em tempo real, sem a necessidade de consultas frequentes (polling). Ele funciona como uma URL personalizada configurada para receber dados automaticamente sempre que um evento específico ocorre.

Copie o a `url` gerada na variável `aws_api_gateway_url`.

In [None]:
aws_api_gateway_url = getpass()

··········


 - **setWebhook**

O método `setWebhook` configura o redirecionamento das mensagens captadas pelo *bot* para o endereço *web* do paramametro `url`.

> **Nota**: os métodos `getUpdates` e `setWebhook` são mutualmente exclusivos, ou seja, enquanto o *webhook* estiver ativo, o método `getUpdates` não funcionará. Para desativar o *webhook*, basta utilizar o método `deleteWebhook`.

In [None]:
response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": true,
  "description": "Webhook was set"
}


 - **getWebhookInfo**

O método `getWebhookInfo` retorna as informações sobre o *webhook* configurado.

In [None]:
response = requests.get(url=f'{base_url}/getWebhookInfo')

print(json.dumps(json.loads(response.text), indent=2))

 - **deleteWebhook**

O método `deleteWebhook` serve para deletar o *Webhook*.

In [None]:

response = requests.get(f'{base_url}/deleteWebhook')


print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": true,
  "description": "Webhook was deleted"
}


---
## **5. Funções lambda**

### **5.1 Função Lambda no Processo Extract**

A função Lambda no processo **Extract** desempenha o papel de **extrair dados** de fontes externas para iniciar o fluxo ETL. Sua principal funcionalidade é acessar uma ou mais fontes de dados, como APIs, bancos de dados ou sistemas de arquivos, e coletar os dados brutos para posterior processamento.

#### **Funcionalidades da Função Lambda no Extract:**
1. **Conexão com a Fonte de Dados:**
   - Estabelece a conexão com a fonte externa (por exemplo, uma API REST).
   - Envia requisições para obter os dados necessários, utilizando parâmetros apropriados.

2. **Manipulação e Validação Inicial:**
   - Verifica a integridade e o formato dos dados extraídos.
   - Converte os dados para um formato estruturado, como JSON, se necessário.

3. **Armazenamento Temporário:**
   - Opcionalmente, salva os dados extraídos em um bucket S3 ou em outra solução de armazenamento intermediário para facilitar o processamento posterior.

4. **Automação:**
   - A função Lambda pode ser acionada automaticamente por eventos, como regras no Amazon EventBridge, para garantir que os dados sejam extraídos de forma periódica e sem intervenção manual.

Essa etapa é crucial para coletar os dados brutos que serão transformados e carregados nas próximas fases do ETL, garantindo que a base de informações esteja disponível para análises posteriores.


In [None]:
import os
import json
import logging
from datetime import datetime, timezone , timedelta

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve,
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')

  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")

  

### **5.2 Carregamento dos Dados Enriquecidos (Load)**  
- Após a transformação, os dados compactados em Parquet são enviados para outro bucket S3, dedicado ao armazenamento de dados enriquecidos.  
- O arquivo Parquet é organizado no bucket com base na data de processamento, permitindo uma estrutura hierárquica que facilita futuras consultas.  




In [None]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:

  '''
  Diariamente é executado para compactar as diversas mensagensm, no formato
  JSON, do dia anterior, armazenadas no bucket de dados cru, em um único
  arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
  '''

  # vars de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  # código principal

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None

      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True

  except Exception as exc:
      logging.error(msg=exc)
      return False






### **5.3 Transformação dos Dados (Transform)**  
- Cada arquivo JSON é processado localmente na Lambda para extrair e organizar as informações importantes das mensagens.  
- A transformação é realizada através da função auxiliar `parse_data`, que filtra os campos essenciais (como ID do usuário, tipo de chat e texto da mensagem) e estrutura os dados para análise.  
- Todos os dados processados são convertidos para o formato **Parquet**, utilizando a biblioteca **PyArrow**. Este formato é altamente eficiente para armazenamento e análise devido à sua compactação e estruturação columnar.

In [None]:
#parte 2

def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data

### **5.4 Organização e Integração no Athena**  
- Os dados no formato Parquet são consumidos por uma tabela externa configurada no **Amazon Athena**.  
- A tabela é particionada por data, otimizando a performance das consultas SQL e reduzindo custos de análise.  
- Para atualizar as partições após a adição dos novos dados, o comando `MSCK REPAIR TABLE` é executado automaticamente, integrando os dados ao esquema existente.  

In [None]:
#parte 3

  from botocore.exceptions import ClientError

def lambda_handler(event, context) -> bool:


  # -- setup


  query = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://modulo42-ebac-etl-enriched/telegram/'
 """


  client = boto3.client('athena')

  # -- create

  try:
    client.start_query_execution(
      QueryString=query,
      ResultConfiguration={'OutputLocation': 's3://ebac-miguel-results-query/'}
    )
  except ClientError as exc:
    raise exc

  # -- update

  try:
    client.start_query_execution(
      QueryString='MSCK REPAIR TABLE telegram',
      ResultConfiguration={'OutputLocation': 's3://ebac-miguel-results-query/'}
    )
  except ClientError as exc:
    raise exc

  return json.dumps(dict(status=True))

## **6. Consultas no AWS Athena**

- visão geral dos dados

```sql
SELECT * FROM telegram limit 10;
```
Resultado:

| message_id | user_id    | user_is_bot | user_first_name | chat_id        | chat_type  | text                 | date       | context_date |
|------------|------------|-------------|-----------------|----------------|------------|----------------------|------------|--------------|
| 80         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | ola                  | 1732653315 | 2024-11-27   |
| 81         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | around               | 1732725202 | 2024-11-27   |
| 82         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | tempo                | 1732726362 | 2024-11-27   |
| 83         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | cavalo               | 1732726365 | 2024-11-27   |
| 84         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | animal               | 1732726367 | 2024-11-27   |
| 85         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | espaço               | 1732726370 | 2024-11-27   |
| 73         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | Arthur               | 1732645040 | 2024-11-26   |
| 74         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | I                    | 1732645043 | 2024-11-26   |
| 75         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | Have a god danm plan | 1732645060 | 2024-11-26   |
| 76         | 6349852487 | false       | Phan            | -1002482572030 | supergroup | ARTHUR               | 1732645069 | 2024-11-26   |


- Quantidade de mensagens por dia.

```sql
SELECT
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC
```
Resultado:

| context_date | message_amount |
|--------------|----------------|
| 2024-11-29   | 11             |
| 2024-11-27   | 6              |
| 2024-11-26   | 7              |


- Quantidade de mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```
Resultado:

| user_id    | user_first_name | context_date | message_amount |
|------------|-----------------|--------------|----------------|
| 6349852487 | Phan            | 2024-11-29   | 11             |
| 6349852487 | Phan            | 2024-11-27   | 6              |
| 6349852487 | Phan            | 2024-11-26   | 7              |


- Média do tamanho das mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```
Resultado:

| user_id    | user_first_name | context_date | average_message_length |
|------------|-----------------|--------------|------------------------|
| 6349852487 | Phan            | 2024-11-29   | 7                      |
| 6349852487 | Phan            | 2024-11-27   | 5                      |
| 6349852487 | Phan            | 2024-11-26   | 14                     |


- Quantidade de mensagens por hora por dia da semana por número da semana.

```sql
WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum
ORDER BY
    parsed_date_weeknum,
    parsed_date_weekday
```
Resultado:

| parsed_date_hour | parsed_date_weekday | parsed_date_weeknum | message_amount |
|------------------|---------------------|---------------------|----------------|
| 18               | 2                   | 48                  | 7              |
| 20               | 2                   | 48                  | 1              |
| 16               | 3                   | 48                  | 5              |
| 20               | 5                   | 48                  | 11             |
