Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed Jul 3, 2023
1 parent c579faa commit a62ee0b
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 0 deletions.
6 changes: 6 additions & 0 deletions conf/cassandra-rackdc.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ rack=rack1
# If AWS IMDS of v2 is configured, ec2_metadata_token_ttl_seconds says how many seconds a token will be valid until
# it is refreshed. Defaults to 21600. Can not be smaller than 30 and bigger than 21600. Has to be an integer.
# ec2_metadata_token_ttl_seconds=21600

# AzureSnitch
# Options are:
# Version of API to talk to. When not set, AzureSnitch will dynamically fetch the lastest API version
# There is no default, the following setting is just an example of the format of API version to use.
# azure_api_version=2021-12-13
190 changes: 190 additions & 0 deletions src/java/org/apache/cassandra/locator/AzureSnitch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.locator;

import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;

import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JsonUtils;

import static java.lang.String.format;

public class AzureSnitch extends AbstractNetworkTopologySnitch
{
private static final Logger logger = LoggerFactory.getLogger(AzureSnitch.class);

private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";

private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;

String rack;
String datacenter;

public AzureSnitch() throws IOException
{
this(new SnitchProperties());
}

public AzureSnitch(SnitchProperties properties) throws IOException
{
this(new AzureCloudConnector(properties));
}

public AzureSnitch(AzureCloudConnector connector) throws IOException
{
String response = connector.getResponse();
JsonNode jsonNode = JsonUtils.JSON_OBJECT_MAPPER.readTree(response);

JsonNode location = jsonNode.get("location");
JsonNode zone = jsonNode.get("zone");

if (location == null || location.isNull() || location.asText().isEmpty())
datacenter = DEFAULT_DC;
else
datacenter = location.asText();

if (zone == null || zone.isNull() || zone.asText().isEmpty())
rack = DEFAULT_RACK;
else
rack = "zone-" + zone.asText();

logger.info(format("%s using region: %s, zone: %s, properties: %s", AzureSnitch.class.getName(), datacenter, rack, connector));
}

static class AzureCloudConnector extends AbstractCloudMetadataServiceConnector
{
private static final Logger logger = LoggerFactory.getLogger(AzureSnitch.class);
private static final Pattern API_VERSION_PATTERN = Pattern.compile("\\d\\d\\d\\d-\\d\\d-\\d\\d");

static final String METADATA_SERVICE_PROPERTY_KEY = "metadata_url";
static final String DEFAULT_METADATA_SERVICE_URL = "http://169.254.169.254";
static final String METADATA_QUERY = "/metadata/instance/compute?api-version=%s&format=json";
static final String VERSIONS_ENDPOINT = "/metadata/versions";
static final String METADATA_HEADER = "Metadata";
static final String API_VERSION_PROPERTY_KEY = "azure_api_version";

private final String apiVersion;

protected AzureCloudConnector(SnitchProperties properties) throws IOException
{
super(properties.get(METADATA_SERVICE_PROPERTY_KEY, DEFAULT_METADATA_SERVICE_URL));
apiVersion = getApiVersion(properties);
logger.debug("Resolved API version " + apiVersion);
}

String getResponse() throws IOException
{
return apiCall(metadataServiceUrl,
String.format(METADATA_QUERY, apiVersion),
"GET",
ImmutableMap.of(METADATA_HEADER, "true"),
200);
}

String getApiVersion(SnitchProperties properties) throws IOException
{
String version = properties.get(API_VERSION_PROPERTY_KEY, null);

if (version != null && API_VERSION_PATTERN.matcher(version).matches())
return version;

logger.debug(format("API version of '%s' does not follow pattern %s or is null. Going to " +
"parse the latest version from the service ...",
version, API_VERSION_PATTERN));

String response = apiCall(metadataServiceUrl,
VERSIONS_ENDPOINT,
"GET",
ImmutableMap.of("Metadata", "true"),
200);

JsonNode versions = JsonUtils.JSON_OBJECT_MAPPER.readTree(response);
JsonNode apiVersions = versions.get("apiVersions");

if (apiVersions == null)
throw new IllegalStateException("Unable to get field 'apiVersions' from the response!");

if (!(apiVersions.isArray()))
throw new IOException(format(
"Expected a list, but got a %s:%s", apiVersions.getClass().getSimpleName(), apiVersions));

String[] versionsArray = JsonUtils.JSON_OBJECT_MAPPER.convertValue(apiVersions, String[].class);

if (versionsArray != null && versionsArray.length > 0)
return versionsArray[versionsArray.length - 1];
else
throw new IllegalStateException("returned apiVersions array is empty!");
}

@Override
public String toString()
{
return format("%s{%s=%s,%s=%s}",
AzureCloudConnector.class.getName(),
METADATA_SERVICE_PROPERTY_KEY,
metadataServiceUrl,
API_VERSION_PROPERTY_KEY,
apiVersion);
}
}

public String getRack(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return rack;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("rack");
return DEFAULT_RACK;
}
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return datacenter;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("data_center");
return DEFAULT_DC;
}
return state.getApplicationState(ApplicationState.DC).value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.locator;

