Skip to content

Commit

Permalink
Merge pull request #78 from harlev/parse_query_results
Browse files Browse the repository at this point in the history
Optional parsing of query result into objects
  • Loading branch information
bryanyang0528 committed Aug 27, 2020
2 parents 8a729c1 + 0c80ddb commit 11b7525
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 2 deletions.
7 changes: 5 additions & 2 deletions ksql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function

from ksql.api import SimplifiedAPI
from ksql.utils import process_query_result


class KSQLAPI(object):
Expand Down Expand Up @@ -41,15 +42,17 @@ def get_properties(self):
def ksql(self, ksql_string, stream_properties=None):
return self.sa.ksql(ksql_string, stream_properties=stream_properties)

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
return self.sa.query(
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, return_objects=None):
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)

yield from process_query_result(results, return_objects)

def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
return self.sa.create_stream(
table_name=table_name, columns_type=columns_type, topic=topic, value_format=value_format
Expand Down
44 changes: 44 additions & 0 deletions ksql/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import ksql
import telnetlib
import json
import re


def check_kafka_available(bootstrap_servers):
Expand Down Expand Up @@ -65,3 +67,45 @@ def get_dependent_queries(api_client, stream_name):
write_queries = [query["id"] for query in stream_info["writeQueries"]]

return read_queries, write_queries


def parse_columns(columns_str):
regex = r"(?<!\<)`(?P<name>[A-Z_]+)` (?P<type>[A-z]+)[\<, \"](?!\>)"
result = []

matches = re.finditer(regex, columns_str)
for matchNum, match in enumerate(matches, start=1):
result.append({"name": match.group("name"), "type": match.group("type")})

return result


def process_row(row, column_names):
row = row.replace(",\n", "").replace("]\n", "")
row_obj = json.loads(row)
if 'finalMessage' in row_obj:
return None
column_values = row_obj["row"]["columns"]
index = 0
result = {}
for column in column_values:
result[column_names[index]["name"]] = column
index += 1

return result


def process_query_result(results, return_objects=None):
if return_objects is None:
yield from results

# parse rows into objects
header = next(results)
columns = parse_columns(header)

for result in results:
row_obj = process_row(result, columns)
if row_obj is None:
return
yield row_obj

98 changes: 98 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,104 @@ def test_ksql_create_stream_w_properties(self):
self.assertTrue(chunk)
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_result_with_utils(self):
topic = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_TOPIC"
stream_name = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_STREAM"

producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(topic, """{"order_id":3,"my_struct":{"a":1,"b":"bbb"}, "my_map":{"x":3, "y":4}, "my_array":[1,2,3], "total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

ksql_string = "CREATE STREAM {} (ORDER_ID INT, MY_STRUCT STRUCT<A INT, B VARCHAR>, MY_MAP MAP<VARCHAR, INT>, MY_ARRAY ARRAY<INT>, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if stream_name not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties
)
header = next(chunks)
columns = utils.parse_columns(header)

for chunk in chunks:
row_obj = utils.process_row(chunk, columns)
self.assertEqual(row_obj["ORDER_ID"], 3)
self.assertEqual(row_obj["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(row_obj["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(row_obj["MY_ARRAY"], [1, 2, 3])
self.assertEqual(row_obj["TOTAL_AMOUNT"], 43)
self.assertEqual(row_obj["CUSTOMER_NAME"], "Palo Alto")
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_result(self):
topic = "TEST_KSQL_PARSE_QUERY_RESULT_TOPIC"
stream_name = "TEST_KSQL_PARSE_QUERY_RESULT_STREAM"

producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(topic, """{"order_id":3,"my_struct":{"a":1,"b":"bbb"}, "my_map":{"x":3, "y":4}, "my_array":[1,2,3], "total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

ksql_string = "CREATE STREAM {} (ORDER_ID INT, MY_STRUCT STRUCT<A INT, B VARCHAR>, MY_MAP MAP<VARCHAR, INT>, MY_ARRAY ARRAY<INT>, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if stream_name not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, return_objects=True
)

for chunk in chunks:
self.assertEqual(chunk["ORDER_ID"], 3)
self.assertEqual(chunk["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(chunk["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(chunk["MY_ARRAY"], [1, 2, 3])
self.assertEqual(chunk["TOTAL_AMOUNT"], 43)
self.assertEqual(chunk["CUSTOMER_NAME"], "Palo Alto")
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_final_message(self):
topic = "TEST_KSQL_PARSE_QUERY_FINAL_MESSAGE_TOPIC"
stream_name = "TEST_KSQL_PARSE_QUERY_FINAL_MESSAGE_STREAM"

producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(topic, """{"order_id":3,"my_struct":{"a":1,"b":"bbb"}, "my_map":{"x":3, "y":4}, "my_array":[1,2,3], "total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

ksql_string = "CREATE STREAM {} (ORDER_ID INT, MY_STRUCT STRUCT<A INT, B VARCHAR>, MY_MAP MAP<VARCHAR, INT>, MY_ARRAY ARRAY<INT>, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)
streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if stream_name not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

chunks = self.api_client.query(
"select * from {} EMIT CHANGES LIMIT 1".format(stream_name), stream_properties=streamProperties, return_objects=True
)

for row_obj in chunks:
self.assertEqual(row_obj["ORDER_ID"], 3)
self.assertEqual(row_obj["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(row_obj["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(row_obj["MY_ARRAY"], [1, 2, 3])
self.assertEqual(row_obj["TOTAL_AMOUNT"], 43)
self.assertEqual(row_obj["CUSTOMER_NAME"], "Palo Alto")

@vcr.use_cassette("tests/vcr_cassettes/bad_requests.yml")
def test_bad_requests(self):
broken_ksql_string = "noi"
Expand Down
55 changes: 55 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,58 @@ def test_get_dependent_queries(self):
read_queries, write_queries = utils.get_dependent_queries(self.api_client, stream_name_as)
self.assertEqual(read_queries, [])
self.assertTrue(write_queries[0].startswith("CSAS_KSQL_PYTHON_TEST_TEST_GET_DEPENDENT_QUERIES_AS"))

def test_parse_columns(self):
header_str = """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `MY_STRUCT` STRUCT<`A` INTEGER, `B` STRING>, `MY_MAP` MAP<STRING, INTEGER>, `MY_ARRAY` ARRAY<INTEGER>, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},"""

columns = utils.parse_columns(header_str)

self.assertEqual(columns[0], {'name': 'ORDER_ID', 'type': 'INTEGER'})
self.assertEqual(columns[1], {'name': 'MY_STRUCT', 'type': 'STRUCT'})
self.assertEqual(columns[2], {'name': 'MY_MAP', 'type': 'MAP'})
self.assertEqual(columns[3], {'name': 'MY_ARRAY', 'type': 'ARRAY'})
self.assertEqual(columns[4], {'name': 'TOTAL_AMOUNT', 'type': 'DOUBLE'})
self.assertEqual(columns[5], {'name': 'CUSTOMER_NAME', 'type': 'STRING'})

def test_process_row(self):
parsed_header = [{'name': 'ORDER_ID', 'type': 'INTEGER'}, {'name': 'MY_STRUCT', 'type': 'STRUCT'}, {'name': 'MY_MAP', 'type': 'MAP'}, {'name': 'MY_ARRAY', 'type': 'ARRAY'}, {'name': 'TOTAL_AMOUNT', 'type': 'DOUBLE'}, {'name': 'CUSTOMER_NAME', 'type': 'STRING'}]
row_str = """{"row":{"columns":[3,{"A":1,"B":"bbb"},{"x":3,"y":4},[1,2,3],43.0,"Palo Alto"]}},\n"""

row_obj = utils.process_row(row_str, parsed_header)

self.assertEqual(row_obj["ORDER_ID"], 3)
self.assertEqual(row_obj["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(row_obj["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(row_obj["MY_ARRAY"], [1, 2, 3])
self.assertEqual(row_obj["TOTAL_AMOUNT"], 43)
self.assertEqual(row_obj["CUSTOMER_NAME"], "Palo Alto")

def test_process_query_result(self):
def mock_generator():
results = [1,2,3,4,5,6]
for a in results:
yield a

results = utils.process_query_result(mock_generator())

first_result = next(results)
self.assertEqual(first_result, 1)

def test_process_query_result_parse_rows(self):
def mock_generator():
header_str = """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `MY_STRUCT` STRUCT<`A` INTEGER, `B` STRING>, `MY_MAP` MAP<STRING, INTEGER>, `MY_ARRAY` ARRAY<INTEGER>, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},"""
row_str = """{"row":{"columns":[3,{"A":1,"B":"bbb"},{"x":3,"y":4},[1,2,3],43.0,"Palo Alto"]}},\n"""

results = [header_str, row_str]
for a in results:
yield a

rows = utils.process_query_result(mock_generator(), True)

first_row = next(rows)
self.assertEqual(first_row["ORDER_ID"], 3)
self.assertEqual(first_row["MY_STRUCT"], {"A": 1, "B": "bbb"})
self.assertEqual(first_row["MY_MAP"], {"x": 3, "y": 4})
self.assertEqual(first_row["MY_ARRAY"], [1, 2, 3])
self.assertEqual(first_row["TOTAL_AMOUNT"], 43)
self.assertEqual(first_row["CUSTOMER_NAME"], "Palo Alto")

0 comments on commit 11b7525

Please sign in to comment.