Skip to content

Commit

Permalink
Merge f810359 into b9b905f
Browse files Browse the repository at this point in the history
  • Loading branch information
crazy-canux committed Dec 20, 2018
2 parents b9b905f + f810359 commit b4129d4
Show file tree
Hide file tree
Showing 18 changed files with 289 additions and 56 deletions.
11 changes: 0 additions & 11 deletions docs/amqp.rst

This file was deleted.

20 changes: 0 additions & 20 deletions docs/ansible.rst

This file was deleted.

5 changes: 0 additions & 5 deletions docs/docker.rst

This file was deleted.

11 changes: 4 additions & 7 deletions docs/index.rst
Expand Up @@ -19,21 +19,18 @@ User Guide

robotframework
selenium
nagios
ansible
jenkins
docker
database
yaml
toml
office
ssh
snmp
ssh
ldap
http
ftp
email
amqp
rpc
rabbitmq
redis
winrm
wmi
misc
Expand Down
4 changes: 0 additions & 4 deletions docs/jenkins.rst

This file was deleted.

101 changes: 101 additions & 0 deletions docs/rabbitmq.rst
@@ -0,0 +1,101 @@
.. _amqp:

pika
====

`<https://github.com/pika/pika>`_

install
-------

install from pypi::

$ pip install pika

usage
-----

import::

import pika

Class BlockingConnection::

connection = pika.BlockingConnection(
pika.ConnectionParameters(
self.host,
self.port,
self.virtual_host,
self.credentials,
**self.kwargs
)
)
connection = pika.BlockingConnection(
pika.URLParameters(
"amqp://username:password@host:port/<virtual_host>[?query-string]"
# eg: "amqp://guest:guest@localhost:5672/%2F"
)
)

# methods
channel = connection.channel() # return BlockingChannel object.
connection.close()

# data
is_closed
is_closing
is_open

Class BasicProperties::

BasicProperties(content_type=None,content_encoding=None, headers=None,
delivery_mode=None, priority=None, correlation_id=None, reply_to=None,
expiration=None, message_id=None, timestamp=None, type=None,
user_id=None, app_id=None, cluster_id=None)

content_type:
"application/json"

delivery_mode:
2: data persistent

priority:

Class BlockingChannel::

BlockingChannel(channel_impk, connection)

# methods
basic_publish(exchange, routing_key, body, properties=None,mandatory=False, immediate=False) # producer
> properties = pika.BasicProperties(...)

basic_consume(consumer_callback, queue, no_ack=False, exclusive=False,consumer_tag=None, arguments=None) # consumer.
> consumer_callback = function_name(channel, method, properties, body)

basic_qos(callback=None, prefetch_size=0, prefetch_count=0,all_channels=False)
basic_ack(delivery_tag=0, multiple=False) # acknowledge messages.
basic_cancel(consumer_tag) # cancels consumer.
basic_get(queue=None, no_ack=False)
basic_nack(...)
basic_recover(...)
basic_reject(...)
cancel()
consume(...)
close(reply_code=0, reply_text="Normal shutdown")
confirm_delivery(callback=None, nowait=False)
exchange_declare(exchange=None, exchange_type='direct', passive=False,durable=False, auto_delete=False, internal=False, arguments=None)
exchange_bind(destination=None, source=None, routing_key='',arguments=None)
exchange_unbind(destination=None, source=None, routing_key='',arguments=None)
exchange_delete(exchange=None, if_unused=False)
publish()
queue_declare(queue='', passive=False, durable=False, exclusive=False,auto_delete=False)
queue_bind(queue, exchange, routing_key=None, arguments=None)
queue_unbind(queue='', exchange=None, routing_key='', arguments=None)
queue_delete(queue='', if_unused=False, if_empty=False)
queue_purge(queue='')
start_consuming() # consumer start consumer message.
stop_consuming(consumer_tag=None) # consumer stop consume message.

# data
channel_number

6 changes: 6 additions & 0 deletions docs/redis.rst
@@ -0,0 +1,6 @@
.. _redis:

redis
=====


5 changes: 0 additions & 5 deletions docs/rpc.rst

This file was deleted.

1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -10,6 +10,7 @@ idna==2.7
ipaddress==1.0.22
ntlm-auth==1.2.0
paramiko==2.4.2
pika==0.12.0
pyasn1==0.4.4
pycparser==2.19
pymssql==2.1.4
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion super_devops/grafana/grafana_wrapper.py
Expand Up @@ -10,7 +10,7 @@

class BaseGrafana(object):
def __init__(
self, grafana_url="http://localhost:3000",
self, grafana_url="http://localhost:3000/",
username=None, password=None, domain=None
):
self.grafana_url = grafana_url
Expand Down
File renamed without changes.
128 changes: 128 additions & 0 deletions super_devops/rabbitmq/pika_wrapper.py
@@ -0,0 +1,128 @@
import pika
import logging

