Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional parsing of query result into objects #78

Merged
merged 17 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ksql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function

from ksql.api import SimplifiedAPI
from ksql.utils import process_query_result


class KSQLAPI(object):
Expand Down Expand Up @@ -41,15 +42,17 @@ def get_properties(self):
def ksql(self, ksql_string, stream_properties=None):
return self.sa.ksql(ksql_string, stream_properties=stream_properties)

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
return self.sa.query(
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, return_objects=None):
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)

yield from process_query_result(results, return_objects)

def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
return self.sa.create_stream(
table_name=table_name, columns_type=columns_type, topic=topic, value_format=value_format
Expand Down
44 changes: 44 additions & 0 deletions ksql/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import ksql
import telnetlib
import json
import re


def check_kafka_available(bootstrap_servers):
Expand Down Expand Up @@ -65,3 +67,45 @@ def get_dependent_queries(api_client, stream_name):
write_queries = [query["id"] for query in stream_info["writeQueries"]]

return read_queries, write_queries


def parse_columns(columns_str):
regex = r"(?<!\<)`(?P<name>[A-Z_]+)` (?P<type>[A-z]+)[\<, \"](?!\>)"
result = []

matches = re.finditer(regex, columns_str)
for matchNum, match in enumerate(matches, start=1):
result.append({"name": match.group("name"), "type": match.group("type")})

return result


def process_row(row, column_names):
row = row.replace(",\n", "").replace("]\n", "")
row_obj = json.loads(row)
if 'finalMessage' in row_obj:
return None
column_values = row_obj["row"]["columns"]
index = 0
result = {}
for column in column_values:
result[column_names[index]["name"]] = column
index += 1

return result


def process_query_result(results, return_objects=None):
if return_objects is None:
yield from results

# parse rows into objects
header = next(results)
columns = parse_columns(header)

for result in results:
row_obj = process_row(result, columns)
if row_obj is None:
return
yield row_obj

98 changes: 98 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,104 @@ def test_ksql_create_stream_w_properties(self):
self.assertTrue(chunk)
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_result_with_utils(self):
topic = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_TOPIC"
stream_name = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_STREAM"

producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(topic, """{"order_id":3,"my_struct":{"a":1,"b":"bbb"}, "my_map":{"x":3, "y":4}, "my_array":[1,2,3], "total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

ksql_string = "CREATE STREAM {} (ORDER_ID INT, MY_STRUCT STRUCT<A INT, B VARCHAR>, MY_MAP MAP<VARCHAR, INT>, MY_ARRAY ARRAY<INT>, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if stream_name not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties
)
header = next(chunks)
columns = utils.parse_columns(header)

for chunk in chunks:
row_obj = utils.process_row(chunk, columns)
self.assertEqual(row_obj["ORDER_ID"], 3)
self.assertEqual(row_obj["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(row_obj["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(row_obj["MY_ARRAY"], [1, 2, 3])
self.assertEqual(row_obj["TOTAL_AMOUNT"], 43)
self.assertEqual(row_obj["CUSTOMER_NAME"], "Palo Alto")
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_result(self):
topic = "TEST_KSQL_PARSE_QUERY_RESULT_TOPIC"
stream_name = "TEST_KSQL_PARSE_QUERY_RESULT_STREAM"

producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(topic, """{"order_id":3,"my_struct":{"a":1,"b":"bbb"}, "my_map":{"x":3, "y":4}, "my_array":[1,2,3], "total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

ksql_string = "CREATE STREAM {} (ORDER_ID INT, MY_STRUCT STRUCT<A INT, B VARCHAR>, MY_MAP MAP<VARCHAR, INT>, MY_ARRAY ARRAY<INT>, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if stream_name not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, return_objects=True
)

for chunk in chunks:
self.assertEqual(chunk["ORDER_ID"], 3)
self.assertEqual(chunk["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(chunk["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(chunk["MY_ARRAY"], [1, 2, 3])
self.assertEqual(chunk["TOTAL_AMOUNT"], 43)
self.assertEqual(chunk["CUSTOMER_NAME"], "Palo Alto")
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_final_message(self):
topic = "TEST_KSQL_PARSE_QUERY_FINAL_MESSAGE_TOPIC"
stream_name = "TEST_KSQL_PARSE_QUERY_FINAL_MESSAGE_STREAM"

producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(topic, """{"order_id":3,"my_struct":{"a":1,"b":"bbb"}, "my_map":{"x":3, "y":4}, "my_array":[1,2,3], "total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

ksql_string = "CREATE STREAM {} (ORDER_ID INT, MY_STRUCT STRUCT<A INT, B VARCHAR>, MY_MAP MAP<VARCHAR, INT>, MY_ARRAY ARRAY<INT>, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if stream_name not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

chunks = self.api_client.query(
"select * from {} EMIT CHANGES LIMIT 1".format(stream_name), stream_properties=streamProperties, return_objects=True
)

for row_obj in chunks:
self.assertEqual(row_obj["ORDER_ID"], 3)
self.assertEqual(row_obj["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(row_obj["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(row_obj["MY_ARRAY"], [1, 2, 3])
self.assertEqual(row_obj["TOTAL_AMOUNT"], 43)
self.assertEqual(row_obj["CUSTOMER_NAME"], "Palo Alto")

@vcr.use_cassette("tests/vcr_cassettes/bad_requests.yml")
def test_bad_requests(self):
broken_ksql_string = "noi"
Expand Down
55 changes: 55 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,58 @@ def test_get_dependent_queries(self):
read_queries, write_queries = utils.get_dependent_queries(self.api_client, stream_name_as)
self.assertEqual(read_queries, [])
self.assertTrue(write_queries[0].startswith("CSAS_KSQL_PYTHON_TEST_TEST_GET_DEPENDENT_QUERIES_AS"))

def test_parse_columns(self):
header_str = """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `MY_STRUCT` STRUCT<`A` INTEGER, `B` STRING>, `MY_MAP` MAP<STRING, INTEGER>, `MY_ARRAY` ARRAY<INTEGER>, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},"""

columns = utils.parse_columns(header_str)

self.assertEqual(columns[0], {'name': 'ORDER_ID', 'type': 'INTEGER'})
self.assertEqual(columns[1], {'name': 'MY_STRUCT', 'type': 'STRUCT'})
self.assertEqual(columns[2], {'name': 'MY_MAP', 'type': 'MAP'})
self.assertEqual(columns[3], {'name': 'MY_ARRAY', 'type': 'ARRAY'})
self.assertEqual(columns[4], {'name': 'TOTAL_AMOUNT', 'type': 'DOUBLE'})
self.assertEqual(columns[5], {'name': 'CUSTOMER_NAME', 'type': 'STRING'})

def test_process_row(self):
parsed_header = [{'name': 'ORDER_ID', 'type': 'INTEGER'}, {'name': 'MY_STRUCT', 'type': 'STRUCT'}, {'name': 'MY_MAP', 'type': 'MAP'}, {'name': 'MY_ARRAY', 'type': 'ARRAY'}, {'name': 'TOTAL_AMOUNT', 'type': 'DOUBLE'}, {'name': 'CUSTOMER_NAME', 'type': 'STRING'}]
row_str = """{"row":{"columns":[3,{"A":1,"B":"bbb"},{"x":3,"y":4},[1,2,3],43.0,"Palo Alto"]}},\n"""

row_obj = utils.process_row(row_str, parsed_header)

self.assertEqual(row_obj["ORDER_ID"], 3)
self.assertEqual(row_obj["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(row_obj["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(row_obj["MY_ARRAY"], [1, 2, 3])
self.assertEqual(row_obj["TOTAL_AMOUNT"], 43)
self.assertEqual(row_obj["CUSTOMER_NAME"], "Palo Alto")

def test_process_query_result(self):
def mock_generator():
results = [1,2,3,4,5,6]
for a in results:
yield a

results = utils.process_query_result(mock_generator())

first_result = next(results)
self.assertEqual(first_result, 1)

def test_process_query_result_parse_rows(self):
def mock_generator():
header_str = """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `MY_STRUCT` STRUCT<`A` INTEGER, `B` STRING>, `MY_MAP` MAP<STRING, INTEGER>, `MY_ARRAY` ARRAY<INTEGER>, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},"""
row_str = """{"row":{"columns":[3,{"A":1,"B":"bbb"},{"x":3,"y":4},[1,2,3],43.0,"Palo Alto"]}},\n"""

results = [header_str, row_str]
for a in results:
yield a

rows = utils.process_query_result(mock_generator(), True)

first_row = next(rows)
self.assertEqual(first_row["ORDER_ID"], 3)
self.assertEqual(first_row["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(first_row["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(first_row["MY_ARRAY"], [1, 2, 3])
self.assertEqual(first_row["TOTAL_AMOUNT"], 43)
self.assertEqual(first_row["CUSTOMER_NAME"], "Palo Alto")