In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Data Pipeline Telegram

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/pipeline_aws_chats.drawio.svg?raw=true)

Projeto por [Guilherme Araújo Vasconcelos](www.linkedin.com/in/guilherme-a-vasconcelos)

**Objetivo**

Vou demonstrar como é feita a extração de dados de conversas do Telegram, fazer a transferência para um datalake, processar e analisar todos os dados extraídos. Ou seja, estamos fazendo o Pipeline , que é a automatização de processos de acordo com eventos pré especificados. 

Comercialmente esse tipo de análise pode ser usada para identificar gargalos em Bots de suporte, até mesmo fazer uma automação do suporte, de empresas por exemplo - qual o tipo de erro mais recorrente com um produto ou serviço. 

# Sistemas

**Transacional**



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/transacional.png?raw=true)





**Analítico**




![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/analitico.png?raw=true)





**Tabela de diferença**

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/tabela%20diferenca.png?raw=true)



Os dois sistemas se complementam no ETL(Extração, Transformação e Carga), como veremos a seguir.

# Telegram

**Criação do Chatbot**

Para nosso projeto foi necessário criar um bot no telegram, adicionar a um grupo e consumimos o conteúdo com o API de Bots do Telegram. 

O Bot foi criando abrindo um chat com o BotFather no próprio telegram:

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/criar%20bot.png?raw=true)

Com o Bot criado foi necessário seguir alguns passos: 

*Desativar a opção de adicionar o bot em outros grupos*

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/bot%20sem%20outros%20grupos.png?raw=true)



*Adicionar o Bot ao grupo*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/grupo%20bot.png?raw=true)



*Colocar o Bot como ADM do grupo*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/bot%20adm.png?raw=true)

**Bot API**

Para acessar as mensagens é necessário conectar o Bot com a API, usei o Google Collab. 

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/api%201.png?raw=true)




![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/api%202.png?raw=true)







*O método getMe retorna informações sobre o bot.*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/api%203.png?raw=true)




*O método getMe retorna as mensagens captadas pelo bot.*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/api%204.png?raw=true)





*Criação API Gateway AWS*




![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/api%205.png?raw=true)


# Função Lambda

Essa função Lambda é que vai ingerir todos os dados do grupo em um Bucket S3 de dados crus.



In [None]:
import os
import json
import logging
from datetime import datetime, timezone, timedelta

import boto3


def lambda_handler(event: dict, context: dict) -> dict:

  '''
  Recebe uma mensagens do Telegram via AWS API Gateway, verifica no
  seu conteúdo se foi produzida em um determinado grupo e a escreve, 
  em seu formato original JSON, em um bucket do AWS S3.
  '''

  # vars de ambiente

  BUCKET = os.environ['AWS_S3_BUCKET']
  TELEGRAM_CHAT_ID = int(os.environ['TELEGRAM_CHAT_ID'])

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = datetime.now(tzinfo).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  filename = f'{timestamp}.json'

  # código principal

  client = boto3.client('s3')
  
  try:

    message = json.loads(event["body"])
    chat_id = message["message"]["chat"]["id"]

    if chat_id == TELEGRAM_CHAT_ID:

      with open(f"/tmp/{filename}", mode='w', encoding='utf8') as fp:
        json.dump(message, fp)

      client.upload_file(f'/tmp/{filename}', BUCKET, f'telegram/context_date={date}/{filename}')

  except Exception as exc:
      logging.error(msg=exc)
      return dict(statusCode="500")

  else:
      return dict(statusCode="200")



*Bucket S3*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/s3%20raw.png?raw=true)







![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/s3%20raw%202.png?raw=true)








![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/s3%20raw%203.png?raw=true)











Como podemos ver, todas as mensagens que o bucket recebe vem em formato JSON. 


**Webhook**

A configuração do Webhook, com a URL disponibilizada pelo API Gateway, vai redirecionar as mensagens para o AWS. 




![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/webhook.png?raw=true)





![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/webhook%202.png?raw=true)






![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/webhook%203.png?raw=true)

# Transformação

Agora que a parte de ingestão está configurada e funcionando, vamos iniciar a transformação dos dados recebidos. Novamente um lambda que fará a transformação dos dados para um novo bucket S3.


***Bucket enriquecido S3***




![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/enriched%201.png?raw=true)




