Skip to content

Commit

Permalink
[CASSANDRA-16928] CEP-10 Phase 1: InetAddressAndPort extends InetSock…
Browse files Browse the repository at this point in the history
…etAddress
  • Loading branch information
belliottsmith committed Oct 7, 2021
1 parent 767f98e commit e2721b0
Show file tree
Hide file tree
Showing 47 changed files with 200 additions and 173 deletions.
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/audit/AuditLogEntry.java
Expand Up @@ -76,10 +76,10 @@ String getLogString()
StringBuilder builder = new StringBuilder(100);
builder.append("user:").append(user)
.append("|host:").append(host)
.append("|source:").append(source.address);
if (source.port > 0)
.append("|source:").append(source.getAddress());
if (source.getPort() > 0)
{
builder.append("|port:").append(source.port);
builder.append("|port:").append(source.getPort());
}

builder.append("|timestamp:").append(timestamp)
Expand Down
37 changes: 19 additions & 18 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Expand Up @@ -20,6 +20,7 @@
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.*;
Expand All @@ -36,7 +37,7 @@
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;

import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -716,9 +717,9 @@ public static synchronized void updateTokens(InetAddressAndPort ep, Collection<T
return;

String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS), ep.address, tokensAsSet(tokens));
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress(), tokensAsSet(tokens));
req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, ?)";
executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, tokensAsSet(tokens));
executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort(), tokensAsSet(tokens));
}

public static synchronized boolean updatePreferredIP(InetAddressAndPort ep, InetAddressAndPort preferred_ip)
Expand All @@ -727,9 +728,9 @@ public static synchronized boolean updatePreferredIP(InetAddressAndPort ep, Inet
return false;

String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS), ep.address, preferred_ip.address);
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress(), preferred_ip.getAddress());
req = "INSERT INTO system.%s (peer, peer_port, preferred_ip, preferred_port) VALUES (?, ?, ?, ?)";
executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, preferred_ip.address, preferred_ip.port);
executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort(), preferred_ip.getAddress(), preferred_ip.getPort());
forceBlockingFlush(LEGACY_PEERS, PEERS_V2);
return true;
}
Expand All @@ -740,14 +741,14 @@ public static synchronized void updatePeerInfo(InetAddressAndPort ep, String col
return;

String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS, columnName), ep.address, value);
executeInternal(String.format(req, LEGACY_PEERS, columnName), ep.getAddress(), value);
//This column doesn't match across the two tables
if (columnName.equals("rpc_address"))
{
columnName = "native_address";
}
req = "INSERT INTO system.%s (peer, peer_port, %s) VALUES (?, ?, ?)";
executeInternal(String.format(req, PEERS_V2, columnName), ep.address, ep.port, value);
executeInternal(String.format(req, PEERS_V2, columnName), ep.getAddress(), ep.getPort(), value);
}

public static synchronized void updatePeerNativeAddress(InetAddressAndPort ep, InetAddressAndPort address)
Expand All @@ -756,19 +757,19 @@ public static synchronized void updatePeerNativeAddress(InetAddressAndPort ep, I
return;

String req = "INSERT INTO system.%s (peer, rpc_address) VALUES (?, ?)";
executeInternal(String.format(req, LEGACY_PEERS), ep.address, address.address);
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress(), address.getAddress());
req = "INSERT INTO system.%s (peer, peer_port, native_address, native_port) VALUES (?, ?, ?, ?)";
executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, address.address, address.port);
executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort(), address.getAddress(), address.getPort());
}


public static synchronized void updateHintsDropped(InetAddressAndPort ep, UUID timePeriod, int value)
{
// with 30 day TTL
String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?";
executeInternal(String.format(req, LEGACY_PEER_EVENTS), timePeriod, value, ep.address);
executeInternal(String.format(req, LEGACY_PEER_EVENTS), timePeriod, value, ep.getAddress());
req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ? AND peer_port = ?";
executeInternal(String.format(req, PEER_EVENTS_V2), timePeriod, value, ep.address, ep.port);
executeInternal(String.format(req, PEER_EVENTS_V2), timePeriod, value, ep.getAddress(), ep.getPort());
}

