Skip to content

Commit

Permalink
HBASE-17001 Enforce quota violation policies in the RegionServer
Browse files Browse the repository at this point in the history
The nuts-and-bolts of filesystem quotas. The Master must inform
RegionServers of the violation of a quota by a table. The RegionServer
must apply the violation policy as configured. Need to ensure
that the proper interfaces exist to satisfy all necessary policies.

This required a massive rewrite of the internal tracking by
the general space quota feature. Instead of tracking "violations",
we need to start tracking "usage". This allows us to make the decision
at the RegionServer level as to when the files in a bulk load request
should be accept or rejected which ultimately lets us avoid bulk loads
dramatically exceeding a configured space quota.
  • Loading branch information
joshelser committed May 22, 2017
1 parent 98b4181 commit 34ba143
Show file tree
Hide file tree
Showing 50 changed files with 6,016 additions and 1,006 deletions.
Expand Up @@ -29,4 +29,8 @@ public class QuotaExceededException extends DoNotRetryIOException {
public QuotaExceededException(String msg) {
super(msg);
}

public QuotaExceededException(String msg, Throwable cause) {
super(msg, cause);
}
}
Expand Up @@ -35,7 +35,6 @@
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -52,6 +51,7 @@
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -62,8 +62,9 @@
* <pre>
* ROW-KEY FAM/QUAL DATA
* n.&lt;namespace&gt; q:s &lt;global-quotas&gt;
* t.&lt;namespace&gt; u:p &lt;namespace-quota policy&gt;
* t.&lt;table&gt; q:s &lt;global-quotas&gt;
* t.&lt;table&gt; u:v &lt;space violation policy&gt;
* t.&lt;table&gt; u:p &lt;table-quota policy&gt;
* u.&lt;user&gt; q:s &lt;global-quotas&gt;
* u.&lt;user&gt; q:s.&lt;table&gt; &lt;table-quotas&gt;
* u.&lt;user&gt; q:s.&lt;ns&gt;: &lt;namespace-quotas&gt;
Expand All @@ -82,7 +83,9 @@ public class QuotaTableUtil {
protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
protected static final byte[] QUOTA_QUALIFIER_VIOLATION = Bytes.toBytes("v");
protected static final byte[] QUOTA_QUALIFIER_POLICY = Bytes.toBytes("p");
protected static final String QUOTA_POLICY_COLUMN =
Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_POLICY);
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
Expand Down Expand Up @@ -214,10 +217,10 @@ public static Filter makeFilter(final QuotaFilter filter) {
/**
* Creates a {@link Scan} which returns only quota violations from the quota table.
*/
public static Scan makeQuotaViolationScan() {
public static Scan makeQuotaSnapshotScan() {
Scan s = new Scan();
// Limit to "u:v" column
s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
// Limit rowspace to the "t:" prefix
s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
return s;
Expand All @@ -230,26 +233,25 @@ public static Scan makeQuotaViolationScan() {
* will throw an {@link IllegalArgumentException}.
*
* @param result A row from the quota table.
* @param policies A map of policies to add the result of this method into.
* @param snapshots A map of violations to add the result of this method into.
*/
public static void extractViolationPolicy(
Result result, Map<TableName,SpaceViolationPolicy> policies) {
public static void extractQuotaSnapshot(
Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
byte[] row = Objects.requireNonNull(result).getRow();
if (null == row) {
throw new IllegalArgumentException("Provided result had a null row");
}
final TableName targetTableName = getTableFromRowKey(row);
Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY);
if (null == c) {
throw new IllegalArgumentException("Result did not contain the expected column "
+ Bytes.toString(QUOTA_FAMILY_USAGE) + ":" + Bytes.toString(QUOTA_QUALIFIER_VIOLATION)
+ ", " + result.toString());
+ QUOTA_POLICY_COLUMN + ", " + result.toString());
}
ByteString buffer = UnsafeByteOperations.unsafeWrap(
c.getValueArray(), c.getValueOffset(), c.getValueLength());
try {
SpaceQuota quota = SpaceQuota.parseFrom(buffer);
policies.put(targetTableName, getViolationPolicy(quota));
QuotaProtos.SpaceQuotaSnapshot snapshot = QuotaProtos.SpaceQuotaSnapshot.parseFrom(buffer);
snapshots.put(targetTableName, SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot));
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
"Result did not contain a valid SpaceQuota protocol buffer message", e);
Expand Down Expand Up @@ -385,23 +387,14 @@ protected static void parseUserResult(final String userName, final Result result
/**
* Creates a {@link Put} to enable the given <code>policy</code> on the <code>table</code>.
*/
public static Put createEnableViolationPolicyUpdate(
TableName tableName, SpaceViolationPolicy policy) {
public static Put createPutSpaceSnapshot(TableName tableName, SpaceQuotaSnapshot snapshot) {
Put p = new Put(getTableRowKey(tableName));
SpaceQuota quota = getProtoViolationPolicy(policy);
p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION, quota.toByteArray());
p.addColumn(
QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_POLICY,
SpaceQuotaSnapshot.toProtoSnapshot(snapshot).toByteArray());
return p;
}

/**
* Creates a {@link Delete} to remove a policy on the given <code>table</code>.
*/
public static Delete createRemoveViolationPolicyUpdate(TableName tableName) {
Delete d = new Delete(getTableRowKey(tableName));
d.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
return d;
}

/* =========================================================================
* Quotas protobuf helpers
*/
Expand Down Expand Up @@ -536,4 +529,4 @@ protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
}
return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
}
}
}
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.util.Objects;

