diff --git a/README.md b/README.md index 6a74639..b8b8283 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -![stiqueue](stiqueue.png) + +![stiqueue](https://github.com/ahmad88me/stiqueue/raw/main/stiqueue.png) [![Build Status](https://ahmad88me.semaphoreci.com/badges/stiqueue/branches/main.svg)](https://ahmad88me.semaphoreci.com/projects/stiqueue) [![codecov](https://codecov.io/gh/ahmad88me/stiqueue/branch/main/graph/badge.svg?token=mfqJCVLNXc)](https://codecov.io/gh/ahmad88me/stiqueue) @@ -7,16 +8,40 @@ # stiqueue Stands for stick queue which is a simple messaging queue +## Usage -# Run coverage +### Methods +The followings are a set of methods supported by stiqueue +* **enq**: to add to the queue (enqueue). +* **deq**: to get a value from the queue (dequeue). +* **cnt**: number of items in the queue. + + +### Server +You can run the server `sqserver.py` as is. +``` +python -m stiqueue.sqserver 0.0.0.0 1234 +``` +You can also change the port to any of your choice. +The default one used in Docker is `27017`. You can also +extend the server and add additional methods to meet your needs. + +### Client +Most probably you want to extend the class `SQClient`, located in `stiqueue/sqclient.py`. +You can see an example of this in `example.client.py`. + + +# Development +## Run coverage ```sh run_coverage.sh``` -# Run tests +## Run tests ```sh run_tests.sh``` -# Run Docker +## Run Docker Example of running the server from Docker -```docker container run --interactive -p "127.0.0.1:1234:1234" --tty --rm --name stiqueue ahmad88me/stiqueue``` +```docker container run --interactive -p "1234:1234" --tty --rm --name stiqueue ahmad88me/stiqueue``` + # Update Docker For example, to update docker image with version `v1.0` diff --git a/run_tests.sh b/run_tests.sh index 98f44ef..abb917d 100644 --- a/run_tests.sh +++ b/run_tests.sh @@ -1 +1 @@ -python -W ignore -m unittest tests \ No newline at end of file +python -W ignore -m unittest tests diff --git a/stiqueue/sqclient.py b/stiqueue/sqclient.py index ea042ee..8e9bfda 100644 --- a/stiqueue/sqclient.py +++ b/stiqueue/sqclient.py @@ -1,27 +1,37 @@ import socket import sys import time +import logging class SQClient: - def __init__(self, host="127.0.0.1", port=1234, max_len=10240): + def __init__(self, host="127.0.0.1", port=1234, max_len=10240, logger=None): self.host = host self.port = port self.max_len = max_len if host is None: self.host = socket.gethostname() self.socket = None + if logger is None: + logger = logging.getLogger(__name__) + # logger.setLevel(logging.CRITICAL) + # create console handler and set level to debug + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + logger.addHandler(ch) + self.logger = logger def connect(self): + self.logger.debug("Connecting to %s %d" % (self.host, self.port)) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.host, self.port)) def send_with_action(self, msg, action, recv=False): ret_val = None req = action+msg - # print("send with action: ") - # print(req) + self.logger.debug("send with action: ") + self.logger.debug(req) self.connect() self.socket.send(req) if recv: diff --git a/stiqueue/sqserver.py b/stiqueue/sqserver.py index 2f76529..978e3a2 100644 --- a/stiqueue/sqserver.py +++ b/stiqueue/sqserver.py @@ -2,21 +2,29 @@ import sys import os from multiprocessing import Lock - +import logging class SQServer: - def __init__(self, host="127.0.0.1", port=1234, wconn=5, max_len=10240, action_len=3, debug=False): + def __init__(self, host="127.0.0.1", port=1234, wconn=5, max_len=10240, action_len=3, debug=False, logger=None): self.lock = Lock() self.q = [] self.action_len = action_len self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.debug = debug + if not logger: + logger = logging.getLogger(__name__) + # logger.setLevel(logging.CRITICAL) + # create console handler and set level to debug + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + logger.addHandler(ch) + self.logger = logger if host is None: self.host = socket.gethostname() if self.debug: - print("SERVER> self host: ") - print(self.host) + self.logger.debug("SERVER> self host: ") + self.logger.debug(self.host) else: self.host = host @@ -25,22 +33,22 @@ def __init__(self, host="127.0.0.1", port=1234, wconn=5, max_len=10240, action_l self.socket.bind((self.host, self.port)) self.max_len = max_len if self.debug: - print("SERVER> binded to %s %d" % (self.host, self.port)) + self.logger.debug("SERVER> binded to %s %d" % (self.host, self.port)) def enq(self, msg): - print("SERVER> enqueue: ") - print(msg) + if self.debug: + self.logger.debug("SERVER> enqueue: ") + self.logger.debug(msg) self.lock.acquire() self.q.append(msg) self.lock.release() def deq(self, conn): - print("dequeue: ") l = len(self.q) if l > 0: self.lock.acquire() v = self.q.pop(0) - print("SERVER> dequeue: %s" % str(v)) + self.logger.debug("SERVER> dequeue: %s" % str(v)) conn.sendall(v) self.lock.release() @@ -57,25 +65,17 @@ def other_actions(self, action_msg): :param action_msg: :return: """ - print("other_actions> "+str(action_msg)) + if self.debug: + self.logger.debug("other_actions> "+str(action_msg)) def listen_single(self): self.socket.listen(self.wconn) if self.debug: - print("SERVER> Waiting for client...") + self.logger.debug("SERVER> Waiting for client...") conn, addr = self.socket.accept() # Accept connection when client connects if self.debug: - print("SERVER> Connected by %s" % str(addr)) + self.logger.debug("SERVER> Connected by %s" % str(addr)) action_msg = conn.recv(self.max_len) # Receive client data - # print("DEBUG: action msg: ") - # while True: - # data = conn.recv(self.max_len) # Receive client data - # if not data: - # break # exit from loop if no data - # action_msg += data - # # conn.sendall(data) # Send the received data back to client - # print("DEBUG: action msg: ") - # print(action_msg) action = action_msg[:self.action_len] if len(action_msg) >= self.action_len: msg = action_msg[self.action_len:] @@ -87,15 +87,17 @@ def listen_single(self): self.cnt(conn) else: if self.debug: - print("SERVER> other action: ") - print(action) + self.logger.debug("SERVER> other action: ") + self.logger.debug(action) self.other_actions(action_msg) else: - print("SERVER> Error: short action length: ") - print(action_msg) - print(action) - print(len(action_msg)) - print(self.action_len) + self.logger.error("SERVER> Error: short action length") + self.logger.debug(action) + if self.debug: + self.logger.debug(action_msg) + self.logger.debug(action) + self.logger.debug(len(action_msg)) + self.logger.debug(self.action_len) conn.close()