Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanyang0528 committed Aug 28, 2020
2 parents 9e704e0 + 7e107ae commit 1b64b58
Show file tree
Hide file tree
Showing 10 changed files with 430 additions and 13 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
FROM frolvlad/alpine-python3

WORKDIR /app
COPY . /app
COPY *requirements* /app/
RUN sed -i -e 's/v3\.8/edge/g' /etc/apk/repositories \
&& apk upgrade --update-cache --available \
&& apk add --no-cache librdkafka librdkafka-dev
RUN apk add --no-cache alpine-sdk python3-dev
RUN pip install -r requirements.txt
RUN pip install -r test-requirements.txt
COPY . /app
RUN pip install -e .

52 changes: 52 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ Supported Python version: 3.5+
.. image:: https://codecov.io/gh/bryanyang0528/ksql-python/branch/master/graph/badge.svg
:target: https://codecov.io/gh/bryanyang0528/ksql-python

.. image:: https://pepy.tech/badge/ksql
:target: https://pepy.tech/project/ksql

.. image:: https://pepy.tech/badge/ksql/month
:target: https://pepy.tech/project/ksql/month

.. image:: https://img.shields.io/badge/license-MIT-yellow.svg
:target: https://github.com/bryanyang0528/ksql-python/blob/master/LICENSE

Installation
------------

Expand Down Expand Up @@ -124,6 +133,49 @@ This command returns a generator. It can be printed e.g. by reading its values v
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}

Query with HTTP/2
^^^^^^^^^^^^^^^^^
Execute queries with the new ``/query-stream`` endpoint. Documented `here <https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries>`_

To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter.

.. code:: python
client.query('select * from table1', use_http2=True)
A generator is returned with the following example response

::

{"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]

To terminate the query above use the ``close_query`` call.
Provide the ``queryId`` returned from the ``query`` call.

.. code:: python
client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")
Insert rows into a Stream with HTTP/2
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Uses the new ``/inserts-stream`` endpoint. See `documentation <https://docs.ksqldb.io/en/0.10.0-ksqldb/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream>`_

.. code:: python
rows = [
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
]
results = self.api_client.inserts_stream("my_stream_name", rows)
An array of object will be returned on success, with the status of each row inserted.


Simplified API
~~~~~~~~~~~~~~

Expand Down
96 changes: 95 additions & 1 deletion ksql/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import urllib
from copy import deepcopy
from requests import Timeout
from urllib.parse import urlparse
from hyper import HTTPConnection


from ksql.builder import SQLBuilder
from ksql.errors import CreateError, InvalidQueryError, KSQLError
Expand Down Expand Up @@ -65,6 +68,42 @@ def ksql(self, ksql_string, stream_properties=None):
res = json.loads(response)
return res

def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
"""
Process streaming incoming data with HTTP/2.
"""
parsed_uri = urlparse(self.url)

logging.debug("KSQL generated: {}".format(query_string))
sql_string = self._validate_sql_string(query_string)
body = {"sql": sql_string}
if stream_properties:
body["properties"] = stream_properties
else:
body["properties"] = {}

with HTTPConnection(parsed_uri.netloc) as connection:
streaming_response = self._request2(
endpoint="query-stream", body=body, connection=connection
)
start_idle = None

if streaming_response.status == 200:
for chunk in streaming_response.read_chunked():
if chunk != b"\n":
start_idle = None
yield chunk.decode(encoding)

else:
if not start_idle:
start_idle = time.time()
if idle_timeout and time.time() - start_idle > idle_timeout:
print("Ending query because of time out! ({} seconds)".format(idle_timeout))
return
else:
raise ValueError("Return code is {}.".format(streaming_response.status))

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
"""
Process streaming incoming data.
Expand Down Expand Up @@ -93,6 +132,20 @@ def get_request(self, endpoint):
auth = (self.api_key, self.secret) if self.api_key or self.secret else None
return requests.get(endpoint, headers=self.headers, auth=auth)

def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"):
url = "{}/{}".format(self.url, endpoint)
data = json.dumps(body).encode(encoding)

headers = deepcopy(self.headers)
if self.api_key and self.secret:
base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8")
headers["Authorization"] = "Basic %s" % base64string

connection.request(method=method.upper(), url=url, headers=headers, body=data)
resp = connection.get_response()

return resp

def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"):
url = "{}/{}".format(self.url, endpoint)

Expand Down Expand Up @@ -122,10 +175,51 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
raise http_error
else:
logging.debug("content: {}".format(content))
raise KSQLError(e=content.get("message"), error_code=content.get("error_code"))
raise KSQLError(content.get("message"), content.get("error_code"), content.get("stackTrace"))
else:
return r

def close_query(self, query_id):
body = {"queryId": query_id}
data = json.dumps(body).encode("utf-8")
url = "{}/{}".format(self.url, "close-query")

response = requests.post(url=url, data=data)

if response.status_code == 200:
logging.debug("Successfully canceled Query ID: {}".format(query_id))
return True
elif response.status_code == 400:
message = json.loads(response.content)["message"]
logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message))
return False
else:
raise ValueError("Return code is {}.".format(response.status_code))

def inserts_stream(self, stream_name, rows):
body = '{{"target":"{}"}}'.format(stream_name)
for row in rows:
body += '\n{}'.format(json.dumps(row))

parsed_uri = urlparse(self.url)
url = "{}/{}".format(self.url, "inserts-stream")
headers = deepcopy(self.headers)
with HTTPConnection(parsed_uri.netloc) as connection:
connection.request("POST", url, bytes(body, "utf-8"), headers)
response = connection.get_response()
result = response.read()

result_str = result.decode("utf-8")
result_chunks = result_str.split("\n")
return_arr = []
for chunk in result_chunks:
try:
return_arr.append(json.loads(chunk))
except:
pass

return return_arr

@staticmethod
def retry(exceptions, delay=1, max_retries=5):
"""
Expand Down
34 changes: 26 additions & 8 deletions ksql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function

from ksql.api import SimplifiedAPI
from ksql.utils import process_query_result


class KSQLAPI(object):
Expand Down Expand Up @@ -41,14 +42,31 @@ def get_properties(self):
def ksql(self, ksql_string, stream_properties=None):
return self.sa.ksql(ksql_string, stream_properties=stream_properties)

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
return self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None, return_objects=None):
if use_http2:
yield from self.sa.query2(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)
else:
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)

