Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old compaction code #4083

Merged
85 changes: 29 additions & 56 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,40 @@ public enum Property {
+ "A new external compaction service would be defined like the following:\n"
+ "`compaction.service.newService.planner="
+ "\"org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner\".`\n"
+ "`compaction.service.newService.opts.queues=\""
+ "`compaction.service.newService.opts.groups=\""
+ "[{\"name\": \"small\", \"maxSize\":\"32M\"},"
+ "{ \"name\":\"medium\", \"maxSize\":\"512M\"},{\"name\":\"large\"}]`\n"
+ "`compaction.service.newService.opts.maxOpen=50`.\n"
+ "Additional options can be defined using the `compaction.service.<service>.opts.<option>` property.",
"3.1.0"),
COMPACTION_SERVICE_ROOT_PLANNER(COMPACTION_SERVICE_PREFIX + "root.planner",
keith-turner marked this conversation as resolved.
Show resolved Hide resolved
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for root tablet service.", "4.0.0"),
COMPACTION_SERVICE_ROOT_MAX_OPEN(COMPACTION_SERVICE_PREFIX + "root.planner.opts.maxOpen", "30",
PropertyType.COUNT, "The maximum number of files a compaction will open.", "4.0.0"),
COMPACTION_SERVICE_ROOT_GROUPS(COMPACTION_SERVICE_PREFIX + "root.planner.opts.groups",
"[{'name':'accumulo_meta'}]".replaceAll("'", "\""), PropertyType.STRING,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"4.0.0"),
COMPACTION_SERVICE_META_PLANNER(COMPACTION_SERVICE_PREFIX + "meta.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for metadata table.", "4.0.0"),
COMPACTION_SERVICE_META_MAX_OPEN(COMPACTION_SERVICE_PREFIX + "meta.planner.opts.maxOpen", "30",
PropertyType.COUNT, "The maximum number of files a compaction will open.", "4.0.0"),
COMPACTION_SERVICE_META_GROUPS(COMPACTION_SERVICE_PREFIX + "meta.planner.opts.groups",
"[{'name':'accumulo_meta'}]".replaceAll("'", "\""), PropertyType.JSON,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"4.0.0"),
COMPACTION_SERVICE_DEFAULT_PLANNER(COMPACTION_SERVICE_PREFIX + "default.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Planner for default compaction service.", "4.0.0"),
COMPACTION_SERVICE_DEFAULT_MAX_OPEN(COMPACTION_SERVICE_PREFIX + "default.planner.opts.maxOpen",
"10", PropertyType.COUNT, "The maximum number of files a compaction will open.", "4.0.0"),
COMPACTION_SERVICE_DEFAULT_GROUPS(COMPACTION_SERVICE_PREFIX + "default.planner.opts.groups",
("[{'name':'user_small','maxSize':'128M'}, {'name':'user_large'}]").replaceAll("'", "\""),
PropertyType.STRING,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"4.0.0"),
COMPACTION_WARN_TIME(COMPACTION_PREFIX + "warn.time", "10m", PropertyType.TIMEDURATION,
"When a compaction has not made progress for this time period, a warning will be logged.",
"3.1.0"),
Expand Down Expand Up @@ -605,63 +633,8 @@ public enum Property {
"2.1.0"),
TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1", PropertyType.COUNT,
"The maximum number of concurrent tablet migrations for a tablet server.", "1.3.5"),
@Deprecated(since = "3.1")
@ReplacedBy(property = COMPACTION_SERVICE_PREFIX)
TSERV_COMPACTION_SERVICE_PREFIX("tserver.compaction.major.service.", null, PropertyType.PREFIX,
"Prefix for compaction services.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_ROOT_PLANNER("tserver.compaction.major.service.root.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for root tablet service.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_ROOT_MAX_OPEN(
"tserver.compaction.major.service.root.planner.opts.maxOpen", "30", PropertyType.COUNT,
"The maximum number of files a compaction will open.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS(
"tserver.compaction.major.service.root.planner.opts.executors",
"[{'name':'all','type':'external','group':'accumulo_meta'}]".replaceAll("'", "\""),
PropertyType.STRING,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_META_PLANNER("tserver.compaction.major.service.meta.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for metadata table.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_META_MAX_OPEN(
"tserver.compaction.major.service.meta.planner.opts.maxOpen", "30", PropertyType.COUNT,
"The maximum number of files a compaction will open.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_META_EXECUTORS(
"tserver.compaction.major.service.meta.planner.opts.executors",
"[{'name':'all','type':'external','group':'accumulo_meta'}]".replaceAll("'", "\""),
PropertyType.JSON,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER("tserver.compaction.major.service.default.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Planner for default compaction service.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN(
"tserver.compaction.major.service.default.planner.opts.maxOpen", "10", PropertyType.COUNT,
"The maximum number of files a compaction will open.", "2.1.0"),
@Deprecated(since = "3.1")
TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS(
"tserver.compaction.major.service.default.planner.opts.executors",
("[{'name':'small','type':'external','maxSize':'128M','group':'user_small'}, {'name':'large','type':'external','group':'user_large'}]")
.replaceAll("'", "\""),
PropertyType.STRING,
"See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.",
"2.1.0"),
TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT,
"The maximum number of concurrent minor compactions for a tablet server.", "1.3.5"),
@Deprecated(since = "3.1")
@ReplacedBy(property = COMPACTION_WARN_TIME)
TSERV_COMPACTION_WARN_TIME("tserver.compaction.warn.time", "10m", PropertyType.TIMEDURATION,
"When a compaction has not made progress for this time period, a warning will be logged.",
"1.6.0"),
TSERV_BLOOM_LOAD_MAXCONCURRENT("tserver.bloom.load.concurrent.max", "4", PropertyType.COUNT,
"The number of concurrent threads that will load bloom filters in the background. "
+ "Setting this to zero will make bloom filters load in the foreground.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ public static void selected(KeyExtent extent, CompactionKind kind,
public static void compacting(KeyExtent extent, CompactionJob job, CompactionConfig config) {
if (fileLog.isDebugEnabled()) {
if (config == null) {
fileLog.debug("Compacting {} on {} for {} from {} size {}", extent, job.getExecutor(),
fileLog.debug("Compacting {} on {} for {} from {} size {}", extent, job.getGroup(),
job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles()));
} else {
fileLog.debug("Compacting {} on {} for {} from {} size {} config {}", extent,
job.getExecutor(), job.getKind(), asMinimalString(job.getFiles()),
getSize(job.getFiles()), config);
job.getGroup(), job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles()),
config);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ interface TabletUpdates<T> {

T deleteSuspension();

T putExternalCompaction(ExternalCompactionId ecid, ExternalCompactionMetadata ecMeta);
T putExternalCompaction(ExternalCompactionId ecid, CompactionMetadata ecMeta);

T deleteExternalCompaction(ExternalCompactionId ecid);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@

import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;

public class ExternalCompactionMetadata {
public class CompactionMetadata {

private final Set<StoredTabletFile> jobFiles;
private final ReferencedTabletFile compactTmpName;
private final String compactorId;
private final CompactionKind kind;
private final short priority;
private final CompactionExecutorId ceid;
private final CompactorGroupId cgid;
private final boolean propagateDeletes;
private final Long fateTxId;

public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles,
ReferencedTabletFile compactTmpName, String compactorId, CompactionKind kind, short priority,
CompactionExecutorId ceid, boolean propagateDeletes, Long fateTxId) {
public CompactionMetadata(Set<StoredTabletFile> jobFiles, ReferencedTabletFile compactTmpName,
String compactorId, CompactionKind kind, short priority, CompactorGroupId ceid,
boolean propagateDeletes, Long fateTxId) {
this.jobFiles = Objects.requireNonNull(jobFiles);
this.compactTmpName = Objects.requireNonNull(compactTmpName);
this.compactorId = Objects.requireNonNull(compactorId);
this.kind = Objects.requireNonNull(kind);
this.priority = priority;
this.ceid = Objects.requireNonNull(ceid);
this.cgid = Objects.requireNonNull(ceid);
this.propagateDeletes = propagateDeletes;
this.fateTxId = fateTxId;
}
Expand All @@ -76,8 +76,8 @@ public short getPriority() {
return priority;
}

public CompactionExecutorId getCompactionExecutorId() {
return ceid;
public CompactorGroupId getCompactionGroupId() {
return cgid;
}

public boolean getPropagateDeletes() {
Expand All @@ -95,7 +95,7 @@ private static class GSonData {
String tmp;
String compactor;
String kind;
String executorId;
String groupId;
short priority;
boolean propDels;
Long fateTxId;
Expand All @@ -108,21 +108,20 @@ public String toJson() {
jData.tmp = compactTmpName.insert().getMetadata();
jData.compactor = compactorId;
jData.kind = kind.name();
jData.executorId = ((CompactionExecutorIdImpl) ceid).getExternalName();
jData.groupId = cgid.toString();
jData.priority = priority;
jData.propDels = propagateDeletes;
jData.fateTxId = fateTxId;
return GSON.get().toJson(jData);
}

public static ExternalCompactionMetadata fromJson(String json) {
public static CompactionMetadata fromJson(String json) {
GSonData jData = GSON.get().fromJson(json, GSonData.class);

return new ExternalCompactionMetadata(
jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
return new CompactionMetadata(jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
StoredTabletFile.of(jData.tmp).getTabletFile(), jData.compactor,
CompactionKind.valueOf(jData.kind), jData.priority,
CompactionExecutorIdImpl.externalId(jData.executorId), jData.propDels, jData.fateTxId);
CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, jData.fateTxId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class TabletMetadata {
private OptionalLong flush = OptionalLong.empty();
private List<LogEntry> logs;
private Double splitRatio = null;
private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions;
private Map<ExternalCompactionId,CompactionMetadata> extCompactions;
private boolean merged;
private TabletHostingGoal goal = TabletHostingGoal.ONDEMAND;
private boolean onDemandHostingRequested = false;
Expand Down Expand Up @@ -414,7 +414,7 @@ public SortedMap<Key,Value> getKeyValues() {
return keyValues;
}

public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() {
public Map<ExternalCompactionId,CompactionMetadata> getExternalCompactions() {
ensureFetched(ColumnType.ECOMP);
return extCompactions;
}
Expand Down Expand Up @@ -449,8 +449,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
final var filesBuilder = ImmutableMap.<StoredTabletFile,DataFileValue>builder();
final var scansBuilder = ImmutableList.<StoredTabletFile>builder();
final var logsBuilder = ImmutableList.<LogEntry>builder();
final var extCompBuilder =
ImmutableMap.<ExternalCompactionId,ExternalCompactionMetadata>builder();
final var extCompBuilder = ImmutableMap.<ExternalCompactionId,CompactionMetadata>builder();
final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,Long>builder();
final var compactedBuilder = ImmutableSet.<Long>builder();
ByteSequence row = null;
Expand Down Expand Up @@ -542,8 +541,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
logsBuilder.add(LogEntry.fromMetaWalEntry(kv));
break;
case ExternalCompactionColumnFamily.STR_NAME:
extCompBuilder.put(ExternalCompactionId.of(qual),
ExternalCompactionMetadata.fromJson(val));
extCompBuilder.put(ExternalCompactionId.of(qual), CompactionMetadata.fromJson(val));
break;
case MergedColumnFamily.STR_NAME:
te.merged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public TabletMetadataBuilder deleteSuspension() {

@Override
public TabletMetadataBuilder putExternalCompaction(ExternalCompactionId ecid,
ExternalCompactionMetadata ecMeta) {
CompactionMetadata ecMeta) {
fetched.add(ECOMP);
internalBuilder.putExternalCompaction(ecid, ecMeta);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public T deleteSuspension() {
}

@Override
public T putExternalCompaction(ExternalCompactionId ecid, ExternalCompactionMetadata ecMeta) {
public T putExternalCompaction(ExternalCompactionId ecid, CompactionMetadata ecMeta) {
mutation.put(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical(), ecMeta.toJson());
return getThis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public interface CompactionJob {
short getPriority();

/**
* @return The executor to run the job.
* @return The group to run the job.
*/
CompactionExecutorId getExecutor();
CompactorGroupId getGroup();

/**
* @return The files to compact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ interface Builder {
* @param priority This determines the order in which the job is taken off the execution queue.
* Larger numbers are taken off the queue first. If two jobs are on the queue, one with a
* priority of 4 and another with 5, then the one with 5 will be taken first.
* @param executor Where the job should run.
* @param group The files to compact.
* @param group Where the job should run.
* @param files The files to compact.
* @return this
*/
Builder addJob(short priority, CompactionExecutorId executor,
Collection<CompactableFile> group);
Builder addJob(short priority, CompactorGroupId group, Collection<CompactableFile> files);

CompactionPlan build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ public interface InitParameters {
String getFullyQualifiedOption(String key);

/**
* @return an execution manager that can be used to created thread pools within a compaction
* service.
* @return a group manager that can be used to create groups for a compaction service.
*/
ExecutorManager getExecutorManager();
GroupManager getGroupManager();
}

public void init(InitParameters params);
Expand Down Expand Up @@ -156,8 +155,8 @@ public interface PlanningParameters {
* {@code [F3,F4,F5]} and it must eventually compact those three files to one.
*
* <p>
* When a planner returns a compactions plan, task will be queued on executors. Previously queued
* task that do not match the latest plan are removed. The planner is called periodically,
* When a planner returns a compactions plan, task will be queued on a compactor group. Previously
* queued task that do not match the latest plan are removed. The planner is called periodically,
* whenever a new file is added, and whenever a compaction finishes.
*
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
import org.apache.accumulo.core.data.AbstractId;

/**
* A unique identifier for a a compaction executor that a {@link CompactionPlanner} can schedule
* A unique identifier for a compactor group that a {@link CompactionPlanner} can schedule
* compactions on using a {@link CompactionJob}.
*
* @since 2.1.0
* @since 3.1.0
* @see org.apache.accumulo.core.spi.compaction
*/
public class CompactionExecutorId extends AbstractId<CompactionExecutorId> {
public class CompactorGroupId extends AbstractId<CompactorGroupId> {
// ELASTICITY_TODO make this cache ids like TableId. This will help save manager memory.
private static final long serialVersionUID = 1L;

protected CompactionExecutorId(String canonical) {
protected CompactorGroupId(String canonical) {
super(canonical);
}
}