<div style="align-items: center; justify-content: space-between;">
   
   <h1>Data Pipeline | AWS - Whatsapp - Telegram</h1>
   <b> por <a href="https://www.linkedin.com/in/fernandohcarneiro/">Fernando Carneiro</a></b>
   <br><br>
   <img src="https://raw.githubusercontent.com/carneiro-fernando/EBAC/a4a2850a42490d15d266fa584c49fb3a8120fcdb/assets/Images/Projeto_Telegram_pipeline/header_pipeline_aws_chats.drawio.svg"  align="center" alt="data-pipeline" width="auto">

</div>

---
## Sumário
#### 1. [**Introdução**](#intro)  
    1.1 Objetivo  
    1.2 O que é Pipeline?  
    1.3 Pipeline do projeto  
#### 2. [**Sistema transacional**](#sistrans)  
    2.1 Ingestão de dados  
    2.2 O que é uma API?  
    2.3 Whatsapp Business API  
    2.4 Telegram Botfather  
    2.5 Webhook  
    2.6 AWS API Gateway  
#### 3. [**Sistema analítico**](#sisanal)  
    3.1 O que é ETL (Extraction, Transformation, Loading)?  
    3.2 Extração (Extraction)    
        3.2.1 AWS Lambda  
        3.2.2 AWS S3  
    3.3 Transformação (Transformation)  
    3.4 Carregamento (Loading)    
        3.4.1 AWS Event Bridge  
#### 4. [**Apresentação**](#apres)  
    4.1 AWS Athena  
    4.2 Análise de Dados
#### 4. [**Conclusão**](#conclu)  

<a id='intro'></a>
## 1. Introdução

### 1.1 Objetivo
O objetivo deste projeto é demonstrar como é feita a extração de dados provenientes do Telegram e do WhatsApp, transferir esses dados para um ***Datalake***, realizar o processamento dos dados em lote na nuvem e então fazer a análise dos dados tratados. Esses dados fornecem a base para extrair informações valiosas, abrindo possibilidades para aprimorar serviços e explorar oportunidades de monetização.

### 1.2 O que é Pipeline?
No contexto de ciência de dados, um ***pipeline*** refere-se a uma **sequência de processos automatizados** que são encadeados para realizar tarefas específicas, desde a coleta de dados até a entrega de resultados. Esse conceito é inspirado no campo da engenharia de software, onde um pipeline representa um fluxo contínuo de desenvolvimento e entrega de software.

*fonte: https://www.stitchdata.com/resources/what-is-data-pipeline/*

### 1.3 Pipeline do projeto
O pipeline de dados deste projeto se inicia pela ingestão dos dados dos usuários por meio de API que conectam as fontes com a nuvem da Amazon Web Services(AWS). Na plataforma da AWS os dados são recebidos por uma função Lambda (inserir link) que armazena esses dados de forma organizada por dias no AWS S3. Diariamente um AWS Event Bridge chama um processo em lote no Lambda que transforma os dados brutos extraindo somente o que estamos interessados (data da mensagem, nome e número do contato e a mensagem) e salva de forma organizada no AWS S3. No processo de visualização, tabelas criadas a partir dos arquivos parquet criados no processo anterior, permitem que façamos análises de todo tipo utilizando a linguagem SQL, sendo possível extrair esses dados para um dashboard posteriormente.

