diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h index 0f7d8a8b..b99de08c 100644 --- a/include/pulsar/Result.h +++ b/include/pulsar/Result.h @@ -92,6 +92,8 @@ enum Result ResultMemoryBufferIsFull, /// Client-wide memory limit has been reached ResultInterrupted, /// Interrupted while waiting to dequeue + + ResultNotFound /// The generic was not found }; // Return string representation of result code diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h index ad64c0ba..93dac4fb 100644 --- a/include/pulsar/Schema.h +++ b/include/pulsar/Schema.h @@ -134,6 +134,8 @@ enum SchemaType // Return string representation of result code PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType); +PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr); + class SchemaInfoImpl; typedef std::map StringMap; @@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo { private: typedef std::shared_ptr SchemaInfoImplPtr; SchemaInfoImplPtr impl_; - static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF; }; } // namespace pulsar diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index b863d529..6ead7004 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -155,6 +155,43 @@ Future BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } +Future> BinaryProtoLookupService::getSchema( + const TopicNamePtr& topicName) { + GetSchemaPromisePtr promise = std::make_shared>>(); + + if (!topicName) { + promise->setFailed(ResultInvalidTopicName); + return promise->getFuture(); + } + cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) + .addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(), + std::placeholders::_1, std::placeholders::_2, promise)); + + return promise->getFuture(); +} + +void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topiName, Result result, + const ClientConnectionWeakPtr& clientCnx, + GetSchemaPromisePtr promise) { + if (result != ResultOk) { + promise->setFailed(result); + return; + } + + ClientConnectionPtr conn = clientCnx.lock(); + uint64_t requestId = newRequestId(); + LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topiName); + + conn->newGetSchema(topiName, requestId) + .addListener([promise](Result result, boost::optional schemaInfo) { + if (result != ResultOk) { + promise->setFailed(result); + return; + } + promise->setValue(schemaInfo); + }); +} + void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise) { diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index 9adb6483..9740fb19 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -20,6 +20,7 @@ #define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_ #include +#include #include @@ -32,6 +33,7 @@ class ConnectionPool; class LookupDataResult; class ServiceNameResolver; using NamespaceTopicsPromisePtr = std::shared_ptr>; +using GetSchemaPromisePtr = std::shared_ptr>>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: @@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; + Future> getSchema(const TopicNamePtr& topicName) override; + private: std::mutex mutex_; uint64_t requestIdGenerator_ = 0; @@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise); + void sendGetSchemaRequest(const std::string& topiName, Result result, + const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise); + void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, NamespaceTopicsPromisePtr promise); diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 51a09f45..91d22d33 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { break; } + case BaseCommand::GET_SCHEMA_RESPONSE: { + const auto& response = incomingCmd.getschemaresponse(); + LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: " + << response.request_id()); + Lock lock(mutex_); + PendingGetSchemaMap::iterator it = pendingGetSchemaRequests_.find(response.request_id()); + if (it != pendingGetSchemaRequests_.end()) { + Promise> getSchemaPromise = it->second; + pendingGetSchemaRequests_.erase(it); + lock.unlock(); + + if (response.has_error_code()) { + if (response.error_code() == proto::TopicNotFound) { + getSchemaPromise.setValue(boost::none); + } else { + Result result = getResult(response.error_code(), response.error_message()); + LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server " + << result + << (response.has_error_message() + ? (" (" + response.error_message() + ")") + : "") + << " -- req_id: " << response.request_id()); + getSchemaPromise.setFailed(result); + } + return; + } + + auto schema = response.schema(); + auto properMap = schema.properties(); + StringMap properties; + for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) { + properties[kv->key()] = kv->value(); + } + SchemaInfo schemaInfo(static_cast(schema.type()), "", + schema.schema_data(), properties); + getSchemaPromise.setValue(schemaInfo); + } else { + lock.unlock(); + LOG_WARN( + "GetSchemaResponse command - Received unknown request id from " + "server: " + << response.request_id()); + } + break; + } + default: { LOG_WARN(cnxString_ << "Received invalid message from server"); close(); @@ -1704,6 +1750,23 @@ Future ClientConnection::newGetTopicsOfNamespace(con return promise.getFuture(); } +Future> ClientConnection::newGetSchema(const std::string& topicName, + uint64_t requestId) { + Lock lock(mutex_); + Promise> promise; + if (isClosed()) { + lock.unlock(); + LOG_ERROR(cnxString_ << "Client is not connected to the broker"); + promise.setFailed(ResultNotConnected); + return promise.getFuture(); + } + + pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise)); + lock.unlock(); + sendCommand(Commands::newGetSchema(topicName, requestId)); + return promise.getFuture(); +} + void ClientConnection::closeSocket() { boost::system::error_code err; if (socket_) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 62a5e9bb..9c83fd9e 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -168,6 +168,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); + Future> newGetSchema(const std::string& topicName, + uint64_t requestId); + private: struct PendingRequestData { Promise promise; @@ -320,6 +323,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; + typedef std::map>> PendingGetSchemaMap; + PendingGetSchemaMap pendingGetSchemaRequests_; + mutable std::mutex mutex_; typedef std::unique_lock Lock; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a9c16536..d9520053 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura return; } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, conf, callback)); + + if (conf.getSchema().getSchemaType() == AUTO_PUBLISH) { + auto self = shared_from_this(); + auto confPtr = std::make_shared(conf); + lookupServicePtr_->getSchema(topicName).addListener( + [self, topicName, confPtr, callback](Result res, boost::optional topicSchema) { + if (res != ResultOk) { + callback(res, Producer()); + } + if (topicSchema) { + confPtr->setSchema(topicSchema.get()); + } + + self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, + std::placeholders::_2, topicName, *confPtr, callback)); + }); + } else { + lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, + std::placeholders::_2, topicName, conf, callback)); + } } void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata, diff --git a/lib/Commands.cc b/lib/Commands.cc index 5bb9587e..f22cea5d 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -157,6 +157,21 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat return buffer; } +SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) { + static BaseCommand cmd; + static std::mutex mutex; + std::lock_guard lock(mutex); + cmd.set_type(BaseCommand::GET_SCHEMA); + + auto getSchema = cmd.mutable_getschema(); + getSchema->set_topic(topic); + getSchema->set_request_id(requestId); + + const SharedBuffer buffer = writeMessageWithSize(cmd); + cmd.clear_getschema(); + return buffer; +} + SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) { static BaseCommand cmd; static std::mutex mutex; @@ -846,5 +861,6 @@ bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) { bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; } bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; } + } // namespace pulsar /* namespace pulsar */ diff --git a/lib/Commands.h b/lib/Commands.h index bbe96fdb..04c2813a 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -85,6 +85,8 @@ class Commands { static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId, const std::string& listenerName); + static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId); + static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId, uint64_t sequenceId, ChecksumType checksumType, const proto::MessageMetadata& metadata, const SharedBuffer& payload); diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 8167b641..40abf4d5 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -27,6 +27,7 @@ #include "ExecutorService.h" #include "LogUtils.h" #include "NamespaceName.h" +#include "SchemaUtils.h" #include "ServiceNameResolver.h" #include "TopicName.h" namespace ptree = boost::property_tree; @@ -143,6 +144,25 @@ Future HTTPLookupService::getTopicsOfNamespaceAsync( return promise.getFuture(); } +Future> HTTPLookupService::getSchema(const TopicNamePtr &topicName) { + Promise> promise; + std::stringstream completeUrlStream; + + const auto &url = serviceNameResolver_.resolveHost(); + if (topicName->isV2Topic()) { + completeUrlStream << url << ADMIN_PATH_V2 << "schemas/" << topicName->getProperty() << '/' + << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() + << "/schema"; + } else { + completeUrlStream << url << ADMIN_PATH_V1 << "schemas/" << topicName->getProperty() << '/' + << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' + << topicName->getEncodedLocalName() << "/schema"; + } + executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest, + shared_from_this(), promise, completeUrlStream.str())); + return promise.getFuture(); +} + static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) { ((std::string *)responseDataPtr)->append((char *)contents, size * nmemb); return size * nmemb; @@ -263,9 +283,6 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & switch (res) { case CURLE_OK: - long response_code; - curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); - LOG_INFO("Response received for url " << completeUrl << " code " << response_code); if (response_code == 200) { retResult = ResultOk; } else if (needRedirection(response_code)) { @@ -286,7 +303,11 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & case CURLE_COULDNT_RESOLVE_HOST: case CURLE_HTTP_RETURNED_ERROR: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - retResult = ResultConnectError; + if (response_code == 404) { + retResult = ResultNotFound; + } else { + retResult = ResultConnectError; + } break; case CURLE_READ_ERROR: LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); @@ -409,4 +430,72 @@ void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std } } +void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl) { + std::string responseData; + Result result = sendHTTPRequest(completeUrl, responseData); + + if (result == ResultNotFound) { + promise.setValue(boost::none); + } else if (result != ResultOk) { + promise.setFailed(result); + } else { + ptree::ptree root; + std::stringstream stream(responseData); + try { + ptree::read_json(stream, root); + } catch (ptree::json_parser_error &e) { + LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() + << "\nInput Json = " << responseData); + promise.setFailed(ResultInvalidMessage); + return; + } + const std::string defaultNotFoundString = "Not found"; + auto schemaTypeStr = root.get("type", defaultNotFoundString); + if (schemaTypeStr == defaultNotFoundString) { + LOG_ERROR("malformed json! - type not present" << responseData); + promise.setFailed(ResultInvalidMessage); + return; + } + auto schemaData = root.get("data", defaultNotFoundString); + if (schemaData == defaultNotFoundString) { + LOG_ERROR("malformed json! - data not present" << responseData); + promise.setFailed(ResultInvalidMessage); + return; + } + + auto schemaType = enumSchemaType(schemaTypeStr); + if (schemaType == KEY_VALUE) { + ptree::ptree kvRoot; + std::stringstream kvStream(schemaData); + try { + ptree::read_json(kvStream, kvRoot); + } catch (ptree::json_parser_error &e) { + LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() + << "\nInput Json = " << schemaData); + promise.setFailed(ResultInvalidMessage); + return; + } + std::stringstream keyStream; + ptree::write_json(keyStream, kvRoot.get_child("key"), false); + std::stringstream valueStream; + ptree::write_json(valueStream, kvRoot.get_child("value"), false); + auto keyData = keyStream.str(); + auto valueData = valueStream.str(); + // Remove the last line break. + keyData.pop_back(); + valueData.pop_back(); + schemaData = mergeKeyValueSchema(keyData, valueData); + } + + StringMap properties; + auto propertiesTree = root.get_child("properties"); + for (const auto &item : propertiesTree) { + properties[item.first] = item.second.get_value(); + } + + SchemaInfo schemaInfo = SchemaInfo(schemaType, "", schemaData, properties); + promise.setValue(schemaInfo); + } +} + } // namespace pulsar diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 929d7ab1..eeaadae7 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -28,6 +28,7 @@ namespace pulsar { class ServiceNameResolver; using NamespaceTopicsPromise = Promise; using NamespaceTopicsPromisePtr = std::shared_ptr; +using GetSchemaPromise = Promise>; class HTTPLookupService : public LookupService, public std::enable_shared_from_this { class CurlInitializer { @@ -62,6 +63,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t void handleLookupHTTPRequest(LookupPromise, const std::string, RequestType); void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, const std::string completeUrl); + void handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl); Result sendHTTPRequest(std::string completeUrl, std::string& responseData); @@ -72,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future getPartitionMetadataAsync(const TopicNamePtr&) override; + Future> getSchema(const TopicNamePtr& topicName) override; + Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; }; } // namespace pulsar diff --git a/lib/LookupService.h b/lib/LookupService.h index 6af290ce..9048b210 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -21,6 +21,7 @@ #include +#include #include #include #include @@ -72,6 +73,14 @@ class LookupService { */ virtual Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) = 0; + /** + * returns current SchemaInfo {@link SchemaInfo} for a given topic. + * + * @param topicName topic-name + * @return SchemaInfo + */ + virtual Future> getSchema(const TopicNamePtr& topicName) = 0; + virtual ~LookupService() {} }; diff --git a/lib/Result.cc b/lib/Result.cc index 3533b1ec..9d0ff106 100644 --- a/lib/Result.cc +++ b/lib/Result.cc @@ -40,6 +40,9 @@ const char* strResult(Result result) { case ResultTimeout: return "TimeOut"; + case ResultNotFound: + return "ResultNotFound"; + case ResultLookupError: return "LookupError"; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 7d704ec5..f16ff751 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -66,6 +66,12 @@ class RetryableLookupService : public LookupService, [this, nsName] { return lookupService_->getTopicsOfNamespaceAsync(nsName); }); } + Future> getSchema(const TopicNamePtr& topicName) override { + return executeAsync>( + "get-schema" + topicName->toString(), + [this, topicName] { return lookupService_->getSchema(topicName); }); + } + template Future executeAsync(const std::string& key, std::function()> f) { Promise promise; diff --git a/lib/Schema.cc b/lib/Schema.cc index 300c91d7..d77822a2 100644 --- a/lib/Schema.cc +++ b/lib/Schema.cc @@ -25,7 +25,8 @@ #include #include -#include "SharedBuffer.h" +#include "SchemaUtils.h" + using boost::property_tree::ptree; using boost::property_tree::read_json; using boost::property_tree::write_json; @@ -40,14 +41,6 @@ PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::KeyValueEncoding namespace pulsar { -static const std::string KEY_SCHEMA_NAME = "key.schema.name"; -static const std::string KEY_SCHEMA_TYPE = "key.schema.type"; -static const std::string KEY_SCHEMA_PROPS = "key.schema.properties"; -static const std::string VALUE_SCHEMA_NAME = "value.schema.name"; -static const std::string VALUE_SCHEMA_TYPE = "value.schema.type"; -static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties"; -static const std::string KV_ENCODING_TYPE = "kv.encoding.type"; - PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) { switch (encodingType) { case KeyValueEncodingType::INLINE: @@ -112,6 +105,44 @@ PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType) { return "UnknownSchemaType"; } +PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr) { + if (schemaTypeStr == "NONE") { + return NONE; + } else if (schemaTypeStr == "STRING") { + return STRING; + } else if (schemaTypeStr == "INT8") { + return INT8; + } else if (schemaTypeStr == "INT16") { + return INT16; + } else if (schemaTypeStr == "INT32") { + return INT32; + } else if (schemaTypeStr == "INT64") { + return INT64; + } else if (schemaTypeStr == "FLOAT") { + return FLOAT; + } else if (schemaTypeStr == "DOUBLE") { + return DOUBLE; + } else if (schemaTypeStr == "BYTES") { + return BYTES; + } else if (schemaTypeStr == "JSON") { + return JSON; + } else if (schemaTypeStr == "PROTOBUF") { + return PROTOBUF; + } else if (schemaTypeStr == "AVRO") { + return AVRO; + } else if (schemaTypeStr == "AUTO_CONSUME") { + return AUTO_CONSUME; + } else if (schemaTypeStr == "AUTO_PUBLISH") { + return AUTO_PUBLISH; + } else if (schemaTypeStr == "KEY_VALUE") { + return KEY_VALUE; + } else if (schemaTypeStr == "PROTOBUF_NATIVE") { + return PROTOBUF_NATIVE; + } else { + throw std::invalid_argument("No match schema type: " + schemaTypeStr); + } +} + class PULSAR_PUBLIC SchemaInfoImpl { public: const std::string name_; @@ -134,18 +165,6 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema, const KeyValueEncodingType &keyValueEncodingType) { - std::string keySchemaStr = keySchema.getSchema(); - std::string valueSchemaStr = valueSchema.getSchema(); - uint32_t keySize = keySchemaStr.size(); - uint32_t valueSize = valueSchemaStr.size(); - - auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize; - SharedBuffer buffer = SharedBuffer::allocate(buffSize); - buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : static_cast(keySize)); - buffer.write(keySchemaStr.c_str(), static_cast(keySize)); - buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : static_cast(valueSize)); - buffer.write(valueSchemaStr.c_str(), static_cast(valueSize)); - auto writeJson = [](const StringMap &properties) { ptree pt; for (auto &entry : properties) { @@ -167,8 +186,10 @@ SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchem properties.emplace(VALUE_SCHEMA_PROPS, writeJson(valueSchema.getProperties())); properties.emplace(KV_ENCODING_TYPE, strEncodingType(keyValueEncodingType)); - impl_ = std::make_shared(KEY_VALUE, "KeyValue", std::string(buffer.data(), buffSize), - properties); + std::string keySchemaStr = keySchema.getSchema(); + std::string valueSchemaStr = valueSchema.getSchema(); + impl_ = std::make_shared(KEY_VALUE, "KeyValue", + mergeKeyValueSchema(keySchemaStr, valueSchemaStr), properties); } SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; } diff --git a/lib/SchemaUtils.h b/lib/SchemaUtils.h new file mode 100644 index 00000000..a298a034 --- /dev/null +++ b/lib/SchemaUtils.h @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef SCHEMA_UTILS_HPP_ +#define SCHEMA_UTILS_HPP_ + +#include "SharedBuffer.h" + +namespace pulsar { + +static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF; +static const std::string KEY_SCHEMA_NAME = "key.schema.name"; +static const std::string KEY_SCHEMA_TYPE = "key.schema.type"; +static const std::string KEY_SCHEMA_PROPS = "key.schema.properties"; +static const std::string VALUE_SCHEMA_NAME = "value.schema.name"; +static const std::string VALUE_SCHEMA_TYPE = "value.schema.type"; +static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties"; +static const std::string KV_ENCODING_TYPE = "kv.encoding.type"; + +/** + * Merge keySchemaData and valueSchemaData. + * @return + */ +static std::string mergeKeyValueSchema(const std::string& keySchemaData, const std::string& valueSchemaData) { + uint32_t keySize = keySchemaData.size(); + uint32_t valueSize = valueSchemaData.size(); + + auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize; + SharedBuffer buffer = SharedBuffer::allocate(buffSize); + buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : static_cast(keySize)); + buffer.write(keySchemaData.c_str(), static_cast(keySize)); + buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : static_cast(valueSize)); + buffer.write(valueSchemaData.c_str(), static_cast(valueSize)); + + return std::string(buffer.data(), buffSize); +} + +} // namespace pulsar + +#endif /* SCHEMA_UTILS_HPP_ */ diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf index 2ee44322..2b73845c 100644 --- a/test-conf/standalone-ssl.conf +++ b/test-conf/standalone-ssl.conf @@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000 # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true +# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data. +isSchemaValidationEnforced=true + # How often to check for topics that have reached the quota backlogQuotaCheckIntervalInSeconds=60 diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 5ac41224..3964b93a 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -39,6 +39,9 @@ using namespace pulsar; DECLARE_LOG_OBJECT() +static std::string binaryLookupUrl = "pulsar://localhost:6650"; +static std::string httpLookupUrl = "http://localhost:8080"; + TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared(1); AuthenticationPtr authData = AuthFactory::Disabled(); @@ -274,3 +277,82 @@ TEST(LookupServiceTest, testTimeout) { ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); } + +class LookupServiceTest : public ::testing::TestWithParam { + public: + void TearDown() override { client_.close(); } + + protected: + Client client_{GetParam()}; +}; + +TEST_P(LookupServiceTest, testGetSchema) { + const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); + std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + + StringMap properties; + properties.emplace("key1", "value1"); + properties.emplace("key2", "value2"); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json", jsonSchema, properties)); + Producer producer; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); + auto lookup = clientImplPtr->getLookup(); + + boost::optional schemaInfo; + auto future = lookup->getSchema(TopicName::get(topic)); + ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_EQ(jsonSchema, schemaInfo->getSchema()); + ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType()); + ASSERT_EQ(properties, schemaInfo->getProperties()); +} + +TEST_P(LookupServiceTest, testGetSchemaNotFund) { + const std::string topic = + "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); + + Producer producer; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); + auto lookup = clientImplPtr->getLookup(); + + boost::optional schemaInfo; + auto future = lookup->getSchema(TopicName::get(topic)); + ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_TRUE(!schemaInfo); +} + +TEST_P(LookupServiceTest, testGetKeyValueSchema) { + const std::string topic = + "testGetKeyValueSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); + StringMap properties; + properties.emplace("key1", "value1"); + properties.emplace("key2", "value2"); + std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties); + SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties); + SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(keyValueSchema); + Producer producer; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); + auto lookup = clientImplPtr->getLookup(); + + boost::optional schemaInfo; + auto future = lookup->getSchema(TopicName::get(topic)); + ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema()); + ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType()); + ASSERT_TRUE(!schemaInfo->getProperties().empty()); +} + +INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); \ No newline at end of file diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc index cf0f8c7c..af6c5f2b 100644 --- a/tests/SchemaTest.cc +++ b/tests/SchemaTest.cc @@ -24,7 +24,6 @@ using namespace pulsar; static std::string lookupUrl = "pulsar://localhost:6650"; - static const std::string exampleSchema = R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})"; @@ -50,11 +49,10 @@ TEST(SchemaTest, testSchema) { res = client.createProducer("topic-avro", producerConf, producer); ASSERT_EQ(ResultIncompatibleSchema, res); - // Creating producer with no schema on same topic should succeed - // because standalone broker is configured by default to not - // require the schema to be set + // Creating producer with no schema on same topic should failed. + // Because we set broker config isSchemaValidationEnforced=true res = client.createProducer("topic-avro", producer); - ASSERT_EQ(ResultOk, res); + ASSERT_EQ(ResultIncompatibleSchema, res); ConsumerConfiguration consumerConf; Consumer consumer; @@ -154,3 +152,28 @@ TEST(SchemaTest, testValueSchemaIsEmpty) { int valueSchemaSize = buffer.readUnsignedInt(); ASSERT_EQ(valueSchemaSize, -1); } + +TEST(SchemaTest, testAutoPublicSchema) { + const std::string topic = "testAutoPublicSchema" + std::to_string(time(nullptr)); + std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo schema(JSON, "test-schema", jsonSchema); + + Client client(lookupUrl); + + ConsumerConfiguration consumerConfiguration; + consumerConfiguration.setSchema(schema); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer)); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(SchemaInfo(pulsar::AUTO_PUBLISH, "", "")); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + Message msg = MessageBuilder().setContent("content").build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + + ASSERT_EQ(ResultOk, consumer.receive(msg)); + ASSERT_EQ("content", msg.getDataAsString()); +}