Skip to content

Commit

Permalink
Connect Skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
rollulus committed Jul 8, 2016
0 parents commit d36a218
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .gitignore
@@ -0,0 +1,9 @@
/.idea
# sbt specific
target/
# Scala-IDE specific
/.settings
/.project
/.classpath
/.cache-main
/.cache-tests
5 changes: 5 additions & 0 deletions README.md
@@ -0,0 +1,5 @@
Kafka Connect FTP
=================

Monitors files on an FTP server and feeds changes into Kafka.

230 changes: 230 additions & 0 deletions pom.xml
@@ -0,0 +1,230 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.eneco.trading</groupId>
<artifactId>kafka-connect-ftp</artifactId>
<packaging>jar</packaging>
<version>0.1</version>
<name>kafka-connect-ftp</name>
<organization>
<name>Eneco</name>
<url>http://energytradeuk.eneco.nl</url>
</organization>
<url>http://energytradeuk.eneco.nl/</url>
<description>
A Kafka Connect ftp source connector for copying data between ftp and Kafka.
</description>

<scm>
<connection>scm:git:git@github.com/Eneco/kafka-connect-ftp.git</connection>
<url>https://github.com/Eneco/kafka-connect-ftp</url>
<developerConnection>scm:git:git@github.com/Eneco/kafka-connect-ftp.git
</developerConnection>
</scm>

<developers>
<developer>
<id>rollulus</id>
<name>Roel Reijerse</name>
<organization>Eneco</organization>
</developer>
<developer>
<id>andrewstevenson</id>
<name>Andrew Stevenson</name>
<organization>Datamountaineer</organization>
</developer>
<developer>
<id>GodlyLudu</id>
<name>Henry Cheung</name>
<organization>Eneco</organization>
</developer>
<developer>
<id>dudebowski</id>
<name>Adriaan Mutter</name>
<organization>Eneco</organization>
</developer>
<developer>
<id>Chrizje</id>
<name>Christian De Jong</name>
<organization>Eneco</organization>
</developer>
</developers>

<properties>
<java.version>1.8</java.version>
<scala.version>2.11.7</scala.version>
<confluent.version>3.0.0</confluent.version>
<kafka.version>0.10.0.0</kafka.version>
<avro.version>1.7.7</avro.version>
<slf4j.version>1.7.13</slf4j.version>
<jsr305.version>1.3.9</jsr305.version>
<mockito.version>1.10.19</mockito.version>
<scalatest.version>3.0.0-M1</scalatest.version>
<guava.version>19.0</guava.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
<cloudera.maven.repo>https://repository.cloudera.com/artifactory/cloudera-repos</cloudera.maven.repo>
</properties>

<repositories>
<repository>
<id>confluent</id>
<name>Confluent</name>
<url>${confluent.maven.repo}</url>
</repository>
<repository>
<id>cdh.repo</id>
<name>Cloudera Repositories</name>
<url>${cloudera.maven.repo}</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.11</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.scalamock</groupId>
<artifactId>scalamock-scalatest-support_2.11</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<!-- make sure wrong scala version is not pulled in -->
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.7</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>copy-resources</goal>
<goal>testResources</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

</build>
</project>
7 changes: 7 additions & 0 deletions src/main/resources/log4j.properties
@@ -0,0 +1,7 @@
# suppress inspection "UnusedProperty" for whole file
log4j.rootLogger=TRACE,stdout

#stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.conversionPattern=%d{ISO8601} %-5p [%t] [%c] [%M:%L] %m%n
@@ -0,0 +1,53 @@
package com.eneco.trading.kafka.connect.ftp.source

import java.util

import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.source.SourceConnector

import scala.collection.JavaConverters._
import scala.util.{Failure, Try}

object FtpSourceConfig {
val Address = "ftp.address"
val User = "ftp.user"
val Password = "ftp.password"

val definition: ConfigDef = new ConfigDef()
.define(Address, Type.STRING,Importance.HIGH,"ftp address")
.define(User, Type.STRING,Importance.HIGH,"ftp user name to login")
.define(Password, Type.PASSWORD,Importance.HIGH,"ftp password to login")
}

class FtpSourceConfig(props: util.Map[String, String])
extends AbstractConfig(FtpSourceConfig.definition, props)


class FtpSourceConnector extends SourceConnector with Logging {
private var configProps : util.Map[String, String] = null

override def taskClass(): Class[_ <: Task] = classOf[FtpSourceTask]

override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
log.info(s"Setting task configurations for $maxTasks workers.")
(1 to maxTasks).map(_ => configProps).toList.asJava
}

override def stop(): Unit = {
}

override def start(props: util.Map[String, String]): Unit = {
configProps = props
Try(new FtpSourceConfig(props)) match {
case Failure(f) => throw new ConnectException("Couldn't start due to configuration error: " + f.getMessage, f)
case _ =>
}
}

override def version(): String = getClass.getPackage.getImplementationVersion

override def config() = FtpSourceConfig.definition
}
@@ -0,0 +1,24 @@
package com.eneco.trading.kafka.connect.ftp.source

import java.util

import org.apache.kafka.connect.source.{SourceRecord, SourceTask}

import scala.collection.JavaConverters._

class FtpSourceTask extends SourceTask with Logging {

override def stop(): Unit = {
}

override def start(map: util.Map[String, String]): Unit = {
}

override def version(): String = getClass.getPackage.getImplementationVersion

override def poll(): util.List[SourceRecord] = {
log.info("poll")
Seq[SourceRecord]().asJava
}
}

@@ -0,0 +1,8 @@
package com.eneco.trading.kafka.connect.ftp.source

import org.slf4j.LoggerFactory

trait Logging {
val loggerName = this.getClass.getName
@transient lazy val log = LoggerFactory.getLogger(loggerName)
}

0 comments on commit d36a218

Please sign in to comment.