Trata-se de um processo de ELT (Extração, Carga e Transformação) que integra um sistema legado com um banco de dados relacional (no exemplo, um MySQL) para um banco NoSQL (ElasticSearch) sem alterações significativas nos dados transferidos.
Para implementar esse fluxo de ELT, optou-se por uma arquitetura baseada em containers para todas as aplicações. A simulação do sistema legado também foi feita por meio de containers, com um para o MySQL e outro para o sistema de ERP (uma aplicação em Python que cadastra novos clientes e vendas de maneira contínua). O servidor Apache Nifi é o responsável por se conectar ao database do sistema legado e enviar, de forma contínua, para o Elasticsearch todos os novos clientes e vendas registrados no ERP. Além disso, o container com Kibana é utilizado para visualização do resultado da integração.
Nesse exemplo específico, foram criados dois flows separados, um para capturar novos clientes e outro para novas vendas. Ambos os flows são versionados pelo Apache Nifi Registry e são executados continuamente, capturando novos dados à medida que são inseridos no sistema legado. Esses dados são capturados e enviados para o banco de dados NoSQL, sem grandes alterações em sua estrutura original. Dessa forma, garantimos uma integração eficiente entre as bases que são heterogêneas.
IMPORTANTE: Este ambiente foi configurado para fins de desenvolvimento e estudos, e algumas configurações foram simplificadas para facilitar a compreensão do ambiente como um todo. No entanto, em um ambiente de produção, é crucial tomar precauções adicionais.
Apache Nifi é uma plataforma open source que permite o gerenciamento e processamento de dados em tempo real de forma simples e escalável. Ele foi desenvolvido para lidar com fluxos de dados em ambientes distribuídos, oferecendo uma interface gráfica amigável para o desenvolvimento de pipelines de dados. O Apache Nifi suporta diversos tipos de fontes de dados, incluindo sistemas de arquivos, bancos de dados, serviços web, fluxos de dados, e muitos outros. Além disso, ele oferece integração com outras ferramentas de Big Data, como Apache Hadoop, Spark, e Hive.
O Apache Nifi Registry é um subprojeto do Apache Nifi que fornece um repositório central para gerenciamento de versionamento, controle de acesso e colaboração para flows do Apache Nifi. Isso permite que as organizações gerenciem seus flows de forma mais eficiente e compartilhem seu trabalho com outras pessoas de maneira controlada e segura.
Elasticsearch é uma ferramenta de busca e análise de dados distribuída que é amplamente utilizada para buscar, analisar e visualizar grandes conjuntos de dados em tempo real. É um banco de dados NoSQL que é otimizado para armazenar, pesquisar e analisar grandes volumes de dados não estruturados. O Elasticsearch é escalável e flexível, permitindo que os usuários realizem pesquisas e análises avançadas em seus dados em tempo real.
Kibana é uma ferramenta de visualização de dados que funciona em conjunto com o Elasticsearch, permitindo a criação de gráficos, dashboards e relatórios interativos para ajudar a entender e extrair insights a partir dos dados armazenados. Ele oferece uma interface amigável que permite a criação de painéis personalizados com diferentes tipos de visualizações, incluindo gráficos de barras, tabelas, mapas, e muitos outros. O Kibana é amplamente utilizado em ambientes de análise de dados e big data para monitoramento de sistemas, detecção de fraudes, análise de logs, e muitas outras aplicações.
Docker é uma plataforma de virtualização de aplicativos que permite que os aplicativos sejam executados em ambientes isolados e portáteis chamados contêineres. Cada contêiner inclui todos os componentes necessários para executar um aplicativo, como código, bibliotecas, dependências e configurações. Isso permite que os desenvolvedores criem, gerenciem e implantem aplicativos de forma mais rápida e consistente em diferentes ambientes.
O Docker Compose é uma ferramenta que permite que os usuários definam e executem aplicativos Docker compostos por vários contêineres. Ele usa um arquivo YAML para definir as configurações de cada contêiner e suas dependências. Isso facilita o gerenciamento de aplicativos complexos que requerem vários contêineres, permitindo que eles sejam gerenciados como uma unidade.
O Docker e o Docker Compose são amplamente usados no desenvolvimento de aplicativos modernos, especialmente em ambientes de desenvolvimento e produção baseados em nuvem. Eles permitem que os desenvolvedores criem aplicativos de forma rápida e consistente, reduzindo a complexidade do gerenciamento de dependências e configurações em diferentes ambientes. Além disso, o uso de contêineres permite que os aplicativos sejam escalonados facilmente, garantindo que as alterações feitas em um contêiner não afetem outros contêineres em execução no mesmo host.
Importante: Na implantação, todos os serviços consumiram aproximadamente 5 GB de RAM, portanto, é recomendado ter pelo menos 8 GB de memória em seu notebook/PC.
Parâmetro | Valor |
---|---|
Usuário | nifi |
Senha | kP8mDnTbXs5H7qL9vFjE3GcA4R6Z2Yy |
URL externa | https://localhost:8443/nifi/ |
- Apache Nifi Registry
Parâmetro | Valor |
---|---|
URL interna | http://nifi-registry:18080/ |
URL externa | http://localhost:18080/nifi-registry/ |
Parâmetro | Valor |
---|---|
Usuário | root |
Senha | d8Uwj1wos64h |
Database | db_erp |
Host interno | erp-database |
Host externo | localhost |
Porta | 3306 |
Database Connection URL | jdbc:mysql://erp-database:3306/db_erp |
Database Driver Class Name | com.mysql.cj.jdbc.Driver |
Database Driver Location | /opt/nifi/nifi-current/jdbc/mysql-connector-j-8.0.31.jar |
Parâmetro | Valor |
---|---|
Usuário | elastic |
Senha | nY5AQz37ZZIfMev9 |
URL interna | http://elasticsearch:9200 |
URL externa | http://localhost:9200 |
- Kibana
Parâmetro | Valor |
---|---|
URL externa | http://localhost:5601 |
Observação: A diferença entre Host/URL interno ou externo é que os internos são utilizados dentro da rede onde as aplicações estão sendo executadas, permitindo a comunicação entre elas. Já os externos são destinados aos usuários para que possam acessar as ferramentas a partir de seus computadores, onde as aplicações estão sendo executadas.
git clone https://github.com/Renatoelho/fluxo-elt.git fluxo-elt
cd fluxo-elt/
cd ERP/
docker build -f dockerfile -t erp-sistema:0.0.1 .
cd ..
docker-compose -f docker-compose.yaml --compatibility up -d
IMPORTANTE: No primeiro start dos serviços, pode ocorrer um erro no serviço nifi-registry se a permissão de acesso ao volume criado for negada. Nesse caso, desative os serviços (
docker-compose -f docker-compose.yaml --compatibility down
) e altere as permissões do volume. Use o comando (sudo chmod -R 777 volumes/nifi_registry/
) e, em seguida, suba novamente os serviços. Tudo deve funcionar corretamente.
O healthcheck é um recurso de monitoramento de estado do contêiner e saúde da aplicação em tempo real, permitindo detectar se tudo está funcionando corretamente.
Existe um healthcheck para cada contêiner do fluxo:
-
Sistema ERP:
test: curl -f http://erp-app:8888/healthcheck || exit 1
-
Database ERP:
test: mysqladmin ping -h erp-database -u root -pd8Uwj1wos64h || exit 1
-
Apache Nifi:
test: wget -q --spider http://nifi:8443/nifi-api/system-diagnostics || exit 1
-
Apache Nifi Registry:
test: wget -q --spider http://nifi-registry:18080/nifi-registry/ || exit 1
-
Elasticsearch:
test: curl -f http://elasticsearch:9200/_cluster/health || exit 1
-
Kibana:
test: curl -f http://kibana:5601/ || exit 1
Para verificar a saúde dos contêiners execute o seguinte comando e verifique no atributo STATUS.
docker ps --format "{{.ID}}\t{{.Names}}\t{{.Status}}"
Os volumes no Docker são um recurso que permite compartilhar dados entre o host e os contêineres de uma forma flexível e independente do sistema de arquivos do host. Os volumes permitem que você armazene dados persistentes fora do contêiner e acesse esses dados de forma consistente em diferentes contêineres.
Quase todos os contêineres existentes podem estar associados a um ou mais volumes, que podem ser definidos no contexto do Docker Compose ou diretamente no gerenciador de volumes do Docker. Para obter informações sobre os volumes utilizados no fluxo, consulte o arquivo docker-compose.yaml ou use os comandos de gerenciamento de volumes do Docker.
docker volume ls
Para acessar o Nifi e Nifi Registry use as seguintes URLs:
1º Passo: acesse o Registry e crie um bucket em:
Settings >> New bucket
Em Bucket Name
adicione bucket-flows-elt
e clique em CREATE
2º Passo: Acesse o Apache Nifi e execute em:
Menu >> Controller Settings >> Registry Clients >> Add Registry Client
Adicione em Name
o mesmo nome do bucket criado no Registry bucket-flows-elt
em Type
escolha NifiRegistryFlowRegistryClient
e clique em ADD
Em seguida acesse novamente e clique em Edit
e adicione na aba PROPERTIES
a URL: http://nifi-registry:18080/ clique em UPDATE
e tudo pronto. Agora seus Flows no Apache Nifi já podem ser versionados.
- Configurações:
- Versionamento:
Esta etapa não se destina apenas a visualizar os dados existentes no banco de dados do ERP, mas também a aproveitar as regras de negócio para desenvolver as consultas que serão usadas para extrair os dados do ERP. É possível utilizar qualquer gerenciador de banco de dados, ou o gerenciador exclusivo do banco em questão. Neste exemplo, utilizaremos o Dbeaver, que é adequado para qualquer banco de dados relacional.
Temos as queries originais que refletem as regras de negócio, bem como as queries ajustadas que serão utilizadas no Apache NiFi. Estas últimas possuem algumas regras para que a query seja alternada entre a primeira execução, que faz uma carga completa, e em todas as demais execuções, apenas as incrementais são executadas. Abaixo estão os exemplos de queries originais e ajustadas para serem executadas no fluxo do Apache NiFi:
As credenciais de acesso ao banco de dados você acessa aqui.
- Originais:
Clientes
SELECT
id,
nome ,
sobrenome,
email,
telefone,
DATE_FORMAT(data_nascimento, '%Y-%m-%d') data_nascimento,
cidade,
estado,
pais,
codigo_pais,
DATE_FORMAT(datahora, '%Y-%m-%d %H:%i:%s') datahora
FROM db_erp.clientes
-- WHERE datahora >= NOW() - INTERVAL 1 MINUTE # Carga Incremental
-- WHERE id >= False # Carga Completa
ORDER BY id;
Vendas
SELECT
id,
produto,
DATE_FORMAT(data_compra, '%Y-%m-%d') data_compra,
tipo_cartao,
numero_cartao,
quantidade,
valor,
DATE_FORMAT(datahora, '%Y-%m-%d %H:%i:%s') datahora
FROM db_erp.vendas
-- WHERE datahora >= NOW() - INTERVAL 1 MINUTE # Carga Incremental
-- WHERE id >= False # Carga Completa
ORDER BY id;
- Ajustadas para o Apache Nifi
Clientes
SELECT
id,
nome ,
sobrenome,
email,
telefone,
DATE_FORMAT(data_nascimento, '%Y-%m-%d') data_nascimento,
cidade,
estado,
pais,
codigo_pais,
DATE_FORMAT(datahora, '%Y-%m-%d %H:%i:%s') datahora
FROM db_erp.clientes
${clausula_where_flow}
ORDER BY id;
Vendas
SELECT
id,
produto,
DATE_FORMAT(data_compra, '%Y-%m-%d') data_compra,
tipo_cartao,
numero_cartao,
quantidade,
valor,
DATE_FORMAT(datahora, '%Y-%m-%d %H:%i:%s') datahora
FROM db_erp.vendas
${clausula_where_flow}
ORDER BY id;
As variáveis clausula_where_flow
e numero_execucao_flow
no Apache Nifi são responsáveis por definir se será uma carga inicial completa ou cargas incrementais, isso para os Flows de Clientes e Vendas, que neste caso serão feitas a cada 5 minutos. Para mais detalhes sobre essa regra, consulte o fluxo.
- Exemplo da lógica para execução das cargas
IMPORTANTE: Essa regra de captura dos registros inseridos nos últimos 5 minutos é apenas uma abordagem conceitual, podendo ocorrer falhas na recuperação dos dados do banco. Para uma execução com grande grau de precisão, a inclusão de algumas variáveis de controle são necessárias, mas não é o escopo em nosso contexto aqui.
Um dos primeiros passos no desenvolvimento de Flows no Apache NiFi é verificar se as conexões aos bancos de dados estão criadas e funcionando. Para isso, os controller services são utilizados. No nosso exemplo, teremos um controller service para o MySQL via JDBC e outro para Elasticsearch.
- Para criar um controller service, é necessário acessar:
Configuration (Engrenagem) >> Controller Services >> Create a new controller service
Depois disso, é necessário escolher e configurar um DBCPConnectionPool 1.19.0
e um JsonRecordSetWriter 1.19.0
para o MySQL, além de um ElasticSearchClientServiceImpl 1.19.0
para o Elasticsearch. No caso do MySQL, é uma configuração JDBC comum, usuário e senha, e é necessário um controller service para converter o resultado da query em um arquivo JSON. Já no caso do Elasticsearch, é necessário informar a URL HTTP, usuário e a senha.
- Controller Services
- Conexão com MySQL
- Conexão com Elasticsearch
- Configuração do JsonRecordSetWriter
- Lógica para extração contínua (a cada 5 Minutos)
- Separa o resultado da Query SQL em um arquivo Json para cada linha
- Grava no Elasticsearch em formato Json
- Ativando os Flows no Apache Nifi
IMPORTANTE: O processor EXECUTOR DO FLOW (EXECUTA UM VEZ)
deve ser executado uma única vez e em seguida desabilitado, pois ele é responsável por gerar a variável numero_execucao_flow
, que será utilizada para definir se a execução da query do MySQL será completa ou incremental (ou seja, apenas os registros dos últimos 5 minutos), pelo processor REEXECUTOR DO FLOW
.
Ative o Flow depois de executar o procedimento do processor EXECUTOR DO FLOW (EXECUTA UM VEZ)
.
Clique com botão diretito do mouse na parte em branco do Flow e em seguida em Start.
- Flow Completo e Ativo - Clientes
Obs.: Todo esse detalhamento do fluxo é para a extração de clientes. Para as vendas, há um fluxo com a mesma estrutura lógica, mas com uma query de início do MySQL e um índice de destino no Elasticsearch diferentes.
Este dashboard está sendo estruturado no Kibana a partir dos dados que foram transferidos para o Elasticsearch, apresentando uma visão dos resultados da integração com as quantidades de clientes e vendas. Esta é apenas uma amostra do que é possível fazer com os dados existentes no banco de dados NoSQL.
Crie suas próprias visualizações acessando o link: http://localhost:5601
Essa prova de conceito (POC) foi desenvolvida para demonstrar a possibilidade de integração entre tecnologias e ferramentas que possuem paradigmas diferentes em busca de um objetivo comum. Partindo de um sistema ERP com um banco de dados relacional, foi utilizada uma ferramenta de processamento de dados em tempo real que atua de maneira contínua na captura dos dados da origem, gravando em um banco de dados NoSQL. Como resultado, é possível criar diversas visualizações sobre os dados transferidos, demonstrando a flexibilidade e potencial das ferramentas utilizadas.
Apache/Nifi, Docker Hub. Disponível em: https://hub.docker.com/r/apache/nifi. Acesso em: 19 abr. 2023.
Volumes, Docker Docs. Disponível em: https://docs.docker.com/storage/volumes. Acesso em: 24 abr. 2023.
Elasticsearch, Docker Hub. Disponível em: https://hub.docker.com/_/elasticsearch. Acesso em: 25 abr. 2023.
Kibana, Docker Hub. Disponível em: https://hub.docker.com/_/kibana. Acesso em: 25 abr. 2023.
NiFi System Administrator’s Guide, Apache NiFi. Disponível em: https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html. Acesso em: 22 abr. 2023.
apache/nifi-registry, Docker Hub. Disponível em: https://hub.docker.com/r/apache/nifi-registry. Acesso em: 22 abr. 2023.
Getting Started with Apache NiFi Registry, Apache NiFi Registry. Disponível em: https://nifi.apache.org/docs/nifi-registry-docs/index.html. Acesso em: 22 abr. 2023.
How to build a data lake from scratch - Part 1: The setup, Victor Seifert. Disponível em: https://towardsdatascience.com/how-to-build-a-data-lake-from-scratch-part-1-the-setup-34ea1665a06e. acesso em: 19 abr. 2023.
How to build a data lake from scratch - Part 2: Connecting the components, Victor Seifert. Disponível em: https://medium.com/towards-data-science/how-to-build-a-data-lake-from-scratch-part-2-connecting-the-components-1bc659cb3f4f. acesso em: 23 abr. 2023.
How to Successfully Implement A Healthcheck In Docker Compose, Linuxhint. Disponível em: https://linuxhint.com/how-to-successfully-implement-healthcheck-in-docker-compose. Acesso em: 24 abr. 2023.
Expression Language Guide, Apache NiFi Expression Language Guide. Disponível em: https://nifi.apache.org/docs/nifi-docs. Acesso em: 26 abr. 2023.