![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/enriched%202.png?raw=true)



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/enriched%203.png?raw=true)




Dessa vez o conteúdo salvo no bucket é no formato Parquet, que faremos a análise no AWS Athena

In [None]:
import os
import json
import logging
from datetime import datetime, timedelta, timezone

import boto3
import pyarrow as pa
import pyarrow.parquet as pq


def lambda_handler(event: dict, context: dict) -> bool:

  '''
  Diariamente é executado para compactar as diversas mensagensm, no formato
  JSON, do dia anterior, armazenadas no bucket de dados cru, em um único
  arquivo no formato PARQUET, armazenando-o no bucket de dados enriquecidos
  '''

  # vars de ambiente

  RAW_BUCKET = os.environ['AWS_S3_BUCKET']
  ENRICHED_BUCKET = os.environ['AWS_S3_ENRICHED']

  # vars lógicas

  tzinfo = timezone(offset=timedelta(hours=-3))
  date = (datetime.now(tzinfo) - timedelta(days=1)).strftime('%Y-%m-%d')
  timestamp = datetime.now(tzinfo).strftime('%Y%m%d%H%M%S%f')

  # código principal

  table = None
  client = boto3.client('s3')

  try:

      response = client.list_objects_v2(Bucket=RAW_BUCKET, Prefix=f'telegram/context_date={date}')

      for content in response['Contents']:

        key = content['Key']
        client.download_file(RAW_BUCKET, key, f"/tmp/{key.split('/')[-1]}")

        with open(f"/tmp/{key.split('/')[-1]}", mode='r', encoding='utf8') as fp:

          data = json.load(fp)
          data = data["message"]

        parsed_data = parse_data(data=data)
        iter_table = pa.Table.from_pydict(mapping=parsed_data)

        if table:

          table = pa.concat_tables([table, iter_table])

        else:

          table = iter_table
          iter_table = None

      pq.write_table(table=table, where=f'/tmp/{timestamp}.parquet')
      client.upload_file(f"/tmp/{timestamp}.parquet", ENRICHED_BUCKET, f"telegram/context_date={date}/{timestamp}.parquet")

      return True

  except Exception as exc:
      logging.error(msg=exc)
      return False

def parse_data(data: dict) -> dict:

  date = datetime.now().strftime('%Y-%m-%d')
  timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

  parsed_data = dict()

  for key, value in data.items():

      if key == 'from':
          for k, v in data[key].items():
              if k in ['id', 'is_bot', 'first_name']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key == 'chat':
          for k, v in data[key].items():
              if k in ['id', 'type']:
                parsed_data[f"{key if key == 'chat' else 'user'}_{k}"] = [v]

      elif key in ['message_id', 'date', 'text']:
          parsed_data[key] = [value]

  if not 'text' in parsed_data.keys():
    parsed_data['text'] = [None]

  return parsed_data

**Foi necessário configurar variáveis de ambiente para o Lambda extrair o conteúdo do bucket cru e enviar o dado transformado para o bucket enriquecido**


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/ambiente.png?raw=true)

***EventBridge***

Por fim para automatizar todo o processo, foi criado um cronograma no AWS EventBridge para que o Lambda seja executado automaticamente uma vez por dia. 


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/event.png?raw=true)

# Apresentação


Por fim para analisarmos os dados, automatizados, extraídos vamos utilizar o AWS Athena para carregar os arquivos Parquet que foram gerados para então analisarmos o conteúdo. 




In [None]:
CREATE EXTERNAL TABLE `telegram`(
  `message_id` bigint,
  `user_id` bigint,
  `user_is_bot` boolean,
  `user_first_name` string,
  `chat_id` bigint,
  `chat_type` string,
  `text` string,
  `date` bigint)
PARTITIONED BY (
  `context_date` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://ultimo-modulo43-datalake-enriched/telegram/'

**É necessário carregar as partições de todas as tabelas geradas que possuam partições**


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/particao.png?raw=true)

**Vamos verificar se a tabela foi criada corretamente**

SELECT * FROM "default"."telegram" limit 10;

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/10%20primeiros.png?raw=true)

*Agora que foi possível confirmar que a tabela foi criada com sucesso, vamos continuar fazendo algumas consultas no SQL junto com gráficos gerados pelo CSV do resultado*

![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/sql.png?raw=true)


