From cf1554641c07359e91a2836d245b0b07438d0cd5 Mon Sep 17 00:00:00 2001 From: Jacques Nadeau Date: Thu, 16 Apr 2015 09:08:00 -0700 Subject: [PATCH] DRILL-2813: Update Hive statistics to use long instead of int for rowcount per split. --- .../drill/exec/store/hive/HiveScan.java | 98 +++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index b96fda4a69e..92635a8f1d9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -94,10 +94,10 @@ public class HiveScan extends AbstractGroupScan { private long rowCount = 0; @JsonCreator - public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, - @JsonProperty("storage-plugin") String storagePluginName, - @JsonProperty("columns") List columns, - @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { + public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry, + @JsonProperty("storage-plugin") final String storagePluginName, + @JsonProperty("columns") final List columns, + @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { this.hiveReadEntry = hiveReadEntry; this.storagePluginName = storagePluginName; this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName); @@ -106,7 +106,7 @@ public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, endpoints = storagePlugin.getContext().getBits(); } - public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List columns) throws ExecutionSetupException { + public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List columns) throws ExecutionSetupException { this.hiveReadEntry = hiveReadEntry; this.columns = columns; this.storagePlugin = storagePlugin; @@ -115,7 +115,7 @@ public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, Li this.storagePluginName = storagePlugin.getName(); } - private HiveScan(HiveScan that) { + private HiveScan(final HiveScan that) { this.columns = that.columns; this.endpoints = that.endpoints; this.hiveReadEntry = that.hiveReadEntry; @@ -133,14 +133,14 @@ public List getColumns() { private void getSplits() throws ExecutionSetupException { try { - List partitions = hiveReadEntry.getPartitions(); - Table table = hiveReadEntry.getTable(); + final List partitions = hiveReadEntry.getPartitions(); + final Table table = hiveReadEntry.getTable(); if (partitions == null || partitions.size() == 0) { - Properties properties = MetaStoreUtils.getTableMetadata(table); + final Properties properties = MetaStoreUtils.getTableMetadata(table); splitInput(properties, table.getSd(), null); } else { - for (Partition partition : partitions) { - Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table); + for (final Partition partition : partitions) { + final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table); splitInput(properties, partition.getSd(), partition); } } @@ -150,27 +150,27 @@ private void getSplits() throws ExecutionSetupException { } /* Split the input given in StorageDescriptor */ - private void splitInput(Properties properties, StorageDescriptor sd, Partition partition) + private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition) throws ReflectiveOperationException, IOException { - JobConf job = new JobConf(); - for (Object obj : properties.keySet()) { + final JobConf job = new JobConf(); + for (final Object obj : properties.keySet()) { job.set((String) obj, (String) properties.get(obj)); } - for (Map.Entry entry : hiveReadEntry.hiveConfigOverride.entrySet()) { + for (final Map.Entry entry : hiveReadEntry.hiveConfigOverride.entrySet()) { job.set(entry.getKey(), entry.getValue()); } InputFormat format = (InputFormat) Class.forName(sd.getInputFormat()).getConstructor().newInstance(); job.setInputFormat(format.getClass()); - Path path = new Path(sd.getLocation()); - FileSystem fs = path.getFileSystem(job); + final Path path = new Path(sd.getLocation()); + final FileSystem fs = path.getFileSystem(job); // Use new JobConf that has FS configuration - JobConf jobWithFsConf = new JobConf(fs.getConf()); + final JobConf jobWithFsConf = new JobConf(fs.getConf()); if (fs.exists(path)) { FileInputFormat.addInputPath(jobWithFsConf, path); format = jobWithFsConf.getInputFormat(); - for (InputSplit split : format.getSplits(jobWithFsConf, 1)) { + for (final InputSplit split : format.getSplits(jobWithFsConf, 1)) { inputSplits.add(split); partitionMap.put(split, partition); } @@ -178,7 +178,7 @@ private void splitInput(Properties properties, StorageDescriptor sd, Partition p final String numRowsProp = properties.getProperty("numRows"); logger.trace("HiveScan num rows property = {}", numRowsProp); if (numRowsProp != null) { - final int numRows = Integer.valueOf(numRowsProp); + final long numRows = Long.valueOf(numRowsProp); // starting from hive-0.13, when no statistics are available, this property is set to -1 // it's important to note that the value returned by hive may not be up to date if (numRows > 0) { @@ -188,33 +188,33 @@ private void splitInput(Properties properties, StorageDescriptor sd, Partition p } @Override - public void applyAssignments(List endpoints) { + public void applyAssignments(final List endpoints) { mappings = Lists.newArrayList(); for (int i = 0; i < endpoints.size(); i++) { mappings.add(new ArrayList()); } - int count = endpoints.size(); + final int count = endpoints.size(); for (int i = 0; i < inputSplits.size(); i++) { mappings.get(i % count).add(inputSplits.get(i)); } } - public static String serializeInputSplit(InputSplit split) throws IOException { - ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput(); + public static String serializeInputSplit(final InputSplit split) throws IOException { + final ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput(); split.write(byteArrayOutputStream); - String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); + final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); logger.debug("Encoded split string for split {} : {}", split, encoded); return encoded; } @Override - public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { + public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException { try { - List splits = mappings.get(minorFragmentId); + final List splits = mappings.get(minorFragmentId); List parts = Lists.newArrayList(); - List encodedInputSplits = Lists.newArrayList(); - List splitTypes = Lists.newArrayList(); - for (InputSplit split : splits) { + final List encodedInputSplits = Lists.newArrayList(); + final List splitTypes = Lists.newArrayList(); + for (final InputSplit split : splits) { HivePartition partition = null; if (partitionMap.get(split) != null) { partition = new HivePartition(partitionMap.get(split)); @@ -226,7 +226,7 @@ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupExcepti if (parts.contains(null)) { parts = null; } - HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride); + final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride); return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); @@ -240,22 +240,22 @@ public int getMaxParallelizationWidth() { @Override public List getOperatorAffinity() { - Map endpointMap = new HashMap<>(); - for (DrillbitEndpoint endpoint : endpoints) { + final Map endpointMap = new HashMap<>(); + for (final DrillbitEndpoint endpoint : endpoints) { endpointMap.put(endpoint.getAddress(), endpoint); logger.debug("endpoing address: {}", endpoint.getAddress()); } - Map affinityMap = new HashMap<>(); + final Map affinityMap = new HashMap<>(); try { long totalSize = 0; - for (InputSplit split : inputSplits) { + for (final InputSplit split : inputSplits) { totalSize += Math.max(1, split.getLength()); } - for (InputSplit split : inputSplits) { - float affinity = ((float) Math.max(1, split.getLength())) / totalSize; - for (String loc : split.getLocations()) { + for (final InputSplit split : inputSplits) { + final float affinity = ((float) Math.max(1, split.getLength())) / totalSize; + for (final String loc : split.getLocations()) { logger.debug("split location: {}", loc); - DrillbitEndpoint endpoint = endpointMap.get(loc); + final DrillbitEndpoint endpoint = endpointMap.get(loc); if (endpoint != null) { if (affinityMap.containsKey(endpoint)) { affinityMap.get(endpoint).addAffinity(affinity); @@ -265,13 +265,13 @@ public List getOperatorAffinity() { } } } - } catch (IOException e) { + } catch (final IOException e) { throw new DrillRuntimeException(e); } - for (DrillbitEndpoint ep : affinityMap.keySet()) { + for (final DrillbitEndpoint ep : affinityMap.keySet()) { Preconditions.checkNotNull(ep); } - for (EndpointAffinity a : affinityMap.values()) { + for (final EndpointAffinity a : affinityMap.values()) { Preconditions.checkNotNull(a.getEndpoint()); } return Lists.newArrayList(affinityMap.values()); @@ -281,7 +281,7 @@ public List getOperatorAffinity() { public ScanStats getScanStats() { try { long data =0; - for (InputSplit split : inputSplits) { + for (final InputSplit split : inputSplits) { data += split.getLength(); } @@ -292,13 +292,13 @@ public ScanStats getScanStats() { } logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount); return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data); - } catch (IOException e) { + } catch (final IOException e) { throw new DrillRuntimeException(e); } } @Override - public PhysicalOperator getNewWithChildren(List children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(final List children) throws ExecutionSetupException { return new HiveScan(this); } @@ -316,20 +316,20 @@ public String toString() { } @Override - public GroupScan clone(List columns) { - HiveScan newScan = new HiveScan(this); + public GroupScan clone(final List columns) { + final HiveScan newScan = new HiveScan(this); newScan.columns = columns; return newScan; } @Override - public boolean canPushdownProjects(List columns) { + public boolean canPushdownProjects(final List columns) { return true; } // Return true if the current table is partitioned false otherwise public boolean supportsPartitionFilterPushdown() { - List partitionKeys = hiveReadEntry.getTable().getPartitionKeys(); + final List partitionKeys = hiveReadEntry.getTable().getPartitionKeys(); if (partitionKeys == null || partitionKeys.size() == 0) { return false; }