Skip to content

Commit

Permalink
DRILL-2813: Update Hive statistics to use long instead of int for row…
Browse files Browse the repository at this point in the history
…count per split.
  • Loading branch information
jacques-n committed Apr 18, 2015
1 parent 238399d commit cf15546
Showing 1 changed file with 49 additions and 49 deletions.
Expand Up @@ -94,10 +94,10 @@ public class HiveScan extends AbstractGroupScan {
private long rowCount = 0;

@JsonCreator
public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") String storagePluginName,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") final String storagePluginName,
@JsonProperty("columns") final List<SchemaPath> columns,
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this.hiveReadEntry = hiveReadEntry;
this.storagePluginName = storagePluginName;
this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
Expand All @@ -106,7 +106,7 @@ public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry,
endpoints = storagePlugin.getContext().getBits();
}

public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<SchemaPath> columns) throws ExecutionSetupException {
public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.storagePlugin = storagePlugin;
Expand All @@ -115,7 +115,7 @@ public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, Li
this.storagePluginName = storagePlugin.getName();
}

private HiveScan(HiveScan that) {
private HiveScan(final HiveScan that) {
this.columns = that.columns;
this.endpoints = that.endpoints;
this.hiveReadEntry = that.hiveReadEntry;
Expand All @@ -133,14 +133,14 @@ public List<SchemaPath> getColumns() {

private void getSplits() throws ExecutionSetupException {
try {
List<Partition> partitions = hiveReadEntry.getPartitions();
Table table = hiveReadEntry.getTable();
final List<Partition> partitions = hiveReadEntry.getPartitions();
final Table table = hiveReadEntry.getTable();
if (partitions == null || partitions.size() == 0) {
Properties properties = MetaStoreUtils.getTableMetadata(table);
final Properties properties = MetaStoreUtils.getTableMetadata(table);
splitInput(properties, table.getSd(), null);
} else {
for (Partition partition : partitions) {
Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
for (final Partition partition : partitions) {
final Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
splitInput(properties, partition.getSd(), partition);
}
}
Expand All @@ -150,35 +150,35 @@ private void getSplits() throws ExecutionSetupException {
}

/* Split the input given in StorageDescriptor */
private void splitInput(Properties properties, StorageDescriptor sd, Partition partition)
private void splitInput(final Properties properties, final StorageDescriptor sd, final Partition partition)
throws ReflectiveOperationException, IOException {
JobConf job = new JobConf();
for (Object obj : properties.keySet()) {
final JobConf job = new JobConf();
for (final Object obj : properties.keySet()) {
job.set((String) obj, (String) properties.get(obj));
}
for (Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
for (final Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) {
job.set(entry.getKey(), entry.getValue());
}
InputFormat<?, ?> format = (InputFormat<?, ?>)
Class.forName(sd.getInputFormat()).getConstructor().newInstance();
job.setInputFormat(format.getClass());
Path path = new Path(sd.getLocation());
FileSystem fs = path.getFileSystem(job);
final Path path = new Path(sd.getLocation());
final FileSystem fs = path.getFileSystem(job);

// Use new JobConf that has FS configuration
JobConf jobWithFsConf = new JobConf(fs.getConf());
final JobConf jobWithFsConf = new JobConf(fs.getConf());
if (fs.exists(path)) {
FileInputFormat.addInputPath(jobWithFsConf, path);
format = jobWithFsConf.getInputFormat();
for (InputSplit split : format.getSplits(jobWithFsConf, 1)) {
for (final InputSplit split : format.getSplits(jobWithFsConf, 1)) {
inputSplits.add(split);
partitionMap.put(split, partition);
}
}
final String numRowsProp = properties.getProperty("numRows");
logger.trace("HiveScan num rows property = {}", numRowsProp);
if (numRowsProp != null) {
final int numRows = Integer.valueOf(numRowsProp);
final long numRows = Long.valueOf(numRowsProp);
// starting from hive-0.13, when no statistics are available, this property is set to -1
// it's important to note that the value returned by hive may not be up to date
if (numRows > 0) {
Expand All @@ -188,33 +188,33 @@ private void splitInput(Properties properties, StorageDescriptor sd, Partition p
}

@Override
public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
mappings = Lists.newArrayList();
for (int i = 0; i < endpoints.size(); i++) {
mappings.add(new ArrayList<InputSplit>());
}
int count = endpoints.size();
final int count = endpoints.size();
for (int i = 0; i < inputSplits.size(); i++) {
mappings.get(i % count).add(inputSplits.get(i));
}
}

public static String serializeInputSplit(InputSplit split) throws IOException {
ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
public static String serializeInputSplit(final InputSplit split) throws IOException {
final ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
split.write(byteArrayOutputStream);
String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
logger.debug("Encoded split string for split {} : {}", split, encoded);
return encoded;
}

@Override
public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
try {
List<InputSplit> splits = mappings.get(minorFragmentId);
final List<InputSplit> splits = mappings.get(minorFragmentId);
List<HivePartition> parts = Lists.newArrayList();
List<String> encodedInputSplits = Lists.newArrayList();
List<String> splitTypes = Lists.newArrayList();
for (InputSplit split : splits) {
final List<String> encodedInputSplits = Lists.newArrayList();
final List<String> splitTypes = Lists.newArrayList();
for (final InputSplit split : splits) {
HivePartition partition = null;
if (partitionMap.get(split) != null) {
partition = new HivePartition(partitionMap.get(split));
Expand All @@ -226,7 +226,7 @@ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupExcepti
if (parts.contains(null)) {
parts = null;
}
HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
Expand All @@ -240,22 +240,22 @@ public int getMaxParallelizationWidth() {

@Override
public List<EndpointAffinity> getOperatorAffinity() {
Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
for (DrillbitEndpoint endpoint : endpoints) {
final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
for (final DrillbitEndpoint endpoint : endpoints) {
endpointMap.put(endpoint.getAddress(), endpoint);
logger.debug("endpoing address: {}", endpoint.getAddress());
}
Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
try {
long totalSize = 0;
for (InputSplit split : inputSplits) {
for (final InputSplit split : inputSplits) {
totalSize += Math.max(1, split.getLength());
}
for (InputSplit split : inputSplits) {
float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
for (String loc : split.getLocations()) {
for (final InputSplit split : inputSplits) {
final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
for (final String loc : split.getLocations()) {
logger.debug("split location: {}", loc);
DrillbitEndpoint endpoint = endpointMap.get(loc);
final DrillbitEndpoint endpoint = endpointMap.get(loc);
if (endpoint != null) {
if (affinityMap.containsKey(endpoint)) {
affinityMap.get(endpoint).addAffinity(affinity);
Expand All @@ -265,13 +265,13 @@ public List<EndpointAffinity> getOperatorAffinity() {
}
}
}
} catch (IOException e) {
} catch (final IOException e) {
throw new DrillRuntimeException(e);
}
for (DrillbitEndpoint ep : affinityMap.keySet()) {
for (final DrillbitEndpoint ep : affinityMap.keySet()) {
Preconditions.checkNotNull(ep);
}
for (EndpointAffinity a : affinityMap.values()) {
for (final EndpointAffinity a : affinityMap.values()) {
Preconditions.checkNotNull(a.getEndpoint());
}
return Lists.newArrayList(affinityMap.values());
Expand All @@ -281,7 +281,7 @@ public List<EndpointAffinity> getOperatorAffinity() {
public ScanStats getScanStats() {
try {
long data =0;
for (InputSplit split : inputSplits) {
for (final InputSplit split : inputSplits) {
data += split.getLength();
}

Expand All @@ -292,13 +292,13 @@ public ScanStats getScanStats() {
}
logger.debug("estimated row count = {}, stats row count = {}", estRowCount, rowCount);
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
} catch (IOException e) {
} catch (final IOException e) {
throw new DrillRuntimeException(e);
}
}

@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException {
return new HiveScan(this);
}

Expand All @@ -316,20 +316,20 @@ public String toString() {
}

@Override
public GroupScan clone(List<SchemaPath> columns) {
HiveScan newScan = new HiveScan(this);
public GroupScan clone(final List<SchemaPath> columns) {
final HiveScan newScan = new HiveScan(this);
newScan.columns = columns;
return newScan;
}

@Override
public boolean canPushdownProjects(List<SchemaPath> columns) {
public boolean canPushdownProjects(final List<SchemaPath> columns) {
return true;
}

// Return true if the current table is partitioned false otherwise
public boolean supportsPartitionFilterPushdown() {
List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys();
if (partitionKeys == null || partitionKeys.size() == 0) {
return false;
}
Expand Down

0 comments on commit cf15546

Please sign in to comment.