O número de mensagens é grande demais para caber na mesma tela, contuo já podemos verificar a extistência de outras mensagens, inclusive de outro usuário. No total foram 60 mensagens enviadas.

*Vamos checar o número de mensagens por dia*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/sql%202.png?raw=true)



O dia com mais mensagens foi o dia 20/11/2014


Gráfico do dia com mais mensagem:



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/msgdia.png?raw=true)

*Agora a quantidade de mensagens por usuário por dia.*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/sql%203.png?raw=true)


Tanto no dia 26 quanto no dia 27 o usuário Guilherme, eu, foi quem mais mandou mensagens no grupo. 

Gráfico de quem enviou mais mensagens no grupo:



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/qtdmsg.png?raw=true)

*Vamos verificar qual mensagem foi dita mais vezes*


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/sql%206.png?raw=true)


As mensagens "teste", "tudo bem?" e "bom dia" foram as mais enviadas 

In [None]:
# devido ao zoom para caber todo o conteúdo na tela o código ficou pequeno:

select text, count(1) as "text_amount"
from "telegram"
group by text
order by text_amount desc 

Graficamente assim ficou a distribuição das mensagens


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/grafico%20text%20amount.png?raw=true)


Apenas uma observação, apesar da mensagem "e o nosso botafogo", **não sou botafoguense.**

*A seguida a média do tamanho das mensagens por usuário por dia.*



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/sql%204.png?raw=true)




Nos dias 26 e 27 o usuário I foi quem teve as maiores médias de mensagens nesses dias. 

Gráfico média do tamanho da mensagem por usuário:


![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/tamanho.png?raw=true)

*Agora a quantidade de mensagens por hora por dia da semana por número da semana.*



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/sql%205.png?raw=true)



Os dias da semana com mais mensagens foram Terça Feira e Quarta Feira. Um dado importante é o período de maior atividade foi quase sempre no período da tarde. Graficamente falando talvez seja mais fácil visualizar:



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/datehour.png?raw=true)



![](https://github.com/guiaraujo017/ImagensProjetoFinal/blob/main/weekdaymessage.png?raw=true)

In [None]:
# Como essa consulta SQL é maior que as outras, vou colocar o código abaixo:


WITH
parsed_date_cte AS (
    SELECT
        *,
        CAST(date_format(from_unixtime("date"),'%Y-%m-%d %H:%i:%s') AS timestamp) AS parsed_date
    FROM "telegram"
),
hour_week_cte AS (
    SELECT
        *,
        EXTRACT(hour FROM parsed_date) AS parsed_date_hour,
        EXTRACT(dow FROM parsed_date) AS parsed_date_weekday,
        EXTRACT(week FROM parsed_date) AS parsed_date_weeknum
    FROM parsed_date_cte
)
SELECT
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum,
    count(1) AS "message_amount"
FROM hour_week_cte
GROUP BY
    parsed_date_hour,
    parsed_date_weekday,
    parsed_date_weeknum
ORDER BY
    parsed_date_weeknum,
    parsed_date_weekday

# Conclusão 


Podemos observar qão poderoso é um Pipelines de dados de um Chat Bot, no nosso caso Telegram. Com ele foi possível analisarmos qual mensagem apareceu mais vezes, quem falou mais e qual período/dia/semana teve mais mensagens.

Essas informações, no nosso caso de estudo, podem parecer inofensivas, contudo para uma empresa é uma arma poderosíssima tanto para cortar custos quanto para aumentar a receita, como?

Bem, de início a Empresa X pode criar um Chat Bot com respostas automáticas para as dúvidas e solicitações mais recorrentes, como solicitar o boleto de pagamento por exemplo. A criação desse Pipeline nos daria meios de verificar as principais dúvidas e solicitações para que então fosse configurado um Bot para responder essas questões. Consequentemente quanto mais demandas forem resolvidas pelo Bot, menor é o custo de contratação com pessoal para resolver demandas. 

Outro ponto seria aumentar as receitas mirando clientes mais ativos em alguma plataforma, como forma de recompensa, por exemplo, poderia ser disponibilizado alguma oferta especial para algum horário ou dia que, historicamente, tem menos movimento. Nós vimos que o Pipeline foi capaz de captar essas interações de quem interage mais e até mesmo o período de interação.

a.