Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;

import java.util.Iterator;

/**
* An {@link Iterator} that may hold resources until it is closed.
*/
public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
import org.apache.hadoop.hdds.utils.db.DBProfile;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
Expand All @@ -45,6 +46,8 @@
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -92,6 +95,11 @@
@RunWith(Parameterized.class)
@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
public class TestOmSnapshot {

static {
Logger.getLogger(ManagedRocksObjectUtils.class).setLevel(Level.DEBUG);
}

private static MiniOzoneCluster cluster = null;
private static OzoneClient client;
private static String volumeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,21 +262,22 @@ private void loadFromSnapshotInfoTable(OMMetadataManager metadataManager)
throws IOException {
// read from snapshotInfo table to populate
// snapshot chains - both global and local path
TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
keyIter = metadataManager.getSnapshotInfoTable().iterator();
Map<Long, SnapshotInfo> snaps = new TreeMap<>();
Table.KeyValue< String, SnapshotInfo > kv;
snapshotChainGlobal.clear();
snapshotChainPath.clear();
latestPathSnapshotID.clear();
snapshotIdToTableKey.clear();

while (keyIter.hasNext()) {
kv = keyIter.next();
snaps.put(kv.getValue().getCreationTime(), kv.getValue());
}
for (SnapshotInfo sinfo : snaps.values()) {
addSnapshot(sinfo);
try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>>
keyIter = metadataManager.getSnapshotInfoTable().iterator()) {
Map<Long, SnapshotInfo> snaps = new TreeMap<>();
Table.KeyValue<String, SnapshotInfo> kv;
snapshotChainGlobal.clear();
snapshotChainPath.clear();
latestPathSnapshotID.clear();
snapshotIdToTableKey.clear();

while (keyIter.hasNext()) {
kv = keyIter.next();
snaps.put(kv.getValue().getCreationTime(), kv.getValue());
}
for (SnapshotInfo sinfo : snaps.values()) {
addSnapshot(sinfo);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.ozone.om.snapshot;

import java.util.Iterator;
import org.apache.hadoop.util.ClosableIterator;

/**
* Define an interface for persistent list.
Expand All @@ -31,5 +31,5 @@ public interface PersistentList<E> {

E get(int index);

Iterator<E> iterator();
ClosableIterator<E> iterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.ozone.om.snapshot;

import java.util.Iterator;
import org.apache.hadoop.util.ClosableIterator;

/**
* Define an interface for persistent set.
Expand All @@ -27,5 +27,5 @@ public interface PersistentSet<E> {

void add(E entry);

Iterator<E> iterator();
ClosableIterator<E> iterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.hadoop.ozone.om.snapshot;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.util.ClosableIterator;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -63,7 +63,9 @@ public boolean add(E entry) {

@Override
public boolean addAll(PersistentList<E> from) {
from.iterator().forEachRemaining(this::add);
try (ClosableIterator<E> iterator = from.iterator()) {
iterator.forEachRemaining(this::add);
}
return true;
}

Expand All @@ -80,12 +82,12 @@ public E get(int index) {
}

@Override
public Iterator<E> iterator() {
public ClosableIterator<E> iterator() {
ManagedRocksIterator managedRocksIterator
= new ManagedRocksIterator(db.get().newIterator(columnFamilyHandle));
managedRocksIterator.get().seekToFirst();

return new Iterator<E>() {
return new ClosableIterator<E>() {
@Override
public boolean hasNext() {
return managedRocksIterator.get().isValid();
Expand All @@ -102,6 +104,11 @@ public E next() {
throw new RuntimeException(exception);
}
}

@Override
public void close() {
managedRocksIterator.close();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.hadoop.ozone.om.snapshot;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.util.ClosableIterator;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -60,12 +60,12 @@ public void add(E entry) {
}

@Override
public Iterator<E> iterator() {
public ClosableIterator<E> iterator() {
ManagedRocksIterator managedRocksIterator =
new ManagedRocksIterator(db.get().newIterator(columnFamilyHandle));
managedRocksIterator.get().seekToFirst();

return new Iterator<E>() {
return new ClosableIterator<E>() {
@Override
public boolean hasNext() {
return managedRocksIterator.get().isValid();
Expand All @@ -82,6 +82,11 @@ public E next() {
throw new RuntimeException(exception);
}
}

@Override
public void close() {
managedRocksIterator.close();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -51,6 +50,7 @@
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport.DiffType;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.ozone.rocksdb.util.ManagedSstFileReader;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
Expand Down Expand Up @@ -528,46 +528,49 @@ private void generateDiffReport(
final PersistentList<byte[]> modifyDiffs =
createDiffReportPersistentList(modifyDiffColumnFamily);

Iterator<byte[]> objectIdsIterator = objectIDsToCheck.iterator();
while (objectIdsIterator.hasNext()) {
byte[] id = objectIdsIterator.next();
/*
* This key can be
* -> Created after the old snapshot was taken, which means it will be
* missing in oldKeyTable and present in newKeyTable.
* -> Deleted after the old snapshot was taken, which means it will be
* present in oldKeyTable and missing in newKeyTable.
* -> Modified after the old snapshot was taken, which means it will be
* present in oldKeyTable and present in newKeyTable with same
* Object ID but with different metadata.
* -> Renamed after the old snapshot was taken, which means it will be
* present in oldKeyTable and present in newKeyTable but with
* different name and same Object ID.
*/

byte[] oldKeyName = oldObjIdToKeyMap.get(id);
byte[] newKeyName = newObjIdToKeyMap.get(id);

if (oldKeyName == null && newKeyName == null) {
// This cannot happen.
throw new IllegalStateException("Old and new key name both are null");
} else if (oldKeyName == null) { // Key Created.
String key = codecRegistry.asObject(newKeyName, String.class);
DiffReportEntry entry = DiffReportEntry.of(DiffType.CREATE, key);
createDiffs.add(codecRegistry.asRawData(entry));
} else if (newKeyName == null) { // Key Deleted.
String key = codecRegistry.asObject(oldKeyName, String.class);
DiffReportEntry entry = DiffReportEntry.of(DiffType.DELETE, key);
deleteDiffs.add(codecRegistry.asRawData(entry));
} else if (Arrays.equals(oldKeyName, newKeyName)) { // Key modified.
String key = codecRegistry.asObject(newKeyName, String.class);
DiffReportEntry entry = DiffReportEntry.of(DiffType.MODIFY, key);
modifyDiffs.add(codecRegistry.asRawData(entry));
} else { // Key Renamed.
String oldKey = codecRegistry.asObject(oldKeyName, String.class);
String newKey = codecRegistry.asObject(newKeyName, String.class);
renameDiffs.add(codecRegistry.asRawData(
DiffReportEntry.of(DiffType.RENAME, oldKey, newKey)));
try (ClosableIterator<byte[]>
objectIdsIterator = objectIDsToCheck.iterator()) {
while (objectIdsIterator.hasNext()) {
byte[] id = objectIdsIterator.next();
/*
* This key can be
* -> Created after the old snapshot was taken, which means it will be
* missing in oldKeyTable and present in newKeyTable.
* -> Deleted after the old snapshot was taken, which means it will be
* present in oldKeyTable and missing in newKeyTable.
* -> Modified after the old snapshot was taken, which means it will
* be present in oldKeyTable and present in newKeyTable with same
* Object ID but with different metadata.
* -> Renamed after the old snapshot was taken, which means it will be
* present in oldKeyTable and present in newKeyTable but with
* different name and same Object ID.
*/

byte[] oldKeyName = oldObjIdToKeyMap.get(id);
byte[] newKeyName = newObjIdToKeyMap.get(id);

if (oldKeyName == null && newKeyName == null) {
// This cannot happen.
throw new IllegalStateException(
"Old and new key name both are null");
} else if (oldKeyName == null) { // Key Created.
String key = codecRegistry.asObject(newKeyName, String.class);
DiffReportEntry entry = DiffReportEntry.of(DiffType.CREATE, key);
createDiffs.add(codecRegistry.asRawData(entry));
} else if (newKeyName == null) { // Key Deleted.
String key = codecRegistry.asObject(oldKeyName, String.class);
DiffReportEntry entry = DiffReportEntry.of(DiffType.DELETE, key);
deleteDiffs.add(codecRegistry.asRawData(entry));
} else if (Arrays.equals(oldKeyName, newKeyName)) { // Key modified.
String key = codecRegistry.asObject(newKeyName, String.class);
DiffReportEntry entry = DiffReportEntry.of(DiffType.MODIFY, key);
modifyDiffs.add(codecRegistry.asRawData(entry));
} else { // Key Renamed.
String oldKey = codecRegistry.asObject(oldKeyName, String.class);
String newKey = codecRegistry.asObject(newKeyName, String.class);
renameDiffs.add(codecRegistry.asRawData(
DiffReportEntry.of(DiffType.RENAME, oldKey, newKey)));
}
}
}

Expand Down Expand Up @@ -655,13 +658,15 @@ private ColumnFamilyHandle createColumnFamily(String columnFamilyName)
private int addToReport(String jobId, int index,
PersistentList<byte[]> diffReportEntries)
throws IOException {
Iterator<byte[]> diffReportIterator = diffReportEntries.iterator();
while (diffReportIterator.hasNext()) {

snapDiffReportTable.put(
codecRegistry.asRawData(jobId + DELIMITER + index),
diffReportIterator.next());
index++;
try (ClosableIterator<byte[]>
diffReportIterator = diffReportEntries.iterator()) {
while (diffReportIterator.hasNext()) {

snapDiffReportTable.put(
codecRegistry.asRawData(jobId + DELIMITER + index),
diffReportIterator.next());
index++;
}
}
return index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
import org.apache.hadoop.hdds.utils.db.IntegerCodec;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -113,13 +113,14 @@ public void testRocksDBPersistentList() throws IOException, RocksDBException {
List<String> testList = Arrays.asList("e1", "e2", "e3", "e1", "e2");
testList.forEach(persistentList::add);

Iterator<String> iterator = persistentList.iterator();
int index = 0;
try (ClosableIterator<String> iterator = persistentList.iterator()) {
int index = 0;

while (iterator.hasNext()) {
assertEquals(iterator.next(), testList.get(index++));
while (iterator.hasNext()) {
assertEquals(iterator.next(), testList.get(index++));
}
assertEquals(testList.size(), index);
}
assertEquals(testList.size(), index);
} finally {
if (columnFamily != null) {
db.get().dropColumnFamily(columnFamily);
Expand Down
Loading