Skip to content

Commit

Permalink
Handled Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shardul-cr7 committed Oct 24, 2018
1 parent 6ad88cc commit 3997955
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 17 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Expand Up @@ -73,6 +73,11 @@
<artifactId>zstd-jni</artifactId>
<version>1.3.2-2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.18</version>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
Expand Down
Expand Up @@ -47,21 +47,22 @@ public GzipCompressor() {
*/
private byte[] compressData(byte[] data) {

ByteArrayOutputStream bt = new ByteArrayOutputStream();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
GzipCompressorOutputStream gzos = new GzipCompressorOutputStream(bt);
GzipCompressorOutputStream gzipCompressorOutputStream =
new GzipCompressorOutputStream(byteArrayOutputStream);
try {
gzos.write(data);
gzipCompressorOutputStream.write(data);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error during Compression step " + e.getMessage());
} finally {
gzos.close();
gzipCompressorOutputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error during Compression step " + e.getMessage());
}

return bt.toByteArray();
return byteArrayOutputStream.toByteArray();
}

/*
Expand All @@ -70,23 +71,23 @@ private byte[] compressData(byte[] data) {
*/
private byte[] decompressData(byte[] data) {

ByteArrayInputStream bt = new ByteArrayInputStream(data);
ByteArrayOutputStream bot = new ByteArrayOutputStream();
ByteArrayInputStream byteArrayOutputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();

try {
GzipCompressorInputStream gzis = new GzipCompressorInputStream(bt);
GzipCompressorInputStream gzis = new GzipCompressorInputStream(byteArrayOutputStream);
byte[] buffer = new byte[1024];
int len;

while ((len = gzis.read(buffer)) != -1) {
bot.write(buffer, 0, len);
byteOutputStream.write(buffer, 0, len);
}

} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error during Decompression step " + e.getMessage());
}

return bot.toByteArray();
return byteOutputStream.toByteArray();
}

@Override public byte[] compressByte(byte[] unCompInput) {
Expand Down Expand Up @@ -179,12 +180,13 @@ private byte[] decompressData(byte[] data) {

@Override public long rawCompress(long inputAddress, int inputSize, long outputAddress)
throws IOException {
throw new RuntimeException("Not implemented rawUncompress for gzip yet");
//gzip api doesnt have rawCompress yet.
throw new RuntimeException("Not implemented rawcompress for gzip yet");
}

@Override public long rawUncompress(byte[] input, byte[] output) throws IOException {
//gzip api doesnt have rawCompress yet.
return 0;
//gzip api doesnt have rawUncompress yet.
throw new RuntimeException("Not implemented rawUcompress for gzip yet");
}

@Override public long maxCompressedLength(long inputSize) {
Expand Down
Expand Up @@ -40,7 +40,8 @@ object CarbonSessionExample {
.addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
spark.sparkContext.setLogLevel("INFO")
exampleBody(spark)
exampleBody1(spark,"gzip")
exampleBody2(spark)
spark.close()
}

Expand Down Expand Up @@ -151,4 +152,67 @@ object CarbonSessionExample {
spark.sql("DROP TABLE IF EXISTS carbonsession_table")
spark.sql("DROP TABLE IF EXISTS stored_as_carbondata_table")
}

def exampleBody1(spark : SparkSession, comp : String): Unit = {
spark.sql("drop table if exists lineitem")

spark.sql(
s"""
| create table lineitem (
| L_ORDERKEY long,
| L_PARTKEY long,
| L_SUPPKEY long,
| L_LINENUMBER int,
| L_QUANTITY long,
| L_EXTENDEDPRICE long,
| L_DISCOUNT long,
| L_TAX long,
| L_RETURNFLAG CHAR(1),
| L_LINESTATUS CHAR(1),
| L_SHIPDATE DATE,
| L_COMMITDATE DATE,
| L_RECEIPTDATE DATE,
| L_SHIPINSTRUCT CHAR(25),
| L_SHIPMODE CHAR(10),
| L_COMMENT VARCHAR(44)
| )
| stored by 'carbondata'
""".stripMargin)

val strt = System.currentTimeMillis();
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, comp)

spark.sql(
s"""
| LOAD DATA LOCAL INPATH '/home/root1/Downloads/2.17.3/dbgen/lineitem.tbl'
| INTO TABLE lineitem
| OPTIONS('DELIMITER'=',', 'Header'='false','BAD_RECORDS_ACTION'='FORCE')
""".stripMargin)

val end = System.currentTimeMillis();
println(end-strt)

// spark.sql("drop table if exists lineitem")

}

def exampleBody2(spark: SparkSession): Unit = {

import org.apache.spark.sql.CarbonSession._
val spk = SparkSession
.builder()
.master("local")
.appName("CarbonSessionExample")
.getOrCreateCarbonSession("/home/root1/Desktop/carbon_new/carbondata/examples/spark2/target/store/default/lineitem")

spark.sql("select count(*) from lineitem").show()

val strt = System.currentTimeMillis()
val result = spark.sql("select * from lineitem").rdd
result.foreach(row => {
row.length
})
val end = System.currentTimeMillis()
println(end-strt)
}
}

0 comments on commit 3997955

Please sign in to comment.