From e2e92fca3f4b909be03cb2340d8bb143b4e2dbe8 Mon Sep 17 00:00:00 2001 From: "Jack Li(Analytics Engineering)" Date: Thu, 22 Jul 2021 16:05:03 -0700 Subject: [PATCH] Support data preprocessing in Spark framework --- .../job/HadoopSegmentPreprocessingJob.java | 174 +---------------- .../job/mappers/SegmentCreationMapper.java | 2 +- .../HadoopAvroDataPreprocessingHelper.java | 67 +++++++ ...ava => HadoopDataPreprocessingHelper.java} | 174 ++++++----------- .../HadoopDataPreprocessingHelperFactory.java | 60 ++++++ .../job/preprocess/HadoopJobPreparer.java | 28 +++ .../HadoopOrcDataPreprocessingHelper.java | 66 +++++++ ...=> HadoopDataPreprocessingHelperTest.java} | 38 +++- .../pinot-ingestion-common/pom.xml | 9 + .../jobs/SegmentPreprocessingJob.java | 180 +++++++++++++++++- .../AvroDataPreprocessingHelper.java | 45 ++--- .../preprocess/DataPreprocessingHelper.java | 126 ++++++++++++ .../DataPreprocessingHelperFactory.java | 4 +- .../OrcDataPreprocessingHelper.java | 46 ++--- .../SampleTimeColumnExtractable.java | 28 +++ .../mappers/AvroDataPreprocessingMapper.java | 6 +- .../mappers/OrcDataPreprocessingMapper.java | 8 +- .../mappers/SegmentPreprocessingMapper.java | 4 +- .../AvroDataPreprocessingPartitioner.java | 4 +- .../partitioners/GenericPartitioner.java | 4 +- .../OrcDataPreprocessingPartitioner.java | 6 +- .../PartitionFunctionFactory.java | 2 +- .../AvroDataPreprocessingReducer.java | 4 +- .../reducers/OrcDataPreprocessingReducer.java | 4 +- .../utils}/DataPreprocessingUtils.java | 2 +- .../utils}/InternalConfigConstants.java | 2 +- .../utils/preprocess/DataFileUtils.java | 2 +- .../utils/preprocess/HadoopUtils.java | 41 ++++ .../ingestion}/utils/preprocess/OrcUtils.java | 2 +- .../utils/preprocess/TextComparator.java | 2 +- .../v0_deprecated/pinot-spark/pom.xml | 28 ++- .../jobs/SparkSegmentPreprocessingJob.java | 104 ++++++++++ .../SparkAvroDataPreprocessingHelper.java | 33 ++++ .../SparkDataPreprocessingComparator.java | 48 +++++ .../SparkDataPreprocessingHelper.java | 163 ++++++++++++++++ .../SparkDataPreprocessingHelperFactory.java | 60 ++++++ .../SparkDataPreprocessingJobKey.java | 37 ++++ .../SparkDataPreprocessingPartitioner.java | 64 +++++++ .../SparkOrcDataPreprocessingHelper.java | 33 ++++ .../pinot/spark/utils}/HadoopUtils.java | 2 +- 40 files changed, 1307 insertions(+), 405 deletions(-) create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/{DataPreprocessingHelper.java => HadoopDataPreprocessingHelper.java} (50%) create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/{DataPreprocessingHelperTest.java => HadoopDataPreprocessingHelperTest.java} (78%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion}/preprocess/AvroDataPreprocessingHelper.java (74%) create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion}/preprocess/DataPreprocessingHelperFactory.java (95%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion}/preprocess/OrcDataPreprocessingHelper.java (82%) create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/mappers/AvroDataPreprocessingMapper.java (95%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/mappers/OrcDataPreprocessingMapper.java (94%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/mappers/SegmentPreprocessingMapper.java (98%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/partitioners/AvroDataPreprocessingPartitioner.java (97%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/partitioners/GenericPartitioner.java (95%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/partitioners/OrcDataPreprocessingPartitioner.java (96%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/partitioners/PartitionFunctionFactory.java (98%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/reducers/AvroDataPreprocessingReducer.java (96%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess}/reducers/OrcDataPreprocessingReducer.java (96%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils}/DataPreprocessingUtils.java (98%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils}/InternalConfigConstants.java (98%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion}/utils/preprocess/DataFileUtils.java (97%) create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion}/utils/preprocess/OrcUtils.java (98%) rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop => pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion}/utils/preprocess/TextComparator.java (96%) create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java create mode 100644 pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java rename pinot-plugins/pinot-batch-ingestion/v0_deprecated/{pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess => pinot-spark/src/main/java/org/apache/pinot/spark/utils}/HadoopUtils.java (96%) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java index 85f5c6cdd288..a4b008696f2b 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java @@ -18,33 +18,19 @@ */ package org.apache.pinot.hadoop.job; -import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper; -import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory; +import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper; +import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory; import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; -import org.apache.pinot.spi.config.table.ColumnPartitionConfig; -import org.apache.pinot.spi.config.table.FieldConfig; -import org.apache.pinot.spi.config.table.IndexingConfig; -import org.apache.pinot.spi.config.table.SegmentPartitionConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableCustomConfig; -import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,23 +46,6 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); - private String _partitionColumn; - private int _numPartitions; - private String _partitionFunction; - private String _partitionColumnDefaultNullValue; - - private String _sortingColumn; - private FieldSpec.DataType _sortingColumnType; - private String _sortingColumnDefaultNullValue; - - private int _numOutputFiles; - private int _maxNumRecordsPerFile; - - private TableConfig _tableConfig; - private org.apache.pinot.spi.data.Schema _pinotTableSchema; - - private Set _preprocessingOperations; - public HadoopSegmentPreprocessingJob(final Properties properties) { super(properties); } @@ -99,8 +68,8 @@ public void run() // Cleans up preprocessed output dir if exists cleanUpPreprocessedOutputs(_preprocessedOutputDir); - DataPreprocessingHelper dataPreprocessingHelper = - DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + HadoopDataPreprocessingHelper dataPreprocessingHelper = + HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); dataPreprocessingHelper .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue, @@ -130,130 +99,6 @@ public void run() LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); } - private void fetchPreProcessingOperations() { - _preprocessingOperations = new HashSet<>(); - TableCustomConfig customConfig = _tableConfig.getCustomConfig(); - if (customConfig != null) { - Map customConfigMap = customConfig.getCustomConfigs(); - if (customConfigMap != null && !customConfigMap.isEmpty()) { - String preprocessingOperationsString = - customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); - DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); - } - } - } - - private void fetchPartitioningConfig() { - // Fetch partition info from table config. - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { - LOGGER.info("Partitioning is disabled."); - return; - } - SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); - if (segmentPartitionConfig != null) { - Map columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); - Preconditions - .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); - if (columnPartitionMap.size() == 1) { - _partitionColumn = columnPartitionMap.keySet().iterator().next(); - _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); - _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); - _partitionColumnDefaultNullValue = - _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString(); - } - } else { - LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); - } - } - - private void fetchSortingConfig() { - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { - LOGGER.info("Sorting is disabled."); - return; - } - // Fetch sorting info from table config first. - List sortingColumns = new ArrayList<>(); - List fieldConfigs = _tableConfig.getFieldConfigList(); - if (fieldConfigs != null && !fieldConfigs.isEmpty()) { - for (FieldConfig fieldConfig : fieldConfigs) { - if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { - sortingColumns.add(fieldConfig.getName()); - } - } - } - if (!sortingColumns.isEmpty()) { - Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); - _sortingColumn = sortingColumns.get(0); - return; - } - - // There is no sorted column specified in field configs, try to find sorted column from indexing config. - IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); - List sortedColumns = indexingConfig.getSortedColumn(); - if (sortedColumns != null) { - Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); - if (sortedColumns.size() == 1) { - _sortingColumn = sortedColumns.get(0); - FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); - Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); - Preconditions - .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); - _sortingColumnType = fieldSpec.getDataType(); - Preconditions - .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, - _sortingColumn); - LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); - } - } - if (_sortingColumn != null) { - _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString(); - } - } - - private void fetchResizingConfig() { - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { - LOGGER.info("Resizing is disabled."); - return; - } - TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); - if (tableCustomConfig == null) { - _numOutputFiles = 0; - return; - } - Map customConfigsMap = tableCustomConfig.getCustomConfigs(); - if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { - _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); - Preconditions.checkState(_numOutputFiles > 0, - String.format("The value of %s should be positive! Current value: %s", - InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); - } else { - _numOutputFiles = 0; - } - - if (customConfigsMap != null) { - int maxNumRecords; - if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { - LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", - InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, - InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); - maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); - } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { - maxNumRecords = - Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); - } else { - return; - } - // TODO: add a in-built maximum value for this config to avoid having too many small files. - // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from - // generating too many small files. - Preconditions.checkArgument(maxNumRecords > 0, - "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE - + " should be positive. Current value: " + maxNumRecords); - LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); - _maxNumRecordsPerFile = maxNumRecords; - } - } - @Override protected Schema getSchema() throws IOException { @@ -275,15 +120,6 @@ protected Schema getSchema() protected void addAdditionalJobProperties(Job job) { } - private void setTableConfigAndSchema() - throws IOException { - _tableConfig = getTableConfig(); - _pinotTableSchema = getSchema(); - - Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); - Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); - } - /** * Cleans up outputs in preprocessed output directory. */ diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 42917903dd30..ac8e2fddd6f1 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -35,9 +35,9 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.pinot.common.utils.TarGzCompressionUtils; -import org.apache.pinot.hadoop.job.InternalConfigConstants; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentCreationJob; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig; import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java new file mode 100644 index 000000000000..adaef88c2e16 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.avro.mapreduce.AvroMultipleOutputs; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopAvroDataPreprocessingHelper extends HadoopDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class); + + public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Schema avroSchema = (Schema) getSchema(_dataPreprocessingHelper._sampleRawDataPath); + LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); + validateConfigsAgainstSchema(avroSchema); + + job.setInputFormatClass(AvroKeyInputFormat.class); + job.setMapperClass(AvroDataPreprocessingMapper.class); + + job.setReducerClass(AvroDataPreprocessingReducer.class); + AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); + AvroMultipleOutputs.setCountersEnabled(job, true); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); + job.setOutputKeyClass(AvroKey.class); + job.setOutputValueClass(NullWritable.class); + + AvroJob.setInputKeySchema(job, avroSchema); + AvroJob.setMapOutputValueSchema(job, avroSchema); + AvroJob.setOutputKeySchema(job, avroSchema); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java similarity index 50% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java index dc51e71e66c8..bf89c62f178a 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java @@ -19,7 +19,6 @@ package org.apache.pinot.hadoop.job.preprocess; import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; @@ -31,72 +30,30 @@ import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; -import org.apache.pinot.hadoop.utils.preprocess.TextComparator; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.utils.preprocess.TextComparator; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class DataPreprocessingHelper { - private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); +public abstract class HadoopDataPreprocessingHelper implements HadoopJobPreparer { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class); - String _partitionColumn; - int _numPartitions; - String _partitionFunction; - String _partitionColumnDefaultNullValue; + protected DataPreprocessingHelper _dataPreprocessingHelper; - String _sortingColumn; - private FieldSpec.DataType _sortingColumnType; - String _sortingColumnDefaultNullValue; - - private int _numOutputFiles; - private int _maxNumRecordsPerFile; - - private TableConfig _tableConfig; - private Schema _pinotTableSchema; - - List _inputDataPaths; - Path _sampleRawDataPath; - Path _outputPath; - - public DataPreprocessingHelper(List inputDataPaths, Path outputPath) { - _inputDataPaths = inputDataPaths; - _sampleRawDataPath = inputDataPaths.get(0); - _outputPath = outputPath; - } - - public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, - String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn, - FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles, - int maxNumRecordsPerFile) { - _tableConfig = tableConfig; - _pinotTableSchema = tableSchema; - _partitionColumn = partitionColumn; - _numPartitions = numPartitions; - _partitionFunction = partitionFunction; - _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue; - - _sortingColumn = sortingColumn; - _sortingColumnType = sortingColumnType; - _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue; - - _numOutputFiles = numOutputFiles; - _maxNumRecordsPerFile = maxNumRecordsPerFile; + public HadoopDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + _dataPreprocessingHelper = dataPreprocessingHelper; } public Job setUpJob() @@ -105,22 +62,23 @@ public Job setUpJob() Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION); Configuration jobConf = job.getConfiguration(); // Input and output paths. - int numInputPaths = _inputDataPaths.size(); + int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size(); jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); - setValidationConfigs(job, _sampleRawDataPath); - for (Path inputFile : _inputDataPaths) { + _dataPreprocessingHelper.setValidationConfigs(job, _dataPreprocessingHelper._sampleRawDataPath); + for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) { FileInputFormat.addInputPath(job, inputFile); } setHadoopJobConfigs(job); // Sorting column - if (_sortingColumn != null) { - LOGGER.info("Adding sorting column: {} to job config", _sortingColumn); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name()); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue); - - switch (_sortingColumnType) { + if (_dataPreprocessingHelper._sortingColumn != null) { + LOGGER.info("Adding sorting column: {} to job config", _dataPreprocessingHelper._sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _dataPreprocessingHelper._sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _dataPreprocessingHelper._sortingColumnType.name()); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, + _dataPreprocessingHelper._sortingColumnDefaultNullValue); + + switch (_dataPreprocessingHelper._sortingColumnType) { case INT: job.setMapOutputKeyClass(IntWritable.class); break; @@ -146,28 +104,29 @@ public Job setUpJob() // Partition column int numReduceTasks = 0; - if (_partitionColumn != null) { - numReduceTasks = _numPartitions; + if (_dataPreprocessingHelper._partitionColumn != null) { + numReduceTasks = _dataPreprocessingHelper._numPartitions; jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); job.setPartitionerClass(GenericPartitioner.class); - jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); - if (_partitionFunction != null) { - jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); + jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _dataPreprocessingHelper._partitionColumn); + if (_dataPreprocessingHelper._partitionFunction != null) { + jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _dataPreprocessingHelper._partitionFunction); } - jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue); + jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, + _dataPreprocessingHelper._partitionColumnDefaultNullValue); jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks); } else { - if (_numOutputFiles > 0) { - numReduceTasks = _numOutputFiles; + if (_dataPreprocessingHelper._numOutputFiles > 0) { + numReduceTasks = _dataPreprocessingHelper._numOutputFiles; } else { // default number of input paths - numReduceTasks = _inputDataPaths.size(); + numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size(); } } - job.setPartitionerClass(getPartitioner()); + job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner()); // Maximum number of records per output file - jobConf - .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile)); + jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, + Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile)); // Number of reducers LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); job.setNumReduceTasks(numReduceTasks); @@ -177,54 +136,10 @@ public Job setUpJob() return job; } - abstract Class getPartitioner(); - - abstract void setUpMapperReducerConfigs(Job job) - throws IOException; - - abstract String getSampleTimeColumnValue(String timeColumnName) - throws IOException; - - private void setValidationConfigs(Job job, Path path) - throws IOException { - SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); - - // TODO: Serialize and deserialize validation config by creating toJson and fromJson - // If the use case is an append use case, check that one time unit is contained in one file. If there is more - // than one, - // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name - // and value - if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { - job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); - String timeColumnName = validationConfig.getTimeColumnName(); - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); - if (timeColumnName != null) { - DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); - if (dateTimeFieldSpec != null) { - DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); - job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); - String sdfPattern = formatSpec.getSDFPattern(); - if (sdfPattern != null) { - job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); - } - } - } - job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, - IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); - - String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); - if (sampleTimeColumnValue != null) { - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); - } - } - } - private void setHadoopJobConfigs(Job job) { job.setJarByClass(HadoopSegmentPreprocessingJob.class); job.setJobName(getClass().getName()); - FileOutputFormat.setOutputPath(job, _outputPath); + FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath); job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); // Turn this on to always firstly use class paths that user specifies. job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); @@ -236,4 +151,23 @@ private void setHadoopJobConfigs(Job job) { job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); } } + + public Object getSchema(Path inputPathDir) + throws IOException { + return _dataPreprocessingHelper.getSchema(inputPathDir); + } + + public void validateConfigsAgainstSchema(Object schema) { + _dataPreprocessingHelper.validateConfigsAgainstSchema(schema); + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn, + FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles, + int maxNumRecordsPerFile) { + _dataPreprocessingHelper + .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction, + partitionColumnDefaultNullValue, + sortingColumn, sortingColumnType, sortingColumnDefaultNullValue, numOutputFiles, maxNumRecordsPerFile); + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java new file mode 100644 index 000000000000..c8dec825461b --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopDataPreprocessingHelperFactory { + private HadoopDataPreprocessingHelperFactory() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class); + + public static HadoopDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + throws IOException { + final List avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); + final List orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); + + int numAvroFiles = avroFiles.size(); + int numOrcFiles = orcFiles.size(); + Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0, + "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles, + inputPaths); + Preconditions + .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s", + inputPaths); + + if (numAvroFiles > 0) { + LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); + return new HadoopAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath)); + } else { + LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); + return new HadoopOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java new file mode 100644 index 000000000000..c4834cfe5b50 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.Job; + + +public interface HadoopJobPreparer { + + void setUpMapperReducerConfigs(Job job) throws IOException; +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java new file mode 100644 index 000000000000..0d9ee78f02b6 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.orc.OrcConf; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.orc.mapreduce.OrcInputFormat; +import org.apache.orc.mapreduce.OrcOutputFormat; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopOrcDataPreprocessingHelper extends HadoopDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class); + + public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath); + String orcSchemaString = orcSchema.toString(); + LOGGER.info("Orc schema is: {}", orcSchemaString); + validateConfigsAgainstSchema(orcSchema); + + job.setInputFormatClass(OrcInputFormat.class); + job.setMapperClass(OrcDataPreprocessingMapper.class); + job.setMapOutputValueClass(OrcValue.class); + Configuration jobConf = job.getConfiguration(); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + + job.setReducerClass(OrcDataPreprocessingReducer.class); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(OrcStruct.class); + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java similarity index 78% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java index 2f97f720f823..5f7464690956 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperTest.java @@ -19,13 +19,13 @@ package org.apache.pinot.hadoop.job.preprocess; import com.google.common.base.Preconditions; +import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -34,6 +34,8 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -41,18 +43,34 @@ import static org.testng.Assert.assertNull; -public class DataPreprocessingHelperTest { +public class HadoopDataPreprocessingHelperTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "HadoopDataPreprocessingHelperTest"); - @Test - public void testDataPreprocessingHelper() + @BeforeClass + public static void setUp() throws IOException { - List inputPaths = new ArrayList<>(); String pathString = Preconditions - .checkNotNull(DataPreprocessingHelperTest.class.getClassLoader().getResource("data/test_sample_data.avro")) + .checkNotNull( + HadoopDataPreprocessingHelperTest.class.getClassLoader().getResource("data/test_sample_data.avro")) .getPath(); - inputPaths.add(new Path(pathString)); + + // Copy the input path to a temp directory. + FileUtils.deleteQuietly(TEMP_DIR); + FileUtils.forceMkdir(TEMP_DIR); + FileUtils.copyFileToDirectory(new File(pathString), TEMP_DIR); + } + + @AfterClass + public static void tearDown() { + FileUtils.deleteQuietly(TEMP_DIR); + } + + @Test + public void testDataPreprocessingHelper() + throws IOException { Path outputPath = new Path("mockOutputPath"); - DataPreprocessingHelper dataPreprocessingHelper = new AvroDataPreprocessingHelper(inputPaths, outputPath); + HadoopDataPreprocessingHelper dataPreprocessingHelper = + HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(new Path(TEMP_DIR.toString()), outputPath); BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null, "APPEND", "DAILY"); IngestionConfig ingestionConfig = new IngestionConfig(batchIngestionConfig, null, null, null, null); diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml index 965a98a42799..3d7b489f4e16 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml @@ -117,5 +117,14 @@ + + org.apache.avro + avro-mapred + provided + + + org.apache.orc + orc-mapreduce + diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java index 70c71e3854d0..df6d46253857 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java @@ -21,11 +21,25 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableCustomConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +53,7 @@ * * enable.preprocessing: false by default. Enables preprocessing job. */ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { - private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingJob.class); protected final Path _schemaFile; protected final Path _inputSegmentDir; protected final Path _preprocessedOutputDir; @@ -47,6 +61,23 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { protected final Path _pathToDependencyJar; protected boolean _enablePreprocessing; + protected String _partitionColumn; + protected int _numPartitions; + protected String _partitionFunction; + protected String _partitionColumnDefaultNullValue; + + protected String _sortingColumn; + protected FieldSpec.DataType _sortingColumnType; + protected String _sortingColumnDefaultNullValue; + + protected int _numOutputFiles; + protected int _maxNumRecordsPerFile; + + protected TableConfig _tableConfig; + protected org.apache.pinot.spi.data.Schema _pinotTableSchema; + + protected Set _preprocessingOperations; + public SegmentPreprocessingJob(final Properties properties) { super(properties); @@ -60,13 +91,13 @@ public SegmentPreprocessingJob(final Properties properties) { _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR); _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA); - _logger.info("*********************************************************************"); - _logger.info("enable.preprocessing: {}", _enablePreprocessing); - _logger.info("path.to.input: {}", _inputSegmentDir); - _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir); - _logger.info("path.to.deps.jar: {}", _pathToDependencyJar); - _logger.info("push.locations: {}", _pushLocations); - _logger.info("*********************************************************************"); + LOGGER.info("*********************************************************************"); + LOGGER.info("enable.preprocessing: {}", _enablePreprocessing); + LOGGER.info("path.to.input: {}", _inputSegmentDir); + LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir); + LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar); + LOGGER.info("push.locations: {}", _pushLocations); + LOGGER.info("*********************************************************************"); } protected abstract void run() @@ -91,4 +122,137 @@ protected boolean isDataFile(String fileName) { // TODO: support orc format in the future. return fileName.endsWith(".avro"); } + + protected void setTableConfigAndSchema() + throws IOException { + _tableConfig = getTableConfig(); + _pinotTableSchema = getSchema(); + + Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); + Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); + } + + protected void fetchPreProcessingOperations() { + _preprocessingOperations = new HashSet<>(); + TableCustomConfig customConfig = _tableConfig.getCustomConfig(); + if (customConfig != null) { + Map customConfigMap = customConfig.getCustomConfigs(); + if (customConfigMap != null && !customConfigMap.isEmpty()) { + String preprocessingOperationsString = + customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); + DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); + } + } + } + + protected void fetchPartitioningConfig() { + // Fetch partition info from table config. + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { + LOGGER.info("Partitioning is disabled."); + return; + } + SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig != null) { + Map columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); + Preconditions + .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); + if (columnPartitionMap.size() == 1) { + _partitionColumn = columnPartitionMap.keySet().iterator().next(); + _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); + _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); + _partitionColumnDefaultNullValue = + _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString(); + } + } else { + LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); + } + } + + protected void fetchSortingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { + LOGGER.info("Sorting is disabled."); + return; + } + // Fetch sorting info from table config first. + List sortingColumns = new ArrayList<>(); + List fieldConfigs = _tableConfig.getFieldConfigList(); + if (fieldConfigs != null && !fieldConfigs.isEmpty()) { + for (FieldConfig fieldConfig : fieldConfigs) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { + sortingColumns.add(fieldConfig.getName()); + } + } + } + if (!sortingColumns.isEmpty()) { + Preconditions.checkArgument(sortingColumns.size() <= 1, "There should be at most 1 sorted column in the table."); + _sortingColumn = sortingColumns.get(0); + return; + } + + // There is no sorted column specified in field configs, try to find sorted column from indexing config. + IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); + List sortedColumns = indexingConfig.getSortedColumn(); + if (sortedColumns != null) { + Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); + if (sortedColumns.size() == 1) { + _sortingColumn = sortedColumns.get(0); + FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); + Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); + Preconditions + .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); + _sortingColumnType = fieldSpec.getDataType(); + Preconditions + .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, + _sortingColumn); + LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); + } + } + if (_sortingColumn != null) { + _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString(); + } + } + + protected void fetchResizingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { + LOGGER.info("Resizing is disabled."); + return; + } + TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); + if (tableCustomConfig == null) { + _numOutputFiles = 0; + return; + } + Map customConfigsMap = tableCustomConfig.getCustomConfigs(); + if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { + _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); + Preconditions.checkState(_numOutputFiles > 0, String + .format("The value of %s should be positive! Current value: %s", + InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); + } else { + _numOutputFiles = 0; + } + + if (customConfigsMap != null) { + int maxNumRecords; + if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { + LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); + maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); + } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { + maxNumRecords = + Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); + } else { + return; + } + // TODO: add a in-built maximum value for this config to avoid having too many small files. + // E.g. if the config is set to 1 which is smaller than this in-built value, + // the job should be abort from generating too many small files. + Preconditions.checkArgument(maxNumRecords > 0, + "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE + + " should be positive. Current value: " + maxNumRecords); + LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); + _maxNumRecordsPerFile = maxNumRecords; + } + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java similarity index 74% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java index 9e5f5f20d8a2..3be4768b6a0a 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; @@ -26,23 +26,13 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner; -import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,30 +51,19 @@ public Class getPartitioner() { } @Override - public void setUpMapperReducerConfigs(Job job) + public Object getSchema(Path inputPathDir) throws IOException { - Schema avroSchema = getAvroSchema(_sampleRawDataPath); - LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); - validateConfigsAgainstSchema(avroSchema); - - job.setInputFormatClass(AvroKeyInputFormat.class); - job.setMapperClass(AvroDataPreprocessingMapper.class); - - job.setReducerClass(AvroDataPreprocessingReducer.class); - AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); - AvroMultipleOutputs.setCountersEnabled(job, true); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); - job.setOutputKeyClass(AvroKey.class); - job.setOutputValueClass(NullWritable.class); + return getAvroSchema(inputPathDir); + } - AvroJob.setInputKeySchema(job, avroSchema); - AvroJob.setMapOutputValueSchema(job, avroSchema); - AvroJob.setOutputKeySchema(job, avroSchema); + @Override + public void validateConfigsAgainstSchema(Object schema) { + Schema avroSchema = (Schema) schema; + validateConfigsAgainstSchema(avroSchema); } @Override - String getSampleTimeColumnValue(String timeColumnName) + public String getSampleTimeColumnValue(String timeColumnName) throws IOException { String sampleTimeColumnValue; try (DataFileStream dataStreamReader = getAvroReader(_sampleRawDataPath)) { @@ -99,7 +78,7 @@ String getSampleTimeColumnValue(String timeColumnName) * @return Input schema * @throws IOException exception when accessing to IO */ - private Schema getAvroSchema(Path inputPathDir) + protected Schema getAvroSchema(Path inputPathDir) throws IOException { Schema avroSchema = null; for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java new file mode 100644 index 000000000000..6ef165fa0755 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.preprocess; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class DataPreprocessingHelper implements SampleTimeColumnExtractable { + private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); + + public String _partitionColumn; + public int _numPartitions; + public String _partitionFunction; + public String _partitionColumnDefaultNullValue; + + public String _sortingColumn; + public FieldSpec.DataType _sortingColumnType; + public String _sortingColumnDefaultNullValue; + + public int _numOutputFiles; + public int _maxNumRecordsPerFile; + + public TableConfig _tableConfig; + public Schema _pinotTableSchema; + + public List _inputDataPaths; + public Path _sampleRawDataPath; + public Path _outputPath; + + public DataPreprocessingHelper(List inputDataPaths, Path outputPath) { + _inputDataPaths = inputDataPaths; + _sampleRawDataPath = inputDataPaths.get(0); + _outputPath = outputPath; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn, + FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles, + int maxNumRecordsPerFile) { + _tableConfig = tableConfig; + _pinotTableSchema = tableSchema; + _partitionColumn = partitionColumn; + _numPartitions = numPartitions; + _partitionFunction = partitionFunction; + _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue; + + _sortingColumn = sortingColumn; + _sortingColumnType = sortingColumnType; + _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue; + + _numOutputFiles = numOutputFiles; + _maxNumRecordsPerFile = maxNumRecordsPerFile; + } + + public abstract Class getPartitioner(); + + abstract public Object getSchema(Path inputPathDir) + throws IOException; + + abstract public void validateConfigsAgainstSchema(Object schema); + + public void setValidationConfigs(Job job, Path path) + throws IOException { + SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); + + // TODO: Serialize and deserialize validation config by creating toJson and fromJson + // If the use case is an append use case, check that one time unit is contained in one file. + // If there is more than one, the job should be disabled, as we should not resize for these use cases. + // Therefore, setting the time column name and value. + if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { + job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); + String timeColumnName = validationConfig.getTimeColumnName(); + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); + if (timeColumnName != null) { + DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); + if (dateTimeFieldSpec != null) { + DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); + job.getConfiguration() + .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); + String sdfPattern = formatSpec.getSDFPattern(); + if (sdfPattern != null) { + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); + } + } + } + job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, + IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); + + String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); + if (sampleTimeColumnValue != null) { + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); + } + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java index 43ee97b75714..c89e30d6b296 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java similarity index 82% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java index 24c0473fe7d2..4371cdaae1eb 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java @@ -16,13 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -30,23 +29,13 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; -import org.apache.orc.mapred.OrcStruct; -import org.apache.orc.mapred.OrcValue; -import org.apache.orc.mapreduce.OrcInputFormat; -import org.apache.orc.mapreduce.OrcOutputFormat; -import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner; -import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,33 +51,24 @@ public OrcDataPreprocessingHelper(List inputDataPaths, Path outputPath) { } @Override - Class getPartitioner() { + public Class getPartitioner() { return OrcDataPreprocessingPartitioner.class; } @Override - void setUpMapperReducerConfigs(Job job) { - TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath); - String orcSchemaString = orcSchema.toString(); - LOGGER.info("Orc schema is: {}", orcSchemaString); - validateConfigsAgainstSchema(orcSchema); - - job.setInputFormatClass(OrcInputFormat.class); - job.setMapperClass(OrcDataPreprocessingMapper.class); - job.setMapOutputValueClass(OrcValue.class); - Configuration jobConf = job.getConfiguration(); - OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + public Object getSchema(Path inputPathDir) + throws IOException { + return getOrcSchema(inputPathDir); + } - job.setReducerClass(OrcDataPreprocessingReducer.class); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(OrcStruct.class); - OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + @Override + public void validateConfigsAgainstSchema(Object schema) { + TypeDescription orcSchema = (TypeDescription) schema; + validateConfigsAgainstSchema(orcSchema); } @Override - String getSampleTimeColumnValue(String timeColumnName) + public String getSampleTimeColumnValue(String timeColumnName) throws IOException { try (Reader reader = OrcFile .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java new file mode 100644 index 000000000000..feaceee3903d --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.preprocess; + +import java.io.IOException; + + +public interface SampleTimeColumnExtractable { + + String getSampleTimeColumnValue(String timeColumnName) + throws IOException; +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java index d9f17accfcb0..0e5e33b3d705 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -27,8 +27,8 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java similarity index 94% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java index 8ad9d84a9fc8..9180286b7f32 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -27,9 +27,9 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; -import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.OrcUtils; import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java index 5ae89669c4c8..3ddd2551aa20 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -30,8 +30,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.pinot.hadoop.job.InternalConfigConstants; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java index 5f2ae4d37949..a19d24cade03 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import com.google.common.base.Preconditions; import java.util.concurrent.atomic.AtomicInteger; @@ -27,7 +27,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java index 118d3106b06d..0a066c22c911 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java @@ -16,14 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroValue; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java index 853502c0f4de..cd36bdf5322e 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import com.google.common.base.Preconditions; import java.util.List; @@ -28,8 +28,8 @@ import org.apache.hadoop.mapreduce.Partitioner; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.OrcUtils; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java index 0ba57dba77b9..1826bdc40519 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import java.util.HashMap; import java.util.Map; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java index 62ed4a3a73bc..f9a046c7128f 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.reducers; +package org.apache.pinot.ingestion.preprocess.reducers; import java.io.IOException; import org.apache.avro.generic.GenericRecord; @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java index a3387a2e9740..10d3f106231b 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.reducers; +package org.apache.pinot.ingestion.preprocess.reducers; import java.io.IOException; import org.apache.commons.lang3.RandomStringUtils; @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java index 1998399f3d96..0e60d2d15176 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils; import java.util.Set; import org.apache.hadoop.io.DoubleWritable; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java index b26fc388a443..b8fa59a55022 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job; +package org.apache.pinot.ingestion.utils; /** * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java index 58e1c1d70ccb..a6e4ca793399 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java new file mode 100644 index 000000000000..e4cdf5e2a092 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.utils.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + + +public class HadoopUtils { + private HadoopUtils() { + } + + public static final Configuration DEFAULT_CONFIGURATION; + public static final FileSystem DEFAULT_FILE_SYSTEM; + + static { + DEFAULT_CONFIGURATION = new Configuration(); + try { + DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); + } catch (IOException e) { + throw new IllegalStateException("Failed to get the default file system", e); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java index dcfc3b5e5ae9..09eb2189e3d0 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.io.BooleanWritable; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java index 4def5bd72c80..9e5295727bea 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml index 64275015c75f..20a5734c23b3 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml @@ -106,6 +106,10 @@ org.apache.pinot pinot-common + + com.esotericsoftware + kryo-shaded + @@ -282,6 +286,10 @@ provided + + org.apache.avro + avro-mapred + @@ -379,6 +387,14 @@ commons-pool commons-pool + + org.apache.avro + avro-mapred + + + org.eclipse.jetty + jetty-util + test @@ -463,8 +479,16 @@ org.apache.orc orc-core + + org.apache.orc + orc-mapreduce + + + org.apache.parquet + parquet-hadoop + - test + provided org.apache.spark @@ -480,7 +504,7 @@ objenesis - test + provided org.apache.derby diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java new file mode 100644 index 000000000000..4743d1fd0ff8 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; +import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper; +import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory; +import org.apache.pinot.spark.utils.HadoopUtils; +import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A Spark job which provides partitioning, sorting, and resizing against the input files, + * which is raw data in either Avro or Orc format. + * Thus, the output files are partitioned, sorted, resized after this job. + * In order to run this job, the following configs need to be specified in job properties: + * * enable.preprocessing: false by default. Enables preprocessing job. + */ +public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class); + + public SparkSegmentPreprocessingJob(Properties properties) { + super(properties); + } + + @Override + protected void run() + throws Exception { + if (!_enablePreprocessing) { + LOGGER.info("Pre-processing job is disabled."); + return; + } else { + LOGGER.info("Starting {}", getClass().getSimpleName()); + } + + setTableConfigAndSchema(); + fetchPreProcessingOperations(); + fetchPartitioningConfig(); + fetchSortingConfig(); + fetchResizingConfig(); + + // Cleans up preprocessed output dir if exists + cleanUpPreprocessedOutputs(_preprocessedOutputDir); + + SparkDataPreprocessingHelper dataPreprocessingHelper = + SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + dataPreprocessingHelper + .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, + _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue, + _numOutputFiles, _maxNumRecordsPerFile); + + // Set up and execute spark job. + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); + addDepsJarToDistributedCache(javaSparkContext); + + SparkSession sparkSession = + SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate(); + + dataPreprocessingHelper.setUpAndExecuteJob(sparkSession); + } + + /** + * Cleans up outputs in preprocessed output directory. + */ + public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir) + throws IOException { + if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) { + LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir); + HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true); + } + } + + protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext) + throws IOException { + if (_pathToDependencyJar != null) { + PinotSparkJobPreparationHelper + .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, sparkContext, _pathToDependencyJar); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java new file mode 100644 index 000000000000..23f17e8a4306 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; + + +public class SparkAvroDataPreprocessingHelper extends SparkDataPreprocessingHelper { + public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public String getDataFormat() { + return "avro"; + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java new file mode 100644 index 000000000000..7a7a56478b5e --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import com.google.common.collect.Ordering; +import java.io.Serializable; + + +public class SparkDataPreprocessingComparator extends Ordering implements Serializable { + @Override + public int compare(Object left, Object right) { + Object value1 = ((SparkDataPreprocessingJobKey) left).getSortedColumn(); + Object value2 = ((SparkDataPreprocessingJobKey) right).getSortedColumn(); + if (value1 == null) { + return 0; + } + if (value1 instanceof Integer) { + return Integer.compare((int) value1, (int) value2); + } else if (value1 instanceof Long) { + return Long.compare((long) value1, (long) value2); + } else if (value1 instanceof Float) { + return Float.compare((float) value1, (float) value2); + } else if (value1 instanceof Double) { + return Double.compare((double) value1, (double) value2); + } else if (value1 instanceof Short) { + return Short.compare((short) value1, (short) value2); + } else if (value1 instanceof String) { + return ((String) value1).compareTo((String) value2); + } + throw new RuntimeException("Unsupported Data type: " + value1.getClass().getName()); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java new file mode 100644 index 000000000000..a73d9a677b4a --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.partitioners.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + + +public abstract class SparkDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelper.class); + + protected DataPreprocessingHelper _dataPreprocessingHelper; + + public SparkDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + _dataPreprocessingHelper = dataPreprocessingHelper; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn, + FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles, + int maxNumRecordsPerFile) { + _dataPreprocessingHelper + .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction, + partitionColumnDefaultNullValue, sortingColumn, sortingColumnType, sortingColumnDefaultNullValue, + numOutputFiles, maxNumRecordsPerFile); + } + + public void setUpAndExecuteJob(SparkSession sparkSession) { + // Read data into data frame. + Dataset dataFrame = sparkSession.read().format(getDataFormat()) + .load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths)); + JavaRDD javaRDD = dataFrame.javaRDD(); + + // Find positions of partition column and sorting column if specified. + StructType schema = dataFrame.schema(); + StructField[] fields = schema.fields(); + int partitionColumnPosition = -1; + int sortingColumnPosition = -1; + PartitionFunction partitionFunction = null; + String partitionColumnDefaultNullValue = null; + for (int i = 0; i <= fields.length; i++) { + StructField field = fields[i]; + if (_dataPreprocessingHelper._partitionColumn != null && _dataPreprocessingHelper._partitionColumn + .equalsIgnoreCase(field.name())) { + partitionColumnPosition = i; + partitionFunction = PartitionFunctionFactory + .getPartitionFunction(_dataPreprocessingHelper._partitionFunction, _dataPreprocessingHelper._numPartitions); + partitionColumnDefaultNullValue = + _dataPreprocessingHelper._pinotTableSchema.getFieldSpecFor(_dataPreprocessingHelper._partitionColumn) + .getDefaultNullValueString(); + } + if (_dataPreprocessingHelper._sortingColumn != null && _dataPreprocessingHelper._sortingColumn + .equalsIgnoreCase(field.name())) { + sortingColumnPosition = i; + } + } + int numPartitions; + if (partitionColumnPosition == -1) { + if (_dataPreprocessingHelper._numOutputFiles > 0) { + numPartitions = _dataPreprocessingHelper._numOutputFiles; + } else { + numPartitions = javaRDD.getNumPartitions(); + } + } else { + numPartitions = _dataPreprocessingHelper._numPartitions; + } + final String finalPartitionColumn = _dataPreprocessingHelper._partitionColumn; + final int finalNumPartitions = numPartitions; + final int finalPartitionColumnPosition = partitionColumnPosition; + final int finalSortingColumnPosition = sortingColumnPosition; + LOGGER.info("Partition column: " + finalPartitionColumn); + LOGGER.info("Number of partitions: " + finalNumPartitions); + LOGGER.info("Position of partition column (if specified): " + finalPartitionColumnPosition); + LOGGER.info("Position of sorting column (if specified): " + finalSortingColumnPosition); + LOGGER.info("Default null value for partition column: " + partitionColumnDefaultNullValue); + SparkDataPreprocessingPartitioner sparkPartitioner = + new SparkDataPreprocessingPartitioner(finalPartitionColumn, finalNumPartitions, partitionFunction, + partitionColumnDefaultNullValue); + + // Convert to java pair rdd. + JavaPairRDD pairRDD = javaRDD.mapToPair((PairFunction) row -> { + Object partitionColumnValue = null; + Object sortingColumnValue = null; + + if (_dataPreprocessingHelper._partitionColumn != null) { + partitionColumnValue = row.get(finalPartitionColumnPosition); + } + int partitionId = sparkPartitioner.generatePartitionId(partitionColumnValue); + if (_dataPreprocessingHelper._sortingColumn != null) { + sortingColumnValue = row.get(finalSortingColumnPosition); + } + return new Tuple2<>(new SparkDataPreprocessingJobKey(partitionId, sortingColumnValue), row); + }); + + // Repartition and sort within partitions. + Comparator comparator = new SparkDataPreprocessingComparator(); + JavaPairRDD partitionedSortedPairRDD = + pairRDD.repartitionAndSortWithinPartitions(sparkPartitioner, comparator); + + // TODO: support preprocessing.max.num.records.per.file before writing back to storage + // Write to output path. + partitionedSortedPairRDD.values().saveAsTextFile(_dataPreprocessingHelper._outputPath.toString()); + } + + private Seq convertPathsToStrings(List paths) { + List stringList = new ArrayList<>(); + for (Path path : paths) { + stringList.add(path.toString()); + } + return toSeq(stringList); + } + + /** + * Helper to wrap a Java collection into a Scala Seq + * + * @param collection java collection + * @param collection item type + * @return Scala Seq of type T + */ + public static Seq toSeq(Collection collection) { + return JavaConverters.asScalaBufferConverter(new ArrayList<>(collection)).asScala().toSeq(); + } + + public abstract String getDataFormat(); +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java new file mode 100644 index 000000000000..cb64bc6ad739 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SparkDataPreprocessingHelperFactory { + private SparkDataPreprocessingHelperFactory() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class); + + public static SparkDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + throws IOException { + final List avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); + final List orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); + + int numAvroFiles = avroFiles.size(); + int numOrcFiles = orcFiles.size(); + Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0, + "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles, + inputPaths); + Preconditions + .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s", + inputPaths); + + if (numAvroFiles > 0) { + LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); + return new SparkAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath)); + } else { + LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); + return new SparkOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java new file mode 100644 index 000000000000..d4e76a2e2fb7 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +public class SparkDataPreprocessingJobKey { + private final Object _partitionId; + private final Object _sortedColumn; + + public SparkDataPreprocessingJobKey(Object partitionId, Object sortedColumn) { + _partitionId = partitionId; + _sortedColumn = sortedColumn; + } + + public Object getPartitionId() { + return _partitionId; + } + + public Object getSortedColumn() { + return _sortedColumn; + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java new file mode 100644 index 000000000000..c7f56ab1c59f --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.spark.Partitioner; + + +public class SparkDataPreprocessingPartitioner extends Partitioner { + private final String _partitionColumn; + private final int _numPartitions; + private final PartitionFunction _partitionFunction; + private final String _partitionColumnDefaultNullValue; + private final AtomicInteger _counter = new AtomicInteger(0); + + public SparkDataPreprocessingPartitioner(String partitionColumn, int numPartitions, + PartitionFunction partitionFunction, String partitionColumnDefaultNullValue) { + _partitionColumn = partitionColumn; + _numPartitions = numPartitions; + _partitionFunction = partitionFunction; + _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue; + } + + @Override + public int numPartitions() { + return _numPartitions; + } + + @Override + public int getPartition(Object key) { + SparkDataPreprocessingJobKey jobKey = (SparkDataPreprocessingJobKey) key; + return (int) jobKey.getPartitionId(); + } + + public int generatePartitionId(Object key) { + if (_partitionColumn == null) { + // Need to distribute evenly for data with the default partition key value. + // We may want to partition and sort on a non-primary key. + return Math.abs(_counter.getAndIncrement()) % _numPartitions; + } + Object keyToPartition = _partitionColumnDefaultNullValue; + if (key != null) { + keyToPartition = key; + } + return _partitionFunction.getPartition(keyToPartition); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java new file mode 100644 index 000000000000..3f84aad34385 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; + + +public class SparkOrcDataPreprocessingHelper extends SparkDataPreprocessingHelper { + public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public String getDataFormat() { + return "orc"; + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java index 0596259021f7..8ee89cfe7726 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.spark.utils; import java.io.IOException; import org.apache.hadoop.conf.Configuration;