yield from process_query_result(results, return_objects)

def close_query(self, query_id):
return self.sa.close_query(query_id)

def inserts_stream(self, stream_name, rows):
return self.sa.inserts_stream(stream_name, rows)

def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
return self.sa.create_stream(
Expand Down
44 changes: 44 additions & 0 deletions ksql/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import ksql
import telnetlib
import json
import re


def check_kafka_available(bootstrap_servers):
Expand Down Expand Up @@ -65,3 +67,45 @@ def get_dependent_queries(api_client, stream_name):
write_queries = [query["id"] for query in stream_info["writeQueries"]]

return read_queries, write_queries


def parse_columns(columns_str):
regex = r"(?<!\<)`(?P<name>[A-Z_]+)` (?P<type>[A-z]+)[\<, \"](?!\>)"
result = []

matches = re.finditer(regex, columns_str)
for matchNum, match in enumerate(matches, start=1):
result.append({"name": match.group("name"), "type": match.group("type")})

return result


def process_row(row, column_names):
row = row.replace(",\n", "").replace("]\n", "")
row_obj = json.loads(row)
if 'finalMessage' in row_obj:
return None
column_values = row_obj["row"]["columns"]
index = 0
result = {}
for column in column_values:
result[column_names[index]["name"]] = column
index += 1

return result


def process_query_result(results, return_objects=None):
if return_objects is None:
yield from results

# parse rows into objects
header = next(results)
columns = parse_columns(header)

for result in results:
row_obj = process_row(result, columns)
if row_obj is None:
return
yield row_obj

1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ wcwidth==0.2.5
wrapt==1.12.1
yarl==1.4.2
zipp==3.1.0
hyper
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
requests
six
urllib3
hyper
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def get_install_requirements(path):
'install_requires': [
'requests',
'six',
'urllib3'
'urllib3',
'hyper'
],
'zip_safe': False,
}
Expand Down
Loading

1 comment on commit 1b64b58

@bryanyang0528
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#83 Add download and license badges
#82 Optimize Dockerfile
#81 fixed exception handling of ksql
#79 Support new query-stream API with HTTP/2
#78 Optional parsing of query result into objects

Please sign in to comment.