import java.util.Properties;

import org.junit.Rule;
import org.junit.Test;

import com.github.tomakehurst.wiremock.client.MappingBuilder;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import org.apache.cassandra.locator.AzureSnitch.AzureCloudConnector;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.cassandra.locator.AzureSnitch.AzureCloudConnector.METADATA_HEADER;
import static org.apache.cassandra.locator.AzureSnitch.AzureCloudConnector.METADATA_QUERY;
import static org.apache.cassandra.locator.AzureSnitch.AzureCloudConnector.METADATA_SERVICE_PROPERTY_KEY;
import static org.apache.cassandra.locator.AzureSnitch.AzureCloudConnector.VERSIONS_ENDPOINT;
import static org.junit.Assert.assertEquals;

public class AzureConnectorMockingTest
{
private static final String API_VERSIONS = "{\"apiVersions\":[\"2017-03-01\",\"2017-04-02\",\"2017-08-01\",\"2017-10-01\",\"2017-12-01\",\"2018-02-01\"," +
"\"2018-04-02\",\"2018-10-01\",\"2019-02-01\",\"2019-03-11\",\"2019-04-30\",\"2019-06-01\",\"2019-06-04\"," +
"\"2019-08-01\",\"2019-08-15\",\"2019-11-01\",\"2020-06-01\",\"2020-07-15\",\"2020-09-01\",\"2020-10-01\"," +
"\"2020-12-01\",\"2021-01-01\",\"2021-02-01\",\"2021-03-01\",\"2021-05-01\",\"2021-08-01\",\"2021-10-01\"," +
"\"2021-11-01\",\"2021-11-15\",\"2021-12-13\"]}";

private static final String RESPONSE = "{\"location\": \"PolandCentral\",\"zone\": \"1\"}";

@Rule
public final WireMockRule service = new WireMockRule(wireMockConfig().bindAddress("127.0.0.1").port(8080));

@Test
public void testConnector() throws Throwable
{
service.stubFor(versionRequest());
service.stubFor(metadataRequest());

Properties p = new Properties();
p.setProperty(METADATA_SERVICE_PROPERTY_KEY, "http://127.0.0.1:8080");

AzureSnitch azureSnitch = new AzureSnitch(new AzureCloudConnector(new SnitchProperties(p)));

assertEquals("zone-1", azureSnitch.rack);
assertEquals("PolandCentral", azureSnitch.datacenter);
}

private MappingBuilder versionRequest()
{
return get(urlEqualTo(VERSIONS_ENDPOINT)).withHeader(METADATA_HEADER, equalTo("true"))
.willReturn(aResponse().withBody(API_VERSIONS)
.withStatus(200)
.withHeader("Content-Type", "application/json; charset=utf-8")
.withHeader("Content-Length", String.valueOf(API_VERSIONS.getBytes(UTF_8).length)));
}

private MappingBuilder metadataRequest()
{
return get(urlEqualTo(format(METADATA_QUERY, "2021-12-13"))).withHeader(METADATA_HEADER, equalTo("true"))
.willReturn(aResponse().withBody(RESPONSE)
.withStatus(200)
.withHeader("Content-Type", "application/json; charset=utf-8")
.withHeader("Content-Length", String.valueOf(RESPONSE.getBytes(UTF_8).length)));
}
}

0 comments on commit a62ee0b

Please sign in to comment.