<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Pipeline de Dados do Telegram II
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Ingestão;</li>
  <li>ETL;</li>
  <li>Apresentação;</li>
  <li>Storytelling.</li>
</ol>

---

# **Exercícios**

## 1\. Ingestão

Replique as atividades da aula 1.

1.1. Crie um `bucket` no `AWS S3` para o armazenamento de dados crus, não se esqueça de adicionar o sufixo `-raw`.

- Foi realizado a criação do bucket `modulo-42-ebac-telegram-raw`  

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/bucket-modulo-42-ebac-telegram-raw.jpg?raw=true)  

1.2. Crie uma função no `AWS Lambda` para recebimento das mensagens e armazenamento no formato JSON no `bucket` de dados crus. Não se esqueça de configurar as variáveis de ambiente e de adicionar as permissão de interação com `AWS S3` no `AWS IAM`.

- Criação da Função Lambda `modulo-42-ebac-telegram-raw`  

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Cria%C3%A7%C3%A3o%20da%20fun%C3%A7%C3%A3o%20lambda%20modulo-42-ebac-telegram-raw.jpg?raw=true)  

- Configuração das Variavéis de Ambiente `AWS_S3_BUCKET` e `TELEGRAM_CHAT_ID`  

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/variaveis%20de%20ambiente%20fun%C3%A7%C3%A3o%20lambda%20modulo-42-ebac-telegram-raw.jpg?raw=true)  

- Permissão no AWS IAM - Foi habilitada a permissão `AmazonS3FullAccess`, o que não é uma boa pratica de segurança, o correto seria a politica de menor previlégio.
Mais como estamos utilizando para estudo acadêmico, vamos realizar o fullaccess.

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/AWS%20IAM%20fun%C3%A7%C3%A3o%20lambda%20modulo-42-ebac-telegram-raw.jpg?raw=true)

> **Nota**: Para testar a função com evento do próprio `AWS Lambda`, substitua o código `message = json.loads(event["body"])` por `message = event`. Lembre-se que o primeiro só faz sentido na integração com o `AWS API Gateway`.

1.3. Crie uma API no `AWS API Gateway` a conecte a função do `AWS Lambda`, conforme apresentado na aula.

- Foi criada a AWS API Gateway `modulo-42-ebac-telegram-api`, e criado o metodo `POST`  
  
![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/modulo-42-ebac-telegram-api-gateway.jpg?raw=true)  


> **Nota**: não disponibilize o endereço da API gerada.

1.4. Configura o *webhook* do *bot* através do método `setWebhook` da API de *bots* do **Telegram**. utilize o endereço da API criada no `AWS API Gateway`. Utilize o método `getWebhookInfo` para consultar a integração.

In [None]:
from getpass import getpass

token = getpass()

··········


In [None]:
from getpass import getpass
aws_api_gateway_url = getpass()

··········


In [None]:
import json

base_url = f'https://api.telegram.org/bot{token}'

 - **setWebhook**

In [None]:
import requests

response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": true,
  "description": "Webhook was set"
}


- **getWebhookInfo**

In [None]:
response = requests.get(url=f'{base_url}/getWebhookInfo')

print(json.dumps(json.loads(response.text), indent=2))

{
  "ok": true,
  "result": {
    "url": "https://xewka5s0z8.execute-api.sa-east-1.amazonaws.com",
    "has_custom_certificate": false,
    "pending_update_count": 0,
    "max_connections": 40,
    "ip_address": "18.230.7.189"
  }
}


> **Nota**: não disponibilize o *token* de acesso ao seu *bot* da API de *bots* do **Telegram**.

## 2\. ETL

Replique as atividades da aula 2.

2.1. Crie um `bucket` no `AWS S3` para o armazenamento de dados enriquecidos, não se esqueça de adicionar o sufixo `-enriched`.

- Foi criado o bucket  `modulo-42-ebac-telegram-enriched`

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/criacao%20do%20bucket%20modulo-42-ebac-telegram-enriched.jpg?raw=true)



2.2. Cria uma função no `AWS Lambda` para processar as mensagens JSON de uma única partição do dia anterior (D-1), armazenadas no *bucket* de dados crus. Salve o resultado em um único arquivo PARQUET, também particionado por dia. Não se esqueça de configurar as variáveis de ambiente, de adicionar as permissão de interação com `AWS S3` no `AWS IAM`, de configurar o *timeout* e de adicionar a *layer* com o código do pacote Python PyArrow.

Vamos seguir as seguintes Etapas:
 - Lista todos os arquivos JSON de uma única participação da camada crua de um *bucket* do `AWS S3`;
 - Para cada arquivo listado:
  - Faz o *download* do arquivo e carrega o conteúdo da mensagem;
  - Executa uma função de *data wrangling*;
  - Cria uma tabela do PyArrow e a contatena com as demais.
 - Persiste a tabela no formato Parquet na camada enriquecida em um *bucket* do `AWS S3`.

- O código da função:

