Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmad88me committed Mar 15, 2022
2 parents cf3d4a2 + 69e3790 commit 837914e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 37 deletions.
35 changes: 30 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
![stiqueue](stiqueue.png)
<!--![stiqueue](stiqueue.png)-->
![stiqueue](https://github.com/ahmad88me/stiqueue/raw/main/stiqueue.png)

[![Build Status](https://ahmad88me.semaphoreci.com/badges/stiqueue/branches/main.svg)](https://ahmad88me.semaphoreci.com/projects/stiqueue)
[![codecov](https://codecov.io/gh/ahmad88me/stiqueue/branch/main/graph/badge.svg?token=mfqJCVLNXc)](https://codecov.io/gh/ahmad88me/stiqueue)
Expand All @@ -7,16 +8,40 @@
# stiqueue
Stands for stick queue which is a simple messaging queue

## Usage

# Run coverage
### Methods
The followings are a set of methods supported by stiqueue
* **enq**: to add to the queue (enqueue).
* **deq**: to get a value from the queue (dequeue).
* **cnt**: number of items in the queue.


### Server
You can run the server `sqserver.py` as is.
```
python -m stiqueue.sqserver 0.0.0.0 1234
```
You can also change the port to any of your choice.
The default one used in Docker is `27017`. You can also
extend the server and add additional methods to meet your needs.

### Client
Most probably you want to extend the class `SQClient`, located in `stiqueue/sqclient.py`.
You can see an example of this in `example.client.py`.


# Development
## Run coverage
```sh run_coverage.sh```

# Run tests
## Run tests
```sh run_tests.sh```

# Run Docker
## Run Docker
Example of running the server from Docker
```docker container run --interactive -p "127.0.0.1:1234:1234" --tty --rm --name stiqueue ahmad88me/stiqueue```
```docker container run --interactive -p "1234:1234" --tty --rm --name stiqueue ahmad88me/stiqueue```


# Update Docker
For example, to update docker image with version `v1.0`
Expand Down
2 changes: 1 addition & 1 deletion run_tests.sh
Original file line number Diff line number Diff line change
@@ -1 +1 @@
python -W ignore -m unittest tests
python -W ignore -m unittest tests
16 changes: 13 additions & 3 deletions stiqueue/sqclient.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
import socket
import sys
import time
import logging


class SQClient:

def __init__(self, host="127.0.0.1", port=1234, max_len=10240):
def __init__(self, host="127.0.0.1", port=1234, max_len=10240, logger=None):
self.host = host
self.port = port
self.max_len = max_len
if host is None:
self.host = socket.gethostname()
self.socket = None
if logger is None:
logger = logging.getLogger(__name__)
# logger.setLevel(logging.CRITICAL)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
logger.addHandler(ch)
self.logger = logger

def connect(self):
self.logger.debug("Connecting to %s %d" % (self.host, self.port))
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))

def send_with_action(self, msg, action, recv=False):
ret_val = None
req = action+msg
# print("send with action: ")
# print(req)
self.logger.debug("send with action: ")
self.logger.debug(req)
self.connect()
self.socket.send(req)
if recv:
Expand Down
58 changes: 30 additions & 28 deletions stiqueue/sqserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,29 @@
import sys
import os
from multiprocessing import Lock

import logging

class SQServer:

def __init__(self, host="127.0.0.1", port=1234, wconn=5, max_len=10240, action_len=3, debug=False):
def __init__(self, host="127.0.0.1", port=1234, wconn=5, max_len=10240, action_len=3, debug=False, logger=None):
self.lock = Lock()
self.q = []
self.action_len = action_len
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.debug = debug
if not logger:
logger = logging.getLogger(__name__)
# logger.setLevel(logging.CRITICAL)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
logger.addHandler(ch)
self.logger = logger
if host is None:
self.host = socket.gethostname()
if self.debug:
print("SERVER> self host: ")
print(self.host)
self.logger.debug("SERVER> self host: ")
self.logger.debug(self.host)
else:
self.host = host

Expand All @@ -25,22 +33,22 @@ def __init__(self, host="127.0.0.1", port=1234, wconn=5, max_len=10240, action_l
self.socket.bind((self.host, self.port))
self.max_len = max_len
if self.debug:
print("SERVER> binded to %s %d" % (self.host, self.port))
self.logger.debug("SERVER> binded to %s %d" % (self.host, self.port))

def enq(self, msg):
print("SERVER> enqueue: ")
print(msg)
if self.debug:
self.logger.debug("SERVER> enqueue: ")
self.logger.debug(msg)
self.lock.acquire()
self.q.append(msg)
self.lock.release()

def deq(self, conn):
print("dequeue: ")
l = len(self.q)
if l > 0:
self.lock.acquire()
v = self.q.pop(0)
print("SERVER> dequeue: %s" % str(v))
self.logger.debug("SERVER> dequeue: %s" % str(v))
conn.sendall(v)
self.lock.release()

Expand All @@ -57,25 +65,17 @@ def other_actions(self, action_msg):
:param action_msg:
:return:
"""
print("other_actions> "+str(action_msg))
if self.debug:
self.logger.debug("other_actions> "+str(action_msg))

def listen_single(self):
self.socket.listen(self.wconn)
if self.debug:
print("SERVER> Waiting for client...")
self.logger.debug("SERVER> Waiting for client...")
conn, addr = self.socket.accept() # Accept connection when client connects
if self.debug:
print("SERVER> Connected by %s" % str(addr))
self.logger.debug("SERVER> Connected by %s" % str(addr))
action_msg = conn.recv(self.max_len) # Receive client data
# print("DEBUG: action msg: ")
# while True:
# data = conn.recv(self.max_len) # Receive client data
# if not data:
# break # exit from loop if no data
# action_msg += data
# # conn.sendall(data) # Send the received data back to client
# print("DEBUG: action msg: ")
# print(action_msg)
action = action_msg[:self.action_len]
if len(action_msg) >= self.action_len:
msg = action_msg[self.action_len:]
Expand All @@ -87,15 +87,17 @@ def listen_single(self):
self.cnt(conn)
else:
if self.debug:
print("SERVER> other action: ")
print(action)
self.logger.debug("SERVER> other action: ")
self.logger.debug(action)
self.other_actions(action_msg)
else:
print("SERVER> Error: short action length: ")
print(action_msg)
print(action)
print(len(action_msg))
print(self.action_len)
self.logger.error("SERVER> Error: short action length")
self.logger.debug(action)
if self.debug:
self.logger.debug(action_msg)
self.logger.debug(action)
self.logger.debug(len(action_msg))
self.logger.debug(self.action_len)
conn.close()


Expand Down

0 comments on commit 837914e

Please sign in to comment.