Skip to content

HerveRiviere/demo-kafka-connect-streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commits
 
 
 
 
 
 
 
 

Repository files navigation

#Demo Kafka Connect & Kafka Streams

##Prerequisites :

  • Kafka 0.10+. You can download a tech preview confluent platform here
  • A running kafka cluster (one node in localhost is ok !)
  • A running postgres database

##Create the postgres table

CREATE TABLE message(date TIMESTAMP NOT NULL,id SERIAL PRIMARY KEY, username VARCHAR(100), message TEXT);

##Kafka Connect

###Start Kafka connect cluster

  • In kafka-connect/start-distributed-connect.sh, replace by the path you installed Kafka 0.10
  • Review the configration specified in kafka-connect/conf/connect-distributed.properties.1 and kafka-connect/conf/connect-distributed.properties.2
  • Launch kafka-connect/start-distributed-connect.sh
  • Check log trace in kafka-connect/log
  • Check HTTP response to localhost:8085 and localhost:8086

###JDBC connector registration

Insert into postgres and check topic table-message

  • Open a console-consumer
$KAFKA_HOME/bin/kafka-console-consumer --zookeeper localhost:2181 --topic table-message
  • Insert a record into postgres message table
insert into message(date,username,message,city) values (now(),'Herve','Hello kafka User Group','Paris');
  • Record must appears in the console-consumer

##Kafka Stream

  • Import the maven project into IntelliJ/ Eclipse...

###Insert into postgres and check topic table-message-kafka

  • Execute the main class DemoKafkaStreams.java
  • Open a console-consumer
$KAFKA_HOME/bin/kafka-console-consumer --zookeeper localhost:2181 --topic table-message-kafka
  • Insert records in message postgre table. If the message contains "kafka user group" it must appears in table-message-kafka topic.
  • To check the KTable, insert the same message two times. The username must appears in table-message-kafka topic.

###Check kafka-user-group-example-kafka_user_count-changelog topic

  • If you want to check the persistence topic of the Ktable, execute :
$KAFKA_HOME/bin/kafka-console-consumer --zookeeper localhost:2181 --topic kafka-user-group-example-kafka_user_count-changelog    \
--formatter kafka.tools.DefaultMessageFormatter           --property print.key=true           \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer    \       
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
--from-beginning

About

Demo of Kafka Connect and Kafka Streams used during my talk in the 2015 May 17, Paris Kafka User Group

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published