import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;

/**
* A point-in-time view of a space quota on a table.
*/
@InterfaceAudience.Private
public class SpaceQuotaSnapshot {
private static final SpaceQuotaSnapshot NO_SUCH_SNAPSHOT = new SpaceQuotaSnapshot(
SpaceQuotaStatus.notInViolation(), 0, Long.MAX_VALUE);
private final SpaceQuotaStatus quotaStatus;
private final long usage;
private final long limit;

/**
* Encapsulates the state of a quota on a table. The quota may or may not be in violation.
* If it is in violation, there will be a non-null violation policy.
*/
@InterfaceAudience.Private
public static class SpaceQuotaStatus {
private static final SpaceQuotaStatus NOT_IN_VIOLATION = new SpaceQuotaStatus(null, false);
final SpaceViolationPolicy policy;
final boolean inViolation;

public SpaceQuotaStatus(SpaceViolationPolicy policy) {
this.policy = Objects.requireNonNull(policy);
this.inViolation = true;
}

private SpaceQuotaStatus(SpaceViolationPolicy policy, boolean inViolation) {
this.policy = policy;
this.inViolation = inViolation;
}

/**
* The violation policy which may be null. Is guaranteed to be non-null if
* {@link #isInViolation()} is <code>true</code>, and <code>false</code>
* otherwise.
*/
public SpaceViolationPolicy getPolicy() {
return policy;
}

/**
* <code>true</code> if the quota is being violated, <code>false</code> otherwise.
*/
public boolean isInViolation() {
return inViolation;
}

/**
* Returns a singleton referring to a quota which is not in violation.
*/
public static SpaceQuotaStatus notInViolation() {
return NOT_IN_VIOLATION;
}

@Override
public int hashCode() {
return new HashCodeBuilder().append(policy == null ? 0 : policy.hashCode())
.append(inViolation).toHashCode();
}

@Override
public boolean equals(Object o) {
if (o instanceof SpaceQuotaStatus) {
SpaceQuotaStatus other = (SpaceQuotaStatus) o;
return Objects.equals(policy, other.policy) && inViolation == other.inViolation;
}
return false;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(getClass().getSimpleName());
sb.append("[policy=").append(policy);
sb.append(", inViolation=").append(inViolation).append("]");
return sb.toString();
}

public static QuotaProtos.SpaceQuotaStatus toProto(SpaceQuotaStatus status) {
QuotaProtos.SpaceQuotaStatus.Builder builder = QuotaProtos.SpaceQuotaStatus.newBuilder();
builder.setInViolation(status.inViolation);
if (status.isInViolation()) {
builder.setPolicy(ProtobufUtil.toProtoViolationPolicy(status.getPolicy()));
}
return builder.build();
}

public static SpaceQuotaStatus toStatus(QuotaProtos.SpaceQuotaStatus proto) {
if (proto.getInViolation()) {
return new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(proto.getPolicy()));
} else {
return NOT_IN_VIOLATION;
}
}
}

public SpaceQuotaSnapshot(SpaceQuotaStatus quotaStatus, long usage, long limit) {
this.quotaStatus = Objects.requireNonNull(quotaStatus);
this.usage = usage;
this.limit = limit;
}

/**
* Returns the status of the quota.
*/
public SpaceQuotaStatus getQuotaStatus() {
return quotaStatus;
}

/**
* Returns the current usage, in bytes, of the target (e.g. table, namespace).
*/
public long getUsage() {
return usage;
}

/**
* Returns the limit, in bytes, of the target (e.g. table, namespace).
*/
public long getLimit() {
return limit;
}

@Override
public int hashCode() {
return new HashCodeBuilder()
.append(quotaStatus.hashCode())
.append(usage)
.append(limit)
.toHashCode();
}

@Override
public boolean equals(Object o) {
if (o instanceof SpaceQuotaSnapshot) {
SpaceQuotaSnapshot other = (SpaceQuotaSnapshot) o;
return quotaStatus.equals(other.quotaStatus) && usage == other.usage && limit == other.limit;
}
return false;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(32);
sb.append("SpaceQuotaSnapshot[policy=").append(quotaStatus).append(", use=");
sb.append(usage).append("bytes/").append(limit).append("bytes]");
return sb.toString();
}

// ProtobufUtil is in hbase-client, and this doesn't need to be public.
public static SpaceQuotaSnapshot toSpaceQuotaSnapshot(QuotaProtos.SpaceQuotaSnapshot proto) {
return new SpaceQuotaSnapshot(SpaceQuotaStatus.toStatus(proto.getStatus()),
proto.getUsage(), proto.getLimit());
}

public static QuotaProtos.SpaceQuotaSnapshot toProtoSnapshot(SpaceQuotaSnapshot snapshot) {
return QuotaProtos.SpaceQuotaSnapshot.newBuilder()
.setStatus(SpaceQuotaStatus.toProto(snapshot.getQuotaStatus()))
.setUsage(snapshot.getUsage()).setLimit(snapshot.getLimit()).build();
}

/**
* Returns a singleton that corresponds to no snapshot information.
*/
public static SpaceQuotaSnapshot getNoSuchSnapshot() {
return NO_SUCH_SNAPSHOT;
}
}

0 comments on commit 34ba143

Please sign in to comment.