Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pipeline {

stage('BuildAndTest') {
options {
timeout(time: 5, unit: 'HOURS')
timeout(time: 9, unit: 'HOURS')
}
steps {
dir("HBASE_${HBASE_PROFILE}") {
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile.yetus
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pipeline {
}

options {
timeout(time: 5, unit: 'HOURS')
timeout(time: 9, unit: 'HOURS')
}

steps {
Expand Down
15 changes: 12 additions & 3 deletions phoenix-core-client/src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ tokens
PRE='pre';
POST='post';
CHANGE='change';
IDX_MUTATIONS='idx_mutations';
DATA_ROW_STATE='data_row_state';
LATEST='latest';
ALL='all';
INDEX='index';
Expand Down Expand Up @@ -161,6 +163,9 @@ tokens
REGIONS = 'regions';
NOVERIFY = 'noverify';
RETURNING = 'returning';
CONSISTENCY = 'consistency';
EVENTUAL = 'eventual';
STRONG = 'strong';
}


Expand Down Expand Up @@ -209,6 +214,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.types.IndexConsistency;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.CDCChangeScope;
Expand Down Expand Up @@ -585,7 +591,7 @@ cdc_change_scopes returns [Set<CDCChangeScope> ret]
;

cdc_change_scope returns [CDCChangeScope ret]
: v=(PRE | POST | CHANGE)
: v=(PRE | POST | CHANGE | IDX_MUTATIONS | DATA_ROW_STATE)
{
ret = CDCChangeScope.valueOf(v.getText().toUpperCase());
}
Expand Down Expand Up @@ -699,8 +705,8 @@ drop_cdc_node returns [DropCDCStatement ret]
// Parse a alter index statement
alter_index_node returns [AlterIndexStatement ret]
: ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name
((s=(USABLE | UNUSABLE | REBUILD (isRebuildAll=ALL)? | DISABLE | ACTIVE)) (async=ASYNC)? ((SET?)p=fam_properties)?)
{ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), isRebuildAll!=null, async!=null, p); }
((s=(USABLE | UNUSABLE | REBUILD (isRebuildAll=ALL)? | DISABLE | ACTIVE)) (async=ASYNC)? ((SET?)p=fam_properties)? | (CONSISTENCY EQ c=(STRONG | EVENTUAL)))
{ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, s!=null ? PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())) : null, isRebuildAll!=null, async!=null, p, c!=null ? IndexConsistency.valueOf(SchemaUtil.normalizeIdentifier(c.getText())) : null); }
;

// Parse a trace statement.
Expand Down Expand Up @@ -1318,6 +1324,9 @@ identifier returns [String ret]

parseNoReserved returns [String ret]
: n=NAME { $ret = n.getText(); }
| CONSISTENCY { $ret = "CONSISTENCY"; }
| EVENTUAL { $ret = "EVENTUAL"; }
| STRONG { $ret = "STRONG"; }
;

