diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..76760f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +/.idea +# sbt specific +target/ +# Scala-IDE specific +/.settings +/.project +/.classpath +/.cache-main +/.cache-tests diff --git a/README.md b/README.md new file mode 100644 index 0000000..16d2395 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +Kafka Connect FTP +================= + +Monitors files on an FTP server and feeds changes into Kafka. + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8a42acd --- /dev/null +++ b/pom.xml @@ -0,0 +1,230 @@ + + + 4.0.0 + + com.eneco.trading + kafka-connect-ftp + jar + 0.1 + kafka-connect-ftp + + Eneco + http://energytradeuk.eneco.nl + + http://energytradeuk.eneco.nl/ + + A Kafka Connect ftp source connector for copying data between ftp and Kafka. + + + + scm:git:git@github.com/Eneco/kafka-connect-ftp.git + https://github.com/Eneco/kafka-connect-ftp + scm:git:git@github.com/Eneco/kafka-connect-ftp.git + + + + + + rollulus + Roel Reijerse + Eneco + + + andrewstevenson + Andrew Stevenson + Datamountaineer + + + GodlyLudu + Henry Cheung + Eneco + + + dudebowski + Adriaan Mutter + Eneco + + + Chrizje + Christian De Jong + Eneco + + + + + 1.8 + 2.11.7 + 3.0.0 + 0.10.0.0 + 1.7.7 + 1.7.13 + 1.3.9 + 1.10.19 + 3.0.0-M1 + 19.0 + UTF-8 + http://packages.confluent.io/maven/ + https://repository.cloudera.com/artifactory/cloudera-repos + + + + + confluent + Confluent + ${confluent.maven.repo} + + + cdh.repo + Cloudera Repositories + ${cloudera.maven.repo} + + false + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + com.typesafe.scala-logging + scala-logging_2.11 + 3.4.0 + + + org.apache.kafka + connect-api + ${kafka.version} + + + org.scalamock + scalamock-scalatest-support_2.11 + 3.2.2 + + + org.mockito + mockito-all + ${mockito.version} + test + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + + + org.scalatest + scalatest_2.11 + ${scalatest.version} + test + + + + org.scala-lang + scala-library + + + + + org.apache.avro + avro + ${avro.version} + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + + + com.google.guava + guava + ${guava.version} + + + + + + + src/main/resources + true + + + src/test/resources + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.7 + + ${project.build.sourceEncoding} + + + + + copy-resources + testResources + + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + + test + + test + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + incremental + + + + + compile + add-source + testCompile + + + + + + maven-assembly-plugin + 2.4 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..8692b1e --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,7 @@ +# suppress inspection "UnusedProperty" for whole file +log4j.rootLogger=TRACE,stdout + +#stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.conversionPattern=%d{ISO8601} %-5p [%t] [%c] [%M:%L] %m%n \ No newline at end of file diff --git a/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/FtpSourceConnector.scala b/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/FtpSourceConnector.scala new file mode 100644 index 0000000..b25931a --- /dev/null +++ b/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/FtpSourceConnector.scala @@ -0,0 +1,53 @@ +package com.eneco.trading.kafka.connect.ftp.source + +import java.util + +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.ConfigDef.{Importance, Type} +import org.apache.kafka.connect.connector.Task +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.source.SourceConnector + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Try} + +object FtpSourceConfig { + val Address = "ftp.address" + val User = "ftp.user" + val Password = "ftp.password" + + val definition: ConfigDef = new ConfigDef() + .define(Address, Type.STRING,Importance.HIGH,"ftp address") + .define(User, Type.STRING,Importance.HIGH,"ftp user name to login") + .define(Password, Type.PASSWORD,Importance.HIGH,"ftp password to login") +} + +class FtpSourceConfig(props: util.Map[String, String]) + extends AbstractConfig(FtpSourceConfig.definition, props) + + +class FtpSourceConnector extends SourceConnector with Logging { + private var configProps : util.Map[String, String] = null + + override def taskClass(): Class[_ <: Task] = classOf[FtpSourceTask] + + override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = { + log.info(s"Setting task configurations for $maxTasks workers.") + (1 to maxTasks).map(_ => configProps).toList.asJava + } + + override def stop(): Unit = { + } + + override def start(props: util.Map[String, String]): Unit = { + configProps = props + Try(new FtpSourceConfig(props)) match { + case Failure(f) => throw new ConnectException("Couldn't start due to configuration error: " + f.getMessage, f) + case _ => + } + } + + override def version(): String = getClass.getPackage.getImplementationVersion + + override def config() = FtpSourceConfig.definition +} diff --git a/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/FtpSourceTask.scala b/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/FtpSourceTask.scala new file mode 100644 index 0000000..0fc09f2 --- /dev/null +++ b/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/FtpSourceTask.scala @@ -0,0 +1,24 @@ +package com.eneco.trading.kafka.connect.ftp.source + +import java.util + +import org.apache.kafka.connect.source.{SourceRecord, SourceTask} + +import scala.collection.JavaConverters._ + +class FtpSourceTask extends SourceTask with Logging { + + override def stop(): Unit = { + } + + override def start(map: util.Map[String, String]): Unit = { + } + + override def version(): String = getClass.getPackage.getImplementationVersion + + override def poll(): util.List[SourceRecord] = { + log.info("poll") + Seq[SourceRecord]().asJava + } +} + diff --git a/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/Logging.scala b/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/Logging.scala new file mode 100644 index 0000000..3a31ca2 --- /dev/null +++ b/src/main/scala/com/eneco/trading/kafka/connect/ftp/source/Logging.scala @@ -0,0 +1,8 @@ +package com.eneco.trading.kafka.connect.ftp.source + +import org.slf4j.LoggerFactory + +trait Logging { + val loggerName = this.getClass.getName + @transient lazy val log = LoggerFactory.getLogger(loggerName) +} \ No newline at end of file