Skip to content

Commit 3ecdd42

Browse files
pan3793yaooqinn
authored andcommitted
[KYUUBI #2439] Using Pure Java TPC-DS generator
### _Why are the changes needed?_ This PR proposes change the Kyuubi TPC-DS generator to pure Java implementation instead of the original C binary. The new pure Java TPC-DS generator is under Apache License, and in fact, I don't know the original C binary License, so we exclude them from release in the past. Since the change removes the License issue of Kyuubi TPC-DS module, we can bundle the TPC-DS tool in the future release. And after migration, I haven't see "error=26, Text file busy" described in #2439 any more. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate 1. Use old C binary based TPC-DS generator generate 1GB data under database `tpcds_s1` 2. Use new pure Java base TPC-DS generator generate 1GB data under database `new_tpcds_sf1` 3. Compare results of `select count(*)`, and `select sum(hash(*))` ``` spark-sql> select count(*) from tpcds_s1.inventory; 11745000 Time taken: 0.161 seconds, Fetched 1 row(s) spark-sql> select count(*) from new_tpcds_sf1.inventory; 11745000 Time taken: 0.141 seconds, Fetched 1 row(s) spark-sql> select sum(hash(*)) from tpcds_s1.inventory; -556768665838 Time taken: 0.252 seconds, Fetched 1 row(s) spark-sql> select sum(hash(*)) from new_tpcds_sf1.inventory; -556768665838 Time taken: 0.232 seconds, Fetched 1 row(s) ``` - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2449 from pan3793/tpcds. Closes #2439 a270bcb [Cheng Pan] Remove the exclusion in source release 7c8d327 [Cheng Pan] [KYUUBI #2439] Using Pure Java TPC-DS generator Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
1 parent eeb8a94 commit 3ecdd42

File tree

10 files changed

+59
-119
lines changed

10 files changed

+59
-119
lines changed

.gitattributes

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
.travis.yml export-ignore
2222
_config.yml export-ignore
2323
codecov.yml export-ignore
24-
dev/kyuubi-tpcds/ export-ignore
2524
licenses-binary/ export-ignore
2625
LICENSE-binary export-ignore
2726
NOTICE-binary export-ignore

dev/kyuubi-tpcds/pom.xml

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,13 @@
5151
</dependency>
5252

5353
<dependency>
54-
<groupId>commons-io</groupId>
55-
<artifactId>commons-io</artifactId>
54+
<groupId>io.trino.tpcds</groupId>
55+
<artifactId>tpcds</artifactId>
56+
</dependency>
57+
58+
<dependency>
59+
<groupId>com.google.guava</groupId>
60+
<artifactId>guava</artifactId>
5661
</dependency>
5762
</dependencies>
5863

@@ -72,6 +77,25 @@
7277
<plugin>
7378
<groupId>org.apache.maven.plugins</groupId>
7479
<artifactId>maven-shade-plugin</artifactId>
80+
<configuration>
81+
<shadedArtifactAttached>false</shadedArtifactAttached>
82+
<artifactSet>
83+
<includes>
84+
<include>com.github.scopt:scopt_${scala.binary.version}</include>
85+
<include>io.trino.tpcds:tpcds</include>
86+
<include>com.google.guava:guava</include>
87+
</includes>
88+
</artifactSet>
89+
<relocations>
90+
<relocation>
91+
<pattern>com.google.common</pattern>
92+
<shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
93+
<includes>
94+
<include>com.google.common.**</include>
95+
</includes>
96+
</relocation>
97+
</relocations>
98+
</configuration>
7599
<executions>
76100
<execution>
77101
<phase>package</phase>
-494 KB
Binary file not shown.
-626 KB
Binary file not shown.
-306 KB
Binary file not shown.
-626 KB
Binary file not shown.

dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala

Lines changed: 15 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,17 @@
1717

1818
package org.apache.kyuubi.tpcds
1919

20-
import java.io.InputStream
21-
import java.lang.ProcessBuilder.Redirect
22-
import java.nio.file.{Files, Paths}
23-
import java.nio.file.attribute.PosixFilePermissions._
20+
import scala.collection.JavaConverters._
2421

25-
import scala.io.Source
26-
27-
import org.apache.spark.{KyuubiSparkUtils, SparkEnv}
22+
import io.trino.tpcds.{Options, Results}
2823
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
2924
import org.apache.spark.sql.functions.col
3025
import org.apache.spark.sql.types.{StringType, StructField, StructType}
31-
import org.slf4j.{Logger, LoggerFactory}
3226

3327
case class TableGenerator(
3428
name: String,
3529
partitionCols: Seq[String],
3630
fields: StructField*) {
37-
@transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
3831

3932
private val schema: StructType = StructType(fields)
4033
private val rawSchema: StructType = StructType(fields.map(f => StructField(f.name, StringType)))
@@ -54,72 +47,19 @@ case class TableGenerator(
5447
private def radix: Int = (scaleFactor / 100) max 5 min parallelism
5548

5649
private def toDF: DataFrame = {
57-
val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
58-
val os = System.getProperty("os.name").split(' ')(0).toLowerCase
59-
val loader = Thread.currentThread().getContextClassLoader
60-
61-
val tempDir = KyuubiSparkUtils.createTempDir(SparkEnv.get.conf)
62-
tempDir.toPath
63-
val dsdgen = Paths.get(tempDir.toString, "dsdgen")
64-
val idx = Paths.get(tempDir.toString, "tpcds.idx")
65-
66-
Seq(dsdgen, idx).foreach { file =>
67-
val in: InputStream = loader.getResourceAsStream(s"bin/$os/${file.toFile.getName}")
68-
Files.createFile(file, asFileAttribute(fromString("rwx------")))
69-
val outputStream = Files.newOutputStream(file)
70-
try {
71-
val buffer = new Array[Byte](8192)
72-
var bytesRead = 0
73-
val canRead = () => {
74-
bytesRead = in.read(buffer)
75-
bytesRead != -1
76-
}
77-
while (canRead()) {
78-
outputStream.write(buffer, 0, bytesRead)
79-
}
80-
} finally {
81-
outputStream.flush()
82-
outputStream.close()
83-
in.close()
84-
}
85-
}
86-
87-
val cmd = s"./dsdgen" +
88-
s" -TABLE $name" +
89-
s" -SCALE $scaleFactor" +
90-
s" -PARALLEL $parallelism" +
91-
s" -child $i" +
92-
s" -DISTRIBUTIONS tpcds.idx" +
93-
s" -FORCE Y" +
94-
s" -QUIET Y"
95-
96-
val builder = new ProcessBuilder(cmd.split(" "): _*)
97-
builder.directory(tempDir)
98-
builder.redirectError(Redirect.INHERIT)
99-
logger.info(s"Start $cmd at ${builder.directory()}")
100-
val process = builder.start()
101-
val res = process.waitFor()
102-
103-
logger.info(s"Finish w/ $res $cmd")
104-
val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
105-
val iterator =
106-
if (Files.exists(data)) {
107-
// The data generated by `dsdgen` encoding in "Cp1252".
108-
// See detail at https://github.com/databricks/spark-sql-perf/pull/104
109-
// noinspection SourceNotClosed
110-
Source.fromFile(data.toFile, "cp1252", 8192).getLines
111-
} else {
112-
logger.warn(s"No data generated in child $i")
113-
Nil
114-
}
115-
iterator
116-
}
117-
118-
val rowRDD = rawRDD.mapPartitions { iter =>
119-
iter.map { line =>
120-
val v = line.split("\\|", -1).dropRight(1).map(Option(_).filter(_.nonEmpty).orNull)
121-
Row.fromSeq(v)
122-
}
50+
val rowRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
51+
val opt = new Options
52+
opt.table = name
53+
opt.scale = scaleFactor
54+
opt.parallelism = parallelism
55+
56+
val session = opt.toSession.withChunkNumber(i)
57+
val table = session.getOnlyTableToGenerate
58+
59+
Results.constructResults(table, session).iterator.asScala
60+
.map { _.get(0).asScala } // 1st row is specific table row
61+
.map { row => row.map { v => if (v == Options.DEFAULT_NULL_STRING) null else v } }
62+
.map { row => Row.fromSeq(row) }
12363
}
12464

12565
val columns = fields.map { f => col(f.name).cast(f.dataType).as(f.name) }

dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.kyuubi.tpcds.benchmark
1919

20-
import java.nio.charset.StandardCharsets
21-
22-
import org.apache.commons.io.IOUtils
20+
import scala.io.{Codec, Source}
2321

2422
/**
2523
* This implements the official TPCDS v2.4 queries with only cosmetic modifications.
@@ -135,9 +133,10 @@ trait TPCDS_2_4_Queries extends Benchmark {
135133
"ss_max")
136134

137135
val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
138-
val queryContent: String = IOUtils.toString(
139-
getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql"),
140-
StandardCharsets.UTF_8)
136+
val in = getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
137+
val queryContent: String = Source.fromInputStream(in)(Codec.UTF8).mkString
138+
in.close()
139+
141140
Query(
142141
queryName + "-v2.4",
143142
queryContent,

dev/kyuubi-tpcds/src/main/scala/org/apache/spark/KyuubiSparkUtils.scala

Lines changed: 0 additions & 35 deletions
This file was deleted.

pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
<swagger.version>2.1.11</swagger.version>
158158
<testcontainers.version>0.39.12</testcontainers.version>
159159
<trino.client.version>363</trino.client.version>
160+
<trino.tpcds.version>1.4</trino.tpcds.version>
160161
<zookeeper.version>3.4.14</zookeeper.version>
161162

162163
<!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module -->
@@ -518,6 +519,18 @@
518519
<version>${trino.client.version}</version>
519520
</dependency>
520521

522+
<dependency>
523+
<groupId>io.trino.tpcds</groupId>
524+
<artifactId>tpcds</artifactId>
525+
<version>${trino.tpcds.version}</version>
526+
<exclusions>
527+
<exclusion>
528+
<groupId>*</groupId>
529+
<artifactId>*</artifactId>
530+
</exclusion>
531+
</exclusions>
532+
</dependency>
533+
521534
<dependency>
522535
<groupId>com.dimafeng</groupId>
523536
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>

0 commit comments

Comments
 (0)