In [1]:
pip install watchdog

Note: you may need to restart the kernel to use updated packages.


In [2]:
#Importar as diferenças que são criadas nos dados da pasta /exercicios/municipios-estados/streaming/. A cada vez que um novo arquivo for adicionado,
#o mesmo deve ser importado para a pasta da UF correspondente e adicionado ao fim do arquivo cidades.csv.

In [None]:
import os
import pandas as pd
from unidecode import unidecode
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from kafka import KafkaProducer

# Primeiro temos a criação de uma instância da classe Watcher, que inicializa um objeto Observer que irá monitorar o diretório especificado no método run.
class Watcher:
    def __init__(self):
        self.observer = Observer()
        
# A definição da classe Handler, que herda de FileSystemEventHandler e implementa o método on_any_event. 
# Esse método é chamado toda vez que um evento é disparado pelo Observer, e a implementação aqui faz o tratamento do evento apenas se for do tipo created.

    def run(self):
        event_handler = Handler()
        self.observer.schedule(event_handler, 'exercicios/municipios-estados/streaming', recursive=True)
        self.observer.start()
        try:
            while True:
                time.sleep(5)
        except:
            self.observer.stop()
            print("Observer stopped")

# Dentro do método on_any_event, é feita a leitura do arquivo criado e o merge com o arquivo de estados. O arquivo final é então salvo em um diretório com o nome do estado correspondente.

class Handler(FileSystemEventHandler):
    @staticmethod
    def on_any_event(event):
        if event.is_directory:
            return None
        elif event.event_type == 'created':
            print(f"New file detected: {event.src_path}")
            estados_df = pd.read_csv('exercicios/municipios-estados/csv/estados.csv', sep=',')
            municipios_df = pd.read_csv(event.src_path, sep=',')
            municipios_df.rename(columns={'uf': 'codigo_uf'}, inplace=True)
            df_cidades = pd.merge(estados_df, municipios_df, on='codigo_uf')
            estado_formatado = unidecode(df_cidades['nome_estado'].iloc[0]).replace(' ', '_').lower()
            if not os.path.exists(estado_formatado):
                os.mkdir(estado_formatado)
            file_path = f"{estado_formatado}/cidades.csv"
            if os.path.exists(file_path):
                df_cidades.to_csv(file_path, mode='a', header=False, index=False)
            else:
                df_cidades.to_csv(file_path, index=False)

if __name__ == '__main__':
    w = Watcher()
    w.run()


In [None]:
# Agora vamos utilizar Apache Kafka para fazer o mesmo processo, mas de maneira automatizada.

# Cria um producer, responsavel por enviar mensagens ao Kafka
producer = KafkaProducer(bootstrap_servers=['awari-kafka:9093'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

# O código é projetado para rodar infinitamente. A classe Watcher usa o método Observer.schedule() para monitorar o diretório 
# 'exercicios/municipios-estados/streaming' em busca de novos arquivos.
class Watcher:
    def __init__(self):
        self.observer = Observer()
        
# Se um novo arquivo for criado, o método Handler.on_any_event() é chamado e a mensagem é enviada ao Kafka através do KafkaProducer.
# loop while True: é utilizado para manter a aplicação em execução indefinidamente enquanto o monitoramento do diretório é realizado.
# exceção é capturada pelo bloco except: e o método Observer.stop() é chamado para finalizar a execução do monitoramento do diretório.

    def run(self):
        event_handler = Handler()
        self.observer.schedule(event_handler, 'exercicios/municipios-estados/streaming', recursive=True)
        self.observer.start()
        try:
            while True:
                pass
        except:
            self.observer.stop()
            print("Observer stopped")

class Handler(FileSystemEventHandler):
    @staticmethod
    def on_any_event(event):
        if event.is_directory:
            return None
        elif event.event_type == 'created':
            print(f"New file detected: {event.src_path}")
            producer.send('novo_arquivo', event.src_path.encode())

if __name__ == '__main__':
    w = Watcher()
    w.run()