Skip to content

Commit 81c48b0

Browse files
jiaoqingbopan3793
authored andcommitted
[KYUUBI #2663] Kyuubi Spark TPC-H Connector - Initial implementation
### _Why are the changes needed?_ fix #2663 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2731 from jiaoqingbo/kyuubi-2663. Closes #2663 1553c59 [jiaoqingbo] code review beb7275 [jiaoqingbo] [KYUUBI #2663] Kyuubi Spark TPC-H Connector - Initial implementation Authored-by: jiaoqingbo <1178404354@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 3b81a49 commit 81c48b0

File tree

9 files changed

+698
-0
lines changed

9 files changed

+698
-0
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<parent>
23+
<groupId>org.apache.kyuubi</groupId>
24+
<artifactId>kyuubi-parent</artifactId>
25+
<version>1.6.0-SNAPSHOT</version>
26+
<relativePath>../../../pom.xml</relativePath>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<artifactId>kyuubi-spark-connector-tpch_2.12</artifactId>
31+
<name>Kyuubi Spark TPC-H Connector</name>
32+
<packaging>jar</packaging>
33+
<url>https://kyuubi.apache.org/</url>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.scala-lang</groupId>
38+
<artifactId>scala-library</artifactId>
39+
<scope>provided</scope>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.slf4j</groupId>
44+
<artifactId>slf4j-api</artifactId>
45+
<scope>provided</scope>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>org.apache.spark</groupId>
50+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
51+
<scope>provided</scope>
52+
</dependency>
53+
54+
<dependency>
55+
<groupId>io.trino.tpch</groupId>
56+
<artifactId>tpch</artifactId>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>com.google.guava</groupId>
61+
<artifactId>guava</artifactId>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>org.apache.spark</groupId>
66+
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
67+
<type>test-jar</type>
68+
<scope>test</scope>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>org.scalatestplus</groupId>
73+
<artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
74+
<scope>test</scope>
75+
</dependency>
76+
77+
<dependency>
78+
<groupId>org.apache.spark</groupId>
79+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
80+
<version>${spark.version}</version>
81+
<type>test-jar</type>
82+
<scope>test</scope>
83+
</dependency>
84+
85+
<dependency>
86+
<groupId>org.apache.kyuubi</groupId>
87+
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
88+
<version>${project.version}</version>
89+
<scope>test</scope>
90+
</dependency>
91+
92+
<dependency>
93+
<groupId>org.apache.kyuubi</groupId>
94+
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
95+
<version>${project.version}</version>
96+
<type>test-jar</type>
97+
<scope>test</scope>
98+
</dependency>
99+
100+
<dependency>
101+
<groupId>org.apache.hadoop</groupId>
102+
<artifactId>hadoop-client-api</artifactId>
103+
<scope>test</scope>
104+
</dependency>
105+
106+
<dependency>
107+
<groupId>org.apache.hadoop</groupId>
108+
<artifactId>hadoop-client-runtime</artifactId>
109+
<scope>test</scope>
110+
</dependency>
111+
112+
<!--
113+
Spark requires `commons-collections` and `commons-io` but got them from transitive
114+
dependencies of `hadoop-client`. As we are using Hadoop Shaded Client, we need add
115+
them explicitly. See more details at SPARK-33212.
116+
-->
117+
<dependency>
118+
<groupId>commons-collections</groupId>
119+
<artifactId>commons-collections</artifactId>
120+
<scope>test</scope>
121+
</dependency>
122+
123+
<dependency>
124+
<groupId>commons-io</groupId>
125+
<artifactId>commons-io</artifactId>
126+
<scope>test</scope>
127+
</dependency>
128+
129+
<dependency>
130+
<groupId>jakarta.xml.bind</groupId>
131+
<artifactId>jakarta.xml.bind-api</artifactId>
132+
<scope>test</scope>
133+
</dependency>
134+
</dependencies>
135+
136+
<build>
137+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
138+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
139+
140+
<plugins>
141+
<plugin>
142+
<groupId>org.apache.maven.plugins</groupId>
143+
<artifactId>maven-dependency-plugin</artifactId>
144+
<configuration>
145+
<skip>true</skip>
146+
</configuration>
147+
</plugin>
148+
149+
<plugin>
150+
<groupId>org.apache.maven.plugins</groupId>
151+
<artifactId>maven-shade-plugin</artifactId>
152+
<configuration>
153+
<shadedArtifactAttached>false</shadedArtifactAttached>
154+
<artifactSet>
155+
<includes>
156+
<include>io.trino.tpch:tpch</include>
157+
<include>com.google.guava:guava</include>
158+
</includes>
159+
</artifactSet>
160+
<relocations>
161+
<relocation>
162+
<pattern>com.google.common</pattern>
163+
<shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
164+
<includes>
165+
<include>com.google.common.**</include>
166+
</includes>
167+
</relocation>
168+
<relocation>
169+
<pattern>io.trino.tpch</pattern>
170+
<shadedPattern>${kyuubi.shade.packageName}.io.trino.tpch</shadedPattern>
171+
<includes>
172+
<include>io.trino.tpch.**</include>
173+
</includes>
174+
</relocation>
175+
</relocations>
176+
</configuration>
177+
<executions>
178+
<execution>
179+
<phase>package</phase>
180+
<goals>
181+
<goal>shade</goal>
182+
</goals>
183+
</execution>
184+
</executions>
185+
</plugin>
186+
187+
<plugin>
188+
<groupId>org.apache.maven.plugins</groupId>
189+
<artifactId>maven-jar-plugin</artifactId>
190+
<executions>
191+
<execution>
192+
<id>prepare-test-jar</id>
193+
<phase>test-compile</phase>
194+
<goals>
195+
<goal>test-jar</goal>
196+
</goals>
197+
</execution>
198+
</executions>
199+
</plugin>
200+
</plugins>
201+
</build>
202+
</project>
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.tpch
19+
20+
import java.time.LocalDate
21+
import java.time.format.DateTimeFormatter
22+
23+
import scala.collection.mutable.ArrayBuffer
24+
25+
import io.trino.tpch._
26+
import io.trino.tpch.GenerateUtils.formatDate
27+
import io.trino.tpch.TpchColumnType.Base._
28+
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.catalyst.InternalRow
30+
import org.apache.spark.sql.connector.read._
31+
import org.apache.spark.sql.types._
32+
import org.apache.spark.unsafe.types.UTF8String
33+
34+
case class TPCHTableChuck(table: String, scale: Int, parallelism: Int, index: Int)
35+
extends InputPartition
36+
37+
class TPCHBatchScan(
38+
@transient table: TpchTable[_],
39+
scale: Int,
40+
schema: StructType) extends ScanBuilder
41+
with Scan with Batch with Serializable {
42+
43+
private val _numRows: Long = TPCHStatisticsUtils.numRows(table, scale)
44+
45+
private val rowCountPerTask: Int = 1000000
46+
47+
private val parallelism: Int =
48+
if (table.equals(TpchTable.NATION) || table.equals(TpchTable.REGION)) 1
49+
else math.max(
50+
SparkSession.active.sparkContext.defaultParallelism,
51+
(_numRows / rowCountPerTask.toDouble).ceil.toInt)
52+
53+
override def build: Scan = this
54+
55+
override def toBatch: Batch = this
56+
57+
override def description: String =
58+
s"Scan TPC-H sf$scale.${table.getTableName}, count: ${_numRows}, parallelism: $parallelism"
59+
60+
override def readSchema: StructType = schema
61+
62+
override def planInputPartitions: Array[InputPartition] =
63+
(1 to parallelism).map { i =>
64+
TPCHTableChuck(table.getTableName, scale, parallelism, i)
65+
}.toArray
66+
67+
def createReaderFactory: PartitionReaderFactory = (partition: InputPartition) => {
68+
val chuck = partition.asInstanceOf[TPCHTableChuck]
69+
new TPCHPartitionReader(chuck.table, chuck.scale, chuck.parallelism, chuck.index, schema)
70+
}
71+
72+
}
73+
74+
class TPCHPartitionReader(
75+
table: String,
76+
scale: Int,
77+
parallelism: Int,
78+
index: Int,
79+
schema: StructType) extends PartitionReader[InternalRow] {
80+
81+
private val tpchTable = TpchTable.getTable(table)
82+
83+
private val columns = tpchTable.getColumns
84+
.asInstanceOf[java.util.List[TpchColumn[TpchEntity]]]
85+
86+
private lazy val dateFmt: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
87+
88+
private val iterator = tpchTable.createGenerator(scale, index, parallelism).iterator
89+
90+
private var currentRow: InternalRow = _
91+
92+
override def next(): Boolean = {
93+
val hasNext = iterator.hasNext
94+
if (hasNext) currentRow = {
95+
val row = iterator.next().asInstanceOf[TpchEntity]
96+
val rowValue = new ArrayBuffer[String]()
97+
columns.stream().forEach(column => {
98+
val baseType = column.getType.getBase
99+
var value: String = ""
100+
baseType match {
101+
case IDENTIFIER => value += column.getIdentifier(row)
102+
case INTEGER => value += column.getInteger(row)
103+
case DATE => value += column.getDate(row)
104+
case DOUBLE => value += column.getDouble(row)
105+
case VARCHAR => value += column.getString(row)
106+
}
107+
rowValue += value
108+
})
109+
val rowAny = new ArrayBuffer[Any]()
110+
rowValue.zipWithIndex.map { case (value, i) =>
111+
(value, schema(i).dataType) match {
112+
case (null, _) => null
113+
case ("", _) => null
114+
case (value, IntegerType) => rowAny += value.toInt
115+
case (value, LongType) => rowAny += value.toLong
116+
case (value, DoubleType) => rowAny += value.toDouble
117+
case (value, DateType) => rowAny += LocalDate.parse(formatDate(value.toInt), dateFmt)
118+
.toEpochDay.toInt
119+
case (value, StringType) => rowAny += UTF8String.fromString(value)
120+
case (value, CharType(_)) => rowAny += UTF8String.fromString(value)
121+
case (value, VarcharType(_)) => rowAny += UTF8String.fromString(value)
122+
case (value, DecimalType()) => rowAny += Decimal(value)
123+
case (value, dt) => throw new IllegalArgumentException(s"value: $value, type: $dt")
124+
}
125+
}
126+
InternalRow.fromSeq(rowAny)
127+
}
128+
hasNext
129+
}
130+
131+
override def get(): InternalRow = currentRow
132+
133+
override def close(): Unit = {}
134+
135+
}

0 commit comments

Comments
 (0)