Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize computation client module, add some test classes for new client SDK. #974

Merged
merged 2 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,13 @@ object LinkisJobBuilder {

def setDefaultAuthToken(authTokenValue: String): Unit = this.authTokenValue = authTokenValue

private[client] def justGetDefaultUJESClient: UJESClient = ujesClient

def getDefaultUJESClient: UJESClient = {
if(ujesClient == null) synchronized {
if(clientConfig == null) buildDefaultConfig()
if(ujesClient == null) {
ujesClient = new UJESClientImpl(clientConfig)
Utils.addShutdownHook(() => ujesClient.close())
}
}
ujesClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client

import java.io.Closeable

import com.webank.wedatasphere.linkis.computation.client.interactive.InteractiveJob
import com.webank.wedatasphere.linkis.computation.client.once.OnceJob


/**
* This class is only used to provide a unified entry for user to build a LinkisJob conveniently and simply.
* Please keep this class lightweight enough, do not set too many field to confuse user.
*/
object LinkisJobClient extends Closeable {

val config = LinkisJobBuilder

val interactive = InteractiveJob
val once = OnceJob

override def close(): Unit = {
if(config.justGetDefaultUJESClient != null) {
config.justGetDefaultUJESClient.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,40 @@
package com.webank.wedatasphere.linkis.computation.client.interactive

import com.webank.wedatasphere.linkis.computation.client.AbstractLinkisJobBuilder
import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils
import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType.RunType
import com.webank.wedatasphere.linkis.ujes.client.UJESClient
import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction
import org.apache.commons.lang.StringUtils


class InteractiveJobBuilder private[interactive]()
extends AbstractLinkisJobBuilder[SubmittableInteractiveJob] {

private var creator: String = _

override def addExecuteUser(executeUser: String): this.type = super.addExecuteUser(executeUser)

def setEngineType(engineType: String): this.type = addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, engineType)

def setCreator(creator: String): this.type = {
this.creator = creator
this
}

def setCode(code: String): this.type = addJobContent("code", code)

def setRunType(runType: RunType): this.type= addJobContent("runType", runType.toString)

def setRunTypeStr(runType: String): this.type = addJobContent("runType", runType)

override protected def validate(): Unit = {
if(labels != null && !labels.containsKey(LabelKeyUtils.USER_CREATOR_LABEL_KEY)
&& StringUtils.isNotBlank(creator))
addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, executeUser + "-" + creator)
super.validate()
}

override protected def createLinkisJob(ujesClient: UJESClient,
jobSubmitAction: JobSubmitAction): SubmittableInteractiveJob = new SubmittableInteractiveJob(ujesClient, jobSubmitAction)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.computation.client.once.action.{GetEngineConnAction, KillEngineConnAction}
import com.webank.wedatasphere.linkis.computation.client.once.simple.SimpleOnceJob


trait OnceJob extends AbstractLinkisJob {
Expand Down Expand Up @@ -57,4 +58,8 @@ trait OnceJob extends AbstractLinkisJob {

}

trait SubmittableOnceJob extends OnceJob with SubmittableLinkisJob
trait SubmittableOnceJob extends OnceJob with SubmittableLinkisJob

object OnceJob {
val simple = SimpleOnceJob
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client;

import com.webank.wedatasphere.linkis.common.conf.Configuration;
import com.webank.wedatasphere.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils;

/**
* @author enjoyyin
* @date 2021-08-25
* @since 0.5.0
*/
public class FlinkOnceJobTest {
public static void main(String[] args) {
// TODO First, set the right gateway url.
LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9002");
// TODO Second, modify the sql, so Flink engineConn can run it successfully.
String sql = "CREATE TABLE mysql_binlog (\n"
+ " id INT NOT NULL,\n"
+ " name STRING,\n"
+ " age INT\n"
+ ") WITH (\n"
+ " 'connector' = 'mysql-cdc',\n"
+ " 'hostname' = 'ip',\n"
+ " 'port' = 'port',\n"
+ " 'username' = '${username}',\n"
+ " 'password' = '${password}',\n"
+ " 'database-name' = '${database}',\n"
+ " 'table-name' = '${tablename}',\n"
+ " 'debezium.snapshot.locking.mode' = 'none'\n"
+ ");\n"
+ "CREATE TABLE sink_table (\n"
+ " id INT NOT NULL,\n"
+ " name STRING,\n"
+ " age INT,\n"
+ " primary key(id) not enforced\n"
+ ") WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'url' = 'jdbc:mysql://${ip}:port/${database}',\n"
+ " 'table-name' = '${tablename}',\n"
+ " 'driver' = 'com.mysql.jdbc.Driver',\n"
+ " 'username' = '${username}',\n"
+ " 'password' = '${password}'\n"
+ ");\n"
+ "INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog";
// TODO Thirdly, please modify the user_creator label and executeUser
SubmittableSimpleOnceJob onceJob = LinkisJobClient.once().simple().builder().setCreateService("Flink-Test")
.addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "flink-1.12.2")
.addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-Streamis")
.addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once")
.addStartupParam(Configuration.IS_TEST_MODE().key(), true)
.setMaxSubmitTime(300000)
.addExecuteUser("hadoop").addJobContent("runType", "sql").addJobContent("code", sql).addSource("jobName", "OnceJobTest")
.build();
onceJob.submit();
System.out.println(onceJob.getId());
onceJob.waitForCompleted();
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client;

import com.webank.wedatasphere.linkis.computation.client.interactive.SubmittableInteractiveJob;

/**
* A test class for submit a sql to hive engineConn.
*/
public class InteractiveJobTest {

public static void main(String[] args) {
// TODO First, set the right gateway url.
LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9002");
//TODO Secondly, please modify the executeUser
SubmittableInteractiveJob job = LinkisJobClient.interactive().builder()
.setEngineType("hive").setRunTypeStr("sql").setCreator("IDE")
.setCode("show tables").addExecuteUser("hadoop").build();
// 3. Submit Job to Linkis
job.submit();
// 4. Wait for Job completed
job.waitForCompleted();
// 5. Get results from iterators.
ResultSetIterator iterator = job.getResultSetIterables()[0].iterator();
System.out.println(iterator.getMetadata());
while(iterator.hasNext()){
System.out.println(iterator.next());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.Engin
import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
import org.apache.commons.io.IOUtils

@Deprecated
object UJESClientImplTest extends App {

// Suggest to use LinkisJobClient to submit job to Linkis.
val clientConfig = DWSClientConfigBuilder.newBuilder().addServerUrl("http://localhost:port")
.connectionTimeout(30000).discoveryEnabled(true)
.discoveryFrequency(1, TimeUnit.MINUTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

import java.util.concurrent.TimeUnit;


@Deprecated
public class UJESClientImplTestJ{
public static void main(String[] args){
// Suggest to use LinkisJobClient to submit job to Linkis.
DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder().addServerUrl("http://localhost:port")
.connectionTimeout(30000).discoveryEnabled(true)
.discoveryFrequency(1, TimeUnit.MINUTES)
Expand All @@ -49,6 +50,7 @@ public static void main(String[] args){
JobStatusResult status = client.status(jobExecuteResult);
while(!status.isCompleted()) {
JobProgressResult progress = client.progress(jobExecuteResult);
System.out.println("progress: " + progress.getProgress());
Utils.sleepQuietly(500);
status = client.status(jobExecuteResult);
}
Expand All @@ -58,4 +60,5 @@ public static void main(String[] args){
System.out.println("fileContents: " + fileContents);
IOUtils.closeQuietly(client);
}

}