Skip to content

Commit

Permalink
Support Rsync to transfer clickhouse data file (#2074)
Browse files Browse the repository at this point in the history
* Support Rsync to transfer clickhouse data file

* replace rsync4j with buildin package scala.sys.process

* resolve scalastyle problem
  • Loading branch information
Emor-nj committed Jun 29, 2022
1 parent 8737474 commit 858c5a8
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.seatunnel.spark.clickhouse.Config.{CLICKHOUSE_LOCAL_PATH, COPY_METHOD, DATABASE, FIELDS, HOST, NODE_ADDRESS, NODE_FREE_PASSWORD, NODE_PASS, PASSWORD, SHARDING_KEY, TABLE, TMP_BATCH_CACHE_LINE, USERNAME}
import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse._
import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, LOGGER, UUID_LENGTH, getClickhouseTableInfo}
import org.apache.seatunnel.spark.clickhouse.sink.Table
import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, ScpFileTransfer}
import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, RsyncFileTransfer, ScpFileTransfer}
import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.TransferMethod.{RSYNC, SCP, TransferMethod, getCopyMethod}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.{Dataset, Encoders, Row}
Expand Down Expand Up @@ -165,21 +164,25 @@ class ClickhouseFile extends SparkBatchSink {
private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {

var fileTransfer: FileTransfer = null
if (this.copyFileMethod == SCP) {
var scpFileTransfer: ScpFileTransfer = null
if (nodePass.contains(shard.hostAddress)) {
scpFileTransfer = new ScpFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
} else {
scpFileTransfer = new ScpFileTransfer(shard.hostAddress)
this.copyFileMethod match {
case SCP => {
if (freePass || !nodePass.contains(shard.hostAddress)) {
fileTransfer = new ScpFileTransfer(shard.hostAddress)
} else {
fileTransfer = new ScpFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
}
}
scpFileTransfer.init()
fileTransfer = scpFileTransfer
} else if (this.copyFileMethod == RSYNC) {
throw new UnsupportedOperationException(s"not support copy file method: '$copyFileMethod' yet")
} else {
throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
case RSYNC => {
if (freePass || !nodePass.contains(shard.hostAddress)) {
fileTransfer = new RsyncFileTransfer(shard.hostAddress)
} else {
fileTransfer = new RsyncFileTransfer(shard.hostAddress, nodePass(shard.hostAddress))
}
}
case _ => throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
s"scp/rsync instead")
}
fileTransfer.init()
fileTransfer.transferAndChown(paths, s"${this.table.getLocalDataPath(shard).head}detached/")

fileTransfer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,76 @@

package org.apache.seatunnel.spark.clickhouse.sink.filetransfer

object RsyncFileTransfer extends FileTransfer {
import org.apache.sshd.client.SshClient
import org.apache.sshd.client.session.ClientSession
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.sys.process.Process

class RsyncFileTransfer(host: String) extends FileTransfer {

private val LOGGER = LoggerFactory.getLogger(classOf[RsyncFileTransfer])
var password: String = _

def this(host: String, password: String) {
this(host)
this.password = password
}

private var session: ClientSession = _
private var client: SshClient = _

override def transferAndChown(sourcePath: String, targetPath: String): Unit = {
throw new UnsupportedOperationException("not support rsync file transfer yet")

try {
// we use sshpass to support non-interactive password authentication
val sshParameter = if (password!=null) s"sshpass -p $password ssh -o StrictHostKeyChecking=no -p 22" else "ssh -o StrictHostKeyChecking=no -p 22"
val exec = mutable.ListBuffer[String]()
exec.append("rsync")
// recursive
exec.append("-r")
// compress during transfer file with -z
exec.append("-z")
// use ssh protocol with -e
exec.append("-e")
exec.append(sshParameter)
exec.append(sourcePath)
exec.append(s"root@$host:$targetPath")
val command = Process(exec)
LOGGER.info(command.lineStream.mkString("\n"))
// remote exec command to change file owner. Only file owner equal with server's clickhouse user can
// make ATTACH command work.
session.executeRemoteCommand("ls -l " + targetPath.substring(0, targetPath.stripSuffix("/").lastIndexOf("/")) +
"/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath)
} catch {
case e: Exception =>
// always return error cause xargs return shell command result
}
}

override def init(): Unit = {
throw new UnsupportedOperationException("not support rsync file transfer yet")
client = SshClient.setUpDefaultClient()
client.start()
session = client.connect("root", this.host, 22).verify().getSession
if (password != null) {
session.addPasswordIdentity(this.password)
}
val isSuccess = session.auth.verify.isSuccess
if (!isSuccess) {
throw new IllegalArgumentException(s"ssh host '$host' verify failed, please check your config")
}
}

override def transferAndChown(sourcePath: List[String], targetPath: String): Unit = {
throw new UnsupportedOperationException("not support rsync file transfer yet")
sourcePath.foreach(s => {
transferAndChown(s, targetPath)
})
}

override def close(): Unit = {
throw new UnsupportedOperationException("not support rsync file transfer yet")
if (session != null && session.isOpen) {
session.close()
}
}
}

0 comments on commit 858c5a8

Please sign in to comment.