Skip to content

A simple unified PubSub messaging module that supports Kafka and Google PubSub as backend.

Notifications You must be signed in to change notification settings

AliAbdelaal/pubsub-lib

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Simple Unified PubSub module

A unified PubSub messaging module that supports Kafka and Google PubSub as backend.

Installation

Python environment

You need to clone the repo and create a python3.7 virtualenv as follows.

$git clone https://github.com/AliAbdelaal/pubsub-lib.git
$cd pubsub-lib/
$python -m venv venv
$source venv/bin/activate
$pip install .

Setup configurations

Create a configs.json file for the corresponding backend that you want to use, and set an environment variable with the path of the file as follows.

$export PUBSUB_CONFIG_PATH='path/to/configs.json'

Kafka installation

kafka

To use Kafka as a backend service, you will need to install Kafka, you can follow these blogs to install it.

Then you will need to setup Kafka configurations file as follows.

  • Kafka config file:

    {
        "kafka_servers": [
                "localhost:9092"
            ]
    }

Google PubSub Installation

gcp-pubsub

To use Google PubSub you will need to setup a GCP account, create a new PubSub topic and a subscription with a publishing and subscribing permissions.

You can follow this video from the creators of Google PubSub to create a topic and a subscription.

You will need to setup the gcp configurations as follows.

  • Google PubSub config file:

    {
        "type": "YOUR_VALUE_GOES_HERE",
        "project_id": "YOUR_VALUE_GOES_HERE",
        "private_key_id": "YOUR_VALUE_GOES_HERE",
        "private_key": "YOUR_VALUE_GOES_HERE",
        "client_email": "YOUR_VALUE_GOES_HERE",
        "client_id": "YOUR_VALUE_GOES_HERE",
        "auth_uri": "YOUR_VALUE_GOES_HERE",
        "token_uri": "YOUR_VALUE_GOES_HERE",
        "auth_provider_x509_cert_url": "YOUR_VALUE_GOES_HERE",
        "client_x509_cert_url": "YOUR_VALUE_GOES_HERE"
    }

Then you will need to export a system environment variables, GOOGLE_CLOUD_PROJECT which will include the project-ID.

Also working with GCP as backend you will need to provide an additional argument gcp_subscription_id to the PubSubFactory.create_consumer() which is the subscription-id.

export GOOGLE_CLOUD_PROJECT='PROJECT_ID'

Usage

The library supports sending and receiving messages in bytes format with an optional string key, here is an example for a producer that sends images to a topic and a consumer that saves these images.

Producer

You will need to install Pillow in order to try this example using pip install Pillow

# import necessary libs
import io
from PIL import Image
from PubSub import PubSubFactory

# initiate a pubsub object with the backend that you would like to use ['kafka', 'gcp']
pubsub = PubSubFactory('gcp')
# if you have a topic named: images-topic
producer = pubsub.create_producer("images-topic")

try:
    while True:
        # keep on accepting key, value pairs of keys and it's corresponding image path
        key, value = input("enter key, image path:").strip().split(",")
        # read the image and encode it in bytes
        image = Image.open(value)
        with io.BytesIO() as output:
            image.save(output, image.format)
            # send the image via the publisher
            producer.push_msg(output.getvalue(), key=key)
except KeyboardInterrupt:
    print("Okay man, shutting down !")

Consumer

# import libraries
import io
import os
from datetime import datetime
from PIL import Image
from PubSub import PubSubFactory

# Create callback function that accepts key and value
def callback(key:str, value:bytes):
    # create a download file to save the images in
    if not os.path.exists('downloads'):
        os.mkdir("downloads")
    image = Image.open(io.BytesIO(value))
    file_name = f'{datetime.now().isoformat().replace(":", "-").split(".")[0]}.{image.format}'
    # Save the image
    image.save(os.path.join("downloads", file_name))
    print(f"I got an Image !\n\tkey: {key}\n\tsaved to: {file_name}")

# initiate a pubsub object with the backend that you would like to use ['kafka', 'gcp']
pubsub = PubSubFactory('gcp')
# if you have a topic named: images-topic
pubsub.create_consumer("images-topic", callback)
# the consumer will run in a separate thread in the background.
print("A new consumer is running.")

About

A simple unified PubSub messaging module that supports Kafka and Google PubSub as backend.

Topics

Resources

Stars

Watchers

Forks

Languages