Skip to content

Commit

Permalink
added logging and ViewTTLJobState to distinguish diff logging so that…
Browse files Browse the repository at this point in the history
… end users can have customized metrics based on the STATE
  • Loading branch information
yanxinyi committed Apr 21, 2020
1 parent 2074ba6 commit 71f4758
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
import org.apache.phoenix.mapreduce.util.ViewInfoTracker.ViewTTLJobState;
import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
import org.apache.phoenix.mapreduce.util.PhoenixViewTtlUtil;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.PhoenixRuntime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,35 +85,35 @@ protected void map(NullWritable key, ViewInfoTracker value,
}
}

private void deletingExpiredRows(PhoenixConnection connection, ViewInfoTracker value, Configuration config)
private void deletingExpiredRows(PhoenixConnection connection, ViewInfoTracker view, Configuration config)
throws SQLException {

String deleteIfExpiredStatement = "DELETE FROM " + value.getViewName() +
" WHERE TO_NUMBER(NOW()) - TO_NUMBER(PHOENIX_ROW_TIMESTAMP()) > " + value.getViewTtl();

PTable view = PhoenixRuntime.getTable(connection, value.getViewName());

deletingExpiredRows(connection, view, deleteIfExpiredStatement, config);
}
String deleteIfExpiredStatement = "DELETE FROM " + view.getViewName() +
" WHERE TO_NUMBER(NOW()) - TO_NUMBER(PHOENIX_ROW_TIMESTAMP()) > " + view.getViewTtl();

private void deletingExpiredRows(PhoenixConnection connection, PTable view, String deleteIfExpiredStatement,
Configuration config) throws SQLException {
try {
this.multiViewJobStatusTracker.updateJobStatus(view, 0,
JobStatus.State.PREP.getValue(), config, 0);
ViewTTLJobState.PREP.getValue(), config, 0);

long startTime = System.currentTimeMillis();
this.multiViewJobStatusTracker.updateJobStatus(view, 0,
JobStatus.State.RUNNING.getValue(), config, 0 );
ViewTTLJobState.RUNNING.getValue(), config, 0 );
connection.setAutoCommit(true);
int numberOfDeletedRows = connection.createStatement().executeUpdate(deleteIfExpiredStatement);

this.multiViewJobStatusTracker.updateJobStatus(view, numberOfDeletedRows,
JobStatus.State.SUCCEEDED.getValue(), config, System.currentTimeMillis() - startTime);
ViewTTLJobState.SUCCEEDED.getValue(), config, System.currentTimeMillis() - startTime);
} catch (Exception e) {
this.multiViewJobStatusTracker.updateJobStatus(view, 0,
JobStatus.State.FAILED.getValue(), config, 0);
LOGGER.info("Deleting Expired Rows has an exception for : " + e.getMessage());
int state;
if (e instanceof SQLException && ((SQLException) e).getErrorCode() == SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) {
LOGGER.info("View has been deleted : " + e.getMessage());
state = ViewTTLJobState.DELETED.getValue();
} else {
LOGGER.info("Deleting Expired Rows has an exception for : " + e.getMessage());
state = ViewTTLJobState.FAILED.getValue();
}

this.multiViewJobStatusTracker.updateJobStatus(view, 0, state, config, 0);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@
*/
package org.apache.phoenix.mapreduce.util;

import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.mapreduce.util.ViewInfoTracker.ViewTTLJobState;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultMultiViewJobStatusTracker implements MultiViewJobStatusTracker {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMultiViewJobStatusTracker.class);

public void updateJobStatus(PTable view, long numberOfDeletedRows, int state, Configuration config, long duration) {
if (state == JobStatus.State.SUCCEEDED.getValue()) {
LOGGER.debug(String.format("Number of deleted rows from view %s, TenantID %s : %d, duration : %d.",
view.getTableName(), view.getTenantId(), numberOfDeletedRows, duration));
public void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state,
Configuration config, long duration) {
if (state == ViewTTLJobState.SUCCEEDED.getValue()) {
LOGGER.debug(String.format("Number of deleted rows from view %s, TenantID %s : " +
"number of deleted row %d, duration : %d.",
view.getViewName(), view.getTenantId(), numberOfDeletedRows, duration));
} else if (state == ViewTTLJobState.DELETED.getValue()) {
LOGGER.debug(String.format("View has been deleted, view info : view %s, TenantID %s : %d.",
view.getViewName(), view.getTenantId()));
} else {
LOGGER.debug(String.format("Job is in state %d for view %s, TenantID %s : %d.",
state, view.getTableName(), view.getTenantId(), numberOfDeletedRows, state));
LOGGER.debug(String.format("Job is in state %d for view %s, TenantID %s, and duration : %d ",
state, view.getViewName(), view.getTenantId(), duration));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuratio
viewInfoWritables.add(viewInfoTracker);
}
}
} catch (SQLException e ) {
} catch (Exception e) {
LOGGER.error("Getting view info failed with: " + e.getMessage());
} catch (Exception e) {

}

return viewInfoWritables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
*/
package org.apache.phoenix.mapreduce.util;

import org.apache.phoenix.schema.PTable;
import org.apache.hadoop.conf.Configuration;


public interface MultiViewJobStatusTracker {
void updateJobStatus(PTable view, long numberOfDeletedRows, int state, Configuration config, long duration);
void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state, Configuration config, long duration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@
import java.io.IOException;

public class ViewInfoTracker implements ViewInfoWritable {
public enum ViewTTLJobState {
RUNNING(1),
SUCCEEDED(2),
FAILED(3),
PREP(4),
KILLED(5),
DELETED(6);

int value;

ViewTTLJobState(int value) {
this.value = value;
}

public int getValue() {
return this.value;
}
}

String tenantId;
String viewName;
long viewTtl;
Expand Down

0 comments on commit 71f4758

Please sign in to comment.