Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, Fil
SchemaEvolutionFactory sef, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) throws IOException {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
consumer, includes, false, counters, ioMetrics);
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, includes, counters, ioMetrics);
SerDeFileMetadata fm;
try {
fm = new SerDeFileMetadata(sourceSerDe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
private final Configuration conf;
private boolean _skipCorrupt; // TODO: get rid of this
private LlapDaemonCacheMetrics cacheMetrics;
private LlapDaemonIOMetrics ioMetrics;
// TODO: if using in multiple places, e.g. SerDe cache, pass this in.
Expand All @@ -73,7 +72,6 @@ public OrcColumnVectorProducer(MetadataCache metadataCache,
this.lowLevelCache = lowLevelCache;
this.bufferManager = bufferManager;
this.conf = conf;
this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
this.cacheMetrics = cacheMetrics;
this.ioMetrics = ioMetrics;
this.tracePool = tracePool;
Expand All @@ -90,8 +88,7 @@ public ReadPipeline createReadPipeline(
InputFormat<?, ?> unused0, Deserializer unused1, Reporter reporter, JobConf job,
Map<Path, PartitionDesc> parts) throws IOException {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
consumer, includes, _skipCorrupt, counters, ioMetrics);
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, includes, counters, ioMetrics);
OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool, parts);
edc.init(reader, reader, reader.getTrace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,16 @@ public class OrcEncodedDataConsumer
private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file.
private CompressionCodec codec;
private List<ConsumerStripeMetadata> stripes;
private final boolean skipCorrupt; // TODO: get rid of this
private SchemaEvolution evolution;
private IoTrace trace;
private final Includes includes;
private TypeDescription[] batchSchemas;
private boolean useDecimal64ColumnVectors;

public OrcEncodedDataConsumer(
Consumer<ColumnVectorBatch> consumer, Includes includes, boolean skipCorrupt,
QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) {
public OrcEncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, Includes includes,
QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) {
super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics, counters);
this.includes = includes;
// TODO: get rid of this
this.skipCorrupt = skipCorrupt;
if (includes.isProbeDecodeEnabled()) {
LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode is enabled with cacheKey {} colIndex {} and colName {}",
this.includes.getProbeCacheKey(), this.includes.getProbeColIdx(), this.includes.getProbeColName());
Expand Down Expand Up @@ -225,7 +221,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch,
private void createColumnReaders(OrcEncodedColumnBatch batch,
ConsumerStripeMetadata stripeMetadata, TypeDescription fileSchema) throws IOException {
TreeReaderFactory.Context context = new TreeReaderFactory.ReaderContext()
.setSchemaEvolution(evolution).skipCorrupt(skipCorrupt)
.setSchemaEvolution(evolution)
.writerTimeZone(stripeMetadata.getWriterTimezone())
.fileFormat(fileMetadata == null ? null : fileMetadata.getFileVersion())
.useUTCTimestamp(true)
Expand Down