# 1. Introdução

O presente projeto foi desenvolvido como conclusão do curso **Profissão Analista de Dados** da EBAC e tem como objetivo a construção de um **pipeline de dados em nuvem**.

O pipeline de dados foi desenvolvido para ingerir, processar e armazenar mensagens de texto disparadas em um grupo do Telegram, criado exclusivamente para este trabalho.

Junto ao grupo foi adicionado um bot, criado dentro da própria plataforma por meio do *botfather*, responsável por captar as mensagens e encaminha-lás via API para a plataforma cloud Amazon Web Services (AWS), onde as mensagens foram processadas, armazenadas, limpas e analisadas.

# 2. Contexto

## 2.1 Chatbots

Um **chatbot** é um tipo de software que interage com usuários através de conversas automatizadas em plataformas de mensagens. Uma aplicação comum de chatbots é o seu uso no atendimento ao cliente, onde, de maneira geral, ajudam clientes a resolver problemas ou esclarecer dúvidas recorrentes antes mesmo que um atendente humano seja acionado.

## 2.2 Telegram

Telegram é uma plataforma de mensagens instantâneas freeware (distribuído gratuitamente) e, em sua maioria, open source. É muito popular entre desenvolvedores por ser pioneiro na implantação da funcionalidade de criação de chatbots, que, por sua vez, permitem a criação de diversas automações.

## 2.3 Dados

Os dados presentes neste projeto podem ser divididos em dois tipos: **transacionais** e **analíticos**. Os dados transacionais são representados pelas mensagens enviadas por usuários em um grupo no Telegram, e os dados analíticos são os dados provenientes dos sistemas transacionais manipulados na etapa de ETL (extraction, transformation and load) e serão armazenados em uma camada **raw** e **enriched**, respectivamente.

# 3. Arquitetura

Este projeto tem como objetivo a construção de um **pipeline de dados** que processe, armazene e exponha as mensagens e suas informações referentes de um grupo do Telegram, para que profissionais da área de Dados possam realizar as análises pertinentes.

Uma atividade de grande interesse para empresas que já utilizam ou pensam em utilizar os chatbots, é a **análise exploratória** dos dados enviados ao bot, para responder perguntas como:

* Qual horário os usuários mais acionam o bot?
* Qual dúvida ou problema mais recorrente?
* O bot está conseguindo esclarecer as dúvidas?

Para responder a essas perguntas, é necessário o desenvolvimento de uma arquitetura em duas partes: a transacional, representada pelos dados captados no grupo do Telegram e a analítica, realizada na plataforma Amazon Web Services (AWS) onde os dados são processados e analisados.

A sintese da arquitetura é representada pela imagem abaixo:

![image.png](attachment:a2a276c9-5858-416f-8f88-093ac3f71ec7.png)

## 3.1 Sistema Transacional

O **Telegram** representa a fonte de dados transacionais deste projeto. Foi criado um grupo de usuários com a presença de um bot, responsável por capturar as mensagens e redirecioná-las via **webhook do backend** do aplicativo para um endpoint, exposto pela **API Gateway** (AWS Services).

As mensagens trafegam no corpo ou payload da requisição.

## 3.2 Sistema Analítico

A etapa analítica do projeto consiste na ingestão, **ETL** (extract, transform, load) e apresentação dos dados. Ela foi realizada no Amazon Web Services (AWS) através dos serviços **API Gateway, Lambda, S3, EventBridge e Athena**.

![image.png](attachment:f0ab426c-b88d-4f13-849d-2e8e0be1ace6.png)

### 3.2.1 Ingestão

A etapa de **ingestão** é responsável por inserir os dados transacionais no ambiente analítico. Como o Telegram retem as mensagens por apenas 24 horas, a ingestão via **streaming** é a mais indicada para este projeto.

Uma requisição **HTTP** com o conteúdo da mensagem em seu payload é recebida pelo **AWS API Gateway** que, por sua vez, as redireciona para o **AWS Lambda**, servindo assim como gatilho. Já o **AWS Lambda** recebe o *payload* da requisição em seu parâmetro *event*, que salva o conteúdo em um arquivo de formato **JSON** (original) e o armazena em um bucket específico no **AWS S3** particionado por dia.

As mensagens capturadas pelo bot podem ser acessadas via **API**. A única requisição necessária é o `token` fornecido pelo **botFather** na criação do bot. Por isso, começamos com a autenticação abaixo:

In [None]:
#autenticando o token

from getpass import getpass

token = getpass()

StdinNotImplementedError: getpass was called, but this frontend does not support input requests.

In [None]:
#pacotes e bibliotecas

import os
import json
import pandas as pd

import logging
from datetime import datetime, timedelta, timezone

import boto3

import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
# A url base é comum a todos os métodos da API

base_url = f'https://api.telegram.org/bot{token}'

O código abaixo representa a **função do Lambda** que recebe as mensagens do **Telegram** via **API Gateway**, verifica no seu conteúdo se elas foram produzidas no grupo especificado e as escreve em seu formato original **JSON**, em um bucket no **AWS S3**:

In [None]:
def lambda_handler(event: dict, context: dict) -> dict:

  #variáveis de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  #variáveis lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  #código

  client = boto3.client('s3')

  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")

Para testar a integração do **AWS Lambda** com a **API**, devemos fazer a implantação da **API** e obter seu endereço web. Com a url gerada, incluímos ela na variável `aws_api_gateway_url`.

aws_api_gateway_url = getpass()

