Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adjust gateway and add a ha module to adapt context service #436

Merged
merged 11 commits into from
Jun 4, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.webank.wedatasphere.linkis.cs.common.entity.source.ContextKeyValue;
import com.webank.wedatasphere.linkis.cs.common.entity.source.ContextValue;
import com.webank.wedatasphere.linkis.cs.listener.ListenerBus.ContextAsyncListenerBus;
import com.webank.wedatasphere.linkis.cs.listener.callback.imp.ClientSource;
import com.webank.wedatasphere.linkis.cs.listener.callback.imp.ContextKeyValueBean;
import com.webank.wedatasphere.linkis.cs.listener.callback.imp.DefaultContextIDCallbackEngine;
import com.webank.wedatasphere.linkis.cs.listener.callback.imp.DefaultContextKeyCallbackEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.webank.wedatasphere.linkis.gateway.config

import com.webank.wedatasphere.linkis.gateway.security.{LDAPUserRestful, SecurityHook, SecurityFilter, UserRestful}
import com.webank.wedatasphere.linkis.gateway.security.{LDAPUserRestful, SecurityFilter, SecurityHook, UserRestful}
import javax.annotation.PostConstruct
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.webank.wedatasphere.linkis.gateway.http

import java.util

import com.webank.wedatasphere.linkis.server.JMap

