Skip to content

Commit

Permalink
KYLIN-3597 Close resources after they are used.
Browse files Browse the repository at this point in the history
  • Loading branch information
matribots authored and shaofengshi committed Oct 9, 2018
1 parent 46c9f55 commit ee93848
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,11 @@ public void execute(Connection connection) throws SQLException {
}

private boolean checkTableExists(final String tableName, final Connection connection) throws SQLException {
final PreparedStatement ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
final ResultSet rs = ps.executeQuery();
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
rs = ps.executeQuery();
while (rs.next()) {
if (tableName.equals(rs.getString(1))) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,58 +180,59 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {

//HBase conf
logger.info("Loading HBase configuration from:{}", hbaseConfFile);
FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));

Configuration hbaseJobConf = new Configuration();
hbaseJobConf.addResource(confInput);
hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());

FileOutputFormat.setOutputPath(job, new Path(outputPath));

JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
if (quickPath) {
hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
}
});
} else {
hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
throws Exception {

List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
inputMeasures);

for (int i = 0; i < cfNum; i++) {
KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
outputValue));
}

return result.iterator();
}
});
}
try (FSDataInputStream confInput = fs.open(new Path(hbaseConfFile))) {
Configuration hbaseJobConf = new Configuration();
hbaseJobConf.addResource(confInput);
hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());

FileOutputFormat.setOutputPath(job, new Path(outputPath));

hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
RowKeyWritable.RowKeyComparator.INSTANCE)
.mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
if (quickPath) {
hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
public Tuple2<ImmutableBytesWritable, KeyValue> call(
Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
rowKeyWritableKeyValueTuple2._2);
public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
}
}).saveAsNewAPIHadoopDataset(job.getConfiguration());
});
} else {
hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
throws Exception {

List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
inputMeasures);

for (int i = 0; i < cfNum; i++) {
KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
outputValue));
}

return result.iterator();
}
});
}

hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
RowKeyWritable.RowKeyComparator.INSTANCE)
.mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
@Override
public Tuple2<ImmutableBytesWritable, KeyValue> call(
Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
rowKeyWritableKeyValueTuple2._2);
}
}).saveAsNewAPIHadoopDataset(job.getConfiguration());
}

logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());

Expand Down

0 comments on commit ee93848

Please sign in to comment.