case_statement returns [ParseNode ret]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,8 @@ public static PTable joinProjectedTables(PTable left, PTable right, JoinType typ
.setDisableWAL(PTable.DEFAULT_DISABLE_WAL).setMultiTenant(left.isMultiTenant())
.setStoreNulls(left.getStoreNulls()).setViewType(left.getViewType())
.setViewIndexIdType(left.getviewIndexIdType()).setViewIndexId(left.getViewIndexId())
.setIndexType(left.getIndexType()).setTransactionProvider(left.getTransactionProvider())
.setIndexType(left.getIndexType()).setIndexConsistency(left.getIndexConsistency())
.setTransactionProvider(left.getTransactionProvider())
.setUpdateCacheFrequency(left.getUpdateCacheFrequency())
.setNamespaceMapped(left.isNamespaceMapped())
.setAutoPartitionSeqName(left.getAutoPartitionSeqName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 38;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 = MIN_TABLE_TIMESTAMP + 42;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 = MIN_TABLE_TIMESTAMP + 44;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_*
// constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.hbase.index.metrics;

import org.apache.hadoop.hbase.metrics.BaseSource;

/**
* IndexCDCConsumer metrics for eventually consistent index updates.
*/
public interface MetricsIndexCDCConsumerSource extends BaseSource {

String METRICS_NAME = "IndexCDCConsumer";
String METRICS_CONTEXT = "phoenix";
String METRICS_DESCRIPTION = "Metrics about the Phoenix Index CDC Consumer";
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;

String CDC_BATCH_PROCESS_TIME = "cdcBatchProcessTime";
String CDC_BATCH_PROCESS_TIME_DESC =
"Histogram for the end-to-end time in milliseconds for processing one CDC batch";

String CDC_MUTATION_GENERATE_TIME = "cdcMutationGenerateTime";
String CDC_MUTATION_GENERATE_TIME_DESC =
"Histogram for the time in milliseconds to generate index mutations from data row states";

String CDC_MUTATION_APPLY_TIME = "cdcMutationApplyTime";
String CDC_MUTATION_APPLY_TIME_DESC =
"Histogram for the time in milliseconds to apply index mutations to index tables";

String CDC_BATCH_COUNT = "cdcBatchCount";
String CDC_BATCH_COUNT_DESC = "The number of CDC batches processed";

String CDC_MUTATION_COUNT = "cdcMutationCount";
String CDC_MUTATION_COUNT_DESC = "The number of individual index mutations applied through CDC";

String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount";
String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing failures";

String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
String CDC_INDEX_UPDATE_LAG_DESC =
"Histogram for the lag in milliseconds between current time and the last processed CDC event";

/**
* Updates the CDC batch processing time histogram.
* @param dataTableName physical data table name
* @param t time taken in milliseconds
*/
void updateCdcBatchProcessTime(String dataTableName, long t);

/**
* Updates the CDC mutation generation time histogram.
* @param dataTableName physical data table name
* @param t time taken in milliseconds
*/
void updateCdcMutationGenerateTime(String dataTableName, long t);

/**
* Updates the CDC mutation apply time histogram.
* @param dataTableName physical data table name
* @param t time taken in milliseconds
*/
void updateCdcMutationApplyTime(String dataTableName, long t);

/**
* Increments the CDC batch count.
* @param dataTableName physical data table name
*/
void incrementCdcBatchCount(String dataTableName);

/**
* Increments the CDC mutation count by the given amount.
* @param dataTableName physical data table name
* @param count number of mutations applied
*/
void incrementCdcMutationCount(String dataTableName, long count);

/**
* Increments the CDC batch failure count.
* @param dataTableName physical data table name
*/
void incrementCdcBatchFailureCount(String dataTableName);

/**
* Updates the CDC lag histogram.
* @param dataTableName physical data table name
* @param lag lag in milliseconds between current time and last processed event
*/
void updateCdcLag(String dataTableName, long lag);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.hbase.index.metrics;

import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;

/**
* Implementation for tracking IndexCDCConsumer metrics.
*/
public class MetricsIndexCDCConsumerSourceImpl extends BaseSourceImpl
implements MetricsIndexCDCConsumerSource {

private final MetricHistogram cdcBatchProcessTimeHisto;
private final MetricHistogram cdcMutationGenerateTimeHisto;
private final MetricHistogram cdcMutationApplyTimeHisto;
private final MutableFastCounter cdcBatchCounter;
private final MutableFastCounter cdcMutationCounter;
private final MutableFastCounter cdcBatchFailureCounter;
private final MetricHistogram cdcIndexUpdateLagHisto;

public MetricsIndexCDCConsumerSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}

public MetricsIndexCDCConsumerSourceImpl(String metricsName, String metricsDescription,
String metricsContext, String metricsJmxContext) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);

cdcBatchProcessTimeHisto =
getMetricsRegistry().newHistogram(CDC_BATCH_PROCESS_TIME, CDC_BATCH_PROCESS_TIME_DESC);
cdcMutationGenerateTimeHisto = getMetricsRegistry().newHistogram(CDC_MUTATION_GENERATE_TIME,
CDC_MUTATION_GENERATE_TIME_DESC);
cdcMutationApplyTimeHisto =
getMetricsRegistry().newHistogram(CDC_MUTATION_APPLY_TIME, CDC_MUTATION_APPLY_TIME_DESC);
cdcBatchCounter = getMetricsRegistry().newCounter(CDC_BATCH_COUNT, CDC_BATCH_COUNT_DESC, 0L);
cdcMutationCounter =
getMetricsRegistry().newCounter(CDC_MUTATION_COUNT, CDC_MUTATION_COUNT_DESC, 0L);
cdcBatchFailureCounter =
getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT, CDC_BATCH_FAILURE_COUNT_DESC, 0L);
cdcIndexUpdateLagHisto =
getMetricsRegistry().newHistogram(CDC_INDEX_UPDATE_LAG, CDC_INDEX_UPDATE_LAG_DESC);
}

