Skip to content

Commit

Permalink
Merge pull request #974 from wushengyeyouya/dev-1.0.2
Browse files Browse the repository at this point in the history
Optimize computation client module, add some test classes for new client SDK.
  • Loading branch information
peacewong committed Aug 26, 2021
2 parents 93e6cb9 + fc0b272 commit 057f220
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,13 @@ object LinkisJobBuilder {

def setDefaultAuthToken(authTokenValue: String): Unit = this.authTokenValue = authTokenValue

private[client] def justGetDefaultUJESClient: UJESClient = ujesClient

def getDefaultUJESClient: UJESClient = {
if(ujesClient == null) synchronized {
if(clientConfig == null) buildDefaultConfig()
if(ujesClient == null) {
ujesClient = new UJESClientImpl(clientConfig)
Utils.addShutdownHook(() => ujesClient.close())
}
}
ujesClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client

import java.io.Closeable

import com.webank.wedatasphere.linkis.computation.client.interactive.InteractiveJob
import com.webank.wedatasphere.linkis.computation.client.once.OnceJob


/**
* This class is only used to provide a unified entry for user to build a LinkisJob conveniently and simply.
* Please keep this class lightweight enough, do not set too many field to confuse user.
*/
object LinkisJobClient extends Closeable {

val config = LinkisJobBuilder

val interactive = InteractiveJob
val once = OnceJob

override def close(): Unit = {
if(config.justGetDefaultUJESClient != null) {
config.justGetDefaultUJESClient.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,40 @@
package com.webank.wedatasphere.linkis.computation.client.interactive

import com.webank.wedatasphere.linkis.computation.client.AbstractLinkisJobBuilder
import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils
import com.webank.wedatasphere.linkis.manager.label.entity.engine.RunType.RunType
import com.webank.wedatasphere.linkis.ujes.client.UJESClient
import com.webank.wedatasphere.linkis.ujes.client.request.JobSubmitAction
import org.apache.commons.lang.StringUtils


class InteractiveJobBuilder private[interactive]()
extends AbstractLinkisJobBuilder[SubmittableInteractiveJob] {

private var creator: String = _

override def addExecuteUser(executeUser: String): this.type = super.addExecuteUser(executeUser)

def setEngineType(engineType: String): this.type = addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, engineType)

def setCreator(creator: String): this.type = {
this.creator = creator
this
}

def setCode(code: String): this.type = addJobContent("code", code)

def setRunType(runType: RunType): this.type= addJobContent("runType", runType.toString)

def setRunTypeStr(runType: String): this.type = addJobContent("runType", runType)

override protected def validate(): Unit = {
if(labels != null && !labels.containsKey(LabelKeyUtils.USER_CREATOR_LABEL_KEY)
&& StringUtils.isNotBlank(creator))
addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, executeUser + "-" + creator)
super.validate()
}

override protected def createLinkisJob(ujesClient: UJESClient,
jobSubmitAction: JobSubmitAction): SubmittableInteractiveJob = new SubmittableInteractiveJob(ujesClient, jobSubmitAction)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.computation.client.once.action.{GetEngineConnAction, KillEngineConnAction}
import com.webank.wedatasphere.linkis.computation.client.once.simple.SimpleOnceJob


trait OnceJob extends AbstractLinkisJob {
Expand Down Expand Up @@ -57,4 +58,8 @@ trait OnceJob extends AbstractLinkisJob {

}

trait SubmittableOnceJob extends OnceJob with SubmittableLinkisJob
trait SubmittableOnceJob extends OnceJob with SubmittableLinkisJob

object OnceJob {
val simple = SimpleOnceJob
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client;

import com.webank.wedatasphere.linkis.common.conf.Configuration;
import com.webank.wedatasphere.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
import com.webank.wedatasphere.linkis.computation.client.utils.LabelKeyUtils;

/**
* @author enjoyyin
* @date 2021-08-25
* @since 0.5.0
*/
public class FlinkOnceJobTest {
public static void main(String[] args) {
// TODO First, set the right gateway url.
LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9002");
// TODO Second, modify the sql, so Flink engineConn can run it successfully.
String sql = "CREATE TABLE mysql_binlog (\n"
+ " id INT NOT NULL,\n"
+ " name STRING,\n"
+ " age INT\n"
+ ") WITH (\n"
+ " 'connector' = 'mysql-cdc',\n"
+ " 'hostname' = 'ip',\n"
+ " 'port' = 'port',\n"
+ " 'username' = '${username}',\n"
+ " 'password' = '${password}',\n"
+ " 'database-name' = '${database}',\n"
+ " 'table-name' = '${tablename}',\n"
+ " 'debezium.snapshot.locking.mode' = 'none'\n"
+ ");\n"
+ "CREATE TABLE sink_table (\n"
+ " id INT NOT NULL,\n"
+ " name STRING,\n"
+ " age INT,\n"
+ " primary key(id) not enforced\n"
+ ") WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'url' = 'jdbc:mysql://${ip}:port/${database}',\n"
+ " 'table-name' = '${tablename}',\n"
+ " 'driver' = 'com.mysql.jdbc.Driver',\n"
+ " 'username' = '${username}',\n"
+ " 'password' = '${password}'\n"
+ ");\n"
+ "INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog";
// TODO Thirdly, please modify the user_creator label and executeUser
SubmittableSimpleOnceJob onceJob = LinkisJobClient.once().simple().builder().setCreateService("Flink-Test")
.addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY(), "flink-1.12.2")
.addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY(), "hadoop-Streamis")
.addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY(), "once")
.addStartupParam(Configuration.IS_TEST_MODE().key(), true)
.setMaxSubmitTime(300000)
.addExecuteUser("hadoop").addJobContent("runType", "sql").addJobContent("code", sql).addSource("jobName", "OnceJobTest")
.build();
onceJob.submit();
System.out.println(onceJob.getId());
onceJob.waitForCompleted();
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client;

import com.webank.wedatasphere.linkis.computation.client.interactive.SubmittableInteractiveJob;

/**
* A test class for submit a sql to hive engineConn.
*/
public class InteractiveJobTest {

public static void main(String[] args) {
// TODO First, set the right gateway url.
LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9002");
//TODO Secondly, please modify the executeUser
SubmittableInteractiveJob job = LinkisJobClient.interactive().builder()
.setEngineType("hive").setRunTypeStr("sql").setCreator("IDE")
.setCode("show tables").addExecuteUser("hadoop").build();
// 3. Submit Job to Linkis
job.submit();
// 4. Wait for Job completed
job.waitForCompleted();
// 5. Get results from iterators.
ResultSetIterator iterator = job.getResultSetIterables()[0].iterator();
System.out.println(iterator.getMetadata());
while(iterator.hasNext()){
System.out.println(iterator.next());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.Engin
import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
import org.apache.commons.io.IOUtils

@Deprecated
object UJESClientImplTest extends App {

// Suggest to use LinkisJobClient to submit job to Linkis.
val clientConfig = DWSClientConfigBuilder.newBuilder().addServerUrl("http://localhost:port")
.connectionTimeout(30000).discoveryEnabled(true)
.discoveryFrequency(1, TimeUnit.MINUTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

import java.util.concurrent.TimeUnit;


@Deprecated
public class UJESClientImplTestJ{
public static void main(String[] args){
// Suggest to use LinkisJobClient to submit job to Linkis.
DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder().addServerUrl("http://localhost:port")
.connectionTimeout(30000).discoveryEnabled(true)
.discoveryFrequency(1, TimeUnit.MINUTES)
Expand All @@ -49,6 +50,7 @@ public static void main(String[] args){
JobStatusResult status = client.status(jobExecuteResult);
while(!status.isCompleted()) {
JobProgressResult progress = client.progress(jobExecuteResult);
System.out.println("progress: " + progress.getProgress());
Utils.sleepQuietly(500);
status = client.status(jobExecuteResult);
}
Expand All @@ -58,4 +60,5 @@ public static void main(String[] args){
System.out.println("fileContents: " + fileContents);
IOUtils.closeQuietly(client);
}

}

0 comments on commit 057f220

Please sign in to comment.