From 11ea2f5eb685616ce16ea2927d369f1d07a4ece5 Mon Sep 17 00:00:00 2001 From: Abhinay Nagpal Date: Thu, 16 May 2013 15:34:55 -0700 Subject: [PATCH] Added new end to end test for verifying the atomic update is consistent on bootstrap cleaned up code based off last code review --- .../client/protocol/admin/AdminClient.java | 14 +- .../client/protocol/pb/ProtoUtils.java | 12 + .../client/protocol/pb/VAdminProto.java | 474 +++++++++--------- .../admin/AdminServiceRequestHandler.java | 4 +- .../server/rebalance/Rebalancer.java | 21 +- .../store/metadata/MetadataStore.java | 11 + .../store/rebalancing/RedirectingStore.java | 14 +- src/proto/voldemort-admin.proto | 2 +- .../RebalanceMetadataConsistencyTest.java | 4 +- 9 files changed, 286 insertions(+), 270 deletions(-) diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 499982f5a1..00b977650c 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -874,14 +874,10 @@ public void updateRemoteMetadata(int remoteNodeId, String key, Versioned public void updateRemoteMetadata(int remoteNodeId, HashMap> keyValueMap) { - Iterator it = keyValueMap.entrySet().iterator(); - ArrayList allKeyVersions = new ArrayList(); - while(it.hasNext()) { - Map.Entry pairs = (Map.Entry) it.next(); - - String key = (String) pairs.getKey(); - - Versioned value = (Versioned) pairs.getValue(); + ArrayList allKeyVersions = new ArrayList(); + for(Entry> entry: keyValueMap.entrySet()) { + String key = entry.getKey(); + Versioned value = entry.getValue(); ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(key, "UTF-8")); Versioned valueBytes = new Versioned(ByteUtils.getBytes(value.getValue(), @@ -897,7 +893,7 @@ public void updateRemoteMetadata(int remoteNodeId, VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder() .setType(VAdminProto.AdminRequestType.UPDATE_METADATA) .setUpdateMetadata(VAdminProto.UpdateMetadataRequest.newBuilder() - .addAllKeyValue(allKeyVersions) + .addAllMetadataEntry(allKeyVersions) .build()) .build(); diff --git a/src/java/voldemort/client/protocol/pb/ProtoUtils.java b/src/java/voldemort/client/protocol/pb/ProtoUtils.java index 7eef3359b0..ede67945a5 100644 --- a/src/java/voldemort/client/protocol/pb/ProtoUtils.java +++ b/src/java/voldemort/client/protocol/pb/ProtoUtils.java @@ -31,6 +31,7 @@ import voldemort.client.protocol.pb.VAdminProto.PerStorePartitionTuple; import voldemort.client.protocol.pb.VAdminProto.ROStoreVersionDirMap; import voldemort.client.protocol.pb.VAdminProto.RebalancePartitionInfoMap; +import voldemort.client.protocol.pb.VProto.KeyedVersions; import voldemort.client.rebalance.RebalancePartitionsInfo; import voldemort.store.ErrorCodeMapper; import voldemort.utils.ByteArray; @@ -198,6 +199,17 @@ public static Versioned decodeVersioned(VProto.Versioned versioned) { decodeClock(versioned.getVersion())); } + /** + * Given a list of value versions for the metadata keys we are just + * interested in the value at index 0 This is because even if we have to + * update the cluster.xml we marshall a single key into a versioned list + * Hence we just look at the value at index 0 + * + */ + public static Versioned decodeVersionedMetadataKeyValue(KeyedVersions keyValue) { + return decodeVersioned(keyValue.getVersions(0)); + } + public static List> decodeVersions(List versioned) { List> values = new ArrayList>(versioned.size()); for(VProto.Versioned v: versioned) diff --git a/src/java/voldemort/client/protocol/pb/VAdminProto.java b/src/java/voldemort/client/protocol/pb/VAdminProto.java index c124feb9c0..cca0257143 100644 --- a/src/java/voldemort/client/protocol/pb/VAdminProto.java +++ b/src/java/voldemort/client/protocol/pb/VAdminProto.java @@ -821,22 +821,22 @@ public UpdateMetadataRequest getDefaultInstanceForType() { return voldemort.client.protocol.pb.VAdminProto.internal_static_voldemort_UpdateMetadataRequest_fieldAccessorTable; } - // repeated .voldemort.KeyedVersions keyValue = 1; - public static final int KEYVALUE_FIELD_NUMBER = 1; - private java.util.List keyValue_ = + // repeated .voldemort.KeyedVersions metadataEntry = 1; + public static final int METADATAENTRY_FIELD_NUMBER = 1; + private java.util.List metadataEntry_ = java.util.Collections.emptyList(); - public java.util.List getKeyValueList() { - return keyValue_; + public java.util.List getMetadataEntryList() { + return metadataEntry_; } - public int getKeyValueCount() { return keyValue_.size(); } - public voldemort.client.protocol.pb.VProto.KeyedVersions getKeyValue(int index) { - return keyValue_.get(index); + public int getMetadataEntryCount() { return metadataEntry_.size(); } + public voldemort.client.protocol.pb.VProto.KeyedVersions getMetadataEntry(int index) { + return metadataEntry_.get(index); } private void initFields() { } public final boolean isInitialized() { - for (voldemort.client.protocol.pb.VProto.KeyedVersions element : getKeyValueList()) { + for (voldemort.client.protocol.pb.VProto.KeyedVersions element : getMetadataEntryList()) { if (!element.isInitialized()) return false; } return true; @@ -845,7 +845,7 @@ public final boolean isInitialized() { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); - for (voldemort.client.protocol.pb.VProto.KeyedVersions element : getKeyValueList()) { + for (voldemort.client.protocol.pb.VProto.KeyedVersions element : getMetadataEntryList()) { output.writeMessage(1, element); } getUnknownFields().writeTo(output); @@ -857,7 +857,7 @@ public int getSerializedSize() { if (size != -1) return size; size = 0; - for (voldemort.client.protocol.pb.VProto.KeyedVersions element : getKeyValueList()) { + for (voldemort.client.protocol.pb.VProto.KeyedVersions element : getMetadataEntryList()) { size += com.google.protobuf.CodedOutputStream .computeMessageSize(1, element); } @@ -1003,9 +1003,9 @@ public voldemort.client.protocol.pb.VAdminProto.UpdateMetadataRequest buildParti throw new IllegalStateException( "build() has already been called on this Builder."); } - if (result.keyValue_ != java.util.Collections.EMPTY_LIST) { - result.keyValue_ = - java.util.Collections.unmodifiableList(result.keyValue_); + if (result.metadataEntry_ != java.util.Collections.EMPTY_LIST) { + result.metadataEntry_ = + java.util.Collections.unmodifiableList(result.metadataEntry_); } voldemort.client.protocol.pb.VAdminProto.UpdateMetadataRequest returnMe = result; result = null; @@ -1023,11 +1023,11 @@ public Builder mergeFrom(com.google.protobuf.Message other) { public Builder mergeFrom(voldemort.client.protocol.pb.VAdminProto.UpdateMetadataRequest other) { if (other == voldemort.client.protocol.pb.VAdminProto.UpdateMetadataRequest.getDefaultInstance()) return this; - if (!other.keyValue_.isEmpty()) { - if (result.keyValue_.isEmpty()) { - result.keyValue_ = new java.util.ArrayList(); + if (!other.metadataEntry_.isEmpty()) { + if (result.metadataEntry_.isEmpty()) { + result.metadataEntry_ = new java.util.ArrayList(); } - result.keyValue_.addAll(other.keyValue_); + result.metadataEntry_.addAll(other.metadataEntry_); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -1057,7 +1057,7 @@ public Builder mergeFrom( case 10: { voldemort.client.protocol.pb.VProto.KeyedVersions.Builder subBuilder = voldemort.client.protocol.pb.VProto.KeyedVersions.newBuilder(); input.readMessage(subBuilder, extensionRegistry); - addKeyValue(subBuilder.buildPartial()); + addMetadataEntry(subBuilder.buildPartial()); break; } } @@ -1065,54 +1065,54 @@ public Builder mergeFrom( } - // repeated .voldemort.KeyedVersions keyValue = 1; - public java.util.List getKeyValueList() { - return java.util.Collections.unmodifiableList(result.keyValue_); + // repeated .voldemort.KeyedVersions metadataEntry = 1; + public java.util.List getMetadataEntryList() { + return java.util.Collections.unmodifiableList(result.metadataEntry_); } - public int getKeyValueCount() { - return result.getKeyValueCount(); + public int getMetadataEntryCount() { + return result.getMetadataEntryCount(); } - public voldemort.client.protocol.pb.VProto.KeyedVersions getKeyValue(int index) { - return result.getKeyValue(index); + public voldemort.client.protocol.pb.VProto.KeyedVersions getMetadataEntry(int index) { + return result.getMetadataEntry(index); } - public Builder setKeyValue(int index, voldemort.client.protocol.pb.VProto.KeyedVersions value) { + public Builder setMetadataEntry(int index, voldemort.client.protocol.pb.VProto.KeyedVersions value) { if (value == null) { throw new NullPointerException(); } - result.keyValue_.set(index, value); + result.metadataEntry_.set(index, value); return this; } - public Builder setKeyValue(int index, voldemort.client.protocol.pb.VProto.KeyedVersions.Builder builderForValue) { - result.keyValue_.set(index, builderForValue.build()); + public Builder setMetadataEntry(int index, voldemort.client.protocol.pb.VProto.KeyedVersions.Builder builderForValue) { + result.metadataEntry_.set(index, builderForValue.build()); return this; } - public Builder addKeyValue(voldemort.client.protocol.pb.VProto.KeyedVersions value) { + public Builder addMetadataEntry(voldemort.client.protocol.pb.VProto.KeyedVersions value) { if (value == null) { throw new NullPointerException(); } - if (result.keyValue_.isEmpty()) { - result.keyValue_ = new java.util.ArrayList(); + if (result.metadataEntry_.isEmpty()) { + result.metadataEntry_ = new java.util.ArrayList(); } - result.keyValue_.add(value); + result.metadataEntry_.add(value); return this; } - public Builder addKeyValue(voldemort.client.protocol.pb.VProto.KeyedVersions.Builder builderForValue) { - if (result.keyValue_.isEmpty()) { - result.keyValue_ = new java.util.ArrayList(); + public Builder addMetadataEntry(voldemort.client.protocol.pb.VProto.KeyedVersions.Builder builderForValue) { + if (result.metadataEntry_.isEmpty()) { + result.metadataEntry_ = new java.util.ArrayList(); } - result.keyValue_.add(builderForValue.build()); + result.metadataEntry_.add(builderForValue.build()); return this; } - public Builder addAllKeyValue( + public Builder addAllMetadataEntry( java.lang.Iterable values) { - if (result.keyValue_.isEmpty()) { - result.keyValue_ = new java.util.ArrayList(); + if (result.metadataEntry_.isEmpty()) { + result.metadataEntry_ = new java.util.ArrayList(); } - super.addAll(values, result.keyValue_); + super.addAll(values, result.metadataEntry_); return this; } - public Builder clearKeyValue() { - result.keyValue_ = java.util.Collections.emptyList(); + public Builder clearMetadataEntry() { + result.metadataEntry_ = java.util.Collections.emptyList(); return this; } @@ -23164,198 +23164,198 @@ public Builder clearReserveMemory() { "emort-client.proto\"!\n\022GetMetadataRequest" + "\022\013\n\003key\030\001 \002(\014\"]\n\023GetMetadataResponse\022%\n\007" + "version\030\001 \001(\0132\024.voldemort.Versioned\022\037\n\005e" + - "rror\030\002 \001(\0132\020.voldemort.Error\"C\n\025UpdateMe" + - "tadataRequest\022*\n\010keyValue\030\001 \003(\0132\030.voldem" + - "ort.KeyedVersions\"9\n\026UpdateMetadataRespo" + - "nse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"7\n\t" + - "FileEntry\022\021\n\tfile_name\030\001 \002(\t\022\027\n\017file_siz" + - "e_bytes\030\002 \002(\003\"F\n\016PartitionEntry\022\013\n\003key\030\001", - " \002(\014\022\'\n\tversioned\030\002 \002(\0132\024.voldemort.Vers" + - "ioned\"\216\001\n\035UpdatePartitionEntriesRequest\022" + - "\r\n\005store\030\001 \002(\t\0222\n\017partition_entry\030\002 \002(\0132" + - "\031.voldemort.PartitionEntry\022*\n\006filter\030\003 \001" + - "(\0132\032.voldemort.VoldemortFilter\"A\n\036Update" + - "PartitionEntriesResponse\022\037\n\005error\030\001 \001(\0132" + - "\020.voldemort.Error\"-\n\017VoldemortFilter\022\014\n\004" + - "name\030\001 \002(\t\022\014\n\004data\030\002 \002(\014\"\257\001\n\030UpdateSlopE" + - "ntriesRequest\022\r\n\005store\030\001 \002(\t\022\013\n\003key\030\002 \002(" + - "\014\022\'\n\007version\030\003 \002(\0132\026.voldemort.VectorClo", - "ck\022,\n\014request_type\030\004 \002(\0162\026.voldemort.Req" + - "uestType\022\r\n\005value\030\005 \001(\014\022\021\n\ttransform\030\006 \001" + - "(\014\"<\n\031UpdateSlopEntriesResponse\022\037\n\005error" + - "\030\001 \001(\0132\020.voldemort.Error\"d\n\032FetchPartiti" + - "onFilesRequest\022\r\n\005store\030\001 \002(\t\0227\n\024replica" + - "_to_partition\030\002 \003(\0132\031.voldemort.Partitio" + - "nTuple\"\244\002\n\034FetchPartitionEntriesRequest\022" + - "7\n\024replica_to_partition\030\001 \003(\0132\031.voldemor" + - "t.PartitionTuple\022\r\n\005store\030\002 \002(\t\022*\n\006filte" + - "r\030\003 \001(\0132\032.voldemort.VoldemortFilter\022\024\n\014f", - "etch_values\030\004 \001(\010\022*\n\"OBSOLETE__DO_NOT_US" + - "E__skip_records\030\005 \001(\003\022\027\n\017initial_cluster" + - "\030\006 \001(\t\022\026\n\016fetch_orphaned\030\007 \001(\010\022\035\n\025record" + - "s_per_partition\030\010 \001(\003\"\201\001\n\035FetchPartition" + - "EntriesResponse\0222\n\017partition_entry\030\001 \001(\013" + - "2\031.voldemort.PartitionEntry\022\013\n\003key\030\002 \001(\014" + - "\022\037\n\005error\030\003 \001(\0132\020.voldemort.Error\"\254\001\n\035De" + - "letePartitionEntriesRequest\022\r\n\005store\030\001 \002" + - "(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031.volde" + - "mort.PartitionTuple\022*\n\006filter\030\003 \001(\0132\032.vo", - "ldemort.VoldemortFilter\022\027\n\017initial_clust" + - "er\030\004 \001(\t\"P\n\036DeletePartitionEntriesRespon" + - "se\022\r\n\005count\030\001 \001(\003\022\037\n\005error\030\002 \001(\0132\020.volde" + - "mort.Error\"\317\001\n\035InitiateFetchAndUpdateReq" + - "uest\022\017\n\007node_id\030\001 \002(\005\022\r\n\005store\030\002 \002(\t\022*\n\006" + - "filter\030\003 \001(\0132\032.voldemort.VoldemortFilter" + - "\0227\n\024replica_to_partition\030\004 \003(\0132\031.voldemo" + - "rt.PartitionTuple\022\027\n\017initial_cluster\030\005 \001" + - "(\t\022\020\n\010optimize\030\006 \001(\010\"1\n\033AsyncOperationSt" + - "atusRequest\022\022\n\nrequest_id\030\001 \002(\005\"/\n\031Async", - "OperationStopRequest\022\022\n\nrequest_id\030\001 \002(\005" + - "\"=\n\032AsyncOperationStopResponse\022\037\n\005error\030" + - "\001 \001(\0132\020.voldemort.Error\"2\n\031AsyncOperatio" + - "nListRequest\022\025\n\rshow_complete\030\002 \002(\010\"R\n\032A" + - "syncOperationListResponse\022\023\n\013request_ids" + - "\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"" + - ":\n\016PartitionTuple\022\024\n\014replica_type\030\001 \002(\005\022" + - "\022\n\npartitions\030\002 \003(\005\"e\n\026PerStorePartition" + - "Tuple\022\022\n\nstore_name\030\001 \002(\t\0227\n\024replica_to_" + - "partition\030\002 \003(\0132\031.voldemort.PartitionTup", - "le\"\370\001\n\031RebalancePartitionInfoMap\022\022\n\nstea" + - "ler_id\030\001 \002(\005\022\020\n\010donor_id\030\002 \002(\005\022\017\n\007attemp" + - "t\030\003 \002(\005\022C\n\030replica_to_add_partition\030\004 \003(" + - "\0132!.voldemort.PerStorePartitionTuple\022F\n\033" + - "replica_to_delete_partition\030\005 \003(\0132!.vold" + - "emort.PerStorePartitionTuple\022\027\n\017initial_" + - "cluster\030\006 \002(\t\"f\n\034InitiateRebalanceNodeRe" + - "quest\022F\n\030rebalance_partition_info\030\001 \002(\0132" + - "$.voldemort.RebalancePartitionInfoMap\"m\n" + - "#InitiateRebalanceNodeOnDonorRequest\022F\n\030", - "rebalance_partition_info\030\001 \003(\0132$.voldemo" + - "rt.RebalancePartitionInfoMap\"\212\001\n\034AsyncOp" + - "erationStatusResponse\022\022\n\nrequest_id\030\001 \001(" + - "\005\022\023\n\013description\030\002 \001(\t\022\016\n\006status\030\003 \001(\t\022\020" + - "\n\010complete\030\004 \001(\010\022\037\n\005error\030\005 \001(\0132\020.voldem" + - "ort.Error\"\'\n\026TruncateEntriesRequest\022\r\n\005s" + - "tore\030\001 \002(\t\":\n\027TruncateEntriesResponse\022\037\n" + - "\005error\030\001 \001(\0132\020.voldemort.Error\"*\n\017AddSto" + - "reRequest\022\027\n\017storeDefinition\030\001 \002(\t\"3\n\020Ad" + - "dStoreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemor", - "t.Error\"\'\n\022DeleteStoreRequest\022\021\n\tstoreNa" + - "me\030\001 \002(\t\"6\n\023DeleteStoreResponse\022\037\n\005error" + - "\030\001 \001(\0132\020.voldemort.Error\"P\n\021FetchStoreRe" + - "quest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002" + - " \002(\t\022\024\n\014push_version\030\003 \001(\003\"9\n\020SwapStoreR" + - "equest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030" + - "\002 \002(\t\"P\n\021SwapStoreResponse\022\037\n\005error\030\001 \001(" + - "\0132\020.voldemort.Error\022\032\n\022previous_store_di" + - "r\030\002 \001(\t\"@\n\024RollbackStoreRequest\022\022\n\nstore" + - "_name\030\001 \002(\t\022\024\n\014push_version\030\002 \002(\003\"8\n\025Rol", - "lbackStoreResponse\022\037\n\005error\030\001 \001(\0132\020.vold" + - "emort.Error\"&\n\020RepairJobRequest\022\022\n\nstore" + - "_name\030\001 \001(\t\"4\n\021RepairJobResponse\022\037\n\005erro" + - "r\030\001 \001(\0132\020.voldemort.Error\"=\n\024ROStoreVers" + - "ionDirMap\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_d" + - "ir\030\002 \002(\t\"/\n\031GetROMaxVersionDirRequest\022\022\n" + - "\nstore_name\030\001 \003(\t\"y\n\032GetROMaxVersionDirR" + - "esponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.vol" + - "demort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001" + - "(\0132\020.voldemort.Error\"3\n\035GetROCurrentVers", - "ionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"}\n\036Get" + - "ROCurrentVersionDirResponse\022:\n\021ro_store_" + - "versions\030\001 \003(\0132\037.voldemort.ROStoreVersio" + - "nDirMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error" + - "\"/\n\031GetROStorageFormatRequest\022\022\n\nstore_n" + - "ame\030\001 \003(\t\"y\n\032GetROStorageFormatResponse\022" + - ":\n\021ro_store_versions\030\001 \003(\0132\037.voldemort.R" + - "OStoreVersionDirMap\022\037\n\005error\030\002 \001(\0132\020.vol" + - "demort.Error\"@\n\027FailedFetchStoreRequest\022" + - "\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\";", - "\n\030FailedFetchStoreResponse\022\037\n\005error\030\001 \001(" + - "\0132\020.voldemort.Error\"\375\001\n\033RebalanceStateCh" + - "angeRequest\022K\n\035rebalance_partition_info_" + - "list\030\001 \003(\0132$.voldemort.RebalancePartitio" + - "nInfoMap\022\026\n\016cluster_string\030\002 \002(\t\022\025\n\rstor" + - "es_string\030\003 \002(\t\022\017\n\007swap_ro\030\004 \002(\010\022\037\n\027chan" + - "ge_cluster_metadata\030\005 \002(\010\022\036\n\026change_reba" + - "lance_state\030\006 \002(\010\022\020\n\010rollback\030\007 \002(\010\"?\n\034R" + - "ebalanceStateChangeResponse\022\037\n\005error\030\001 \001" + - "(\0132\020.voldemort.Error\"G\n DeleteStoreRebal", - "anceStateRequest\022\022\n\nstore_name\030\001 \002(\t\022\017\n\007" + - "node_id\030\002 \002(\005\"D\n!DeleteStoreRebalanceSta" + - "teResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Er" + - "ror\"h\n\023NativeBackupRequest\022\022\n\nstore_name" + - "\030\001 \002(\t\022\022\n\nbackup_dir\030\002 \002(\t\022\024\n\014verify_fil" + - "es\030\003 \002(\010\022\023\n\013incremental\030\004 \002(\010\">\n\024Reserve" + - "MemoryRequest\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nsiz" + - "e_in_mb\030\002 \002(\003\"8\n\025ReserveMemoryResponse\022\037" + - "\n\005error\030\001 \001(\0132\020.voldemort.Error\"\360\016\n\025Vold" + - "emortAdminRequest\022)\n\004type\030\001 \002(\0162\033.voldem", - "ort.AdminRequestType\0223\n\014get_metadata\030\002 \001" + - "(\0132\035.voldemort.GetMetadataRequest\0229\n\017upd" + - "ate_metadata\030\003 \001(\0132 .voldemort.UpdateMet" + - "adataRequest\022J\n\030update_partition_entries" + - "\030\004 \001(\0132(.voldemort.UpdatePartitionEntrie" + - "sRequest\022H\n\027fetch_partition_entries\030\005 \001(" + - "\0132\'.voldemort.FetchPartitionEntriesReque" + - "st\022J\n\030delete_partition_entries\030\006 \001(\0132(.v" + - "oldemort.DeletePartitionEntriesRequest\022K" + - "\n\031initiate_fetch_and_update\030\007 \001(\0132(.vold", - "emort.InitiateFetchAndUpdateRequest\022F\n\026a" + - "sync_operation_status\030\010 \001(\0132&.voldemort." + - "AsyncOperationStatusRequest\022H\n\027initiate_" + - "rebalance_node\030\t \001(\0132\'.voldemort.Initiat" + - "eRebalanceNodeRequest\022B\n\024async_operation" + - "_stop\030\n \001(\0132$.voldemort.AsyncOperationSt" + - "opRequest\022B\n\024async_operation_list\030\013 \001(\0132" + - "$.voldemort.AsyncOperationListRequest\022;\n" + - "\020truncate_entries\030\014 \001(\0132!.voldemort.Trun" + - "cateEntriesRequest\022-\n\tadd_store\030\r \001(\0132\032.", - "voldemort.AddStoreRequest\0223\n\014delete_stor" + - "e\030\016 \001(\0132\035.voldemort.DeleteStoreRequest\0221" + - "\n\013fetch_store\030\017 \001(\0132\034.voldemort.FetchSto" + - "reRequest\022/\n\nswap_store\030\020 \001(\0132\033.voldemor" + - "t.SwapStoreRequest\0227\n\016rollback_store\030\021 \001" + - "(\0132\037.voldemort.RollbackStoreRequest\022D\n\026g" + - "et_ro_max_version_dir\030\022 \001(\0132$.voldemort." + - "GetROMaxVersionDirRequest\022L\n\032get_ro_curr" + - "ent_version_dir\030\023 \001(\0132(.voldemort.GetROC" + - "urrentVersionDirRequest\022D\n\025fetch_partiti", - "on_files\030\024 \001(\0132%.voldemort.FetchPartitio" + - "nFilesRequest\022@\n\023update_slop_entries\030\026 \001" + - "(\0132#.voldemort.UpdateSlopEntriesRequest\022" + - ">\n\022failed_fetch_store\030\030 \001(\0132\".voldemort." + - "FailedFetchStoreRequest\022C\n\025get_ro_storag" + - "e_format\030\031 \001(\0132$.voldemort.GetROStorageF" + - "ormatRequest\022F\n\026rebalance_state_change\030\032" + - " \001(\0132&.voldemort.RebalanceStateChangeReq" + - "uest\022/\n\nrepair_job\030\033 \001(\0132\033.voldemort.Rep" + - "airJobRequest\022X\n initiate_rebalance_node", - "_on_donor\030\034 \001(\0132..voldemort.InitiateReba" + - "lanceNodeOnDonorRequest\022Q\n\034delete_store_" + - "rebalance_state\030\035 \001(\0132+.voldemort.Delete" + - "StoreRebalanceStateRequest\0225\n\rnative_bac" + - "kup\030\036 \001(\0132\036.voldemort.NativeBackupReques" + - "t\0227\n\016reserve_memory\030\037 \001(\0132\037.voldemort.Re" + - "serveMemoryRequest*\310\005\n\020AdminRequestType\022" + - "\020\n\014GET_METADATA\020\000\022\023\n\017UPDATE_METADATA\020\001\022\034" + - "\n\030UPDATE_PARTITION_ENTRIES\020\002\022\033\n\027FETCH_PA" + - "RTITION_ENTRIES\020\003\022\034\n\030DELETE_PARTITION_EN", - "TRIES\020\004\022\035\n\031INITIATE_FETCH_AND_UPDATE\020\005\022\032" + - "\n\026ASYNC_OPERATION_STATUS\020\006\022\033\n\027INITIATE_R" + - "EBALANCE_NODE\020\007\022\030\n\024ASYNC_OPERATION_STOP\020" + - "\010\022\030\n\024ASYNC_OPERATION_LIST\020\t\022\024\n\020TRUNCATE_" + - "ENTRIES\020\n\022\r\n\tADD_STORE\020\013\022\020\n\014DELETE_STORE" + - "\020\014\022\017\n\013FETCH_STORE\020\r\022\016\n\nSWAP_STORE\020\016\022\022\n\016R" + - "OLLBACK_STORE\020\017\022\032\n\026GET_RO_MAX_VERSION_DI" + - "R\020\020\022\036\n\032GET_RO_CURRENT_VERSION_DIR\020\021\022\031\n\025F" + - "ETCH_PARTITION_FILES\020\022\022\027\n\023UPDATE_SLOP_EN" + - "TRIES\020\024\022\026\n\022FAILED_FETCH_STORE\020\026\022\031\n\025GET_R", - "O_STORAGE_FORMAT\020\027\022\032\n\026REBALANCE_STATE_CH" + - "ANGE\020\030\022\016\n\nREPAIR_JOB\020\031\022$\n INITIATE_REBAL" + - "ANCE_NODE_ON_DONOR\020\032\022 \n\034DELETE_STORE_REB" + - "ALANCE_STATE\020\033\022\021\n\rNATIVE_BACKUP\020\034\022\022\n\016RES" + - "ERVE_MEMORY\020\035B-\n\034voldemort.client.protoc" + - "ol.pbB\013VAdminProtoH\001" + "rror\030\002 \001(\0132\020.voldemort.Error\"H\n\025UpdateMe" + + "tadataRequest\022/\n\rmetadataEntry\030\001 \003(\0132\030.v" + + "oldemort.KeyedVersions\"9\n\026UpdateMetadata" + + "Response\022\037\n\005error\030\001 \001(\0132\020.voldemort.Erro" + + "r\"7\n\tFileEntry\022\021\n\tfile_name\030\001 \002(\t\022\027\n\017fil" + + "e_size_bytes\030\002 \002(\003\"F\n\016PartitionEntry\022\013\n\003", + "key\030\001 \002(\014\022\'\n\tversioned\030\002 \002(\0132\024.voldemort" + + ".Versioned\"\216\001\n\035UpdatePartitionEntriesReq" + + "uest\022\r\n\005store\030\001 \002(\t\0222\n\017partition_entry\030\002" + + " \002(\0132\031.voldemort.PartitionEntry\022*\n\006filte" + + "r\030\003 \001(\0132\032.voldemort.VoldemortFilter\"A\n\036U" + + "pdatePartitionEntriesResponse\022\037\n\005error\030\001" + + " \001(\0132\020.voldemort.Error\"-\n\017VoldemortFilte" + + "r\022\014\n\004name\030\001 \002(\t\022\014\n\004data\030\002 \002(\014\"\257\001\n\030Update" + + "SlopEntriesRequest\022\r\n\005store\030\001 \002(\t\022\013\n\003key" + + "\030\002 \002(\014\022\'\n\007version\030\003 \002(\0132\026.voldemort.Vect", + "orClock\022,\n\014request_type\030\004 \002(\0162\026.voldemor" + + "t.RequestType\022\r\n\005value\030\005 \001(\014\022\021\n\ttransfor" + + "m\030\006 \001(\014\"<\n\031UpdateSlopEntriesResponse\022\037\n\005" + + "error\030\001 \001(\0132\020.voldemort.Error\"d\n\032FetchPa" + + "rtitionFilesRequest\022\r\n\005store\030\001 \002(\t\0227\n\024re" + + "plica_to_partition\030\002 \003(\0132\031.voldemort.Par" + + "titionTuple\"\244\002\n\034FetchPartitionEntriesReq" + + "uest\0227\n\024replica_to_partition\030\001 \003(\0132\031.vol" + + "demort.PartitionTuple\022\r\n\005store\030\002 \002(\t\022*\n\006" + + "filter\030\003 \001(\0132\032.voldemort.VoldemortFilter", + "\022\024\n\014fetch_values\030\004 \001(\010\022*\n\"OBSOLETE__DO_N" + + "OT_USE__skip_records\030\005 \001(\003\022\027\n\017initial_cl" + + "uster\030\006 \001(\t\022\026\n\016fetch_orphaned\030\007 \001(\010\022\035\n\025r" + + "ecords_per_partition\030\010 \001(\003\"\201\001\n\035FetchPart" + + "itionEntriesResponse\0222\n\017partition_entry\030" + + "\001 \001(\0132\031.voldemort.PartitionEntry\022\013\n\003key\030" + + "\002 \001(\014\022\037\n\005error\030\003 \001(\0132\020.voldemort.Error\"\254" + + "\001\n\035DeletePartitionEntriesRequest\022\r\n\005stor" + + "e\030\001 \002(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031." + + "voldemort.PartitionTuple\022*\n\006filter\030\003 \001(\013", + "2\032.voldemort.VoldemortFilter\022\027\n\017initial_" + + "cluster\030\004 \001(\t\"P\n\036DeletePartitionEntriesR" + + "esponse\022\r\n\005count\030\001 \001(\003\022\037\n\005error\030\002 \001(\0132\020." + + "voldemort.Error\"\317\001\n\035InitiateFetchAndUpda" + + "teRequest\022\017\n\007node_id\030\001 \002(\005\022\r\n\005store\030\002 \002(" + + "\t\022*\n\006filter\030\003 \001(\0132\032.voldemort.VoldemortF" + + "ilter\0227\n\024replica_to_partition\030\004 \003(\0132\031.vo" + + "ldemort.PartitionTuple\022\027\n\017initial_cluste" + + "r\030\005 \001(\t\022\020\n\010optimize\030\006 \001(\010\"1\n\033AsyncOperat" + + "ionStatusRequest\022\022\n\nrequest_id\030\001 \002(\005\"/\n\031", + "AsyncOperationStopRequest\022\022\n\nrequest_id\030" + + "\001 \002(\005\"=\n\032AsyncOperationStopResponse\022\037\n\005e" + + "rror\030\001 \001(\0132\020.voldemort.Error\"2\n\031AsyncOpe" + + "rationListRequest\022\025\n\rshow_complete\030\002 \002(\010" + + "\"R\n\032AsyncOperationListResponse\022\023\n\013reques" + + "t_ids\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132\020.voldemort.E" + + "rror\":\n\016PartitionTuple\022\024\n\014replica_type\030\001" + + " \002(\005\022\022\n\npartitions\030\002 \003(\005\"e\n\026PerStorePart" + + "itionTuple\022\022\n\nstore_name\030\001 \002(\t\0227\n\024replic" + + "a_to_partition\030\002 \003(\0132\031.voldemort.Partiti", + "onTuple\"\370\001\n\031RebalancePartitionInfoMap\022\022\n" + + "\nstealer_id\030\001 \002(\005\022\020\n\010donor_id\030\002 \002(\005\022\017\n\007a" + + "ttempt\030\003 \002(\005\022C\n\030replica_to_add_partition" + + "\030\004 \003(\0132!.voldemort.PerStorePartitionTupl" + + "e\022F\n\033replica_to_delete_partition\030\005 \003(\0132!" + + ".voldemort.PerStorePartitionTuple\022\027\n\017ini" + + "tial_cluster\030\006 \002(\t\"f\n\034InitiateRebalanceN" + + "odeRequest\022F\n\030rebalance_partition_info\030\001" + + " \002(\0132$.voldemort.RebalancePartitionInfoM" + + "ap\"m\n#InitiateRebalanceNodeOnDonorReques", + "t\022F\n\030rebalance_partition_info\030\001 \003(\0132$.vo" + + "ldemort.RebalancePartitionInfoMap\"\212\001\n\034As" + + "yncOperationStatusResponse\022\022\n\nrequest_id" + + "\030\001 \001(\005\022\023\n\013description\030\002 \001(\t\022\016\n\006status\030\003 " + + "\001(\t\022\020\n\010complete\030\004 \001(\010\022\037\n\005error\030\005 \001(\0132\020.v" + + "oldemort.Error\"\'\n\026TruncateEntriesRequest" + + "\022\r\n\005store\030\001 \002(\t\":\n\027TruncateEntriesRespon" + + "se\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"*\n\017A" + + "ddStoreRequest\022\027\n\017storeDefinition\030\001 \002(\t\"" + + "3\n\020AddStoreResponse\022\037\n\005error\030\001 \001(\0132\020.vol", + "demort.Error\"\'\n\022DeleteStoreRequest\022\021\n\tst" + + "oreName\030\001 \002(\t\"6\n\023DeleteStoreResponse\022\037\n\005" + + "error\030\001 \001(\0132\020.voldemort.Error\"P\n\021FetchSt" + + "oreRequest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_" + + "dir\030\002 \002(\t\022\024\n\014push_version\030\003 \001(\003\"9\n\020SwapS" + + "toreRequest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore" + + "_dir\030\002 \002(\t\"P\n\021SwapStoreResponse\022\037\n\005error" + + "\030\001 \001(\0132\020.voldemort.Error\022\032\n\022previous_sto" + + "re_dir\030\002 \001(\t\"@\n\024RollbackStoreRequest\022\022\n\n" + + "store_name\030\001 \002(\t\022\024\n\014push_version\030\002 \002(\003\"8", + "\n\025RollbackStoreResponse\022\037\n\005error\030\001 \001(\0132\020" + + ".voldemort.Error\"&\n\020RepairJobRequest\022\022\n\n" + + "store_name\030\001 \001(\t\"4\n\021RepairJobResponse\022\037\n" + + "\005error\030\001 \001(\0132\020.voldemort.Error\"=\n\024ROStor" + + "eVersionDirMap\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tst" + + "ore_dir\030\002 \002(\t\"/\n\031GetROMaxVersionDirReque" + + "st\022\022\n\nstore_name\030\001 \003(\t\"y\n\032GetROMaxVersio" + + "nDirResponse\022:\n\021ro_store_versions\030\001 \003(\0132" + + "\037.voldemort.ROStoreVersionDirMap\022\037\n\005erro" + + "r\030\002 \001(\0132\020.voldemort.Error\"3\n\035GetROCurren", + "tVersionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"}" + + "\n\036GetROCurrentVersionDirResponse\022:\n\021ro_s" + + "tore_versions\030\001 \003(\0132\037.voldemort.ROStoreV" + + "ersionDirMap\022\037\n\005error\030\002 \001(\0132\020.voldemort." + + "Error\"/\n\031GetROStorageFormatRequest\022\022\n\nst" + + "ore_name\030\001 \003(\t\"y\n\032GetROStorageFormatResp" + + "onse\022:\n\021ro_store_versions\030\001 \003(\0132\037.voldem" + + "ort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(\0132" + + "\020.voldemort.Error\"@\n\027FailedFetchStoreReq" + + "uest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 ", + "\002(\t\";\n\030FailedFetchStoreResponse\022\037\n\005error" + + "\030\001 \001(\0132\020.voldemort.Error\"\375\001\n\033RebalanceSt" + + "ateChangeRequest\022K\n\035rebalance_partition_" + + "info_list\030\001 \003(\0132$.voldemort.RebalancePar" + + "titionInfoMap\022\026\n\016cluster_string\030\002 \002(\t\022\025\n" + + "\rstores_string\030\003 \002(\t\022\017\n\007swap_ro\030\004 \002(\010\022\037\n" + + "\027change_cluster_metadata\030\005 \002(\010\022\036\n\026change" + + "_rebalance_state\030\006 \002(\010\022\020\n\010rollback\030\007 \002(\010" + + "\"?\n\034RebalanceStateChangeResponse\022\037\n\005erro" + + "r\030\001 \001(\0132\020.voldemort.Error\"G\n DeleteStore", + "RebalanceStateRequest\022\022\n\nstore_name\030\001 \002(" + + "\t\022\017\n\007node_id\030\002 \002(\005\"D\n!DeleteStoreRebalan" + + "ceStateResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo" + + "rt.Error\"h\n\023NativeBackupRequest\022\022\n\nstore" + + "_name\030\001 \002(\t\022\022\n\nbackup_dir\030\002 \002(\t\022\024\n\014verif" + + "y_files\030\003 \002(\010\022\023\n\013incremental\030\004 \002(\010\">\n\024Re" + + "serveMemoryRequest\022\022\n\nstore_name\030\001 \002(\t\022\022" + + "\n\nsize_in_mb\030\002 \002(\003\"8\n\025ReserveMemoryRespo" + + "nse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"\360\016\n" + + "\025VoldemortAdminRequest\022)\n\004type\030\001 \002(\0162\033.v", + "oldemort.AdminRequestType\0223\n\014get_metadat" + + "a\030\002 \001(\0132\035.voldemort.GetMetadataRequest\0229" + + "\n\017update_metadata\030\003 \001(\0132 .voldemort.Upda" + + "teMetadataRequest\022J\n\030update_partition_en" + + "tries\030\004 \001(\0132(.voldemort.UpdatePartitionE" + + "ntriesRequest\022H\n\027fetch_partition_entries" + + "\030\005 \001(\0132\'.voldemort.FetchPartitionEntries" + + "Request\022J\n\030delete_partition_entries\030\006 \001(" + + "\0132(.voldemort.DeletePartitionEntriesRequ" + + "est\022K\n\031initiate_fetch_and_update\030\007 \001(\0132(", + ".voldemort.InitiateFetchAndUpdateRequest" + + "\022F\n\026async_operation_status\030\010 \001(\0132&.volde" + + "mort.AsyncOperationStatusRequest\022H\n\027init" + + "iate_rebalance_node\030\t \001(\0132\'.voldemort.In" + + "itiateRebalanceNodeRequest\022B\n\024async_oper" + + "ation_stop\030\n \001(\0132$.voldemort.AsyncOperat" + + "ionStopRequest\022B\n\024async_operation_list\030\013" + + " \001(\0132$.voldemort.AsyncOperationListReque" + + "st\022;\n\020truncate_entries\030\014 \001(\0132!.voldemort" + + ".TruncateEntriesRequest\022-\n\tadd_store\030\r \001", + "(\0132\032.voldemort.AddStoreRequest\0223\n\014delete" + + "_store\030\016 \001(\0132\035.voldemort.DeleteStoreRequ" + + "est\0221\n\013fetch_store\030\017 \001(\0132\034.voldemort.Fet" + + "chStoreRequest\022/\n\nswap_store\030\020 \001(\0132\033.vol" + + "demort.SwapStoreRequest\0227\n\016rollback_stor" + + "e\030\021 \001(\0132\037.voldemort.RollbackStoreRequest" + + "\022D\n\026get_ro_max_version_dir\030\022 \001(\0132$.volde" + + "mort.GetROMaxVersionDirRequest\022L\n\032get_ro" + + "_current_version_dir\030\023 \001(\0132(.voldemort.G" + + "etROCurrentVersionDirRequest\022D\n\025fetch_pa", + "rtition_files\030\024 \001(\0132%.voldemort.FetchPar" + + "titionFilesRequest\022@\n\023update_slop_entrie" + + "s\030\026 \001(\0132#.voldemort.UpdateSlopEntriesReq" + + "uest\022>\n\022failed_fetch_store\030\030 \001(\0132\".volde" + + "mort.FailedFetchStoreRequest\022C\n\025get_ro_s" + + "torage_format\030\031 \001(\0132$.voldemort.GetROSto" + + "rageFormatRequest\022F\n\026rebalance_state_cha" + + "nge\030\032 \001(\0132&.voldemort.RebalanceStateChan" + + "geRequest\022/\n\nrepair_job\030\033 \001(\0132\033.voldemor" + + "t.RepairJobRequest\022X\n initiate_rebalance", + "_node_on_donor\030\034 \001(\0132..voldemort.Initiat" + + "eRebalanceNodeOnDonorRequest\022Q\n\034delete_s" + + "tore_rebalance_state\030\035 \001(\0132+.voldemort.D" + + "eleteStoreRebalanceStateRequest\0225\n\rnativ" + + "e_backup\030\036 \001(\0132\036.voldemort.NativeBackupR" + + "equest\0227\n\016reserve_memory\030\037 \001(\0132\037.voldemo" + + "rt.ReserveMemoryRequest*\310\005\n\020AdminRequest" + + "Type\022\020\n\014GET_METADATA\020\000\022\023\n\017UPDATE_METADAT" + + "A\020\001\022\034\n\030UPDATE_PARTITION_ENTRIES\020\002\022\033\n\027FET" + + "CH_PARTITION_ENTRIES\020\003\022\034\n\030DELETE_PARTITI", + "ON_ENTRIES\020\004\022\035\n\031INITIATE_FETCH_AND_UPDAT" + + "E\020\005\022\032\n\026ASYNC_OPERATION_STATUS\020\006\022\033\n\027INITI" + + "ATE_REBALANCE_NODE\020\007\022\030\n\024ASYNC_OPERATION_" + + "STOP\020\010\022\030\n\024ASYNC_OPERATION_LIST\020\t\022\024\n\020TRUN" + + "CATE_ENTRIES\020\n\022\r\n\tADD_STORE\020\013\022\020\n\014DELETE_" + + "STORE\020\014\022\017\n\013FETCH_STORE\020\r\022\016\n\nSWAP_STORE\020\016" + + "\022\022\n\016ROLLBACK_STORE\020\017\022\032\n\026GET_RO_MAX_VERSI" + + "ON_DIR\020\020\022\036\n\032GET_RO_CURRENT_VERSION_DIR\020\021" + + "\022\031\n\025FETCH_PARTITION_FILES\020\022\022\027\n\023UPDATE_SL" + + "OP_ENTRIES\020\024\022\026\n\022FAILED_FETCH_STORE\020\026\022\031\n\025", + "GET_RO_STORAGE_FORMAT\020\027\022\032\n\026REBALANCE_STA" + + "TE_CHANGE\020\030\022\016\n\nREPAIR_JOB\020\031\022$\n INITIATE_" + + "REBALANCE_NODE_ON_DONOR\020\032\022 \n\034DELETE_STOR" + + "E_REBALANCE_STATE\020\033\022\021\n\rNATIVE_BACKUP\020\034\022\022" + + "\n\016RESERVE_MEMORY\020\035B-\n\034voldemort.client.p" + + "rotocol.pbB\013VAdminProtoH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23383,7 +23383,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_voldemort_UpdateMetadataRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_voldemort_UpdateMetadataRequest_descriptor, - new java.lang.String[] { "KeyValue", }, + new java.lang.String[] { "MetadataEntry", }, voldemort.client.protocol.pb.VAdminProto.UpdateMetadataRequest.class, voldemort.client.protocol.pb.VAdminProto.UpdateMetadataRequest.Builder.class); internal_static_voldemort_UpdateMetadataResponse_descriptor = diff --git a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java index 6078d3a02c..db0313e8fb 100644 --- a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java @@ -1146,13 +1146,13 @@ public VAdminProto.UpdateMetadataResponse handleUpdateMetadata(VAdminProto.Updat metadataStore.writeLock.lock(); try { - for(KeyedVersions keyValue: request.getKeyValueList()) { + for(KeyedVersions keyValue: request.getMetadataEntryList()) { try { ByteArray key = ProtoUtils.decodeBytes(keyValue.getKey()); String keyString = ByteUtils.getString(key.get(), "UTF-8"); if(MetadataStore.METADATA_KEYS.contains(keyString)) { - Versioned versionedValue = ProtoUtils.decodeVersioned(keyValue.getVersions(0)); + Versioned versionedValue = ProtoUtils.decodeVersionedMetadataKeyValue(keyValue); logger.info("Updating metadata for key '" + keyString + "'"); metadataStore.put(new ByteArray(ByteUtils.getBytes(keyString, "UTF-8")), versionedValue, diff --git a/src/java/voldemort/server/rebalance/Rebalancer.java b/src/java/voldemort/server/rebalance/Rebalancer.java index c8a722ffc0..c3cb4f221e 100644 --- a/src/java/voldemort/server/rebalance/Rebalancer.java +++ b/src/java/voldemort/server/rebalance/Rebalancer.java @@ -140,8 +140,8 @@ public synchronized void releaseRebalancingPermit(int nodeId) { * @param cluster Cluster metadata to change * @param rebalancePartitionsInfo List of rebalance partitions info * @param swapRO Boolean to indicate swapping of RO store - * @param changeClusterMetadata Boolean to indicate a change of cluster - * metadata + * @param changeClusterAndStoresMetadata Boolean to indicate a change of + * cluster metadata * @param changeRebalanceState Boolean to indicate a change in rebalance * state * @param rollback Boolean to indicate that we are rolling back or not @@ -150,21 +150,21 @@ public void rebalanceStateChange(Cluster cluster, List storeDefs, List rebalancePartitionsInfo, boolean swapRO, - boolean changeClusterMetadata, + boolean changeClusterAndStoresMetadata, boolean changeRebalanceState, boolean rollback) { Cluster currentCluster = metadataStore.getCluster(); List currentStoreDefs = metadataStore.getStoreDefList(); logger.info("Server doing rebalance state change with options [ cluster metadata change - " - + changeClusterMetadata + " ], [ changing rebalancing state - " + + changeClusterAndStoresMetadata + " ], [ changing rebalancing state - " + changeRebalanceState + " ], [ changing swapping RO - " + swapRO + " ], [ rollback - " + rollback + " ]"); // Variables to track what has completed List completedRebalancePartitionsInfo = Lists.newArrayList(); List swappedStoreNames = Lists.newArrayList(); - boolean completedClusterChange = false; + boolean completedClusterAndStoresChange = false; boolean completedRebalanceSourceClusterChange = false; Cluster previousRebalancingSourceCluster = null; List previousRebalancingSourceStores = null; @@ -239,7 +239,7 @@ public void rebalanceStateChange(Cluster cluster, } // CHANGE CLUSTER METADATA AND STORE METADATA - if(changeClusterMetadata) { + if(changeClusterAndStoresMetadata) { logger.info("Switching cluster metadata from " + currentCluster + " to " + cluster); logger.info("Switching stores metadata from " + currentStoreDefs + " to " + storeDefs); @@ -248,7 +248,8 @@ public void rebalanceStateChange(Cluster cluster, MetadataStore.STORES_KEY, storeDefs); - completedClusterChange = true; + completedClusterAndStoresChange = true; + } // SWAP RO DATA FOR ALL STORES @@ -261,7 +262,7 @@ public void rebalanceStateChange(Cluster cluster, logger.error("Got exception while changing state, now rolling back changes", e); // ROLLBACK CLUSTER AND STORES CHANGE - if(completedClusterChange) { + if(completedClusterAndStoresChange) { try { logger.info("Rolling back cluster.xml to " + currentCluster); logger.info("Rolling back stores.xml to " + currentStoreDefs); @@ -270,8 +271,8 @@ public void rebalanceStateChange(Cluster cluster, MetadataStore.STORES_KEY, currentStoreDefs); } catch(Exception exception) { - logger.error("Error while rolling back cluster metadata to " + currentCluster, - exception); + logger.error("Error while rolling back cluster metadata to " + currentCluster + + " Stores metadata to " + currentStoreDefs, exception); } } diff --git a/src/java/voldemort/store/metadata/MetadataStore.java b/src/java/voldemort/store/metadata/MetadataStore.java index 1f77e0a951..b72afc78e0 100644 --- a/src/java/voldemort/store/metadata/MetadataStore.java +++ b/src/java/voldemort/store/metadata/MetadataStore.java @@ -413,6 +413,17 @@ public StoreDefinition getStoreDef(String storeName) { } } + public VoldemortState getServerStateLocked() { + // acquire read lock + readLock.lock(); + try { + return VoldemortState.valueOf(metadataCache.get(SERVER_STATE_KEY).getValue().toString()); + } finally { + readLock.unlock(); + + } + } + public VoldemortState getServerState() { // acquire read lock readLock.lock(); diff --git a/src/java/voldemort/store/rebalancing/RedirectingStore.java b/src/java/voldemort/store/rebalancing/RedirectingStore.java index b311270f3c..f7770e5c83 100644 --- a/src/java/voldemort/store/rebalancing/RedirectingStore.java +++ b/src/java/voldemort/store/rebalancing/RedirectingStore.java @@ -463,10 +463,7 @@ private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan, } if(sourceStoreDefs == null) { /* - * This is more for defensive coding purposes. The update of the - * source stores key happens before the server is put in REBALANCING - * mode and is reset to null after the server goes back to NORMAL - * mode. + * similar to the above for stores xml */ if(logger.isTraceEnabled()) { @@ -477,15 +474,12 @@ private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan, } StoreDefinition sourceStoreDef = null; - for(StoreDefinition sourceStoreDefIt: sourceStoreDefs) { - if(storeDef.getName().equals(sourceStoreDefIt.getName())) - sourceStoreDef = storeDef; - } + sourceStoreDef = StoreUtils.getStoreDef(sourceStoreDefs, storeDef.getName()); + Integer nodeId = metadata.getNodeId(); Integer zoneId = currentRoutingPlan.getCluster().getNodeById(nodeId).getZoneId(); - // Use the old store definition to get the routing object instead of the - // new one + // Use the old store definition to get the routing object StoreRoutingPlan oldRoutingPlan = new StoreRoutingPlan(sourceCluster, sourceStoreDef); // Check the current node's relationship to the key. int zoneReplicaType = currentRoutingPlan.getZoneReplicaType(zoneId, nodeId, key); diff --git a/src/proto/voldemort-admin.proto b/src/proto/voldemort-admin.proto index 01b6a28719..59613c6fb1 100644 --- a/src/proto/voldemort-admin.proto +++ b/src/proto/voldemort-admin.proto @@ -21,7 +21,7 @@ message GetMetadataResponse { } message UpdateMetadataRequest { - repeated KeyedVersions keyValue = 1; + repeated KeyedVersions metadataEntry = 1; } message UpdateMetadataResponse { diff --git a/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java b/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java index 09e0249bbe..00e9ec277e 100644 --- a/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java +++ b/test/unit/voldemort/client/rebalance/RebalanceMetadataConsistencyTest.java @@ -123,7 +123,7 @@ public void setUp() { @Test public void testThreading() { - for(int i = 0; i < 30; i++) { + for(int i = 0; i < 3000; i++) { Cluster cluster; StoreDefinition storeDef; @@ -189,8 +189,10 @@ class ThreadReader implements Runnable { @Override public void run() { + metadataStore.readLock.lock(); checkCluster = metadataStore.getCluster(); checkstores = metadataStore.getStoreDefList(); + metadataStore.readLock.unlock(); if(checkCluster.equals(currentCluster)) { Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication);