Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

making it compatible to spark 2.4 #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ organization := "com.hortonworks.hive"
scalaVersion := "2.11.8"
val scalatestVersion = "2.2.6"

sparkVersion := sys.props.getOrElse("spark.version", "2.3.1")
sparkVersion := sys.props.getOrElse("spark.version", "2.4.0")

val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.1.0")
val hiveVersion = sys.props.getOrElse("hive.version", "3.0.0")
val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.2.0")
val hiveVersion = sys.props.getOrElse("hive.version", "3.1.1")
val log4j2Version = sys.props.getOrElse("log4j2.version", "2.4.1")
val tezVersion = sys.props.getOrElse("tez.version", "0.9.1")
val thriftVersion = sys.props.getOrElse("thrift.version", "0.9.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import java.io.IOException;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes;

public class CountDataReader implements DataReader<ColumnarBatch> {
public class CountDataReader implements InputPartitionReader<ColumnarBatch> {
private long numRows;

public CountDataReader(long numRows) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package com.hortonworks.spark.sql.hive.llap;

import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.vectorized.ColumnarBatch;

public class CountDataReaderFactory implements DataReaderFactory<ColumnarBatch> {
public class CountDataReaderFactory implements InputPartition<ColumnarBatch> {
private long numRows;

public CountDataReaderFactory(long numRows) {
this.numRows = numRows;
}

@Override
public DataReader<ColumnarBatch> createDataReader() {
public InputPartitionReader<ColumnarBatch> createPartitionReader() {
return new CountDataReader(numRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ static String warehouseKey(String keySuffix) {

private static Logger LOG = LoggerFactory.getLogger(HWConf.class);
public static final String HIVESERVER2_CREDENTIAL_ENABLED = "spark.security.credentials.hiveserver2.enabled";
public static final String HIVESERVER2_JDBC_URL_PRINCIPAL = "spark.sql.hive.hiveserver2.jdbc.url.principal";
public static final String HIVESERVER2_JDBC_URL_PRINCIPAL = "" +
"spark.sql.hive.hiveserver2.jdbc.url.principal";
public static final String HIVESERVER2_JDBC_URL = "spark.sql.hive.hiveserver2.jdbc.url";

public void setString(HiveWarehouseSessionState state, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow {
public class HiveStreamingDataSourceWriter implements DataSourceWriter {
private static Logger LOG = LoggerFactory.getLogger(HiveStreamingDataSourceWriter.class);

private String jobId;
Expand All @@ -23,7 +23,7 @@ public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow {
private String metastoreKrbPrincipal;

public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commitIntervalRows, String db,
String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
this.jobId = jobId;
this.schema = schema;
this.commitIntervalRows = commitIntervalRows;
Expand All @@ -35,9 +35,9 @@ public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commi
}

@Override
public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
public DataWriterFactory<InternalRow> createWriterFactory() {
return new HiveStreamingDataWriterFactory(jobId, schema, commitIntervalRows, db, table, partition, metastoreUri,
metastoreKrbPrincipal);
metastoreKrbPrincipal);
}

@Override
Expand All @@ -49,5 +49,4 @@ public void commit(WriterCommitMessage[] messages) {
public void abort(WriterCommitMessage[] messages) {
LOG.info("Abort job {}", jobId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class HiveStreamingDataWriter implements DataWriter<InternalRow> {
private String jobId;
private StructType schema;
private int partitionId;
private int attemptNumber;
private long attemptNumber;
private String db;
private String table;
private List<String> partition;
Expand All @@ -35,7 +35,7 @@ public class HiveStreamingDataWriter implements DataWriter<InternalRow> {
private long rowsWritten = 0;
private String metastoreKrbPrincipal;

public HiveStreamingDataWriter(String jobId, StructType schema, long commitAfterNRows, int partitionId, int
public HiveStreamingDataWriter(String jobId, StructType schema, long commitAfterNRows, int partitionId, long
attemptNumber, String db, String table, List<String> partition, final String metastoreUri,
final String metastoreKrbPrincipal) {
this.jobId = jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class HiveStreamingDataWriterFactory implements DataWriterFactory<Interna
private String metastoreKrbPrincipal;

public HiveStreamingDataWriterFactory(String jobId, StructType schema, long commitIntervalRows, String db,
String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
String table, List<String> partition, final String metastoreUri, final String metastoreKrbPrincipal) {
this.jobId = jobId;
this.schema = schema;
this.db = db;
Expand All @@ -33,16 +33,15 @@ public HiveStreamingDataWriterFactory(String jobId, StructType schema, long comm
}

@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, int attemptNumber) {
public DataWriter<InternalRow> createDataWriter(int partitionId, long attemptNumber,long epochId) {
ClassLoader restoredClassloader = Thread.currentThread().getContextClassLoader();
ClassLoader isolatedClassloader = HiveIsolatedClassLoader.isolatedClassLoader();
try {
Thread.currentThread().setContextClassLoader(isolatedClassloader);
return new HiveStreamingDataWriter(jobId, schema, commitIntervalRows, partitionId, attemptNumber, db,
table, partition, metastoreUri, metastoreKrbPrincipal);
table, partition, metastoreUri, metastoreKrbPrincipal);
} finally {
Thread.currentThread().setContextClassLoader(restoredClassloader);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand All @@ -28,7 +28,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;

public class HiveWarehouseDataReader implements DataReader<ColumnarBatch> {
public class HiveWarehouseDataReader implements InputPartitionReader<ColumnarBatch> {

private RecordReader<?, ArrowWrapperWritable> reader;
private ArrowWrapperWritable wrapperWritable = new ArrowWrapperWritable();
Expand Down Expand Up @@ -67,7 +67,7 @@ protected RecordReader<?, ArrowWrapperWritable> getRecordReader(LlapInputSplit s
attemptId,
childAllocatorReservation,
arrowAllocatorMax);
LlapBaseInputFormat input = new LlapBaseInputFormat(true, allocator);
LlapBaseInputFormat input = new LlapBaseInputFormat(true, arrowAllocatorMax);
return input.getRecordReader(split, conf, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;

public class HiveWarehouseDataReaderFactory implements DataReaderFactory<ColumnarBatch> {
public class HiveWarehouseDataReaderFactory implements InputPartition<ColumnarBatch> {
private byte[] splitBytes;
private byte[] confBytes;
private transient InputSplit split;
Expand Down Expand Up @@ -51,7 +51,7 @@ public String[] preferredLocations() {
}

@Override
public DataReader<ColumnarBatch> createDataReader() {
public InputPartitionReader<ColumnarBatch> createPartitionReader() {
LlapInputSplit llapInputSplit = new LlapInputSplit();
ByteArrayInputStream splitByteArrayStream = new ByteArrayInputStream(splitBytes);
ByteArrayInputStream confByteArrayStream = new ByteArrayInputStream(confBytes);
Expand All @@ -67,7 +67,7 @@ public DataReader<ColumnarBatch> createDataReader() {
}
}

protected DataReader<ColumnarBatch> getDataReader(LlapInputSplit split, JobConf jobConf, long arrowAllocatorMax)
protected InputPartitionReader<ColumnarBatch> getDataReader(LlapInputSplit split, JobConf jobConf, long arrowAllocatorMax)
throws Exception {
return new HiveWarehouseDataReader(split, jobConf, arrowAllocatorMax);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
Expand Down Expand Up @@ -44,7 +44,7 @@
* 5. Spark pulls factories, where factory/task are 1:1 -> createBatchDataReaderFactories(..)
*/
public class HiveWarehouseDataSourceReader
implements DataSourceReader, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, SupportsPushDownFilters {
implements DataSourceReader, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, SupportsPushDownFilters {

//The pruned schema
StructType schema = null;
Expand Down Expand Up @@ -95,27 +95,27 @@ protected StructType getTableSchema() throws Exception {
replaceSparkHiveDriver();

StatementType queryKey = getQueryType();
String query;
if (queryKey == StatementType.FULL_TABLE_SCAN) {
String dbName = HWConf.DEFAULT_DB.getFromOptionsMap(options);
SchemaUtil.TableRef tableRef = SchemaUtil.getDbTableNames(dbName, options.get("table"));
query = selectStar(tableRef.databaseName, tableRef.tableName);
} else {
query = options.get("query");
}
LlapBaseInputFormat llapInputFormat = null;
try {
JobConf conf = JobUtil.createJobConf(options, query);
llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE);
InputSplit[] splits = llapInputFormat.getSplits(conf, 0);
LlapInputSplit schemaSplit = (LlapInputSplit) splits[0];
Schema schema = schemaSplit.getSchema();
return SchemaUtil.convertSchema(schema);
} finally {
if(llapInputFormat != null) {
close();
}
String query;
if (queryKey == StatementType.FULL_TABLE_SCAN) {
String dbName = HWConf.DEFAULT_DB.getFromOptionsMap(options);
SchemaUtil.TableRef tableRef = SchemaUtil.getDbTableNames(dbName, options.get("table"));
query = selectStar(tableRef.databaseName, tableRef.tableName);
} else {
query = options.get("query");
}
LlapBaseInputFormat llapInputFormat = null;
try {
JobConf conf = JobUtil.createJobConf(options, query);
llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE);
InputSplit[] splits = llapInputFormat.getSplits(conf, 0);
LlapInputSplit schemaSplit = (LlapInputSplit) splits[0];
Schema schema = schemaSplit.getSchema();
return SchemaUtil.convertSchema(schema);
} finally {
if(llapInputFormat != null) {
close();
}
}
}

@Override public StructType readSchema() {
Expand All @@ -134,12 +134,12 @@ protected StructType getTableSchema() throws Exception {
//"returns unsupported filters."
@Override public Filter[] pushFilters(Filter[] filters) {
pushedFilters = Arrays.stream(filters).
filter((filter) -> FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()).
toArray(Filter[]::new);
filter((filter) -> FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()).
toArray(Filter[]::new);

return Arrays.stream(filters).
filter((filter) -> !FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()).
toArray(Filter[]::new);
filter((filter) -> !FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()).
toArray(Filter[]::new);
}

@Override public Filter[] pushedFilters() {
Expand All @@ -150,11 +150,11 @@ protected StructType getTableSchema() throws Exception {
this.schema = requiredSchema;
}

@Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
public List<InputPartition<ColumnarBatch>> createBatchDataReaderFactories() {
try {
boolean countStar = this.schema.length() == 0;
String queryString = getQueryString(SchemaUtil.columnNames(schema), pushedFilters);
List<DataReaderFactory<ColumnarBatch>> factories = new ArrayList<>();
List<InputPartition<ColumnarBatch>> factories = new ArrayList<>();
if (countStar) {
LOG.info("Executing count with query: {}", queryString);
factories.addAll(getCountStarFactories(queryString));
Expand All @@ -167,8 +167,12 @@ protected StructType getTableSchema() throws Exception {
}
}

protected List<DataReaderFactory<ColumnarBatch>> getSplitsFactories(String query) {
List<DataReaderFactory<ColumnarBatch>> tasks = new ArrayList<>();
@Override public List<InputPartition<ColumnarBatch>> planBatchInputPartitions(){
return createBatchDataReaderFactories();
}

protected List<InputPartition<ColumnarBatch>> getSplitsFactories(String query) {
List<InputPartition<ColumnarBatch>> tasks = new ArrayList<>();
try {
JobConf jobConf = JobUtil.createJobConf(options, query);
LlapBaseInputFormat llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE);
Expand All @@ -184,12 +188,12 @@ protected List<DataReaderFactory<ColumnarBatch>> getSplitsFactories(String query
return tasks;
}

protected DataReaderFactory<ColumnarBatch> getDataReaderFactory(InputSplit split, JobConf jobConf, long arrowAllocatorMax) {
protected InputPartition<ColumnarBatch> getDataReaderFactory(InputSplit split, JobConf jobConf, long arrowAllocatorMax) {
return new HiveWarehouseDataReaderFactory(split, jobConf, arrowAllocatorMax);
}

private List<DataReaderFactory<ColumnarBatch>> getCountStarFactories(String query) {
List<DataReaderFactory<ColumnarBatch>> tasks = new ArrayList<>(100);
private List<InputPartition<ColumnarBatch>> getCountStarFactories(String query) {
List<InputPartition<ColumnarBatch>> tasks = new ArrayList<>(100);
long count = getCount(query);
String numTasksString = HWConf.COUNT_TASKS.getFromOptionsMap(options);
int numTasks = Integer.parseInt(numTasksString);
Expand All @@ -205,7 +209,7 @@ private List<DataReaderFactory<ColumnarBatch>> getCountStarFactories(String quer
protected long getCount(String query) {
try(Connection conn = getConnection()) {
DriverResultSet rs = DefaultJDBCWrapper.executeStmt(conn, HWConf.DEFAULT_DB.getFromOptionsMap(options), query,
Long.parseLong(HWConf.MAX_EXEC_RESULTS.getFromOptionsMap(options)));
Long.parseLong(HWConf.MAX_EXEC_RESULTS.getFromOptionsMap(options)));
return rs.getData().get(0).getLong(0);
} catch (SQLException e) {
LOG.error("Failed to connect to HS2", e);
Expand Down Expand Up @@ -239,4 +243,4 @@ public void close() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
Expand All @@ -35,7 +35,7 @@

import static com.hortonworks.spark.sql.hive.llap.util.HiveQlUtil.loadInto;

public class HiveWarehouseDataSourceWriter implements SupportsWriteInternalRow {
public class HiveWarehouseDataSourceWriter implements DataSourceWriter {
protected String jobId;
protected StructType schema;
protected Path path;
Expand All @@ -52,7 +52,7 @@ public HiveWarehouseDataSourceWriter(Map<String, String> options, String jobId,
this.conf = conf;
}

@Override public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
@Override public DataWriterFactory<InternalRow> createWriterFactory() {
return new HiveWarehouseDataWriterFactory(jobId, schema, path, new SerializableHadoopConfiguration(conf));
}

Expand Down