@Override
public void updateCdcBatchProcessTime(String dataTableName, long t) {
incrementTableSpecificHistogram(CDC_BATCH_PROCESS_TIME, dataTableName, t);
cdcBatchProcessTimeHisto.add(t);
}

@Override
public void updateCdcMutationGenerateTime(String dataTableName, long t) {
incrementTableSpecificHistogram(CDC_MUTATION_GENERATE_TIME, dataTableName, t);
cdcMutationGenerateTimeHisto.add(t);
}

@Override
public void updateCdcMutationApplyTime(String dataTableName, long t) {
incrementTableSpecificHistogram(CDC_MUTATION_APPLY_TIME, dataTableName, t);
cdcMutationApplyTimeHisto.add(t);
}

@Override
public void incrementCdcBatchCount(String dataTableName) {
incrementTableSpecificCounter(CDC_BATCH_COUNT, dataTableName);
cdcBatchCounter.incr();
}

@Override
public void incrementCdcMutationCount(String dataTableName, long count) {
MutableFastCounter tableCounter =
getMetricsRegistry().getCounter(getMetricName(CDC_MUTATION_COUNT, dataTableName), 0);
tableCounter.incr(count);
cdcMutationCounter.incr(count);
}

@Override
public void incrementCdcBatchFailureCount(String dataTableName) {
incrementTableSpecificCounter(CDC_BATCH_FAILURE_COUNT, dataTableName);
cdcBatchFailureCounter.incr();
}

@Override
public void updateCdcLag(String dataTableName, long lag) {
incrementTableSpecificHistogram(CDC_INDEX_UPDATE_LAG, dataTableName, lag);
cdcIndexUpdateLagHisto.add(lag);
}

private void incrementTableSpecificCounter(String baseName, String tableName) {
MutableFastCounter tableCounter =
getMetricsRegistry().getCounter(getMetricName(baseName, tableName), 0);
tableCounter.incr();
}

private void incrementTableSpecificHistogram(String baseName, String tableName, long t) {
MetricHistogram tableHistogram =
getMetricsRegistry().getHistogram(getMetricName(baseName, tableName));
tableHistogram.add(t);
}

private String getMetricName(String baseName, String tableName) {
return baseName + "." + tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class MetricsIndexerSourceFactory {
private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory();
private volatile MetricsIndexerSource indexerSource;
private GlobalIndexCheckerSource globalIndexCheckerSource;
private MetricsIndexCDCConsumerSource indexCDCConsumerSource;

private MetricsIndexerSourceFactory() {
}
Expand All @@ -45,4 +46,11 @@ public synchronized GlobalIndexCheckerSource getGlobalIndexCheckerSource() {
}
return INSTANCE.globalIndexCheckerSource;
}

public synchronized MetricsIndexCDCConsumerSource getIndexCDCConsumerSource() {
if (INSTANCE.indexCDCConsumerSource == null) {
INSTANCE.indexCDCConsumerSource = new MetricsIndexCDCConsumerSourceImpl();
}
return INSTANCE.indexCDCConsumerSource;
}
}
Loading