logger = logging.getLogger(__file__)


class BaseRabbitmq(object):
def __init__(
self, host="localhost", port=5672, virtual_host='/',
username='guest', password='guest', channel_max=65535,
frame_max=131072, **kwargs
):
self.host = host
self.port = port
self.virtual_host = virtual_host
self.credentials = pika.credentials.PlainCredentials(
username, password)

self.channel_max = channel_max
self.frame_max = frame_max

# self.heartbeat = None
# self.ssl = False
# self.ssl_options = None
# self.connection_attempts = 1
# self.retry_delay = 2.0
# self.socket_timeout = 10.0
# self.locale = 'en_US'
# self.backpressure_detection = False
# self.blocked_connection_timeout = None
# self.client_properties = None
# self.tcp_options = None
self.kwargs = kwargs

self.connection = None
self.channel = None

def __enter__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
self.host,
self.port,
self.virtual_host,
self.credentials,
**self.kwargs
)
)
self.channel = self.connection.channel()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.connection.close()

def purge_queue(self, name):
try:
self.channel.queue_purge(queue=name)
except Exception as e:
logger.error(
"Purge queue {} failed: {}.".format(name, e.message)
)
raise

def create_exchange(
self, exchange, exchange_type='direct', passive=False,
durable=False, auto_delete=False, internal=False, arguments=None
):
try:
self.channel.exchange_declare(
exchange=exchange, exchange_type=exchange_type,
passive=passive, durable=durable, auto_delete=auto_delete,
internal=internal, arguments=arguments
)
except Exception:
raise

def create_queue(
self, queue, passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None
):
try:
self.channel.queue_declare(
queue=queue, passive=passive, durable=durable,
exclusive=exclusive, auto_delete=auto_delete,
arguments=arguments
)
except Exception:
raise

def bind(self, queue, exchange='', routing_key=None, arguments=None):
try:
self.channel.queue_bind(
queue=queue, exchange=exchange, routing_key=routing_key,
arguments=arguments
)
self.channel.confirm_delivery()
except Exception:
raise

def remove_exchange(self, exchange):
try:
self.channel.exchange_delete(
exchange=exchange, if_unused=False)
except Exception:
raise

def remove_queue(self, queue):
try:
self.channel.queue_delete(
queue=queue, if_unused=False, if_empty=False
)
except Exception:
raise


if __name__ == "__main__":
with BaseRabbitmq(
host="10.103.239.61", username="sandbox", password="password"
) as mq:
mq.create_exchange(exchange="test", durable=True)
mq.create_queue(
queue="canux", durable=True, arguments={"x-max-priority": 255}
)
mq.bind("canux", "test")
mq.remove_exchange("test")
mq.remove_queue("canux")



44 changes: 44 additions & 0 deletions super_devops/rabbitmq/rabbitmq_wrapper.py
@@ -0,0 +1,44 @@
import logging
import urlparse

from super_devops.http.requests_wrapper import BaseRequests

logger = logging.getLogger(__name__)


class BaseRabbitmq(object):
def __init__(
self, url="http://localhost:15672/",
username=None, password=None, domain=None
):
self.base_url = url
self.username = username
self.password = password
self.domain = domain

def purge_queue(self, name, vhost="%2F"):
try:
url = urlparse.urljoin(
self.base_url,
"/api/queues/{}/{}/contents".format(vhost, name)
)
with BaseRequests(
self.username, self.password, self.domain
) as req:
res = req.delete(url)
logger.debug("purge queue res: {}".format(res.content))

if res.status_code == 204:
logger.debug("purge queue {} succeed.".format(name))
return True
else:
logger.error("purge queue {} failed.".format(name))
return False
except Exception:
raise


if __name__ == "__main__":
rabbitmq = BaseRabbitmq(
"http://10.103.239.61:15672", "sandbox", "password")
rabbitmq.purge_queue(name="vmray_cloud")
Empty file added super_devops/redis/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion super_devops/tick_stack/influxdb_wrapper.py
Expand Up @@ -9,7 +9,7 @@

class BaseInfluxdb(object):
def __init__(
self, influxdb_url="http://localhost:8086",
self, influxdb_url="http://localhost:8086/",
username=None, password=None, domain=None
):
self.influxdb_url = influxdb_url
Expand Down
2 changes: 1 addition & 1 deletion super_devops/tick_stack/kapacitor_wrapper.py
Expand Up @@ -9,7 +9,7 @@

class BaseKapacitor(object):
def __init__(
self, kapacitor_url="http://localhost:9092",
self, kapacitor_url="http://localhost:9092/",
username=None, password=None, domain=None
):
self.kapacitor_url = kapacitor_url
Expand Down

0 comments on commit b4129d4

Please sign in to comment.