public static synchronized void updateSchemaVersion(UUID version)
Expand Down Expand Up @@ -800,12 +801,12 @@ private static Collection<Token> deserializeTokens(Collection<String> tokensStri
/**
* Remove stored tokens being used by another node
*/
public static synchronized void removeEndpoint(InetAddressAndPort ep)
public static synchronized void removeEndpoint(InetSocketAddress ep)
{
String req = "DELETE FROM system.%s WHERE peer = ?";
executeInternal(String.format(req, LEGACY_PEERS), ep.address);
executeInternal(String.format(req, LEGACY_PEERS), ep.getAddress());
req = String.format("DELETE FROM system.%s WHERE peer = ? AND peer_port = ?", PEERS_V2);
executeInternal(req, ep.address, ep.port);
executeInternal(req, ep.getAddress(), ep.getPort());
forceBlockingFlush(LEGACY_PEERS, PEERS_V2);
}

Expand Down Expand Up @@ -887,7 +888,7 @@ public static Map<InetAddressAndPort, UUID> loadHostIds()
public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
{
String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE peer=? AND peer_port = ?";
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port);
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort());
if (!result.isEmpty() && result.one().has("preferred_ip"))
{
UntypedResultSet.Row row = result.one();
Expand Down Expand Up @@ -934,7 +935,7 @@ public static CassandraVersion getReleaseVersion(InetAddressAndPort ep)
return CURRENT_VERSION;
}
String req = "SELECT release_version FROM system.%s WHERE peer=? AND peer_port=?";
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port);
UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.getAddress(), ep.getPort());
if (result != null && result.one().has("release_version"))
{
return new CassandraVersion(result.one().getString("release_version"));
Expand Down Expand Up @@ -1428,9 +1429,9 @@ public static synchronized void updateTransferredRanges(StreamOperation streamOp
{
rangesToUpdate.add(rangeToBytes(range));
}
executeInternal(format(cql, LEGACY_TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer.address, keyspace);
executeInternal(format(cql, LEGACY_TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer.getAddress(), keyspace);
cql = "UPDATE system.%s SET ranges = ranges + ? WHERE operation = ? AND peer = ? AND peer_port = ? AND keyspace_name = ?";
executeInternal(String.format(cql, TRANSFERRED_RANGES_V2), rangesToUpdate, streamOperation.getDescription(), peer.address, peer.port, keyspace);
executeInternal(String.format(cql, TRANSFERRED_RANGES_V2), rangesToUpdate, streamOperation.getDescription(), peer.getAddress(), peer.getPort(), keyspace);
}

public static synchronized Map<InetAddressAndPort, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner)
Expand Down
Expand Up @@ -114,7 +114,7 @@ private void addRow(SimpleDataSet dataSet, InetAddressAndPort addressAndPort, In
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(addressAndPort);
String rack = DatabaseDescriptor.getEndpointSnitch().getRack(addressAndPort);
dataSet.row(addressAndPort.address, addressAndPort.port, dc, rack)
dataSet.row(addressAndPort.getAddress(), addressAndPort.getPort(), dc, rack)
.column(USING_BYTES, handlers.usingCapacity())
.column(USING_RESERVE_BYTES, handlers.usingEndpointReserveCapacity())
.column(CORRUPT_FRAMES_RECOVERED, handlers.corruptFramesRecovered())
Expand Down
Expand Up @@ -115,7 +115,7 @@ private void addRow(SimpleDataSet dataSet, InetAddressAndPort addressAndPort, Ou
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(addressAndPort);
String rack = DatabaseDescriptor.getEndpointSnitch().getRack(addressAndPort);
long pendingBytes = sum(connections, OutboundConnection::pendingBytes);
dataSet.row(addressAndPort.address, addressAndPort.port, dc, rack)
dataSet.row(addressAndPort.getAddress(), addressAndPort.getPort(), dc, rack)
.column(USING_BYTES, pendingBytes)
.column(USING_RESERVE_BYTES, connections.usingReserveBytes())
.column(PENDING_COUNT, sum(connections, OutboundConnection::pendingCount))
Expand Down
Expand Up @@ -98,7 +98,7 @@ public FakeNode(InetAddressAndPort address, Integer rackId, Collection<Token> to

public int nodeId()
{
return fakeAddressAndPort.port;
return fakeAddressAndPort.getPort();
}

public int rackId()
Expand Down
Expand Up @@ -320,7 +320,7 @@ private void reset()

public Map<InetAddress, Double> getScores()
{
return scores.entrySet().stream().collect(Collectors.toMap(address -> address.getKey().address, Map.Entry::getValue));
return scores.entrySet().stream().collect(Collectors.toMap(address -> address.getKey().getAddress(), Map.Entry::getValue));
}

public Map<String, Double> getScoresWithPort()
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void gossiperStarting()
throw new RuntimeException(e);
}
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT, StorageService.instance.valueFactory.internalAddressAndPort(address));
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(address.address));
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(address.getAddress()));
Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true));
}
}
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/locator/IEndpointSnitch.java
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.locator;

import java.net.InetSocketAddress;
import java.util.Set;

import org.apache.cassandra.utils.FBUtilities;
Expand Down Expand Up @@ -55,6 +56,11 @@ default public String getLocalDatacenter()
return getDatacenter(FBUtilities.getBroadcastAddressAndPort());
}

default String getDatacenter(InetSocketAddress endpoint)
{
return getDatacenter(InetAddressAndPort.getByAddress(endpoint));
}

default public String getDatacenter(Replica replica)
{
return getDatacenter(replica.endpoint());
Expand Down

0 comments on commit e2721b0

Please sign in to comment.