/**
* created by cooperyang on 2019/1/9.
*/
Expand All @@ -32,13 +36,16 @@ trait GatewayContext {
def setGatewayRoute(gatewayRoute: GatewayRoute): Unit
def getGatewayRoute: GatewayRoute

def getParams: JMap[String, String]
}
class BaseGatewayContext extends GatewayContext {
private var request: GatewayHttpRequest = _
private var response: GatewayHttpResponse = _
private var webSocketRequest: Boolean = false
private var gatewayRoute: GatewayRoute = _

private val props: JMap[String, String] = new util.HashMap[String, String]()

override def getRequest: GatewayHttpRequest = request

override def setRequest(request: GatewayHttpRequest): Unit = this.request = request
Expand All @@ -54,4 +61,6 @@ class BaseGatewayContext extends GatewayContext {
override def setGatewayRoute(gatewayRoute: GatewayRoute): Unit = this.gatewayRoute = gatewayRoute

override def getGatewayRoute: GatewayRoute = gatewayRoute

override def getParams: JMap[String, String] = this.props
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package com.webank.wedatasphere.linkis.gateway.security
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.gateway.http.{GatewayContext, GatewayHttpRequest}
import com.webank.wedatasphere.linkis.server.exception.LoginExpireException
import com.webank.wedatasphere.linkis.server.security.{SSOUtils, ServerSSOUtils}
import com.webank.wedatasphere.linkis.server.security.SecurityFilter._
import com.webank.wedatasphere.linkis.server.security.{SSOUtils, ServerSSOUtils}
import javax.servlet.http.Cookie

import scala.collection.JavaConversions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package com.webank.wedatasphere.linkis.gateway.security

import java.util.Random

import com.google.gson.{Gson, JsonObject}
import com.google.gson.Gson
import com.webank.wedatasphere.linkis.common.utils.{Logging, RSAUtils, Utils}
import com.webank.wedatasphere.linkis.gateway.config.GatewayConfiguration
import com.webank.wedatasphere.linkis.gateway.http.GatewayContext
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway-httpclient-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.7.0</version>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.text.SimpleDateFormat
import java.util

import com.fasterxml.jackson.databind.ObjectMapper
import com.ning.http.client.Response
import com.webank.wedatasphere.linkis.common.io.{Fs, FsPath}
import com.webank.wedatasphere.linkis.httpclient.AbstractHttpClient
import com.webank.wedatasphere.linkis.httpclient.discovery.Discovery
Expand All @@ -37,14 +36,15 @@ import com.webank.wedatasphere.linkis.httpclient.response.{HttpResult, ListResul
import com.webank.wedatasphere.linkis.storage.FSFactory
import org.apache.commons.beanutils.BeanUtils
import org.apache.commons.lang.ClassUtils
import org.apache.http.HttpResponse

import scala.collection.JavaConversions.mapAsJavaMap

/**
* created by cooperyang on 2019/5/20.
*/
class DWSHttpClient(clientConfig: DWSClientConfig, clientName: String)
extends AbstractHttpClient(clientConfig, clientName) {
extends AbstractHttpClient(clientConfig, clientName) {

override protected def createDiscovery(): Discovery = new DWSGatewayDiscovery

Expand All @@ -57,21 +57,25 @@ class DWSHttpClient(clientConfig: DWSClientConfig, clientName: String)
requestAction
}

override protected def httpResponseToResult(response: Response, requestAction: HttpAction): Option[Result] = {
val url = requestAction.getURL
override protected def httpResponseToResult(response: HttpResponse, requestAction: HttpAction, responseBody: String): Option[Result] = {
var entity = response.getEntity
val statusCode: Int = response.getStatusLine.getStatusCode
val url: String = requestAction.getURL
val contentType: String = entity.getContentType.getValue
DWSHttpMessageFactory.getDWSHttpMessageResult(url).map { case DWSHttpMessageResultInfo(_, clazz) =>
clazz match {
case c if ClassUtils.isAssignable(c, classOf[DWSResult]) =>
val dwsResult = clazz.getConstructor().newInstance().asInstanceOf[DWSResult]
dwsResult.set(response.getResponseBody, response.getStatusCode, response.getUri.toString, response.getContentType)
dwsResult.set(responseBody, statusCode, url, contentType)
BeanUtils.populate(dwsResult, dwsResult.getData)
return Some(dwsResult)
case _ =>
}

def transfer(value: Result, map: Map[String, Object]): Unit = {
value match {
case httpResult: HttpResult =>
httpResult.set(response.getResponseBody, response.getStatusCode, response.getUri.toString, response.getContentType)
httpResult.set(responseBody, statusCode, url, contentType)
case _ =>
}
val javaMap = mapAsJavaMap(map)
Expand All @@ -89,17 +93,18 @@ class DWSHttpClient(clientConfig: DWSClientConfig, clientName: String)
transfer(value, map)
value
}.toArray
new ListResult(response.getResponseBody, results)
new ListResult(responseBody, results)
}
}.orElse(nonDWSResponseToResult(response, requestAction))
}

protected def nonDWSResponseToResult(response: Response, requestAction: HttpAction): Option[Result] = None
protected def nonDWSResponseToResult(response: HttpResponse, requestAction: HttpAction): Option[Result] = None

protected def fillResultFields(responseMap: util.Map[String, Object], value: Result): Unit = {}

//TODO Consistent with workspace, plus expiration time(与workspace保持一致,加上过期时间)
override protected def getFsByUser(user: String, path: FsPath): Fs = FSFactory.getFsByProxyUser(path, user)

}
object DWSHttpClient {
val jacksonJson = new ObjectMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package com.webank.wedatasphere.linkis.httpclient.dws.authentication

import com.ning.http.client.Response
import com.webank.wedatasphere.linkis.common.utils.ByteTimeUtils
import com.webank.wedatasphere.linkis.httpclient.authentication.{AbstractAuthenticationStrategy, AuthenticationAction, AuthenticationResult}
import com.webank.wedatasphere.linkis.httpclient.dws.exception.AuthenticationFailedException
import com.webank.wedatasphere.linkis.httpclient.dws.request.DWSAuthenticationAction
import com.webank.wedatasphere.linkis.httpclient.dws.response.DWSAuthenticationResult
import com.webank.wedatasphere.linkis.httpclient.request.{Action, UserAction, UserPwdAction}
import org.apache.commons.lang.StringUtils
import org.apache.http.HttpResponse

/**
* created by cooperyang on 2019/5/22.
Expand Down Expand Up @@ -51,7 +51,7 @@ class StaticAuthenticationStrategy(override protected val sessionMaxAliveTime: L
action
}

override def getAuthenticationResult(response: Response, requestAction: AuthenticationAction): AuthenticationResult = {
override def getAuthenticationResult(response: HttpResponse, requestAction: AuthenticationAction): AuthenticationResult = {
new DWSAuthenticationResult(response, requestAction.serverUrl)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package com.webank.wedatasphere.linkis.httpclient.dws.authentication

import java.util

import com.ning.http.client.Response
import com.ning.http.client.cookie.Cookie
import com.webank.wedatasphere.linkis.httpclient.authentication._
import com.webank.wedatasphere.linkis.httpclient.dws.exception.AuthenticationFailedException
import com.webank.wedatasphere.linkis.httpclient.request.{Action, UserAction}
import org.apache.http.HttpResponse
import org.apache.http.cookie.Cookie

/**
* Created by enjoyyin on 2019/10/4.
Expand All @@ -35,6 +35,7 @@ class TokenAuthenticationStrategy(override protected val sessionMaxAliveTime: Lo
case _: AuthenticationAction => null
case action: UserAction => new HttpAuthentication {
import TokenAuthenticationStrategy._

import scala.collection.JavaConversions._
override def authToCookies: Array[Cookie] = Array.empty

Expand All @@ -51,7 +52,7 @@ class TokenAuthenticationStrategy(override protected val sessionMaxAliveTime: Lo

override protected def getAuthenticationAction(requestAction: Action, serverUrl: String): AuthenticationAction = null

override def getAuthenticationResult(response: Response, requestAction: AuthenticationAction): AuthenticationResult = null
override def getAuthenticationResult(response: HttpResponse, requestAction: AuthenticationAction): AuthenticationResult = null

}
object TokenAuthenticationStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.webank.wedatasphere.linkis.httpclient.dws.config

import com.webank.wedatasphere.linkis.httpclient.config.{ClientConfig, ClientConfigBuilder}
import com.webank.wedatasphere.linkis.httpclient.config.ClientConfigBuilder
import com.webank.wedatasphere.linkis.httpclient.dws.exception.UnknownVersionException
import org.apache.commons.lang.StringUtils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

package com.webank.wedatasphere.linkis.httpclient.dws.discovery

import com.ning.http.client.Response
import com.webank.wedatasphere.linkis.httpclient.discovery.{AbstractDiscovery, HeartbeatAction, HeartbeatResult}
import com.webank.wedatasphere.linkis.httpclient.dws.request.DWSHeartbeatAction
import com.webank.wedatasphere.linkis.httpclient.dws.response.DWSHeartbeatResult
import org.apache.http.HttpResponse

import scala.util.Random

Expand All @@ -43,7 +43,7 @@ class DWSGatewayDiscovery extends AbstractDiscovery {

override protected def getHeartbeatAction(serverUrl: String): HeartbeatAction = new DWSHeartbeatAction(serverUrl)

override def getHeartbeatResult(response: Response, requestAction: HeartbeatAction): HeartbeatResult = requestAction match {
override def getHeartbeatResult(response: HttpResponse, requestAction: HeartbeatAction): HeartbeatResult = requestAction match {
case h: DWSHeartbeatAction => new DWSHeartbeatResult(response, h.serverUrl)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,37 @@ package com.webank.wedatasphere.linkis.httpclient.dws.response

import java.util

import com.ning.http.client.Response
import com.ning.http.client.cookie.Cookie
import com.webank.wedatasphere.linkis.httpclient.authentication.{Authentication, AuthenticationResult, HttpAuthentication}
import com.webank.wedatasphere.linkis.httpclient.exception.HttpMessageParseException

import scala.collection.JavaConversions
import org.apache.http.HttpResponse
import org.apache.http.cookie.Cookie
import org.apache.http.util.EntityUtils

/**
* created by cooperyang on 2019/5/22.
*/
class DWSAuthenticationResult(response: Response, serverUrl: String) extends AuthenticationResult with DWSResult {
class DWSAuthenticationResult(response: HttpResponse, serverUrl: String) extends AuthenticationResult with DWSResult {

setResponse()

private def setResponse(): Unit = {
val entity = response.getEntity
val responseBody: String = if (entity != null) {
EntityUtils.toString(entity, "UTF-8")
} else {
null
}
val statusCode: Int = response.getStatusLine.getStatusCode
val url: String = serverUrl
val contentType: String = entity.getContentType.getValue
set(responseBody, statusCode, url, contentType)
}

set(response.getResponseBody, response.getStatusCode, response.getUri.toString, response.getContentType)
override def getAuthentication: Authentication = if(getStatus == 0) new HttpAuthentication {

override def getAuthentication: Authentication = if (getStatus == 0) new HttpAuthentication {
private var lastAccessTime: Long = System.currentTimeMillis
override def authToCookies: Array[Cookie] = JavaConversions.asScalaBuffer(response.getCookies).toArray

override def authToCookies: Array[Cookie] = Array.empty

override def authToHeaders: util.Map[String, String] = new util.HashMap[String, String]()

Expand All @@ -44,4 +59,5 @@ class DWSAuthenticationResult(response: Response, serverUrl: String) extends Aut
override def updateLastAccessTime(): Unit = lastAccessTime = System.currentTimeMillis
} else throw new HttpMessageParseException(s"login to gateway $serverUrl failed! Reason: " + getMessage)


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,33 @@ package com.webank.wedatasphere.linkis.httpclient.dws.response

import java.util

import com.ning.http.client.Response
import com.webank.wedatasphere.linkis.httpclient.discovery.HeartbeatResult
import org.apache.http.HttpResponse
import org.apache.http.util.EntityUtils

/**
* created by cooperyang on 2019/5/22.
*/
class DWSHeartbeatResult(response: Response, serverUrl: String) extends HeartbeatResult with DWSResult {
class DWSHeartbeatResult(response: HttpResponse, serverUrl: String) extends HeartbeatResult with DWSResult {

set(response.getResponseBody, response.getStatusCode, response.getUri.toString, response.getContentType)
if(getStatus != 0) warn(s"heartbeat to gateway $serverUrl failed! message: $getMessage.")

setResponse()

private def setResponse(): Unit = {
val entity = response.getEntity
val responseBody: String = if (entity != null) {
EntityUtils.toString(entity, "UTF-8")
} else {
null
}
val statusCode: Int = response.getStatusLine.getStatusCode
val url: String = serverUrl
val contentType: String = entity.getContentType.getValue
set(responseBody, statusCode, url, contentType)
}


if (getStatus != 0) warn(s"heartbeat to gateway $serverUrl failed! message: $getMessage.")
override val isHealthy: Boolean = getData.get("isHealthy") match {
case b: java.lang.Boolean => b
case s if s != null => s.toString.toBoolean
Expand Down
6 changes: 6 additions & 0 deletions gateway/gateway-ujes-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<version>2.1</version>
</dependency>

<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-cs-common</artifactId>
<version>${linkis.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package com.webank.wedatasphere.linkis.gateway.ujes.parser
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.gateway.http.GatewayContext
import com.webank.wedatasphere.linkis.gateway.parser.AbstractGatewayParser
import com.webank.wedatasphere.linkis.gateway.ujes.parser.EntranceExecutionGatewayParser._
import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
import com.webank.wedatasphere.linkis.protocol.utils.ZuulEntranceUtils
import org.springframework.stereotype.Component

import EntranceExecutionGatewayParser._

/**
* created by cooperyang on 2019/5/15.
*/
Expand Down