In [19]:
from faker import Faker
from kafka import KafkaProducer
from json import dumps
from datetime import datetime, timezone, timedelta
import time
import os

PATH_S3 = "images_for_processing"

topic = "imageForChecking"
bootstrap_servers = "127.0.0.1:9092"

kafka_producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=lambda x: dumps(x).encode("utf-8"),
)

In [20]:
class ImageProcessor:
    def __init__(self, folder_path):
        self.folder_path = folder_path
        self.processed_files = set()

    def get_list_new_images(self):
        new_images = []
        for file in os.listdir(self.folder_path):
            if file.endswith(".jpg") or file.endswith(".png") or file.endswith(".jpeg"):  # Укажите нужные расширения файлов
                full_path = os.path.join(self.folder_path, file)
                if os.path.isfile(full_path) and full_path not in self.processed_files:
                    new_images.append(full_path)
        return new_images

    def mark_as_processed(self, file_path):
        self.processed_files.add(file_path)

class ImageSender:
    def __init__(self, folder_path, kafka_producer, topic):
        self.image_processor = ImageProcessor(folder_path)
        self.kafka_producer = kafka_producer
        self.topic = topic
        self.fake = Faker()

    def send_images_to_kafka(self):
        current_time = time.time()
        list_files = self.image_processor.get_list_new_images()
        if list_files:
            for full_path in list_files:
                data = {
                    "taskId": str(self.fake.uuid4()),
                    "url": full_path,
                    "timestamp_find_image": current_time
                }
                self.kafka_producer.send(self.topic, value=data)
                print("Sent data:", data)
                self.image_processor.mark_as_processed(full_path)



In [47]:
script_dir = os.getcwd()  # Получаем путь до текущего скрипта
images_dir = os.path.join(script_dir, PATH_S3)  # Папка с изображениями полный путь
image_sender = ImageSender(images_dir, kafka_producer, topic)

while True:
    image_sender.send_images_to_kafka()
    time.sleep(5)

Sent data: {'taskId': 'cea4e6ae-6ede-4335-a78c-38e4388e316d', 'url': 'd:\\MAIN\\Для души (полезный контент)\\Дополнительное\\ml-diploma\\images_for_processing\\2aba8accc49f0318_jpg.rf.f4c9be7d920e096d5758dece5ce13c34.jpg', 'timestamp_find_image': 1710776123.9011474}
Sent data: {'taskId': '590a0df1-8b6c-4046-8f41-1f5ede0716da', 'url': 'd:\\MAIN\\Для души (полезный контент)\\Дополнительное\\ml-diploma\\images_for_processing\\4c965b917f477943_jpg.rf.2170f806f83e3a63a2655138c7fd7a7f.jpg', 'timestamp_find_image': 1710776123.9011474}
Sent data: {'taskId': '1ff41ee9-4980-46bb-a345-45193ffbac29', 'url': 'd:\\MAIN\\Для души (полезный контент)\\Дополнительное\\ml-diploma\\images_for_processing\\4d5bb651b8accaf1_jpg.rf.8e2c4e07ba804b07495749b10d946dba.jpg', 'timestamp_find_image': 1710776123.9011474}


KeyboardInterrupt: 