Skip to content

Commit

Permalink
Automatic conversion of query/ingestion urls in ingest clients (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Sep 13, 2022
1 parent cf21875 commit e2059e7
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 221 deletions.
Original file line number Diff line number Diff line change
@@ -1,78 +1,31 @@
package com.microsoft.azure.kusto.ingest;

import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;

public abstract class IngestClientBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final String INGEST_PREFIX = "ingest-";
static final String PROTOCOL_SUFFIX = "://";

String connectionDataSource;
private String endpointServiceType;
private String suggestedEndpointUri;
public static final String INGEST_PREFIX = "ingest-";
protected static final String WRONG_ENDPOINT_MESSAGE = "Ingestion failed likely because the wrong endpoint, whose ServiceType %s, was configured, which isn't compatible with the client of type '%s' being used. Initialize the client with the appropriate endpoint URL";
protected static final String CONFIGURED_ENDPOINT_MESSAGE = "is '%s'";
protected static final String INDETERMINATE_CONFIGURED_ENDPOINT_MESSAGE = "couldn't be determined";

protected void validateEndpointServiceType(String endpoint, String expectedServiceType)
throws IngestionClientException {
if (StringUtils.isBlank(endpointServiceType)) {
endpointServiceType = retrieveServiceType();
}
if (!expectedServiceType.equals(endpointServiceType)) {
String message;
if (StringUtils.isNotBlank(endpointServiceType)) {
String configuredEndpointMessage = String.format(CONFIGURED_ENDPOINT_MESSAGE, endpointServiceType);
message = String.format(WRONG_ENDPOINT_MESSAGE, configuredEndpointMessage, expectedServiceType);
} else {
message = String.format(WRONG_ENDPOINT_MESSAGE, INDETERMINATE_CONFIGURED_ENDPOINT_MESSAGE, expectedServiceType);
}
suggestedEndpointUri = generateEndpointSuggestion(suggestedEndpointUri, endpoint);
if (StringUtils.isNotBlank(suggestedEndpointUri)) {
message = String.format("%s, which is likely '%s'.", message, suggestedEndpointUri);
} else {
message += ".";
}
throw new IngestionClientException(message);
}
static boolean shouldCompress(CompressionType sourceCompressionType, IngestionProperties.DataFormat dataFormat) {
return (sourceCompressionType == null) && (dataFormat == null || dataFormat.isCompressible());
}

protected String generateEndpointSuggestion(String existingSuggestedEndpointUri, String dataSource) {
if (existingSuggestedEndpointUri != null) {
return existingSuggestedEndpointUri;
}
// The default is not passing a suggestion to the exception
String endpointUriToSuggestStr = "";
if (StringUtils.isNotBlank(dataSource)) {
URIBuilder existingEndpoint;
try {
existingEndpoint = new URIBuilder(dataSource);
endpointUriToSuggestStr = emendEndpointUri(existingEndpoint);
} catch (URISyntaxException e) {
log.warn(
"Since the wrong endpoint was used, we attempted to suggest the correct endpoint. However, we couldn't parse dataSource '{}', so no suggestion can be made.",
dataSource, e);
} catch (IllegalArgumentException e) {
log.warn(
"Since the wrong endpoint was used, we attempted to suggest the correct endpoint. However, the URL is already in the correct format '{}', so no suggestion can be made.",
dataSource, e);
}
static String getIngestionEndpoint(String clusterUrl) {
if (clusterUrl.contains(INGEST_PREFIX)) {
return clusterUrl;
} else {
return clusterUrl.replaceFirst(PROTOCOL_SUFFIX, PROTOCOL_SUFFIX + INGEST_PREFIX);
}

return endpointUriToSuggestStr;
}

protected abstract String retrieveServiceType();

protected abstract String emendEndpointUri(URIBuilder existingEndpoint);

static boolean shouldCompress(CompressionType sourceCompressionType, IngestionProperties.DataFormat dataFormat) {
return (sourceCompressionType == null) && (dataFormat == null || dataFormat.isCompressible());
static String getQueryEndpoint(String clusterUrl) {
if (clusterUrl.contains(INGEST_PREFIX)) {
return clusterUrl.replaceFirst(INGEST_PREFIX, "");
} else {
return clusterUrl;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,62 @@ public static StreamingIngestClient createStreamingIngestClient(ConnectionString

/**
* Creates a new managed streaming ingest client, with default http client properties.
* @param dmConnectionStringBuilder connection string builder for the data management endpoint
* @param engineConnectionStringBuilder connection string builder for the engine endpoint
* This method should only be used for advanced cases. If your endpoints are standard, or you do not know, use {@link #createManagedStreamingIngestClient(ConnectionStringBuilder)} instead.
* @param ingestionEndpointConnectionStringBuilder connection string builder for the data management endpoint
* @param queryEndpointConnectionStringBuilder connection string builder for the engine endpoint
* @return a new managed streaming ingest client
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient createManagedStreamingIngestClient(ConnectionStringBuilder dmConnectionStringBuilder,
ConnectionStringBuilder engineConnectionStringBuilder)
public static ManagedStreamingIngestClient createManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder,
ConnectionStringBuilder queryEndpointConnectionStringBuilder)
throws URISyntaxException {
return createManagedStreamingIngestClient(dmConnectionStringBuilder, engineConnectionStringBuilder, null);
return createManagedStreamingIngestClient(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, null);
}

/**
* Creates a new managed streaming ingest client.
* @param dmConnectionStringBuilder connection string builder for the data management endpoint
* @param engineConnectionStringBuilder connection string builder for the engine endpoint
* This method should only be used for advanced cases. If your endpoints are standard, or you do not know, use
* {@link #createManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* @param ingestionEndpointConnectionStringBuilder connection string builder for the data management endpoint
* @param queryEndpointConnectionStringBuilder connection string builder for the engine endpoint
* @param properties additional properties to configure the http client
* @return a new managed streaming ingest client
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient createManagedStreamingIngestClient(ConnectionStringBuilder dmConnectionStringBuilder,
ConnectionStringBuilder engineConnectionStringBuilder, @Nullable HttpClientProperties properties)
public static ManagedStreamingIngestClient createManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder,
ConnectionStringBuilder queryEndpointConnectionStringBuilder, @Nullable HttpClientProperties properties)
throws URISyntaxException {
return new ManagedStreamingIngestClient(dmConnectionStringBuilder, engineConnectionStringBuilder, properties);
return new ManagedStreamingIngestClient(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, properties);
}

/**
* Creates a new managed streaming ingest client, with default http client properties.
* This method supports both an ingestion and query endpoint, and deduces the other endpoint from the given one.
* @param connectionStringBuilder connection string builder for the client
* @return a new managed streaming ingest client
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient createManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder)
throws URISyntaxException {
return createManagedStreamingIngestClient(connectionStringBuilder, (HttpClientProperties) null);
}

/**
* Creates a new managed streaming ingest client.
* This method supports both an ingestion and query endpoint, and deduces the other endpoint from the given one.
* @param connectionStringBuilder connection string builder for the client
* @param properties additional properties to configure the http client
* @return a new managed streaming ingest client
* @throws URISyntaxException if the connection string is invalid
*/
public static ManagedStreamingIngestClient createManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder,
@Nullable HttpClientProperties properties)
throws URISyntaxException {
return new ManagedStreamingIngestClient(connectionStringBuilder, properties);
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #createManagedStreamingIngestClient(ConnectionStringBuilder)} instead.
* Creates a new ManagedStreamingIngestClient from an engine connection string, with default http client properties.
* This method infers the DM connection string from the engine connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -99,6 +129,7 @@ public static ManagedStreamingIngestClient createManagedStreamingIngestClientFro
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #createManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from an engine connection string.
* This method infers the DM connection string from the engine connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -114,6 +145,7 @@ public static ManagedStreamingIngestClient createManagedStreamingIngestClientFro
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #createManagedStreamingIngestClient(ConnectionStringBuilder)} instead.
* Creates a new ManagedStreamingIngestClient from a DM connection string, with default http client properties.
* This method infers the engine connection string from the DM connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -127,6 +159,7 @@ public static ManagedStreamingIngestClient createManagedStreamingIngestClientFro
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #createManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from a DM connection string.
* This method infers the engine connection string from the DM connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ManagedStreamingIngestClient implements IngestClient {
private final ExponentialRetry exponentialRetryTemplate;

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from a DM connection string, with default http client properties.
* This method infers the engine connection string from the DM connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -62,6 +63,7 @@ public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStri
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from a DM connection string.
* This method infers the engine connection string from the DM connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -74,11 +76,12 @@ public static ManagedStreamingIngestClient fromDmConnectionString(ConnectionStri
@Nullable HttpClientProperties properties)
throws URISyntaxException {
ConnectionStringBuilder engineConnectionString = new ConnectionStringBuilder(dmConnectionString);
engineConnectionString.setClusterUrl(StreamingIngestClient.generateEngineUriSuggestion(new URIBuilder(dmConnectionString.getClusterUrl())));
engineConnectionString.setClusterUrl(IngestClientBase.getQueryEndpoint(engineConnectionString.getClusterUrl()));
return new ManagedStreamingIngestClient(dmConnectionString, engineConnectionString, properties);
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from an engine connection string, with default http client properties.
* This method infers the DM connection string from the engine connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -91,6 +94,7 @@ public static ManagedStreamingIngestClient fromEngineConnectionString(Connection
}

/**
* @deprecated - Ingest clients now automatically deduce the endpoint, use {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)} instead.
* Creates a new ManagedStreamingIngestClient from an engine connection string.
* This method infers the DM connection string from the engine connection string.
* For advanced usage, use {@link ManagedStreamingIngestClient#ManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
Expand All @@ -103,24 +107,51 @@ public static ManagedStreamingIngestClient fromEngineConnectionString(Connection
@Nullable HttpClientProperties properties)
throws URISyntaxException {
ConnectionStringBuilder dmConnectionString = new ConnectionStringBuilder(engineConnectionString);
dmConnectionString.setClusterUrl(QueuedIngestClientImpl.generateDmUriSuggestion(new URIBuilder(engineConnectionString.getClusterUrl())));
dmConnectionString.setClusterUrl(IngestClientBase.getIngestionEndpoint(engineConnectionString.getClusterUrl()));
return new ManagedStreamingIngestClient(dmConnectionString, engineConnectionString, properties);
}

public ManagedStreamingIngestClient(ConnectionStringBuilder dmConnectionStringBuilder,
ConnectionStringBuilder engineConnectionStringBuilder) throws URISyntaxException {
this(dmConnectionStringBuilder, engineConnectionStringBuilder, null);
/**
* @deprecated - This method is slated to be private. Use
* {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder)}
* instead.
*/
public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder,
ConnectionStringBuilder queryEndpointConnectionStringBuilder) throws URISyntaxException {
this(ingestionEndpointConnectionStringBuilder, queryEndpointConnectionStringBuilder, null);
}

/**
* @deprecated - This method is slated to be private. Use
* {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder, ConnectionStringBuilder, HttpClientProperties)} instead.
* This constructor should only be used for advanced cases. If your endpoints are standard, or you do not know, use
* {@link #ManagedStreamingIngestClient(ConnectionStringBuilder, HttpClientProperties)})} instead.
* @param ingestionEndpointConnectionStringBuilder - Endpoint for ingesting data, usually starts with "https://ingest-"
* @param queryEndpointConnectionStringBuilder - Endpoint for querying data, does not include "ingest-"
* @param properties - Additional properties to configure the http client
* @throws URISyntaxException if the connection string is invalid
*/
public ManagedStreamingIngestClient(ConnectionStringBuilder ingestionEndpointConnectionStringBuilder,
ConnectionStringBuilder queryEndpointConnectionStringBuilder,
@Nullable HttpClientProperties properties) throws URISyntaxException {
log.info("Creating a new ManagedStreamingIngestClient from connection strings");
queuedIngestClient = new QueuedIngestClientImpl(ingestionEndpointConnectionStringBuilder, properties);
streamingIngestClient = new StreamingIngestClient(queryEndpointConnectionStringBuilder, properties);
exponentialRetryTemplate = new ExponentialRetry(ATTEMPT_COUNT);
}

public ManagedStreamingIngestClient(ConnectionStringBuilder dmConnectionStringBuilder,
ConnectionStringBuilder engineConnectionStringBuilder,
ManagedStreamingIngestClient(ConnectionStringBuilder connectionStringBuilder,
@Nullable HttpClientProperties properties) throws URISyntaxException {
log.info("Creating a new ManagedStreamingIngestClient from connection strings");
queuedIngestClient = new QueuedIngestClientImpl(dmConnectionStringBuilder, properties);
streamingIngestClient = new StreamingIngestClient(engineConnectionStringBuilder, properties);
queuedIngestClient = new QueuedIngestClientImpl(connectionStringBuilder, properties);
streamingIngestClient = new StreamingIngestClient(connectionStringBuilder, properties);
exponentialRetryTemplate = new ExponentialRetry(ATTEMPT_COUNT);
}

/**
* @deprecated - This method is slated to be private. Use
* {@link IngestClientFactory#createManagedStreamingIngestClient(ConnectionStringBuilder)} instead.
*/
public ManagedStreamingIngestClient(ResourceManager resourceManager,
AzureStorageClient storageClient,
StreamingClient streamingClient) {
Expand Down
Loading

0 comments on commit e2059e7

Please sign in to comment.