Skip to content
Permalink
Browse files
[feature] Support Spark3.2 compilation (#24)
* support spark3.2
  • Loading branch information
cxzl25 committed May 7, 2022
1 parent 094d76f commit 32fe7da7baf50fe5df6a6cc083793b659fb697ee
Showing 10 changed files with 61 additions and 22 deletions.
@@ -2,3 +2,4 @@ custom_env.sh
spark-doris-connector/dependency-reduced-pom.xml
spark-doris-connector/output/
spark-doris-connector/target/
spark-doris-connector/.idea/
@@ -32,5 +32,6 @@ header:
- '**/*.patch'
- '**/*.log'
- 'custom_env.sh.tpl'
- '**/*.csv'

comment: on-failure
@@ -24,7 +24,7 @@ under the License.

### Spark Doris Connector

More information about compilation and usage, please visit [Spark Doris Connector](https://doris.apache.org/extending-doris/spark-doris-connector.html)
More information about compilation and usage, please visit [Spark Doris Connector](https://doris.apache.org/ecosystem/spark-doris-connector.html)

## License

@@ -35,9 +35,11 @@ usage() {
Usage:
$0 --spark version --scala version # specify spark and scala version
$0 --tag # this is a build from tag
$0 --mvn-args -Dxx=yy -Pxx # specify maven arguments
e.g.:
$0 --spark 2.3.4 --scala 2.11
$0 --spark 3.1.2 --scala 2.12
$0 --spark 3.2.0 --scala 2.12 --mvn-args \"-Dnetty.version=4.1.68.Final -Dfasterxml.jackson.version=2.12.3\"
$0 --tag
"
exit 1
@@ -49,6 +51,7 @@ OPTS=$(getopt \
-o 'h' \
-l 'spark:' \
-l 'scala:' \
-l 'mvn-args:' \
-l 'tag' \
-- "$@")

@@ -68,10 +71,12 @@ fi
BUILD_FROM_TAG=0
SPARK_VERSION=0
SCALA_VERSION=0
MVN_ARGS=""
while true; do
case "$1" in
--spark) SPARK_VERSION=$2 ; shift 2 ;;
--scala) SCALA_VERSION=$2 ; shift 2 ;;
--mvn-args) MVN_ARGS=$2 ; shift 2 ;;
--tag) BUILD_FROM_TAG=1 ; shift ;;
--) shift ; break ;;
*) echo "Internal error" ; exit 1 ;;
@@ -91,7 +96,7 @@ if [[ ${BUILD_FROM_TAG} -eq 1 ]]; then
${MVN_BIN} clean package
else
rm -rf ${ROOT}/output/
${MVN_BIN} clean package -Dspark.version=${SPARK_VERSION} -Dscala.version=${SCALA_VERSION} -Dspark.minor.version=${SPARK_MINOR_VERSION}
${MVN_BIN} clean package -Dspark.version=${SPARK_VERSION} -Dscala.version=${SCALA_VERSION} -Dspark.minor.version=${SPARK_MINOR_VERSION} $MVN_ARGS
fi

mkdir ${ROOT}/output/
@@ -73,6 +73,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<thrift.binary>${env.THRIFT_BIN}</thrift.binary>
<project.scm.id>github</project.scm.id>
<netty.version>4.1.27.Final</netty.version>
<fasterxml.jackson.version>2.10.0</fasterxml.jackson.version>
</properties>
<profiles>
<!-- for custom internal repository -->
@@ -132,6 +134,12 @@
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
@@ -200,18 +208,17 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
<artifactId>jackson-annotations</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.27.Final</version>
<scope>provided</scope>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
</dependencies>
<build>
@@ -51,6 +51,9 @@
import java.util.HashSet;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
@@ -72,9 +75,6 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;

import com.google.common.annotations.VisibleForTesting;
@@ -44,14 +44,13 @@
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jdk.nashorn.internal.ir.annotations.Ignore;

public class TestRestService {
private final static Logger logger = LoggerFactory.getLogger(TestRestService.class);

@@ -0,0 +1,3 @@
name,gender,age
A,Male,16
B,Female,12
@@ -1,8 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.sql

import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.junit.{Ignore, Test}

// This test need real connect info to run.
// Set the connect info before comment out this @Ignore
@Ignore
class TestConnectorWriteDoris {

val dorisFeNodes = "127.0.0.1:8030"
@@ -36,21 +56,23 @@ class TestConnectorWriteDoris {

@Test
def csvDataWriteTest(): Unit = {
val csvFile =
Thread.currentThread().getContextClassLoader.getResource("data.csv").toString
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val df = spark.read
.option("header", "true") // uses the first line as names of columns
.option("inferSchema", "true") // infers the input schema automatically from data
.csv("data.csv")
.csv(csvFile)
df.createTempView("tmp_tb")
val doris = spark.sql(
"""
|create TEMPORARY VIEW test_lh
s"""
|CREATE TEMPORARY VIEW test_lh
|USING doris
|OPTIONS(
| "table.identifier"="test.test_lh",
| "fenodes"="127.0.0.1:8030",
| "user"="root",
| "password"=""
| "fenodes"="${dorisFeNodes}",
| "user"="${dorisUser}",
| "password"="${dorisPwd}"
|);
|""".stripMargin)
spark.sql(
@@ -19,7 +19,7 @@ package org.apache.doris.spark.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Ignore;
import org.junit.Ignore
import org.junit.Test

// This test need real connect info to run.
@@ -113,6 +113,7 @@ class TestSparkConnector {
.option("sink.batch.size",2)
.option("sink.max-retries",2)
.start().awaitTermination()
spark.stop()
}
}

0 comments on commit 32fe7da

Please sign in to comment.