In [None]:
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:
    RAW_BUCKET = os.environ['AWS_S3_BUCKET']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

    tzinfo = timezone(offset=timedelta(hours=-3))
    date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

    table = None
    client = boto3.client('s3')

    try:
        response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

        for content in response['Contents']:
            key = content['Key']
            client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

            # Log para mostrar o conteúdo do arquivo baixado
            logging.info(f"Content of {key}:")
            with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:
                logging.info(fp.read())

            with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:
                data = json.load(fp)
                data = data.get("message")

            if not data:
                logging.warning(f"Empty 'message' key in {key}")
                continue

            parsed_data = parse_data(data=data)
            iter_table = pa.Table.from_pydict(mapping=parsed_data)

            if table:
                table = pa.concat_tables([table, iter_table])
            else:
                table = iter_table
                iter_table = None

        if table is not None:
            pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
            client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

            return True
        else:
            logging.warning("No data to process.")
            return False

    except Exception as exc:
        logging.error(f"Error: {exc}")
        return False


- O código da função de *data wrangling*:

In [None]:
def parse_data(data: dict) -> dict:
    date = datetime.now().strftime('%Y-%m-%d')
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    parsed_data = dict()

    for key, value in data.items():
        if key == 'from':
            for k, v in data[key].items():
                if k in ['id', 'is_bot', 'first_name']:
                    parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]
        elif key == 'chat':
            for k, v in data[key].items():
                if k in ['id', 'type']:
                    parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]
        elif key in ['message_id', 'date', 'text']:
            parsed_data[key] = [value]

    if 'text' not in parsed_data.keys():
        parsed_data['text'] = [None]

    return parsed_data


> **Nota**: Para testar a função, substitua o código `date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')` por `date = (datetime.now(tzinfo) - timedelta(days=0)).strftime('%Y-%m-%d')`, permitindo assim o processamento de mensagens de um mesmo dia.

2.3. Crie uma regra no `AWS Event Bridge` para executar a função do `AWS Lambda` todo dia a meia noite no horário de Brasília (GMT-3).

- Foi criada a regra para executar a função lambda automaticamente, todos os dias as 00:00.  

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Amazon%20EventBridge%20%20modulo-42-ebac-telegram-enriched.jpg?raw=true)

## 3\. Apresentação

Replique as atividades da aula 3.

3.1. Cria a tabela no `AWS Athena` que aponte para os dados armazenados no bucket enriquecido do `AWS S3`.

```sql
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://modulo-42-ebac-telegram-enriched/telegram/'
```

- AWS Athena  

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Cria%C3%A7%C3%A3o%20de%20tabela%20externa%20_telegram_%20no%20AWS%20Athena.png?raw=true)  

3.2. Execute o comando `MSCK REPAIR TABLE <nome-tabela>` para carregar as partições.

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/carregando%20particao%20no%20athena.jpg?raw=true)

3.3. Execute as consultas SQL para explorar os dados.

```sql
SELECT * FROM `telegram` LIMIT 10;
```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/SELECT%20%20FROM%20telegram%20LIMIT%2010.png?raw=true)  

- Quantidade de mensagens por dia.

```sql
SELECT
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC
```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Quantidade%20de%20mensagens%20por%20dia.jpg?raw=true)  

- Quantidade de mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Quantidade%20de%20mensagens%20por%20usu%C3%A1rio%20por%20dia.jpg?raw=true)

- Média do tamanho das mensagens por usuário por dia.

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/M%C3%A9dia%20do%20tamanho%20das%20mensagens%20por%20usu%C3%A1rio%20por%20dia.jpg?raw=true)

- Quantidade de mensagens por hora por dia da semana por número da semana.

```sql
WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum
ORDER BY
    parsed_date_weeknum,
    parsed_date_weekday
```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Quantidade%20de%20mensagens%20por%20hora%20por%20dia%20da%20semana%20por%20n%C3%BAmero%20da%20semana1.1.jpg?raw=true)

- `parsed_date_hour`: representa a horas do tipo int, no caso acima temos as horas 08 e 11 da noite e 2 da tarde.  
- `parsed_date_weekday`: representa o dia da semana no tipo int, da forma 0 = Domingo, 1 = Segunda-Feira sucesivamente até o 6 = Sabádo.  
- `parsed_date_weeknum`: representa o numero da semana do ano, no nosso exemplo a 3 semana do ano de 2024.
- `message_amount`: representa a quantidade total de mensagens.

- Palavras mais frequentes nas mensagens no chat

```sql
SELECT palavra, COUNT(*) AS frequencia
FROM (
  SELECT TRIM(REGEXP_REPLACE(word, '[^a-zA-Z0-9]', '')) AS palavra
  FROM telegram
  CROSS JOIN UNNEST(SPLIT(text, ' ')) AS t(word)
) palavras
WHERE palavra <> ''
GROUP BY palavra
ORDER BY frequencia DESC
LIMIT 10;


```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Palavras%20mais%20frequentes%20nas%20mensagens%20no%20chat.jpg?raw=true)

**Acima visualizamos as top 10 palavras mais utilizadas no chat.**

- Análise de users bots vs. usuários humanos no chat, e o total de mensagens no chat.







