New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cassandra 7544 rebased2 #184
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, first round of review complete. On the whole, this looks great. I want to do a second round on selected stuffs, so look for that in the next day.
@Override | ||
public int compareTo(InetAddressAndPort o) | ||
{ | ||
int retval = ByteBuffer.wrap(address.getAddress()).compareTo(ByteBuffer.wrap(o.address.getAddress())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if compareTo
is on a hot path anywhere, but I think we can avoid allocating the two ByteBuffer
s by using FastByteOperations#compareUnsigned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it's pretty bad. getAddress() allocates the address byte array. Maybe I should have these store the address array so I don't have to go to address to get it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get HostAddress is the same thing.
I think InetAddress implementations for whatever reason choose to trade footprint for allocations.
For us it's fine to just store the allocation. We don't plan on storing a ton of these I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it's not for whatever reason. They are defensive copies. We should consider the correctness issues here of not having defensive copies.
* Can't just add the additional columns because they are primary key columns and C* doesn't support changing | ||
* key columns even if it's just clustering columns. | ||
*/ | ||
public class LegacySystemKeyspaceMigrator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial nit: maybe rename to SystemKeyspaceMigrator40
to indicate this is for 3.0/3.x -> 4.0 only, and that we drop after 4.0. I guess it's 'legacy' that's a little vague to me.
logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeerEventsName, peerEventsName); | ||
} | ||
|
||
static void migrateLegacyTransferredRanges() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty naming nit: remove 'legacy' from method name as you didn't do that on the other methods.
int transferred = 0; | ||
for (UntypedResultSet.Row row : rows) | ||
{ | ||
logger.debug("Transferring row {}", transferred); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial nit: all three migrate*
methods have this same log line. can you add the table name or something more unique?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to repeat the table name on every line? There is a row at the end that lists the count of rows transferred and the source and destination. I could repeat that without the count before the transfer starts.
+ "PRIMARY KEY ((id)))") | ||
.partitioner(new LocalPartitioner(TimeUUIDType.instance)) | ||
.compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) | ||
.gcGraceSeconds(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this suppossed to be here? It's not on trunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't rebased onto trunk in a while. Aleksey landed af3fe39#diff-ce3f6856b405c96859d9a50d9977e0b9L115 which is when it was removed on trunk. I'll get it when I rebase. And probably a lot of other pain as well. Looks like it's going to be 100% conflicts.
@@ -19,6 +19,7 @@ | |||
package org.apache.cassandra.streaming; | |||
|
|||
import java.io.IOException; | |||
import java.net.Socket; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused imports
Set<InetAddress> ignores = new HashSet<>(); | ||
Set<InetAddress> hostsArg = new HashSet<>(); | ||
Set<InetAddress> ignoresArg = new HashSet<>(); | ||
Set<InetSocketAddress> hosts = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use InetSocketAddress
here instead of InetAddressWithPort
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loader is a client not really part of the database and interacts with the Java driver using InetSocketAddress.
The rule is that an InetSocketAddress should never refer to the storage port. InetAddressAndPort might refer to the client port though in some of the internal code in the server though.
Collection<String> leavingNodes = probe.getLeavingNodes(); | ||
Collection<String> movingNodes = probe.getMovingNodes(); | ||
Map<String, String> loadMap = probe.getLoadMap(); | ||
Collection<String> liveNodes = probe.getLiveNodes(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not thrilled with the two versions of printDc
, but given the objects they depend on (like SetHostStat
), refactoring that is probably too much for this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's ugly, but eventually we can just get rid of the old way. It's going to converge on what we want soon enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
private final Set<InetAddress> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet(); | ||
|
||
|
||
private static final InetAddress bindAll; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I"m not sure you should delete this
bindAll` check. There is a jira for it, CASSANDRA-5227. Jira is down right now so I can't see what the rationale behind this is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked up the JIRA and it's not necessary anymore. The comment says
// Note that after all nodes are running a version that includes CASSANDRA-5899, rpcAddress should
// never be 0.0.0.0, so this can eventually be removed.
So we can never hit that path anyway. I think it was useful when introduced, but it's not anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, looks like I didn't read that comment. But yes, you are correct, so +1 here
@@ -531,7 +530,7 @@ private void testPrepareWithLWT(ProtocolVersion version) throws Throwable | |||
@Test | |||
public void testPrepareWithBatchLWT() throws Throwable | |||
{ | |||
testPrepareWithBatchLWT(ProtocolVersion.V4); | |||
// testPrepareWithBatchLWT(ProtocolVersion.V4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I think I was debugging and commented it out.
InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address); | ||
//Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from" | ||
//Will work since we don't actually have ports diverge across network interfaces | ||
StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, because I didn't document this (at all), it's hard to know why the hell I have to check the addr
coming from the channel
. Shame on me.
In the case of unit tests, if you use the EmbeddedChannel
, channel.remoteAddress()
does not return an InetSocketAddress
, but an EmbeddedSocketAddress
. I think the best thing to do here is:
private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
{
SocketAddress addr = channel.remoteAddress();
final InetAddressAndPort connecting;
if (addr instanceof InetSocketAddress)
{
InetSocketAddress address = (InetSocketAddress)addr;
connecting = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
}
else
{
// assumably the addr is an EmbeddedSocketAddress, and we only get that when running unit tests where channel is an instance of EmbeddedChannel. In that case, it's safe to simply use the "from" parameter.
connecting = from;
}
StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look right? Your using the ephemeral port from SocketChannel.remoteAddress()? That's not a useful port number for anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes are correct; I made a mistake. We should keep your code, but can you add my comment: In the case of unit tests, if you use the EmbeddedChannel, channel.remoteAddress() does not return an InetSocketAddress, but an EmbeddedSocketAddress. Hence why we need the type check here
conf/cassandra.yaml
Outdated
@@ -960,6 +967,7 @@ server_encryption_options: | |||
# cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA] | |||
# require_client_auth: false | |||
# require_endpoint_verification: false | |||
# outgoing_encrypted_port_source: yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is not referenced anywhere in the code (not in ServerEncrpytionOptions
). Is this just incomplete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's out of date. I implemented this functionality, but then you implemented and merged competing functionality. I'll remove it.
X1, | ||
X2, | ||
INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports | ||
NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me why you renamed RPC_ADDRESS
to NATIVE_ADDRESS
. That make it really easy to confuse "the addr/port to be used between peers" as opposed to "the addr/port open for client apps/drivers". Perhaps a better name is INTERNODE_ADDRESS_AND_PORT
? (or INTERNAL_ or PEER_ or ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
native address is what it is now because thrift RPC is gone. Now what I remember is I did this because I thought we stopped using rpc_address in the yaml and renamed it to native_address. And now I am very confused because that doesn't seem to be the case. I was trying to consistently use native_address instead of rpc_address now that thrift is gone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what the optimal thing to do here is. I didn't do this out of the blue and I didn't really want to, but I felt guilty about continuing to call it RPC. It's a lot of code changes to stop using native because there are many places where I made it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ughh, so I was completely wrong wrt the RPC_ADDRESS/NATIVE_ADDRESS thing. i misread this line:
appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
as this:
appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastAddress()));
(getBroadcastRpcAddress
vs getBroadcastAddress
) when we do the assignment in StorageService.
…we always pay the overhead anyways usually multiple times.
… ids aren't the same size.
… the list was made bigger than necessary.
+ "mean_partition_size bigint," | ||
+ "partitions_count bigint," | ||
+ "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") | ||
.gcGraceSeconds(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also not supposed to be here anymore.
I can't close this, but we should close it in favor of #188 which is this rebased yet again. |
closing as 7544 has been committed. |
No description provided.