diff --git a/docs/amqp.rst b/docs/amqp.rst deleted file mode 100644 index f6a4a40..0000000 --- a/docs/amqp.rst +++ /dev/null @@ -1,11 +0,0 @@ -.. _amqp: - -pika -==== - -``_ - -celery -====== - -``_ diff --git a/docs/ansible.rst b/docs/ansible.rst deleted file mode 100644 index 9e3d44e..0000000 --- a/docs/ansible.rst +++ /dev/null @@ -1,20 +0,0 @@ -.. _ansible: - -ansible -======= - - -install -------- - -install from pypi:: - - $ pip install ansible - -usage ------ - -import:: - - import ansible - diff --git a/docs/docker.rst b/docs/docker.rst deleted file mode 100644 index 328f14f..0000000 --- a/docs/docker.rst +++ /dev/null @@ -1,5 +0,0 @@ -.. _docker: - -docker -====== - diff --git a/docs/index.rst b/docs/index.rst index 427e3ca..d16f9ec 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,21 +19,18 @@ User Guide robotframework selenium - nagios - ansible - jenkins - docker database yaml + toml office - ssh snmp + ssh ldap http ftp email - amqp - rpc + rabbitmq + redis winrm wmi misc diff --git a/docs/jenkins.rst b/docs/jenkins.rst deleted file mode 100644 index 135a175..0000000 --- a/docs/jenkins.rst +++ /dev/null @@ -1,4 +0,0 @@ -.. _jenkins: - -jenkins -======= \ No newline at end of file diff --git a/docs/rabbitmq.rst b/docs/rabbitmq.rst new file mode 100644 index 0000000..a2a32e2 --- /dev/null +++ b/docs/rabbitmq.rst @@ -0,0 +1,101 @@ +.. _amqp: + +pika +==== + +``_ + +install +------- + +install from pypi:: + + $ pip install pika + +usage +----- + +import:: + + import pika + +Class BlockingConnection:: + + connection = pika.BlockingConnection( + pika.ConnectionParameters( + self.host, + self.port, + self.virtual_host, + self.credentials, + **self.kwargs + ) + ) + connection = pika.BlockingConnection( + pika.URLParameters( + "amqp://username:password@host:port/[?query-string]" + # eg: "amqp://guest:guest@localhost:5672/%2F" + ) + ) + + # methods + channel = connection.channel() # return BlockingChannel object. + connection.close() + + # data + is_closed + is_closing + is_open + +Class BasicProperties:: + + BasicProperties(content_type=None,content_encoding=None, headers=None, + delivery_mode=None, priority=None, correlation_id=None, reply_to=None, + expiration=None, message_id=None, timestamp=None, type=None, + user_id=None, app_id=None, cluster_id=None) + + content_type: + "application/json" + + delivery_mode: + 2: data persistent + + priority: + +Class BlockingChannel:: + + BlockingChannel(channel_impk, connection) + + # methods + basic_publish(exchange, routing_key, body, properties=None,mandatory=False, immediate=False) # producer + > properties = pika.BasicProperties(...) + + basic_consume(consumer_callback, queue, no_ack=False, exclusive=False,consumer_tag=None, arguments=None) # consumer. + > consumer_callback = function_name(channel, method, properties, body) + + basic_qos(callback=None, prefetch_size=0, prefetch_count=0,all_channels=False) + basic_ack(delivery_tag=0, multiple=False) # acknowledge messages. + basic_cancel(consumer_tag) # cancels consumer. + basic_get(queue=None, no_ack=False) + basic_nack(...) + basic_recover(...) + basic_reject(...) + cancel() + consume(...) + close(reply_code=0, reply_text="Normal shutdown") + confirm_delivery(callback=None, nowait=False) + exchange_declare(exchange=None, exchange_type='direct', passive=False,durable=False, auto_delete=False, internal=False, arguments=None) + exchange_bind(destination=None, source=None, routing_key='',arguments=None) + exchange_unbind(destination=None, source=None, routing_key='',arguments=None) + exchange_delete(exchange=None, if_unused=False) + publish() + queue_declare(queue='', passive=False, durable=False, exclusive=False,auto_delete=False) + queue_bind(queue, exchange, routing_key=None, arguments=None) + queue_unbind(queue='', exchange=None, routing_key='', arguments=None) + queue_delete(queue='', if_unused=False, if_empty=False) + queue_purge(queue='') + start_consuming() # consumer start consumer message. + stop_consuming(consumer_tag=None) # consumer stop consume message. + + # data + channel_number + diff --git a/docs/redis.rst b/docs/redis.rst new file mode 100644 index 0000000..43d37b5 --- /dev/null +++ b/docs/redis.rst @@ -0,0 +1,6 @@ +.. _redis: + +redis +===== + + diff --git a/docs/rpc.rst b/docs/rpc.rst deleted file mode 100644 index c558e50..0000000 --- a/docs/rpc.rst +++ /dev/null @@ -1,5 +0,0 @@ -.. _rpc: - -grpc -==== - diff --git a/requirements.txt b/requirements.txt index 052db37..86c64fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ idna==2.7 ipaddress==1.0.22 ntlm-auth==1.2.0 paramiko==2.4.2 +pika==0.12.0 pyasn1==0.4.4 pycparser==2.19 pymssql==2.1.4 diff --git a/super_devops/amqp/__init__.py b/super_devops/gitlab/__init__.py similarity index 100% rename from super_devops/amqp/__init__.py rename to super_devops/gitlab/__init__.py diff --git a/super_devops/grafana/grafana_wrapper.py b/super_devops/grafana/grafana_wrapper.py index 7bdfab5..e3dc968 100644 --- a/super_devops/grafana/grafana_wrapper.py +++ b/super_devops/grafana/grafana_wrapper.py @@ -10,7 +10,7 @@ class BaseGrafana(object): def __init__( - self, grafana_url="http://localhost:3000", + self, grafana_url="http://localhost:3000/", username=None, password=None, domain=None ): self.grafana_url = grafana_url diff --git a/super_devops/docker/__init__.py b/super_devops/rabbitmq/__init__.py similarity index 100% rename from super_devops/docker/__init__.py rename to super_devops/rabbitmq/__init__.py diff --git a/super_devops/rabbitmq/pika_wrapper.py b/super_devops/rabbitmq/pika_wrapper.py new file mode 100644 index 0000000..afd1c6a --- /dev/null +++ b/super_devops/rabbitmq/pika_wrapper.py @@ -0,0 +1,128 @@ +import pika +import logging + +logger = logging.getLogger(__file__) + + +class BaseRabbitmq(object): + def __init__( + self, host="localhost", port=5672, virtual_host='/', + username='guest', password='guest', channel_max=65535, + frame_max=131072, **kwargs + ): + self.host = host + self.port = port + self.virtual_host = virtual_host + self.credentials = pika.credentials.PlainCredentials( + username, password) + + self.channel_max = channel_max + self.frame_max = frame_max + + # self.heartbeat = None + # self.ssl = False + # self.ssl_options = None + # self.connection_attempts = 1 + # self.retry_delay = 2.0 + # self.socket_timeout = 10.0 + # self.locale = 'en_US' + # self.backpressure_detection = False + # self.blocked_connection_timeout = None + # self.client_properties = None + # self.tcp_options = None + self.kwargs = kwargs + + self.connection = None + self.channel = None + + def __enter__(self): + self.connection = pika.BlockingConnection( + pika.ConnectionParameters( + self.host, + self.port, + self.virtual_host, + self.credentials, + **self.kwargs + ) + ) + self.channel = self.connection.channel() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.connection.close() + + def purge_queue(self, name): + try: + self.channel.queue_purge(queue=name) + except Exception as e: + logger.error( + "Purge queue {} failed: {}.".format(name, e.message) + ) + raise + + def create_exchange( + self, exchange, exchange_type='direct', passive=False, + durable=False, auto_delete=False, internal=False, arguments=None + ): + try: + self.channel.exchange_declare( + exchange=exchange, exchange_type=exchange_type, + passive=passive, durable=durable, auto_delete=auto_delete, + internal=internal, arguments=arguments + ) + except Exception: + raise + + def create_queue( + self, queue, passive=False, durable=False, + exclusive=False, auto_delete=False, arguments=None + ): + try: + self.channel.queue_declare( + queue=queue, passive=passive, durable=durable, + exclusive=exclusive, auto_delete=auto_delete, + arguments=arguments + ) + except Exception: + raise + + def bind(self, queue, exchange='', routing_key=None, arguments=None): + try: + self.channel.queue_bind( + queue=queue, exchange=exchange, routing_key=routing_key, + arguments=arguments + ) + self.channel.confirm_delivery() + except Exception: + raise + + def remove_exchange(self, exchange): + try: + self.channel.exchange_delete( + exchange=exchange, if_unused=False) + except Exception: + raise + + def remove_queue(self, queue): + try: + self.channel.queue_delete( + queue=queue, if_unused=False, if_empty=False + ) + except Exception: + raise + + +if __name__ == "__main__": + with BaseRabbitmq( + host="10.103.239.61", username="sandbox", password="password" + ) as mq: + mq.create_exchange(exchange="test", durable=True) + mq.create_queue( + queue="canux", durable=True, arguments={"x-max-priority": 255} + ) + mq.bind("canux", "test") + mq.remove_exchange("test") + mq.remove_queue("canux") + + + diff --git a/super_devops/rabbitmq/rabbitmq_wrapper.py b/super_devops/rabbitmq/rabbitmq_wrapper.py new file mode 100644 index 0000000..b3ca2c8 --- /dev/null +++ b/super_devops/rabbitmq/rabbitmq_wrapper.py @@ -0,0 +1,44 @@ +import logging +import urlparse + +from super_devops.http.requests_wrapper import BaseRequests + +logger = logging.getLogger(__name__) + + +class BaseRabbitmq(object): + def __init__( + self, url="http://localhost:15672/", + username=None, password=None, domain=None + ): + self.base_url = url + self.username = username + self.password = password + self.domain = domain + + def purge_queue(self, name, vhost="%2F"): + try: + url = urlparse.urljoin( + self.base_url, + "/api/queues/{}/{}/contents".format(vhost, name) + ) + with BaseRequests( + self.username, self.password, self.domain + ) as req: + res = req.delete(url) + logger.debug("purge queue res: {}".format(res.content)) + + if res.status_code == 204: + logger.debug("purge queue {} succeed.".format(name)) + return True + else: + logger.error("purge queue {} failed.".format(name)) + return False + except Exception: + raise + + +if __name__ == "__main__": + rabbitmq = BaseRabbitmq( + "http://10.103.239.61:15672", "sandbox", "password") + rabbitmq.purge_queue(name="vmray_cloud") diff --git a/super_devops/redis/__init__.py b/super_devops/redis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/super_devops/tick_stack/influxdb_wrapper.py b/super_devops/tick_stack/influxdb_wrapper.py index 74ea41e..33079d9 100644 --- a/super_devops/tick_stack/influxdb_wrapper.py +++ b/super_devops/tick_stack/influxdb_wrapper.py @@ -9,7 +9,7 @@ class BaseInfluxdb(object): def __init__( - self, influxdb_url="http://localhost:8086", + self, influxdb_url="http://localhost:8086/", username=None, password=None, domain=None ): self.influxdb_url = influxdb_url diff --git a/super_devops/tick_stack/kapacitor_wrapper.py b/super_devops/tick_stack/kapacitor_wrapper.py index a2e4206..da866a5 100644 --- a/super_devops/tick_stack/kapacitor_wrapper.py +++ b/super_devops/tick_stack/kapacitor_wrapper.py @@ -9,7 +9,7 @@ class BaseKapacitor(object): def __init__( - self, kapacitor_url="http://localhost:9092", + self, kapacitor_url="http://localhost:9092/", username=None, password=None, domain=None ): self.kapacitor_url = kapacitor_url diff --git a/tests/test_smtp_wrapper.py b/tests/test_smtp_wrapper.py index 0f3b82d..168bfe5 100644 --- a/tests/test_smtp_wrapper.py +++ b/tests/test_smtp_wrapper.py @@ -30,11 +30,12 @@ def test_plain(self): with BaseEmail(host='mail.domain.com', port=25) as email: email.sendmail( frm="super-devops@domain.com", - to=['wcheng@domain.com'], + to=['canuxcheng@gmail.com', 'canuxcheng@163.com'], subject='test subject', text='text body' ) + @unittest.skip('ignore') def test_html(self): with BaseEmail(host='mail.domain.com', port=25) as email: email.sendmail(