Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for MongoDB Replica Set URI with readPreference and host:port enum in MongoDB dictionaries #46524

Merged
merged 10 commits into from Mar 7, 2023
8 changes: 8 additions & 0 deletions base/poco/MongoDB/include/Poco/MongoDB/Connection.h
Expand Up @@ -90,6 +90,9 @@ namespace MongoDB

Poco::Net::SocketAddress address() const;
/// Returns the address of the MongoDB server.

const std::string & uri() const;
/// Returns the uri on which the connection was made.

void connect(const std::string & hostAndPort);
/// Connects to the given MongoDB server.
Expand Down Expand Up @@ -148,6 +151,7 @@ namespace MongoDB
private:
Poco::Net::SocketAddress _address;
Poco::Net::StreamSocket _socket;
std::string _uri;
};


Expand All @@ -158,6 +162,10 @@ namespace MongoDB
{
return _address;
}
inline const std::string & Connection::uri() const
{
return _uri;
}


}
Expand Down
211 changes: 149 additions & 62 deletions base/poco/MongoDB/src/Connection.cpp
Expand Up @@ -145,68 +145,155 @@ void Connection::connect(const Poco::Net::StreamSocket& socket)

void Connection::connect(const std::string& uri, SocketFactory& socketFactory)
{
Poco::URI theURI(uri);
if (theURI.getScheme() != "mongodb") throw Poco::UnknownURISchemeException(uri);

std::string userInfo = theURI.getUserInfo();
std::string host = theURI.getHost();
Poco::UInt16 port = theURI.getPort();
if (port == 0) port = 27017;

std::string databaseName = theURI.getPath();
if (!databaseName.empty() && databaseName[0] == '/') databaseName.erase(0, 1);
if (databaseName.empty()) databaseName = "admin";

bool ssl = false;
Poco::Timespan connectTimeout;
Poco::Timespan socketTimeout;
std::string authMechanism = Database::AUTH_SCRAM_SHA1;

Poco::URI::QueryParameters params = theURI.getQueryParameters();
for (Poco::URI::QueryParameters::const_iterator it = params.begin(); it != params.end(); ++it)
{
if (it->first == "ssl")
{
ssl = (it->second == "true");
}
else if (it->first == "connectTimeoutMS")
{
connectTimeout = static_cast<Poco::Timespan::TimeDiff>(1000)*Poco::NumberParser::parse(it->second);
}
else if (it->first == "socketTimeoutMS")
{
socketTimeout = static_cast<Poco::Timespan::TimeDiff>(1000)*Poco::NumberParser::parse(it->second);
}
else if (it->first == "authMechanism")
{
authMechanism = it->second;
}
}

connect(socketFactory.createSocket(host, port, connectTimeout, ssl));

if (socketTimeout > 0)
{
_socket.setSendTimeout(socketTimeout);
_socket.setReceiveTimeout(socketTimeout);
}

if (!userInfo.empty())
{
std::string username;
std::string password;
std::string::size_type pos = userInfo.find(':');
if (pos != std::string::npos)
{
username.assign(userInfo, 0, pos++);
password.assign(userInfo, pos, userInfo.size() - pos);
}
else username = userInfo;

Database database(databaseName);
if (!database.authenticate(*this, username, password, authMechanism))
throw Poco::NoPermissionException(Poco::format("Access to MongoDB database %s denied for user %s", databaseName, username));
}
std::vector<std::string> strAddresses;
std::string newURI;

if (uri.find(',') != std::string::npos)
{
size_t pos;
size_t head = 0;
if ((pos = uri.find("@")) != std::string::npos)
{
head = pos + 1;
}
else if ((pos = uri.find("://")) != std::string::npos)
{
head = pos + 3;
}

std::string tempstr;
std::string::const_iterator it = uri.begin();
it += head;
size_t tail = head;
for (;it != uri.end() && *it != '?' && *it != '/'; ++it)
{
tempstr += *it;
tail++;
}

it = tempstr.begin();
std::string token;
for (;it != tempstr.end(); ++it)
{
if (*it == ',')
{
newURI = uri.substr(0, head) + token + uri.substr(tail, uri.length());
strAddresses.push_back(newURI);
token = "";
}
else
{
token += *it;
}
}
newURI = uri.substr(0, head) + token + uri.substr(tail, uri.length());
strAddresses.push_back(newURI);
}
else
{
strAddresses.push_back(uri);
}

newURI = strAddresses.front();
Poco::URI theURI(newURI);
if (theURI.getScheme() != "mongodb") throw Poco::UnknownURISchemeException(uri);

std::string userInfo = theURI.getUserInfo();
std::string databaseName = theURI.getPath();
if (!databaseName.empty() && databaseName[0] == '/') databaseName.erase(0, 1);
if (databaseName.empty()) databaseName = "admin";

bool ssl = false;
Poco::Timespan connectTimeout;
Poco::Timespan socketTimeout;
std::string authMechanism = Database::AUTH_SCRAM_SHA1;
std::string readPreference="primary";

Poco::URI::QueryParameters params = theURI.getQueryParameters();
for (Poco::URI::QueryParameters::const_iterator it = params.begin(); it != params.end(); ++it)
{
if (it->first == "ssl")
{
ssl = (it->second == "true");
}
else if (it->first == "connectTimeoutMS")
{
connectTimeout = static_cast<Poco::Timespan::TimeDiff>(1000)*Poco::NumberParser::parse(it->second);
}
else if (it->first == "socketTimeoutMS")
{
socketTimeout = static_cast<Poco::Timespan::TimeDiff>(1000)*Poco::NumberParser::parse(it->second);
}
else if (it->first == "authMechanism")
{
authMechanism = it->second;
}
else if (it->first == "readPreference")
{
readPreference= it->second;
}
}

for (std::vector<std::string>::const_iterator it = strAddresses.cbegin();it != strAddresses.cend(); ++it)
{
newURI = *it;
theURI = Poco::URI(newURI);

std::string host = theURI.getHost();
Poco::UInt16 port = theURI.getPort();
if (port == 0) port = 27017;

connect(socketFactory.createSocket(host, port, connectTimeout, ssl));
_uri = newURI;
if (socketTimeout > 0)
{
_socket.setSendTimeout(socketTimeout);
_socket.setReceiveTimeout(socketTimeout);
}
if (strAddresses.size() > 1)
{
Poco::MongoDB::QueryRequest request("admin.$cmd");
request.setNumberToReturn(1);
request.selector().add("isMaster", 1);
kssenii marked this conversation as resolved.
Show resolved Hide resolved
Poco::MongoDB::ResponseMessage response;

sendRequest(request, response);
_uri = newURI;
if (!response.documents().empty())
{
Poco::MongoDB::Document::Ptr doc = response.documents()[0];
if (doc->get<bool>("ismaster") && readPreference == "primary")
{
break;
}
else if (!doc->get<bool>("ismaster") && readPreference == "secondary")
{
break;
}
else if (it + 1 == strAddresses.cend())
{
throw Poco::URISyntaxException(uri);
}
}
}
}
if (!userInfo.empty())
{
std::string username;
std::string password;
std::string::size_type pos = userInfo.find(':');
if (pos != std::string::npos)
{
username.assign(userInfo, 0, pos++);
password.assign(userInfo, pos, userInfo.size() - pos);
}
else username = userInfo;

Database database(databaseName);

if (!database.authenticate(*this, username, password, authMechanism))
throw Poco::NoPermissionException(Poco::format("Access to MongoDB database %s denied for user %s", databaseName, username));
}
}


Expand Down
10 changes: 5 additions & 5 deletions src/Dictionaries/MongoDBDictionarySource.cpp
Expand Up @@ -114,7 +114,11 @@ MongoDBDictionarySource::MongoDBDictionarySource(
{
if (!uri.empty())
{
Poco::URI poco_uri(uri);
// Connect with URI.
Poco::MongoDB::Connection::SocketFactory socket_factory;
connection->connect(uri, socket_factory);

Poco::URI poco_uri(connection->uri());

// Parse database from URI. This is required for correctness -- the
// cursor is created using database name and collection name, so we have
Expand All @@ -134,10 +138,6 @@ MongoDBDictionarySource::MongoDBDictionarySource(
{
user.resize(separator);
}

// Connect with URI.
Poco::MongoDB::Connection::SocketFactory socket_factory;
connection->connect(uri, socket_factory);
}
else
{
Expand Down