-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #11 from Nasdaq/ncds_pytest
Integration test top-level and util file
- Loading branch information
Showing
2 changed files
with
272 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
from ncdssdk.src.tests.utils.NCDSTestUtil import NCDSTestUtil | ||
from ncdssdk import NCDSClient | ||
import json | ||
import pytest | ||
|
||
|
||
ncds_test_util = NCDSTestUtil() | ||
|
||
|
||
def test_NCDS_client(): | ||
ncds_client = NCDSClient(None, None) | ||
assert ncds_client is not None | ||
|
||
|
||
def test_list_topics_for_the_client(): | ||
ncds_client = NCDSClient(None, None) | ||
topics = ncds_client.list_topics_for_client() | ||
added_topics = ncds_test_util.get_added_topics() | ||
assert topics.sort() == added_topics.sort() | ||
|
||
|
||
def test_get_schema_for_the_topic(): | ||
ncds_client = NCDSClient(None, None) | ||
topic = "GIDS" | ||
schema_from_sdk = ncds_client.get_schema_for_topic(topic) | ||
schema_file = "testGIDS.avsc" | ||
schema_from_file = ncds_test_util.get_schema_for_topic(schema_file) | ||
assert schema_from_sdk == schema_from_file | ||
|
||
|
||
def test_top_messages_with_timestamp(): | ||
topic = "MOCK" | ||
mock_records = ncds_test_util.get_mock_messages() | ||
for mock_record in mock_records: | ||
mock_record["schema_name"] = "SeqEtpIpvValue" | ||
mock_records_from_kafka = [] | ||
timestamp = ncds_test_util.timestamp_to_seek_from | ||
ncds_client = NCDSClient(None, None) | ||
records = ncds_client.top_messages(topic, timestamp) | ||
for record in records: | ||
print(record.offset(), record.timestamp()) | ||
mock_records_from_kafka.append(json.loads(record.value())) | ||
assert len(mock_records_from_kafka) == 8 | ||
assert mock_records[2:] == mock_records_from_kafka | ||
|
||
|
||
def test_insertion(): | ||
mock_records = ncds_test_util.get_mock_messages() | ||
for mock_record in mock_records: | ||
mock_record["schema_name"] = "SeqEtpIpvValue" | ||
mock_records_from_kafka = [] | ||
topic = "MOCK" | ||
ncds_client = NCDSClient(None, None) | ||
records = ncds_client.top_messages(topic) | ||
for record in records: | ||
mock_records_from_kafka.append(json.loads(record.value())) | ||
assert mock_records == mock_records_from_kafka | ||
|
||
|
||
def test_get_sample_message(): | ||
mock_records = ncds_test_util.get_mock_messages() | ||
mock_msg = mock_records[-1] | ||
mock_msg["schema_name"] = "SeqEtpIpvValue" | ||
|
||
topic = "MOCK" | ||
msg_name = "SeqEtpIpvValue" | ||
ncds_client = NCDSClient(None, None) | ||
record = ncds_client.get_sample_messages(topic, msg_name, False) | ||
|
||
assert str(mock_msg) == record | ||
|
||
|
||
def test_get_all_sample_messages(): | ||
GIDS_records = ncds_test_util.get_GIDS_messages() | ||
for i, record in enumerate(GIDS_records): | ||
if i < 5: | ||
record["schema_name"] = "SeqEquitiesSummary" | ||
else: | ||
record["schema_name"] = "SeqEtpIpvValue" | ||
topic = "GIDS" | ||
msg_name = "SeqEtpIpvValue" | ||
ncds_client = NCDSClient(None, None) | ||
records = ncds_client.get_sample_messages(topic, msg_name, True) | ||
print("mock records: ", GIDS_records) | ||
print("records from ncdsclient: ", records) | ||
assert len(records) == 5 | ||
assert GIDS_records[5:] == records | ||
|
||
|
||
def test_get_sample_message_incorrect_topic(): | ||
mock_records = ncds_test_util.get_mock_messages() | ||
mock_msg = mock_records[0] | ||
mock_msg["schema_name"] = "SeqEtpIpvValue" | ||
|
||
topic = "MUCK" | ||
msg_name = "SeqEtpIpvValue" | ||
ncds_client = NCDSClient(None, None) | ||
|
||
with pytest.raises(Exception): | ||
ncds_client.get_sample_messages(topic, msg_name, False) | ||
|
||
|
||
def test_get_schema_for_the_incorrect_topic(): | ||
ncds_client = NCDSClient(None, None) | ||
topic = "MOCK" | ||
schema_from_sdk = ncds_client.get_schema_for_topic(topic) | ||
schema_file = "testGIDS.avsc" | ||
schema_from_file = ncds_test_util.get_schema_for_topic(schema_file) | ||
assert schema_from_sdk != schema_from_file | ||
|
||
# with pytest.raises(Exception): | ||
# schema_from_file = ncds_test_util.get_schema_for_topic(schema_file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
import uuid | ||
import avro | ||
import logging | ||
from importlib import resources | ||
import ncdssdk.src.tests.resources as testresources | ||
from confluent_kafka import SerializingProducer | ||
import time | ||
from ncdssdk.src.tests.utils.AvroMocker import AvroMocker | ||
from ncdssdk.src.tests.utils.AvroSerializer import AvroSerializer | ||
|
||
|
||
class NCDSTestUtil: | ||
def __init__(self): | ||
self.kafka_test_server = None | ||
self.kafka_test_utils = None | ||
self.config = {} | ||
self.topics_on_stream = [] | ||
self.mock_messages_on_stream = [] | ||
self.GIDS_messages_on_stream = [] | ||
self.ctrl_topic = "control" | ||
self.mock_data_stream = "MOCK.stream" | ||
self.GIDS_data_stream = "GIDS.stream" | ||
self.timestamp_to_seek_from = 0 | ||
|
||
self.add_schemas_to_control_topic() | ||
self.push_mock_messages() | ||
self.push_gids_messages() | ||
|
||
def get_producer_config(self, schema_file): | ||
msg_str = resources.read_text( | ||
testresources, schema_file) | ||
schema = avro.schema.parse(msg_str) | ||
message_serializer = AvroSerializer(schema) | ||
|
||
kafka_connect_string = "localhost:9092" | ||
self.config["bootstrap.servers"] = kafka_connect_string | ||
self.config["client.id"] = uuid.uuid4() | ||
self.config["request.timeout.ms"] = 15000 | ||
self.config["value.serializer"] = message_serializer.encode | ||
|
||
return self.config | ||
|
||
def add_schemas_to_control_topic(self): | ||
try: | ||
# define our topics | ||
nls_key = "NLSUTP" | ||
gids_key = "GIDS" | ||
mock_key = "Mock" | ||
|
||
all_topics = [nls_key, gids_key, mock_key] | ||
|
||
all_records = [] | ||
|
||
ctrl_schema_file = 'ControlMessageSchema.avsc' | ||
ctrl_msg_str = resources.read_text(testresources, ctrl_schema_file) | ||
ctrl_schema = avro.schema.parse(ctrl_msg_str) | ||
mocker = AvroMocker(ctrl_schema.schemas[1], 1) | ||
|
||
for topic in all_topics: | ||
topic_string = "test"+topic+".avsc" | ||
msg_str = resources.read_text( | ||
testresources, topic_string) | ||
record = mocker.create_message() | ||
record["schema"] = str(msg_str) | ||
record["name"] = topic.upper() | ||
all_records.append(record) | ||
self.topics_on_stream.append(topic) | ||
|
||
# creating producer | ||
producer_config = self.get_producer_config(ctrl_schema_file) | ||
|
||
try: | ||
producer = SerializingProducer(producer_config) | ||
|
||
for record in all_records: | ||
producer.produce( | ||
self.ctrl_topic, value=record) | ||
|
||
producer.flush() | ||
|
||
# close producer | ||
except Exception as e: | ||
logging.exception(e) | ||
|
||
except Exception as e: | ||
logging.exception(e) | ||
|
||
def get_added_topics(self): | ||
return self.topics_on_stream | ||
|
||
def get_mock_messages(self): | ||
return self.mock_messages_on_stream | ||
|
||
def get_GIDS_messages(self): | ||
return self.GIDS_messages_on_stream | ||
|
||
def get_schema_for_topic(self, schema_file): | ||
schema_msg_str = resources.read_text(testresources, schema_file) | ||
return avro.schema.parse(schema_msg_str) | ||
|
||
def on_delivery(self, err, msg): | ||
if (msg.offset() == 2): | ||
self.timestamp_to_seek_from = msg.timestamp()[1] | ||
|
||
def push_mock_messages(self): | ||
records = self.get_mocker_generic_record(10) | ||
mock_schema_file = 'testMock.avsc' | ||
producer_config = self.get_producer_config(mock_schema_file) | ||
try: | ||
# create producer | ||
mocker_producer = SerializingProducer(producer_config) | ||
|
||
# add mock records to topic | ||
for record in records: | ||
self.mock_messages_on_stream.append(record) | ||
mocker_producer.produce( | ||
self.mock_data_stream, value=record, on_delivery=self.on_delivery) | ||
time.sleep(1) | ||
mocker_producer.flush() | ||
|
||
except Exception as e: | ||
logging.exception(e) | ||
|
||
def get_mocker_generic_record(self, number_of_records): | ||
mock_msg_str = resources.read_text(testresources, 'testMock.avsc') | ||
mocker_schema = avro.schema.parse(mock_msg_str) | ||
|
||
avro_mocker = AvroMocker(mocker_schema, number_of_records) | ||
return avro_mocker.generate_mock_messages() | ||
|
||
def push_gids_messages(self): | ||
records = self.get_GIDS_generic_record(5) | ||
GIDS_schema_file = 'testGIDS.avsc' | ||
producer_config = self.get_producer_config(GIDS_schema_file) | ||
try: | ||
# create producer | ||
GIDS_producer = SerializingProducer(producer_config) | ||
# add GIDS records to topic | ||
for record in records: | ||
self.GIDS_messages_on_stream.append(record) | ||
GIDS_producer.produce(self.GIDS_data_stream, value=record) | ||
|
||
GIDS_producer.flush() | ||
|
||
except Exception as e: | ||
logging.exception(e) | ||
|
||
def get_GIDS_generic_record(self, number_of_records): | ||
mock_msg_str = resources.read_text(testresources, 'testGIDS.avsc') | ||
GIDS_schema = avro.schema.parse(mock_msg_str) | ||
|
||
avro_mocker = AvroMocker(GIDS_schema.schemas[0], number_of_records) | ||
avro_mocker2 = AvroMocker(GIDS_schema.schemas[3], number_of_records) | ||
|
||
commodities_msgs = avro_mocker.generate_mock_messages() | ||
equities_msgs = avro_mocker2.generate_mock_messages() | ||
|
||
GIDS_messages = commodities_msgs + equities_msgs | ||
|
||
return GIDS_messages |