Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
support for Hive 2.x (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
fermich authored and Paul Yang committed Jan 6, 2017
1 parent f39db61 commit d2958b9
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 21 deletions.
Expand Up @@ -13,20 +13,23 @@
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.session.SessionState;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;


public class AuditLogHookUtils {

private static final Log LOG = LogFactory.getLog(AuditLogHookUtils.class);
Expand Down Expand Up @@ -167,7 +170,6 @@ public static void insertAuditLogEntry(
}

SessionState sessionState = new SessionState(hiveConf);
sessionState.setCmd(command);

// Map the HiveOperation to the ...ql.plan.HiveOperation when possible.
// ALTERTABLE_EXCHANGEPARTITION may be the only one that can't be mapped.
Expand All @@ -187,11 +189,24 @@ public static void insertAuditLogEntry(
}
}

sessionState.setCommandType(commandType);
sessionState.setMapRedStats(mapRedStatsPerStage);
SessionState.setCurrentSessionState(sessionState);

// Run the hook
cliAuditLogHook.run(sessionState, readEntities, writeEntities, null, null);
SemanticAnalyzer semanticAnalyzer = new SemanticAnalyzer(hiveConf);
QueryPlan queryPlan = new QueryPlan(
command,
semanticAnalyzer,
null,
commandType != null ? commandType.getOperationName() : null
);

HookContext hookContext = new HookContext(queryPlan, null);
hookContext.setInputs(readEntities);
hookContext.setOutputs(writeEntities);
hookContext.setConf(hiveConf);

cliAuditLogHook.run(hookContext);
}

/**
Expand Down
@@ -1,17 +1,14 @@
package com.airbnb.reair.hive.hooks;

import com.airbnb.reair.common.Command;
import com.airbnb.reair.db.DbCredentials;
import com.airbnb.reair.utils.RetryableTask;
import com.airbnb.reair.utils.RetryingTaskRunner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

Expand All @@ -26,7 +23,7 @@
* generates entries in the output objects table, and the map reduce stats
* tables.
*/
public class CliAuditLogHook implements PostExecute {
public class CliAuditLogHook implements ExecuteWithHookContext {

public static Logger LOG = Logger.getLogger(CliAuditLogHook.class);

Expand Down Expand Up @@ -60,14 +57,27 @@ protected DbCredentials getDbCreds(Configuration conf) {
}

@Override
public void run(final SessionState sessionState,
public void run(HookContext hookContext) throws Exception {
Set<ReadEntity> inputs = hookContext.getInputs();
Set<WriteEntity> outputs = hookContext.getOutputs();
UserGroupInformation ugi = hookContext.getUgi();

run(hookContext, inputs, outputs, ugi);
}

/**
*
* @param hookContext
* The hook context passed to each hooks.
* @throws Exception if there's an error
*/
public void run(final HookContext hookContext,
final Set<ReadEntity> readEntities,
final Set<WriteEntity> writeEntities,
final LineageInfo lineageInfo,
final UserGroupInformation userGroupInformation)
throws Exception {
HiveConf conf = sessionState.getConf();
SessionStateLite sessionStateLite = new SessionStateLite(sessionState);
HiveConf conf = hookContext.getConf();
SessionStateLite sessionStateLite = new SessionStateLite(hookContext.getQueryPlan());

final DbCredentials dbCreds = getDbCreds(conf);

Expand Down
Expand Up @@ -2,6 +2,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.session.SessionState;

import java.util.HashMap;
Expand Down Expand Up @@ -44,13 +45,16 @@ public SessionStateLite(
/**
* Creates a lightweight representation of the session state.
*
* @param sessionState The Hive session state
* @param plan The Hive query plan
*/
public SessionStateLite(SessionState sessionState) {
this.cmd = sessionState.getCmd();
this.commandType = sessionState.getCommandType();
this.queryId = sessionState.getQueryId();
public SessionStateLite(QueryPlan plan) {

SessionState sessionState = SessionState.get();

this.conf = new HiveConf(sessionState.getConf());
this.cmd = plan.getQueryStr();
this.commandType = plan.getOperationName();
this.queryId = plan.getQueryId();
this.mapRedStats = new HashMap<>(sessionState.getMapRedStats());
}

Expand Down
Expand Up @@ -105,6 +105,7 @@ public void testAuditLogTable() throws Exception {
"test_db",
"test_output_table");
outputTable.setCreateTime(0);
outputTable.setOwner("table.owner");

List<org.apache.hadoop.hive.ql.metadata.Table> outputTables =
new ArrayList<>();
Expand Down Expand Up @@ -167,8 +168,9 @@ public void testAuditLogTable() throws Exception {
"test_db.test_output_table",
"TABLE",
"{\"1\":{\"str\":\"test_"
+ "output_table\"},\"2\":{\"str\":\"test_db\"},\"4\":"
+ "{\"i32\":0},\"5\":{\"i32\":0},\"6\":{\"i3"
+ "output_table\"},\"2\":{\"str\":\"test_db\"},"
+ "\"3\":{\"str\":\"table.owner\"},"
+ "\"4\":{\"i32\":0},\"5\":{\"i32\":0},\"6\":{\"i3"
+ "2\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",0]}"
+ ",\"3\":{\"str\":\"org.apache.hadoop.mapred.Sequenc"
+ "eFileInputFormat\"},\"4\":{\"str\":\"org.apache.ha"
Expand Down

0 comments on commit d2958b9

Please sign in to comment.