```sql
SELECT user_is_bot, user_first_name, COUNT(*) AS total_mensagens
FROM telegram
GROUP BY user_is_bot, user_first_name;

```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/An%C3%A1lise%20de%20users%20bots%20vs.%20usu%C3%A1rios%20humanos%20no%20chat,%20e%20o%20total%20de%20mensagens%20no%20chat.jpg?raw=true)

No caso acima, observamos que o usuário `Aurora`, enviou `12` mensagens no chat e não é bot, pois a coluna `user_is_bot` = false.

- Média de Palavras por Mensagem e organizado por data

```sql
SELECT context_date, AVG(CARDINALITY(SPLIT(text, ' '))) AS media_palavras_por_mensagem
FROM telegram
WHERE text IS NOT NULL
GROUP BY context_date;
```

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/M%C3%A9dia%20de%20Palavras%20por%20Mensagem%20e%20organizado%20por%20data.jpg?raw=true)

Media de palavras por mensagem no dia `15-01-2024` = `2.83`

## 4\. Storytelling

 - **Arquitetura**

![](https://github.com/Genesio-Moreira87/Pipeline-de-Dados-do-Telegram/blob/main/images/Profissao%20Analista%20de%20dados%20M42%20Material%20de%20apoio%20arch%20.png?raw=true)

**Contexto:**

No cenário atual, a integração de chatbots tornou-se essencial para a interação online. Os chatbots, programas de computador que simulam a conversa humana, geram dados cruciais que demandam uma gestão eficiente. A distinção entre dados transacionais e analíticos, sendo os primeiros gerados em tempo real e os segundos analisados para insights, justifica a criação do projeto.

**Arquitetura:**

- **Sistema Transacional:**

    Utilizamos o Telegram como plataforma para chatbots, aproveitando sua API eficaz para desenvolvimento e gerenciamento. A interação em tempo real gera dados transacionais, fornecendo insights valiosos sobre o comportamento dos usuários.

**Sistema Analítico:**

- **Ingestão:**

  O processo de ingestão coleta dados diretamente da API do Telegram, usando métodos como getUpdates, garantindo a captura de todos os dados transacionais.  

**ETL (Extract, Transform, Load):**

A fase de ETL transforma os dados brutos em formato analítico. Manipulações e limpezas são realizadas antes de carregar os dados em um repositório analítico.  
**Apresentação:**  

Utilizando consultas SQL e visualizações gráficas, apresentamos os dados analíticos de forma compreensível. Ferramentas como AWS Athena facilitam a execução de consultas diretamente no repositório analítico.  

**Lambda:**

O AWS Lambda, parte integrante do projeto, desempenha um papel crucial na execução de código serverless. Pode ser empregado para automatizar tarefas, como a execução de ETLs em resposta a eventos específicos, garantindo eficiência e escalabilidade.

**Análise Exploratória de Dados:**

- **Fonte:**

  Os dados são analisados na fonte original, a API de bots do Telegram através do método getUpdates. Esta fonte oferece detalhes sobre interações de usuários, incluindo identificadores, mensagens e carimbos de data/hora.

**Destino:**

Na fase de apresentação, consultas SQL são utilizadas para explorar dados em profundidade. Potenciais visualizações incluem gráficos de tendências, contagem de mensagens por usuário e análise de palavras-chave mais frequentes, facilitando a compreensão dos padrões de interação dos usuários.

Essa abordagem de análise exploratória de dados fornece insights valiosos, essenciais para aprimorar a experiência do usuário no chatbot do Telegram. O AWS Lambda, ao integrar-se ao projeto, assegura a eficiência e automação contínua das tarefas.

# **Melhorias**

- **Infraestrutura**

A tabela do `AWS Athena` são contem atualização automática de novas partiçoes. Uma melhoria seria adicionar essa funcionalidade no `AWS Lambda`.

- **Dados**

No projeto, estamos transformando e apresentando apenas os dados referentes a mensagens de texto. Contudo, estamos ingerindo todas as mensagens que são enviadas ao grupo do **Telegram**: vídeo, áudio, arquivos, etc. Uma melhoria seria adicionar novos campos na função do `AWS Lambda` da etapa de ETL e na tabela do `AWS Athena` da etapa de ingestão, ou ainda criar uma novas funções de ETL e tabelas de apresentação.

 - **Consultas**

A tabela do `AWS Athena` da camada de apresentação contem dados que apresentam diversas oportunidades de criação de novas consultas analíticas. Uma melhoria seria a criação de novas consultas SQL, respondendo novas perguntas e trazendo novos insights.

# Autoria

Autor: Genésio Moreira Coutinho  
Linkedin: [link do Linkedin](https://www.linkedin.com/in/genesio-coutinho/)  
GitHub: [Link do GitHub](https://github.com/Genesio-Moreira87)  
Kaggle: [Link do Kaggle](https://www.kaggle.com/genesiomoreira)  
Email: genesiomoreiracoutinho@gmail.com


Duvidas, sugestões e melhoria, sinta-se a vontade de me contatar para um bate papo. Obrigado.