Skip to content

Commit

Permalink
[KYUUBI #1094] [BUGFIX #1068]Support upload file to HDFS
Browse files Browse the repository at this point in the history
#1068
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1094 from zhang1002/branch-1.3_support-upload-file-to-hdfs.

Closes #1094

e96794e [张宇翔] Support upload file to hdfs
ff23774 [张宇翔] Support the eventLog upload to hdfs

Authored-by: 张宇翔 <zhang1002@126.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
zhang1002 authored and ulysses-you committed Sep 13, 2021
1 parent 6a183f9 commit 9b1fa5e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ kyuubi\.engine<br>\.deregister\.exception<br>\.classes|<div style='width: 65pt;w
kyuubi\.engine<br>\.deregister\.exception<br>\.messages|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.deregister\.exception<br>\.ttl|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>file:/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger.<ul><li>Local Path: start with 'file:'</li><li>HDFS Path: start with 'hdfs:'</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark.events

import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
import java.net.URI
import java.nio.file.Paths

import scala.collection.mutable.HashMap

Expand Down Expand Up @@ -83,7 +82,7 @@ class JsonEventLogger(logName: String, hadoopConf: Configuration)
}

override def initialize(conf: KyuubiConf): Unit = synchronized {
logRoot = Paths.get(conf.get(ENGINE_EVENT_JSON_LOG_PATH)).toAbsolutePath.toUri
logRoot = URI.create(conf.get(ENGINE_EVENT_JSON_LOG_PATH))
fs = FileSystem.get(logRoot, hadoopConf)
requireLogRootWritable()
super.initialize(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.kyuubi.engine.spark.events

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.io.{BufferedReader, InputStreamReader}
import java.nio.file.Paths

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.apache.hive.service.rpc.thrift.TExecuteStatementReq
import org.scalatest.time.SpanSugar._

Expand All @@ -31,24 +33,27 @@ import org.apache.kyuubi.operation.{JDBCTestUtils, OperationHandle}
class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
import EventLoggerType._

private val logRoot = Utils.createTempDir()
private val logRoot = "file:" + Utils.createTempDir().toString
private val currentDate = Utils.getDateFromTimestamp(System.currentTimeMillis())

override def withKyuubiConf: Map[String, String] = Map(
KyuubiConf.ENGINE_EVENT_LOGGERS.key -> s"$JSON,$SPARK",
KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH.key -> logRoot.toString,
KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH.key -> logRoot,
"spark.eventLog.enabled" -> "true",
"spark.eventLog.dir" -> logRoot.toString
"spark.eventLog.dir" -> logRoot
)

override protected def jdbcUrl: String = getJdbcUrl

test("round-trip for event logging service") {
val engineEventPath = Paths.get(
logRoot.toString, "engine", s"day=$currentDate", KyuubiSparkUtil.engineId + ".json")
logRoot, "engine", s"day=$currentDate", KyuubiSparkUtil.engineId + ".json")
val sessionEventPath = Paths.get(
logRoot.toString, "session", s"day=$currentDate", KyuubiSparkUtil.engineId + ".json")
val engineEventReader = Files.newBufferedReader(engineEventPath, StandardCharsets.UTF_8)
logRoot, "session", s"day=$currentDate", KyuubiSparkUtil.engineId + ".json")
// val engineEventReader = Files.newBufferedReader(engineEventPath, StandardCharsets.UTF_8)
val fileSystem: FileSystem = FileSystem.get(new Configuration())
val fs: FSDataInputStream = fileSystem.open(new Path(engineEventPath.toString))
val engineEventReader = new BufferedReader(new InputStreamReader(fs))

val readEvent = JsonProtocol.jsonToEvent(engineEventReader.readLine())
assert(readEvent.isInstanceOf[KyuubiEvent])
Expand Down Expand Up @@ -92,7 +97,7 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {

test("statementEvent: generate, dump and query") {
val statementEventPath = Paths.get(
logRoot.toString, "statement", s"day=$currentDate", engine.engineId + ".json")
logRoot, "statement", s"day=$currentDate", engine.engineId + ".json")
val sql = "select timestamp'2021-06-01'"
withSessionHandle { (client, handle) =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,10 +684,12 @@ object KyuubiConf {

val ENGINE_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
buildConf("engine.event.json.log.path")
.doc("The location of all the engine events go for the builtin JSON logger")
.doc("The location of all the engine events go for the builtin JSON logger.<ul>" +
"<li>Local Path: start with 'file:'</li>" +
"<li>HDFS Path: start with 'hdfs:'</li></ul>")
.version("1.3.0")
.stringConf
.createWithDefault("/tmp/kyuubi/events")
.createWithDefault("file:/tmp/kyuubi/events")

val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("engine.event.loggers")
Expand Down

0 comments on commit 9b1fa5e

Please sign in to comment.