Skip to content
(eventually) spark, akka, cassandra, kafka PoC repository
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
api/src/main/scala/com/ferhtaydn/http
core/src
csv/src/main
examples/src/main/scala/com/ferhtaydn
project
.gitignore
LICENSE
README.md
build.sbt
scalastyle-config.xml

README.md

SACK - SMACK without Mesos

This project is a kind of PoC to develop simple flows with (eventually) Spark, Akka, Cassandra, and Kafka.

Here is the default components of the repo;

  • Confluent Platform stack
  • scala-kafka-client for kafka interaction
  • akka-http for api endpoint
  • circe for decoding
  • shapeless for csv to model conversion
  • avro4s and sbt-avro4s for avro schema

A step-by-step multi node installation manual can be found at this guide.

There are multiple projects under the root.

  • In api project, there are rest endpoints to take products inside.
    • An http layer to post products to http topic that is also consumed by cassandra sink connector to the products table.
    • For the large json products content, you can encode the content with Content-Transfer-Encoding header set to gzip.
java -jar api/target/scala-2.11/api.jar

or

$ sbt project api
$ sbt runMain com.ferhtaydn.http.WebServer
  • In csv project, a simple flow of messages processing is simulated. Consuming a topic, some validation, and producing to the another topic.
    • simple csv file is consumed by Kafka FileStreamSource connector to a raw topic,
    • each line is tried to be converted to a Product class,
    • successful records are consumed by cassandra sink connector to the products table,
    • invalid lines are stored in a failure topic to be able to investigate later
java -jar csv/target/scala-2.11/csv.jar
  • In examples project, there are binary and avro formatted messages used with scala-kafka-client.

  • Common code for all other projects is placed under the core project.

Step-by-step guide in local

$ cd confluent-3.0.1

$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

$ ./bin/kafka-server-start ./etc/kafka/server.properties

$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

$ cat /tmp/connect-file-source.properties
    
    name=product-csv-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/tmp/products.csv
    topic=product-csv-raw

$ ./bin/kafka-topics --list --zookeeper localhost:2181

$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic product-csv-avro --from-beginning

$ ~/workspace/datamountaineer/stream-reactor/kafka-connect-cassandra/build/libs(branch:master) » export CLASSPATH=kafka-connect-cassandra-0.2-3.0.1-all.jar
$ ~/workspace/datamountaineer/stream-reactor/kafka-connect-cassandra/build/libs(branch:master) » ~/workspace/confluent/confluent-3.0.1/bin/connect-distributed /tmp/connect-distributed.properties
$ ~/workspace/datamountaineer/kafka-connect-tools/build/libs(branch:master) » java -jar kafka-connect-cli-0.6-all.jar create product-csv-source < /tmp/connect-file-source.properties

$ ~/workspace/confluent/confluent-3.0.1 » ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic product-csv-raw --from-beginning

$ apache-cassandra-3.9 » ./bin/cqlsh
$ CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
$ use demo;
$ cqlsh:demo>
        create table products (
         brand varchar, 
         supplierId varchar, 
         productType varchar, 
         gender varchar, 
         ageGroup varchar, 
         category varchar, 
         productFeature varchar, 
         productCode varchar,
         webProductDesc varchar, 
         productDesc varchar, 
         supplierColor varchar, 
         colorFeature varchar, 
         barcode varchar, 
         supplierSize varchar, 
         dsmSize varchar, 
         stockUnit varchar, 
         ftStockQuantity int, 
         ftPurchasePriceVatInc double, 
         psfVatInc double, 
         tsfVatInc double, 
         vatRate double, 
         material varchar, 
         composition varchar,
         productionPlace varchar, 
         productWeightKg double,
         productionContentWriting varchar,
         productDetail varchar,
         sampleSize varchar,
         modelSize varchar,
         supplierProductCode varchar,
         project varchar,
         theme varchar,
         trendLevel varchar,
         designer varchar,
         imageUrl varchar,
         PRIMARY KEY (barcode));

$ cat cassandra-sink-distributed-products.properties 

    name=cassandra-sink-products
    connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
    tasks.max=1
    topics=product-csv-avro,product-http-avro
    connect.cassandra.export.route.query=INSERT INTO products SELECT * FROM product-csv-avro;INSERT INTO products SELECT * FROM product-http-avro
    connect.cassandra.contact.points=localhost
    connect.cassandra.port=9042
    connect.cassandra.key.space=demo
    connect.cassandra.username=cassandra
    connect.cassandra.password=cassandra


$ ~/workspace/datamountaineer/kafka-connect-tools/build/libs(branch:master) » java -jar kafka-connect-cli-0.6-all.jar create cassandra-sink-products < /tmp/cassandra-sink-distributed-products.properties

$ sbt project csv
$ sbt runMain com.ferhtaydn.csv.RawToAvroGenericProcessorBoot

$ ~/workspace/confluent/confluent-3.0.1 » ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic product-csv-avro --from-beginning

$ cqlsh:demo> select * from products;

You can’t perform that action at this time.