![Pipeline do projeto](https://raw.githubusercontent.com/carneiro-fernando/EBAC/db382413378537f201bed37cc51b427c83da5a5e/assets/Images/Projeto_Telegram_pipeline/pipeline_aws_chats.drawio.svg)

<a id='sistrans'></a>
## 2. Sistema transacional

Um sistema transacional é uma estrutura tecnológica e de dados que suporta as transações diárias de uma fonte de dados. Ele é responsável por coletar, registrar e transmitir dados para uma cadeia de processos que tratarão os dados para posterior análise. Em resumo, os sistemas transacionais são projetados para ingerir dados criados diariamente e salvá-los em um banco de dados ou *DataLake*.

*fonte: https://insightsoftware.com/encyclopedia/transactional-systems/*

### 2.1 Ingestão de dados
Neste projeto a ingestão dos dados consiste na captura de mensagens de texto enviadas pelas plataformas Whatsapp e Telegram em tempo real, fornecidas por meio de uma API.

#### 2.2 O que é uma API?
Uma API, ou Interface de Programação de Aplicações, é um componente de software dentro de um sistema que fornece um mecanismo para invocar uma tarefa em outro sistema. Ela serve como uma ponte que facilita a comunicação e integração entre sistemas ou componentes de software diversos, até de tipos diferentes.

*fonte: https://www.3pillarglobal.com/insights/a-simple-api-definition-and-how-apis-work/*

#### 2.3 Whatsapp Business API
É a API da empresa que permite a conexão em escala entre diversos tipos de sistemas e os usuários da plataforma. Neste projeto iremos usá-la para receber as mensagens enviadas para um número de telefone específico configurado. Embora seja possível extrair uma variedade de informações, me concentrei em capturar dados essenciais, como as informações de contato e a mensagem enviada pelo usuário.

*Documentação: https://developers.facebook.com/docs/whatsapp/cloud-api/overview*

##### 2.3.1 Criação da aplicação
Na página *dashboard* do [Meta for developers]('https://developers.facebook.com/') foi criado um aplicativo chamado `chat_to_analysis`.

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/wpp_app_creation.png?raw=true">

Adicionado ao aplicativo criado os serviços necessários para o recebimento de mensagens na plataforma Whatsapp, Whatsapp e Webhooks.

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/wpp_app_integration.png?raw=true">


#### 2.4 Telegram Botfather
Um chatbot é um programa de computador projetado para simular a interação humana em conversas. Neste projeto iremos usar um chatbot para capturar as mensagens enviadas dentro de um grupo. O BotFather é um bot especial no Telegram que faz a criação e gerenciamento de bots na plataforma. Ele nos permitirá desenvolver e configurar o bot de maneira eficiente e interativa.

*Documentação: https://core.telegram.org/bots/features#botfather*

##### 2.4.1 Criação do bot
No plataforma *web* do *Telegram* foi criado um bot a partir do *Botfather* e dado o nome de `chat_monitor_ro_bot.

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/bot_creation.png.png?raw=true" alt="Bot Creation Image">

Após a criação do bot, criei um grupo ao qual tenho interesse em captar as interações entre os integrantes. 

<img src='https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/group_creation.png?raw=true'>

E foi configurado o bot como administrador, permitindo dessa maneira o bot poderá ouvir a conversa de todos os integrantes deste grupo.

<img src='https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/bot_admin.png?raw=true'>

#### 2.5 AWS API Gateway
O primeiro serviço em nuvem na sequência do pipeline, faz parte dos serviços em nuvem da Amazon Web Services (AWS). Este serviço centraliza o gerenciamento das APIs inclusive o recebimento, como no caso deste projeto, em que servirá como porta de entrada para o recebimento do *payload* advindo das plataformas de mensagens. Ele oferece recursos como autenticação, autorização, monitoramento e escalabilidade, simplificando o processo de construção e administração de APIs de forma eficiente na infraestrutura da AWS.

*fonte: https://aws.amazon.com/pt/api-gateway/*

##### 2.5.1 Criação da API para Whatsapp
Na plataforma, foi criada uma nova API com protocolo REST com um método GET e um método POST configurado com integração do tipo proxy com o serviço Lambda.

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/wpp_api_methods.png?raw=true">

##### 2.5.2 Criação da API para Telegram
Na plataforma *Telegram*, foi repetido o procedimento e criado uma nova API com protocolo REST porém apenas o método POST, também configurado com integração do tipo proxy com o serviço Lambda.

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/tgm_api_methods.png?raw=true">

#### 2.6 Webhook
Um webhook é uma ferramenta de comunicação automatizada que permite que sistemas online, geralmente APIs, enviem informações em tempo real assim que eventos específicos ocorrem. Com uma abordagem assíncrona baseada em eventos, opera por meio de protocolos HTTP ou HTTPS, fornecendo uma maneira ágil e eficiente de integrar plataformas e serviços online. No escopo deste projeto, o webhook será acionado pelo recebimento de uma nova mensagem, sendo responsável por encaminhar os dados úteis, também chamado de *payload*,  de forma instantânea aos serviços em nuvem.

*fonte: https://www.redhat.com/en/topics/automation/what-is-a-webhook*

##### 2.6.1 Configuração do Whatsapp
Na dashboard *Meta for Developers* foi configurado o *webhook* com a API criada no *AWS API Gateway*, para isso foi passado a URL disponibilizada implantar a API e configurado um script na plataforma Lambda para fazer a verificação. O script que criei pode ser encontrado neste link <LINK PARA O SCRIPT>

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/wpp_webhooks_config.png?raw=true">

Após configurado o *webhook* foi possível selecionar quais os campos de interesse à ser recebidos pelo aplicativo.

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/wpp_webhook_fields.png?raw=true">

##### 2.6.2 Configuração do Telegram
O procedimento na plataforma *Telegram* é completamente diferente, nela foi preciso enviar uma requisição GET para acionar o método `setWebhook` e injetando nele o endereço URL adquirido na criação da API do *AWS API Gateway*. Abaixo segue a captura da execução da requisição:

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/webhook_creation.png?raw=true">

Após configurado o *webhook* foi possível verificar seu funcionamento com o método `getWebhookInfo`:

<img src="https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/webhook_info.png?raw=true">

<a id='sisanal'></a>
## 3. Sistema analítico
Esses sistemas apoiam a tomada de decisões, relatórios, consultas e análises. São projetados para lidar com consultas complexas em grandes volumes de dados vindos dos sistemas transacionais, organizam esses dados e os processam de maneira a criar insights úteis. Neste projeto o sistema compreende a retirada dos dados brutos (*raw*) do datalake, a transformação deles em informação e a análise em busca de padrões e insights.

*fonte: http://bi-insider.com/posts/types-of-enterprise-data-transactional-analytical-master/*

### 3.1 O que é ETL (Extraction, Transformation, Loading)?
É o processo de extrair, limpar e organizar os dados de uma origem para serem carregados, ou armazenados, em um local específico. Essa etapa é crucial para assegurar que os dados estejam em uma forma apropriada e prontos para análise antes de serem empregados em relatórios, visualizações ou em outros procedimentos analíticos.

 *fonte: https://www.ibm.com/topics/etl*

### 3.2 Extração (Extraction)
Envolve a extração, ou coleta, de dados brutos de diferentes fontes, como as APIs neste caso.

#### 3.2.1 AWS Lambda
O AWS Lambda é um serviço de computação que permite a execução de código sem a necessidade de gerenciar servidores, também chamado de *serverless*. Nela é possível criar funções Lambda, carregar seu código e definir eventos que acionarão a execução da função, sem se preocupar com a escala automática e infraestrutura subjacente. Neste projeto ela se encarregará de executar scripts na linguagem *Python* para manuseio e transformação dos dados.

*fonte: https://docs.aws.amazon.com/lambda/latest/dg/welcome.html*

#### 3.2.2 AWS S3
O Amazon S3, ou Simple Storage Service, é um serviço de armazenamento que permite armazenar e recuperar dados de maneira fácil e escalável. É amplamente utilizado para armazenar arquivos, fazer backup e hospedar sistemas online, proporcionando uma solução eficiente e confiável para necessidades de armazenamento na nuvem. Neste projeto, ele armazenará tanto os dados brutos servindo como *Datalake*, quanto os dados transformados e prontos para análise.

*fonte: https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html*

##### 3.2.3 Extração Whatsapp
Foi desenvolvido um script para a extração dos dados do Whatsapp de acordo com a sua [documentação]('https://developers.facebook.com/docs/graph-api/webhooks/getting-started/webhooks-for-whatsapp?locale=pt_BR') como segue no trecho de código em Python abaixo. Nele é possível notar a verificação que a empresa impõem para estabelecer a conexão e o retorno dos dados na variável `response_body` que contém os dados no formato *JSON*.

In [1]:
### Desenvolvido por: Fernando Carneiro 

import json
import os
import logging
import boto3
from datetime import datetime, timedelta, timezone

def lambda_handler(event, context):
   
# Variáveis de ambiente
    BUCKET = os.environ['AWS_S3_BUCKET']
    
# Declaração de variáveis
    tzinfo = timezone(offset=timedelta(hours=-3))
    date = datetime.now(tzinfo).strftime('%Y-%m-%d')
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')
    filename = f'{timestamp}.json'
    
# Instanciando client S3
    client = boto3.client('s3')
    
    try:
        message = json.loads(event["body"])
        
        with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
            json.dump(message, fp)
        
        client.upload_file(f'/tmp/{filename}', BUCKET, f'whatsapp/context_date={date}/{filename}')
    
    except Exception as exc:
        logging.error(msg=exc)
        return dict(statusCode="500")
    

# Verificação do Webhook
    VERIFY_TOKEN = os.environ['VERIFY_TOKEN']
    
    # Checando 'queryStringParameters' para verificar se é um webhook
    if event.get('queryStringParameters'):
        mode = event['queryStringParameters'].get('hub.mode')
        token_sent = event['queryStringParameters'].get('hub.verify_token')
        
        if mode == 'subscribe' and token_sent == VERIFY_TOKEN:
            return {
                'statusCode': 200,
                'body': event['queryStringParameters']['hub.challenge']
            }
        else:
            return {
                'statusCode': 403,
                'body': 'Nao corresponde'
            }

    response_body = {
        'status': 'success',
        'message': 'Success, but queryStringParameters is not present',
    }

    return {
        'statusCode': 200,
        'body': json.dumps(response_body)
    }

##### 3.2.4 Extração Telegram
Abaixo segue o script em Python usado para a extração dos dados do Telegram.

In [2]:
import os
import json
import logging
from datetime import datetime, timezone

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve, 
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')
  
  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")

Note que em ambas as extrações os dados provenientes da *API* em formato *JSON* são direcionados para um endereço na *web* armazenado na variável `BUCKET`. Este é o nome do espaço de armazenamento do serviço *AWS S3* para onde estamos enviando os dados brutos. Na sequência será explanado a etapa seguinte, a de transformação dos dados extraídos.

### 3.3 Transformação (Transformation)
Nesta etapa, os dados coletados são processados e transformados para atender aos requisitos do destino. Isso inclui limpeza, filtragem, agregação e qualquer manipulação necessária.

#### 3.3.1 Transformação Whatsapp (Lambda)
Essa foi a etapa ais difícil e demorada de desenvolver, tando em vista tanto os requisitos do sistema de API da empresa, nem sempre com documentação disponível de forma detalhada ou clara, como também quanto à abstração lógica necessária. O script desenvolvido abaixo recebe os dados brutos de um *bucket* e retira dele somente as informações de interesse, devolvendo o dado estruturado no formato de tabela *parquet*.

In [3]:
### Criado por: Fernando Carneiro

# Importação de bibliotecas
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import pandas as pd
import boto3
import pyarrow as pa
import pyarrow.parquet as pq

# Função padrão do AWS Lambda
def lambda_handler(event: dict, context: dict) -> bool:
    """
    Função de manipulação do AWS Lambda para processar o evento e o contexto.

    Parâmetros:
    event (dict): O evento do AWS Lambda.
    context (dict): O contexto do AWS Lambda.

    Retorna:
    bool: Verdadeiro se a operação foi bem-sucedida, Falso caso contrário.
    """
    # Recuperar variáveis de ambiente
    RAW_BUCKET = os.environ['AWS_S3_RAW']
    ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

    # Inicializar variáveis lógicas
    tzinfo = timezone(offset=timedelta(hours=-3))
    date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
    timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

    table = None
    client = boto3.client('s3')

    try:
        # Listar objetos no bucket raw com o prefixo especificado
        response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'whatsapp/context_date={date}')
        df = pd.DataFrame()

        # Percorrer o conteúdo da resposta
        for content in response['Contents']:
            key = content['Key']
            # Baixar o arquivo do bucket raw para um local temporário
            client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

            # Abrir o arquivo baixado e carregar seu conteúdo JSON
            with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:
                data = json.load(fp)

            # Analisar os dados e anexá-los ao dataframe
            if df.empty:
                df = parse_data(data=data)
            else:
                new_df = parse_data(data=data)
                df = pd.concat([df, new_df], axis=0, join='outer')

        # Converter o dataframe para uma Tabela PyArrow
        table = pa.Table.from_pandas(df)
        # Escrever a tabela em um arquivo Parquet
        pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
        # Carregar o arquivo Parquet no bucket enriquecido
        client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"whatsapp/context_date={date}/{timestamp}.parquet")

        return True

    except Exception as exc:
        # Registrar o erro e imprimi-lo
        logging.error(msg=exc)
        print(f'Erro: {exc}')

        return False 
        
# Função para extrair os dados desejados e transformar o dicionário (JSON) em um DataFrame      
def parse_data(data: dict) -> pd.DataFrame:
    """
    Analisa os dados fornecidos em um DataFrame do pandas.

    Parâmetros:
    data (dict): Os dados para analisar.

    Retorna:
    pd.DataFrame: Os dados analisados.
    """
    # Inicializa uma lista vazia para armazenar os dados
    data_list = []

    # Verifica se os dados são um dicionário e contém 'entry'
    if isinstance(data, dict) and "entry" in data:
        # Percorre a lista 'entry'
        for i in range(len(data["entry"])):
            entry = data["entry"][i]

            # Verifica se a entrada é um dicionário e contém 'changes'
            if isinstance(entry, dict) and "changes" in entry:
                # Percorre a lista 'changes'
                for j in range(len(entry["changes"])):
                    change = entry["changes"][j]

                    # Verifica se a alteração é um dicionário e contém 'value'
                    if isinstance(change, dict) and "value" in change:
                        # Obtém o dicionário 'value'
                        value = change["value"]

                        # Verifica se o valor é um dicionário e contém 'contacts' que deve ser uma lista
                        if isinstance(value, dict) and "contacts" in value and isinstance(value["contacts"], list):
                            # Percorre a lista 'contacts'
                            for k in range(len(value["contacts"])):
                                # Obtém o dicionário 'contact'
                                contact = value["contacts"][k]

                                # Verifica se o valor é um dicionário e contém 'messages' que deve ser uma lista
                                if isinstance(value, dict) and "messages" in value and isinstance(value["messages"], list):
                                    # Percorre a lista 'messages'
                                    for l in range(len(value["messages"])):
                                        # Obtém o dicionário 'message'
                                        message = value["messages"][l]

                                        # Cria uma linha com as colunas necessárias
                                        row = {
                                            "name": contact["profile"]["name"],
                                            "from": message["from"],
                                            "body": message["text"]["body"],
                                            "timestamp": message["timestamp"],
                                            "type": message["type"]
                                        }

                                        # Anexa a linha à lista de dados
                                        data_list.append(row)

    # Cria um dataframe a partir da lista de dados
    parsed_data = pd.DataFrame(data_list)

    return parsed_data

#### 3.3.1 Transformação Telegram (Lambda)
O script desenvolvido abaixo de forma similar recebe os dados brutos de um *bucket* e retira dele somente as informações de interesse, devolvendo o dado estruturado no formato de tabela *parquet*. A diferença entre os scripts da plataforma *Whatsapp* e *Telegram* é basicamente a estrutura em que o dado é recebido via *API*, sendo o do *Whatsapp* maior e com mais aninhamento entre itens.

In [4]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:

  '''
  Diariamente é executado para compactar as diversas mensagensm, no formato
  JSON, do dia anterior, armazenadas no bucket de dados cru, em um único 
  arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
  '''

  # vars de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  # código principal

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None
          
      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True
  
  except Exception as exc:
      logging.error(msg=exc)
      return False
  
def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data

Ambos os scripts retornam a um *bucket* uma tabela no formato *parquet* organizada com os seguintes dados:

* Whatsapp Schema

| Column Name | Data Type |
|-------------|-----------|
| name        | object    |
| number      | int64     |
| message     | object    |
| date        | int64     |
| type        | object    |

* Telegram Schema

| Column Name      | Data Type |
|------------------|-----------|
| message_id       | int64     |
| user_id          | int64     |
| user_is_bot      | bool      |
| user_first_name  | object    |
| chat_id          | int64     |
| chat_type        | object    |
| date             | int64     |
| text             | object    |

### 3.4 Carregamento (Loading)

#### 3.4 AWS Event Bridge
É um serviço que permite a criação de regras baseadas em cronogramas para acionar eventos em horários específicos, facilitando a automação de tarefas recorrentes. Essa funcionalidade é útil para programar a execução de determinadas ações, como a ativação das funções Lambda deste projeto, conforme a agenda predefinida.

*fonte: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-what-is.html*

Para este projeto, o *AWS Event Bridge* foi responsável por inicializar o processo de transformação, uma vez ao dia às 00:00h. Ambos os scripts Python de transformação são executados, tanto do Whatsapp quanto do Telegram.

![Eventos agendados](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/schedules.png?raw=true)

<a id='apres'></a>
## 4. Apresentação
Nesta fase os dados são disponibilizados para os usuários finais, como analistas e cientistas de dados, e para sistemas de consulta, como dashboards e motores de consulta. Geralmente, as informações são acessadas por meio de ferramentas de consulta, como *SQL*, sendo esta a principal interface para a maioria dos usuários. Nesse contexto, a etapa de apresentação utiliza o *AWS Athena*, uma ferramenta com motor de consulta *SQL*, simplificando a leitura e visualização dos dados armazenados na camada *ETL* para análises eficazes.

### 4.1 AWS Athena
O AWS Athena é um serviço de consulta interativa que permite analisar dados armazenados no Amazon S3 usando SQL padrão. Ele elimina a necessidade de carregar dados para um banco de dados permitindo explorar grandes conjuntos de dados de maneira fácil e flexível, obtendo insights valiosos sem a necessidade de infraestrutura prévia ou complexos processos de gerenciamento de dados. É especialmente útil em cenários de *Big Data* e *Data Lakes* como no nosso caso, proporcionando uma abordagem ágil para análise de dados na nuvem. É o meio escolhido para este projeto, para visualizar e analisar as informações armazenadas.

*fonte: https://docs.aws.amazon.com/athena/latest/ug/what-is.html*


Na etapa de **apresentação**, o `AWS Athena` tem função de entregar o dados através de uma interface SQL para os usuários do sistema analítico. Para o projeto foram criadas duas tabelas, uma para cada plataforma e depois analisados brevemente as informações obtidas. Para a criação das tabelas foi usado a linguagem SQL, veja abaixo alguns exemplos começando pelas queries de criação de ambas as tabelas:

Criação da tabela SQL - Whatsapp

In [5]:
CREATE EXTERNAL TABLE `whatsapp`(
  `name` string, 
  `number` bigint, 
  `message` string, 
  `date` bigint, 
  `type` string)
PARTITIONED BY ( 
  `context_date` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket>/whatsapp'

SyntaxError: invalid syntax (1697826171.py, line 1)

Criação da tabela SQL - Telegram

In [None]:
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint, 
  `user_id` bigint, 
  `user_is_bot` boolean, 
  `user_first_name` string, 
  `chat_id` bigint, 
  `chat_type` string, 
  `text` string, 
  `date` bigint)
PARTITIONED BY ( 
  `context_date` date)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<bucket>/telegram/'

A consulta SQL a seguir foi projetada para validar a criação bem-sucedida de uma das tabelas em nosso banco de dados. Além disso, ela confirma se os dados particionados foram corretamente capturados, armazenados e depois recuperados no serviço S3. Esta verificação é crucial para garantir a integridade dos dados antes de prosseguirmos com qualquer análise adicional.

In [None]:
SELECT * FROM "whatsapp";

O resultado da consulta é exibido na página do serviço *Athena* da seguinte maneira:

![Consulta SQL](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/wpp_sql_querie0.png?raw=true)

### 4.2 Análise Exploratória de Dados
Nesta seção, aprofundamos a análise dos dados enriquecidos por meio de consultas, buscando entender melhor as nuances e informações presentes. As consultas SQL feitas no serviço *Athena* nos possibilitam a criar gráficos para análise exploratória e extração de insights, com essas conclusões podemos entender melhor o comportamento dos usuários do grupo e, a partir disso, criar estratégias para melhorar a experiência deles.

Utilizando ainda a consulta de teste anterior, as informações consultadas foram visualizadas por meio da plotagem gráfica das colunas de dados consultadas. Abaixo, apresenta-se a quantidade de mensagens por usuário, fornecendo insights sobre os principais contatos na plataforma *WhatsApp*.

![Gráfico de Frequência por Usuário](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/graphs_freq_name.png?raw=true)

Observa-se que os usuários mais ativos na plataforma *WhatsApp* são o Daniel e a Fernanda. Com base nessa informação, torna-se viável implementar um tratamento personalizado para cada usuário, priorizando a resposta aos contatos mais frequentes. Além disso, é possível adotar medidas para envolver os usuários que interagem menos, buscando aumentar a participação e fortalecer o relacionamento com esses contatos na plataforma. Essa abordagem personalizada contribui para uma gestão mais eficiente e orientada ao engajamento, promovendo uma experiência mais satisfatória para todos os usuários envolvidos.

A seguir, apresentamos a visualização da contagem de repetições para a coluna de conteúdo das mensagens.

![Gráfico de Frequência por Conteúdo](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/graph_freq_msg.png?raw=true)

A análise revelou que a mensagem mais comum é "Bom dia", indicando que as interações dos usuários nesta plataforma tendem a se iniciar durante o período matutino. Essa percepção oferece insights valiosos para compreender o comportamento dos usuários ao longo do dia, possibilitando estratégias mais alinhadas com os padrões de engajamento observados.

A seguir, uma consulta SQL realizada para uma análise dos dados enriquecidos da tabela `whatsapp`. Inicialmente, converte o campo de data/hora (`timestamp`) para um formato legível de carimbo de data/hora. Posteriormente, extrai informações como a hora do dia, o dia da semana e o número da semana a partir do carimbo de data/hora. Por fim, agrupa os dados, contabilizando o número de mensagens para cada combinação de hora, dia da semana e número da semana. Isso proporciona um resumo da frequência das mensagens, destacando padrões temporais relevantes.

In [None]:
WITH 
parsed_date_cte AS (
    SELECT 
        *, 
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "whatsapp" 
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS hora_do_dia,
        EXTRACT(dow FROM parsed_date) AS dia_da_semana,
        EXTRACT(week FROM parsed_date) AS semana_do_ano
    FROM parsed_date_cte
)
SELECT
    hora_do_dia,
    dia_da_semana,
    semana_do_ano,
    count(1) AS "qtd_de_mensagens" 
FROM hour_week_cte
GROUP BY
    hora_do_dia,
    dia_da_semana,
    semana_do_ano
ORDER BY
    semana_do_ano,
    dia_da_semana

O resultado dessa consulta é uma tabela semelhante a abaixo:

| hora_do_dia | dia_da_semana | semana_do_ano | qtd_de_mensagens |
|-------------------|---------------------|---------------------|----------------|
| 15                | 2                   | 46                  | 10              |
| 16                | 6                   | 46                  | 13              |
| 10                | 5                   | 47                  | 16              |
| 12                | 1                   | 47                  | 31              |
| 13                | 4                   | 47                  | 13              |
| 14                | 5                   | 47                  | 25              |
| 10                | 7                   | 47                  | 37              |
| 18                | 6                   | 47                  | 13              |
| 16                | 1                   | 48                  | 24              |
| 23                | 4                   | 48                  | 22              |
| 14                | 6                   | 48                  | 28              |
| 11                | 3                   | 48                  | 44              |
| 14                | 3                   | 48                  | 16              |
| 19                | 3                   | 49                  | 38              |
...

Cada gráfico atua como uma ferramenta para extrair informações significativas, contribuindo para uma análise mais aprofundada dos padrões e tendências presentes nos dados. Esse processo é essencial para identificar nuances que podem passar despercebidas em análises mais superficiais, melhorando assim a interpretação dos dados. Inicialmente, foi plotado um gráfico de mensagens por hora.

![Gráfico de Mensagens por Hora](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/graph_msgs_hr.png?raw=true)

A análise da quantidade de mensagens por hora oferece insights sobre os períodos de maior atividade no grupo. Nota-se que os horários de pico ocorrem às 11 da manhã, ao meio-dia e às 19 horas. Esses momentos coincidem com os horários de refeições, antes do almoço e antes do jantar, indicando uma pausa nas atividades para verificar mensagens. Essa informação valiosa possibilita a adaptação de abordagens mais eficazes para usuários, visando resultados aprimorados.


![Gráfico de Mensagens por Dia da Semana](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/graph_msgs_dia_sem.png?raw=true)

A análise da quantidade de mensagens por dia da semana revela os dias de maior movimento no grupo. Observa-se que domingo (7) lidera, seguido por segunda-feira (1) e quarta-feira (3). Esses resultados indicam, conforme esperado, uma atividade mais intensa no domingo, um dia de descanso. O sábado, também considerado dia de descanso, apresenta um menor número de mensagens, sugerindo que os usuários possivelmente se envolvem em outras atividades, caracterizando-o como o "dia de sair à noite". O aumento nas mensagens na segunda-feira e quarta-feira pode refletir a relutância em retomar o trabalho na primeira semana e a monotonia do meio da semana, respectivamente.

![Gráfico de Mensagens por Dia do Ano](https://github.com/carneiro-fernando/EBAC/blob/main/assets/Images/Projeto_Telegram_pipeline/graph_msgs_dia_ano.png?raw=true)

A análise da quantidade de mensagens por dia do ano oferece insights sobre os períodos de maior atividade no grupo ao longo do ano. Destaca-se a semana 49, correspondente ao início de dezembro, como o período de maior movimento. Isso sugere que, ao se aproximar o fim do ano, a interação dos usuários com a plataforma de mensagens aumenta significativamente. Essa compreensão temporal é valiosa para ajustar estratégias e alocar recursos de forma mais eficaz durante períodos de maior demanda.

<a id='conclu'></a>
## 5. Conclusão

A convergência entre as plataformas WhatsApp e Telegram, análise de dados, e o uso de plataformas em nuvem, como a AWS neste projeto, otimiza de forma expressiva as operações empresariais. Essas plataformas, com interfaces ágeis disponíveis 24 horas por dia, simplificam a interação do cliente. O emprego da AWS intensifica essa interação ao oferecer escalabilidade, flexibilidade e confiabilidade, com ajustes dinâmicos de recursos conforme a demanda.

A análise de dados, impulsionada por pipelines na nuvem, torna-se ainda mais eficiente com essa abordagem, proporcionando processamento em grande escala e armazenamento seguro. Isso possibilita a extração de insights detalhados a partir de volumes substanciais de informações geradas pelas interações dos usuários nas plataformas. 

As vantagens incluem também a redução de custos pela nuvem, eliminando a necessidade de investir em infraestrutura física, e a agilidade na adaptação a mudanças nas demandas operacionais, garantindo eficiência contínua. A segurança não pode ser deixada de lado, especialmente ao lidar com informações do cliente em interações nas plataformas. Plataformas em nuvem, como a AWS, implementam protocolos rigorosos, protegendo dados sensíveis contra ameaças cibernéticas. 

Em resumo, essa integração proporciona uma abordagem sinérgica que aprimora a experiência do cliente nas plataformas WhatsApp e Telegram, otimiza a eficiência operacional e oferece suporte estratégico para tomada de decisões. A aplicação de um pipeline de análise de dados desempenha um papel fundamental no cenário corporativo, fornecendo recursos para impulsionar inovação e excelência.