# Pycture Producer
Capture Images from Webcam and sends them to Kafka

In [1]:
from kafka import KafkaProducer
import cv2
import base64
import json
import time
import datetime as dt
%matplotlib inline

### Class containing all the producer functionality 

In [2]:
class pycture_producer():
    """Stream WebCam Images to Kafka Endpoint.

    Keyword arguments:
    interval  -- Interval for capturing images in seconds (default 5 sec)    
    source    -- Index of Video Device or Filename of Video-File
    camera_id -- Unique Identifier for this Camera
    topic     -- receiving Kafka Topic 
    server    -- Host + Port of Kafka Endpoint (default '127.0.0.1:9092')
    """

    def __init__(self,
                 interval: int = 5,
                 source=0,
                 camera_id: str = 'camera_generic',
                 topic: str = 'wordcounttopic',
                 server: str = '127.0.0.1:9092'):

        print(f'Initialized camera "{camera_id}" with source {source}.')
        print(f'Send to "{topic}" on "{server}" every {interval} sec.')
        
        # Class Variables
        self.interval = interval  # Interval for Photos in Seconds
        self.video_source = source
        self.camera_id = camera_id
        self.server = server  # Host + Port of Kafka Endpoint
        self.topic = topic
        self.img_file = './frame.jpg'

        # Connection to Kafka Enpoint
        self.producer = KafkaProducer(bootstrap_servers=self.server,
                                      value_serializer=lambda m: json.dumps(m).encode('utf8'))
        self.producer.send(self.topic, b'data')

        # Start Streaming...
        self.stream_video()

    def stream_video(self):
        """Start streaming video frames to Kafka forever."""
        print(f'Start capturing frames every {self.interval} sec.')
        while True:
            vidcap = cv2.VideoCapture(self.video_source)           
            success, image = vidcap.read()
            timestamp = dt.datetime.utcnow().isoformat()
            vidcap.release()
            if success is True:
                # Base64 encode image for transfer in json
                png = cv2.imencode('.png', image)[1]
                png_as_text = base64.b64encode(png).decode('utf-8')
                # Build object and send to Kafka
                result = {
                    'image': png_as_text,
                    'timestamp': timestamp,
                    'camera_id': self.camera_id
                }
                self.send_to_kafka(result)
                self.save_image(image)
            else:
                print(f'Could not read image from source {self.video_source}!')
            # Sleep interval, before next capture
            time.sleep(self.interval)

    def send_to_kafka(self, data):
        """Send JSON payload to topic in Kafka."""
        self.producer.send(self.topic, data)

    def save_image(self, image):
        """Save frame as JPEG file, for debugging purpose only."""
        cv2.imwrite(self.img_file, image)

### Initialiue and start producer

In [None]:
# Set source='demo.mp4' for streaming video file
pycture_producer(interval=2,
                source=0, #'../producer/demo.mp4'
                camera_id='holger',
                server='127.0.0.1:9092',
                topic='pycturestream')

Initialized camera "holger" with source 0.
Send to "pycturestream" on "127.0.0.1:9092" every 2 sec.
Start capturing frames every 2 sec.
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
Could not read image from source 0!
