Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,100 @@
import java.util.Objects;
import java.util.UUID;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import org.apache.accumulo.core.util.UuidUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

public class ScanServerRefTabletFile extends TabletFile {

@SuppressWarnings("deprecation")
private static final String OLD_PREFIX = OldScanServerFileReferenceSection.getRowPrefix();
private final String prefix;
private final Value NULL_VALUE = new Value(new byte[0]);
private final Text colf;
private final Text colq;
private final Text serverAddress;
private final Text uuid;

public ScanServerRefTabletFile(String file, String serverAddress, UUID serverLockUUID) {
public ScanServerRefTabletFile(UUID serverLockUUID, String serverAddress, String file) {
super(new Path(URI.create(file)));
this.colf = new Text(serverAddress);
this.colq = new Text(serverLockUUID.toString());
// For new data, always use the current prefix
prefix = ScanServerFileReferenceSection.getRowPrefix();
this.serverAddress = new Text(serverAddress);
uuid = new Text(serverLockUUID.toString());
}

public ScanServerRefTabletFile(String file, Text colf, Text colq) {
super(new Path(URI.create(file)));
this.colf = colf;
this.colq = colq;
public ScanServerRefTabletFile(Key k) {
super(new Path(URI.create(extractFile(k))));
serverAddress = k.getColumnFamily();
if (isOldPrefix(k)) {
prefix = OLD_PREFIX;
uuid = new Text(k.getColumnQualifier().toString());
} else {
prefix = ScanServerFileReferenceSection.getRowPrefix();
var row = k.getRow().toString();
Preconditions.checkArgument(row.startsWith(prefix), "Unexpected row prefix %s ", row);
var uuidStr = row.substring(prefix.length());
Preconditions.checkArgument(UuidUtil.isUUID(uuidStr, 0), "Row suffix is not uuid %s", row);
uuid = new Text(uuidStr);
}
}

public Mutation putMutation() {
// Only write scan refs in the new format
Mutation mutation = new Mutation(prefix + uuid.toString());
mutation.put(serverAddress, getFilePath(), getValue());
return mutation;
}

public Mutation putDeleteMutation() {
Mutation mutation;
if (Objects.equals(prefix, OLD_PREFIX)) {
mutation = new Mutation(prefix + this.getPath().toString());
mutation.putDelete(serverAddress, uuid);
} else {
mutation = new Mutation(prefix + uuid.toString());
mutation.putDelete(serverAddress, getFilePath());
}
return mutation;
}

private static String extractFile(Key k) {
if (isOldPrefix(k)) {
return k.getRow().toString().substring(OLD_PREFIX.length());
} else {
return k.getColumnQualifier().toString();
}
}

/**
* Returns the correctly formatted range for a unique uuid
*
* @param uuid ServerLockUUID of a Scan Server
* @return Range for a single scan server
*/
public static Range getRange(UUID uuid) {
Objects.requireNonNull(uuid);
return new Range(MetadataSchema.ScanServerFileReferenceSection.getRowPrefix() + uuid);
}

public String getRowSuffix() {
return this.getPathStr();
private static boolean isOldPrefix(Key k) {
return k.getRow().toString().startsWith(OLD_PREFIX);
}

public Text getServerAddress() {
return this.colf;
public UUID getServerLockUUID() {
return UUID.fromString(uuid.toString());
}

public Text getServerLockUUID() {
return this.colq;
public Text getFilePath() {
return new Text(this.getPath().toString());
}

public Value getValue() {
Expand All @@ -64,8 +126,8 @@ public Value getValue() {
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((colf == null) ? 0 : colf.hashCode());
result = prime * result + ((colq == null) ? 0 : colq.hashCode());
result = prime * result + ((serverAddress == null) ? 0 : serverAddress.hashCode());
result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
return result;
}

Expand All @@ -81,13 +143,13 @@ public boolean equals(Object obj) {
return false;
}
ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj;
return Objects.equals(colf, other.colf) && Objects.equals(colq, other.colq);
return Objects.equals(serverAddress, other.serverAddress) && Objects.equals(uuid, other.uuid);
}

@Override
public String toString() {
return "ScanServerRefTabletFile [file=" + this.getRowSuffix() + ", server address=" + colf
+ ", server lock uuid=" + colq + "]";
return "ScanServerRefTabletFile [file=" + this.getPath().toString() + ", server address="
+ serverAddress + ", server lock uuid=" + uuid + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,20 @@ public static String getRowPrefix() {
}

public static class ScanServerFileReferenceSection {
private static final Section section =
new Section(RESERVED_PREFIX + "scanfileref", true, RESERVED_PREFIX + "scanfilereg", false);

public static Range getRange() {
return section.getRange();
}

public static String getRowPrefix() {
return section.getRowPrefix();
}
}

@Deprecated(since = "2.1")
public static class OldScanServerFileReferenceSection {
private static final Section section =
new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX + "sserx", false);

Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/util/UuidUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.util;

public class UuidUtil {
/**
* A fast method for verifying a suffix of a string looks like a uuid.
*
* @param offset location where the uuid starts. Its expected the uuid occupies the rest of the
* string.
*/
public static boolean isUUID(String uuid, int offset) {
if (uuid.length() - offset != 36) {
return false;
}
for (int i = 0; i < 36; i++) {
var c = uuid.charAt(i + offset);
if (i == 8 || i == 13 || i == 18 || i == 23) {
if (c != '-') {
// expect '-' char at above positions, did not see it
return false;
}
} else if (c < '0' || (c > '9' && c < 'A') || (c > 'F' && c < 'a') || c > 'f') {
// expected hex at all other positions, did not see hex chars
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.OldScanServerFileReferenceSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -345,11 +347,8 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
@Override
public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) {
try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
String prefix = ScanServerFileReferenceSection.getRowPrefix();
for (ScanServerRefTabletFile ref : scanRefs) {
Mutation m = new Mutation(prefix + ref.getRowSuffix());
m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue());
writer.addMutation(m);
writer.addMutation(ref.putMutation());
}
} catch (MutationsRejectedException | TableNotFoundException e) {
throw new IllegalStateException(
Expand All @@ -358,14 +357,15 @@ public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scan
}

@Override
@SuppressWarnings("deprecation")
public Stream<ScanServerRefTabletFile> getScanServerFileReferences() {
try {
Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY);
scanner.setRange(ScanServerFileReferenceSection.getRange());
int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
BatchScanner scanner =
context.createBatchScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY);
scanner.setRanges(Set.of(ScanServerFileReferenceSection.getRange(),
OldScanServerFileReferenceSection.getRange()));
return StreamSupport.stream(scanner.spliterator(), false)
.map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
.map(e -> new ScanServerRefTabletFile(e.getKey()));
} catch (TableNotFoundException e) {
throw new IllegalStateException(DataLevel.USER.metaTable() + " not found!", e);
}
Expand All @@ -377,14 +377,10 @@ public void deleteScanServerFileReferences(String serverAddress, UUID scanServer
Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied");
try (
Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY)) {
scanner.setRange(ScanServerFileReferenceSection.getRange());
scanner.fetchColumn(new Text(serverAddress), new Text(scanServerLockUUID.toString()));
scanner.setRange(ScanServerRefTabletFile.getRange(scanServerLockUUID));

int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
Set<ScanServerRefTabletFile> refsToDelete = StreamSupport.stream(scanner.spliterator(), false)
.map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()))
.collect(Collectors.toSet());
.map(e -> new ScanServerRefTabletFile(e.getKey())).collect(Collectors.toSet());

if (!refsToDelete.isEmpty()) {
this.deleteScanServerFileReferences(refsToDelete);
Expand All @@ -397,11 +393,8 @@ public void deleteScanServerFileReferences(String serverAddress, UUID scanServer
@Override
public void deleteScanServerFileReferences(Collection<ScanServerRefTabletFile> refsToDelete) {
try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) {
String prefix = ScanServerFileReferenceSection.getRowPrefix();
for (ScanServerRefTabletFile ref : refsToDelete) {
Mutation m = new Mutation(prefix + ref.getRowSuffix());
m.putDelete(ref.getServerAddress(), ref.getServerLockUUID());
writer.addMutation(m);
writer.addMutation(ref.putDeleteMutation());
}
log.debug("Deleted scan server file reference entries for files: {}", refsToDelete);
} catch (MutationsRejectedException | TableNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void clean(ServerContext context) {

// collect all uuids that are currently in the metadata table
context.getAmple().getScanServerFileReferences().forEach(ssrtf -> {
uuidsToDelete.add(UUID.fromString(ssrtf.getServerLockUUID().toString()));
uuidsToDelete.add(ssrtf.getServerLockUUID());
});

// gather the list of current live scan servers, its important that this is done after the above
Expand All @@ -56,8 +56,7 @@ public static void clean(ServerContext context) {
final Set<ScanServerRefTabletFile> refsToDelete = new HashSet<>();

context.getAmple().getScanServerFileReferences().forEach(ssrtf -> {

var uuid = UUID.fromString(ssrtf.getServerLockUUID().toString());
var uuid = ssrtf.getServerLockUUID();

if (uuidsToDelete.contains(uuid)) {
refsToDelete.add(ssrtf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,13 @@ public void run() {
LOG.info("Stopping Thrift Servers");
address.server.stop();

LOG.info("Removing server scan references");
this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
serverLockUUID);
try {
LOG.info("Removing server scan references");
this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
serverLockUUID);
} catch (Exception e) {
LOG.warn("Failed to remove scan server refs from metadata location", e);
}

try {
LOG.debug("Closing filesystems");
Expand Down Expand Up @@ -625,7 +629,7 @@ private Map<KeyExtent,TabletMetadata> reserveFilesInner(Collection<KeyExtent> ex

for (StoredTabletFile file : allFiles.keySet()) {
if (!reservedFiles.containsKey(file)) {
refs.add(new ScanServerRefTabletFile(file.getPathStr(), serverAddress, serverLockUUID));
refs.add(new ScanServerRefTabletFile(serverLockUUID, serverAddress, file.getPathStr()));
filesToReserve.add(file);
tabletsToCheck.add(Objects.requireNonNull(allFiles.get(file)));
LOG.trace("RFFS {} need to add scan ref for file {}", myReservationId, file);
Expand Down Expand Up @@ -829,7 +833,7 @@ private void cleanUpReservedFiles(long expireTimeMs) {
influxFiles.add(file);
confirmed.add(file);
refsToDelete
.add(new ScanServerRefTabletFile(file.getPathStr(), serverAddress, serverLockUUID));
.add(new ScanServerRefTabletFile(serverLockUUID, serverAddress, file.getPathStr()));

// remove the entry from the map while holding the write lock ensuring no new
// reservations are added to the map values while the metadata operation to delete is
Expand Down
Loading