O método `setWebhook` configura o redirecionamento das mensagens captadas pelo bot para o endereço web do parâmetro url, e o método `getWebhookInfo` retorna as informações sobre o webhook configurado.

In [None]:
#setWebhook

response = requests.get(url=f'{base_url}/setWebhook?url={aws_api_gateway_url}')

print(json.dumps(json.loads(response.text), indent=2))

In [None]:
#getWebhookInfo

response = requests.get(url=f'{base_url}/getWebhookInfo')

print(json.dumps(json.loads(response.text), indent=2))

### 3.2.2 ETL

A etapa de **extração, transformação e carregamento** é a etapa responsável pela manipulação dos dados extraídos na etapa transacional, ou seja, persistidos em camadas cruas (*raw*) dos sistemas analíticos. O dado cru armazenado passa por um processo recorrente onde ele é limpo, duplicado e persistido com técnicas de particionamento, orientado a coluna e compressão. Ao final, o dado está pronto para ser analisado por profissionais da área.

Uma vez ao dia, o **AWS Event Brigde** aciona o **AWS Lambda** que, por sua vez, processa todas as mensagens geradas no dia anterior (D-1), desnormaliza o dado semi-estruturado típico de arquivos **JSON**, salva o conteúdo processado em um arquivo no formato Apache Parquet e o armazena no **AWS S3** particionado por dia.

O código abaixo é **executado diariamente pelo AWS Lambda** - acionado pelo AWS Event Bridge - para compactar as diversas mensagens que chegam no grupo (do dia anterior), no formato JSON armazenadas no bucket de dados cru (raw), em um único arquivo no formato Parquet. Após o processamento, as mensagens são armazenadas no bucket de dados enriquecidos (enriched).

In [None]:
def lambda_handler(event: dict, context: dict) -> bool:

  #variáveis de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  #variáveis lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  #código

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None

      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True

  except Exception as exc:
      logging.error(msg=exc)
      return False

def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data

Note que código acima possui uma função chamada `parse_data`, que executa um laço de repetição para varrer todas as chaves do arquivo e selecionar apenas a text, que é a única de interesse deste projeto. Caso a mensagem não possua a chave text, ela será criada com o valor `None`.

### 3.2.3 Apresentação dos Dados

Nesta etapa, o **AWS Athena** tem como função **apresentar os dados aos usuários e sistemas**, através de uma interface **SQL**. Portanto, é necessário a criação de uma tabela externa sobre o dado armazenado na camada mais refinada da arquitetura, a camada enriquecida (enriched) - entregando assim dados mais consistentes e com consultas mais baratas.

Para criar a tabela, executamos as seguintes *queries*:

```sql
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket-enriquecido>/'
```

Para adicionar as partições em uma única tabela:

```sql
MSCK REPAIR TABLE `telegram`;
```

E para confirmar que os dados foram carregados, vamos executar uma query para exibir 10 valores:

```sql
SELECT * FROM `telegram` LIMIT 10;
```

![image.png](attachment:01eb2cbf-ffc7-47e5-aa79-e8390d2629f1.png)

# 4. Análise Exploratória de Dados

Agora, com as funções criadas e agendadas e com as tabelas alimentadas, podemos executar algumas análises exploratórias, como exemplo:

**Quantidade de mensagens por dia:**

```sql
SELECT
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY context_date
ORDER BY context_date DESC
```

![image.png](attachment:993fbe2b-8fa4-451a-9728-2014cfa7fcb4.png)

**Quantidade de mensagens por usuários por dia**

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  count(1) AS "message_amount"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

![image.png](attachment:6095f439-134b-4bee-958d-e9faba483f71.png)

**Média do tamanho das mensagens de cada usuário por dia**

```sql
SELECT
  user_id,
  user_first_name,
  context_date,
  CAST(AVG(length(text)) AS INT) AS "average_message_length"
FROM "telegram"
GROUP BY
  user_id,
  user_first_name,
  context_date
ORDER BY context_date DESC
```

![image.png](attachment:2d10fb2b-5408-41d9-8247-5646a22c46b0.png)

**Quantidade de mensagens por hora por dia da semana por número da semana**

```sql
WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum
ORDER BY
    parsed_date_weeknum,
    parsed_date_weekday
```

![image.png](attachment:8aa84cf9-8426-4b0f-8dc6-d1ec99097e08.png)

# 5. Conclusão

A utilização de *chatbots* juntamente com a **análise dos dados extraídos através de um pipeline**, são de grande utilidade para empresas de todo porte, como demonstrado acima. Os *chatbots* facilitam a interação dos clientes com a empresa, uma vez que podem resolver problemas e tirar dúvidas pelo seu smarphone/web sem precisar ir até uma loja ou entrar em contato diretamente com algum funcionário. Os dados extraídos dessas interações fornecem *insights* de grande valor para os tomadores de decisão, uma vez que as análises personalizadas de acordo com suas demandas, fornecem informações que vão desde o comportamento dos seus clientes, até esclarecimentos e feedbacks sobre os produtos oferecidos.

Este projeto **permitiu conhecer os serviços oferecidos pela Amazon Web Services** e entender todo o potencial que os mesmos possuem para o analista de dados.

Também foi possível perceber como o formato **parquet** melhorou o desempenho das consultas.

Foi extremamente fácil fazer a **API do Telegram** funcionar com os serviços da AWS. Uma vez criada a API ela funcionou corretamente e se manteve em funcionamento sem problemas.

Ficará para um próximo projeto, testar a comunicação entre o AWS Athena e o **Microsoft Power BI**.