Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
LearningJournal committed Jan 7, 2019
1 parent 103552a commit 3e12b70
Show file tree
Hide file tree
Showing 21 changed files with 4,440 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pos-fanout-avro/README.md
@@ -0,0 +1,5 @@
This directory contains Kafka Streams example.

The project uses avro-maven-plugin to generate classes.

Do not manually modify classes in types directory
131 changes: 131 additions & 0 deletions pos-fanout-avro/pom.xml
@@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2018. Prashant Kumar Pandey
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ You may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>guru.learningjournal.kafka.examples</groupId>
<artifactId>pos-fanout-avro</artifactId>
<version>2.0.0</version>

<properties>
<avro.version>1.8.1</avro.version>
<kafka.version>2.0.0</kafka.version>
<log4j.version>2.11.1</log4j.version>
<junit.version>5.3.1</junit.version>
<java.version>1.8</java.version>
<confluent.version>5.0.1</confluent.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<!-- Avro dependency-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<!-- Confluent Avro SerDes-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- Apache Kafka Clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Apache Kafka Streams-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Apache Log4J2 binding for SLF4J -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<!-- JUnit5 Jupiter -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
<!-- JUnit 5 Jupiter Engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
<!-- JUnit 5 Jupiter Parameterized Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<!-- Maven Avro plugin for generating pojo-->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>
1 change: 1 addition & 0 deletions pos-fanout-avro/scripts/start-hadoop-sink-consumer.cmd
@@ -0,0 +1 @@
C:\Users\prashant\Downloads\confluent-5.0.1\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hadoop-sink --from-beginning
1 change: 1 addition & 0 deletions pos-fanout-avro/scripts/start-kafka-server-0.cmd
@@ -0,0 +1 @@
C:\Users\prashant\Downloads\confluent-5.0.1\bin\windows\kafka-server-start.bat C:\Users\prashant\Downloads\confluent-5.0.1\etc\kafka\server-0.properties
1 change: 1 addition & 0 deletions pos-fanout-avro/scripts/start-kafka-server-1.cmd
@@ -0,0 +1 @@
C:\Users\prashant\Downloads\confluent-5.0.1\bin\windows\kafka-server-start.bat C:\Users\prashant\Downloads\confluent-5.0.1\etc\kafka\server-1.properties
1 change: 1 addition & 0 deletions pos-fanout-avro/scripts/start-kafka-server-2.cmd
@@ -0,0 +1 @@
C:\Users\prashant\Downloads\confluent-5.0.1\bin\windows\kafka-server-start.bat C:\Users\prashant\Downloads\confluent-5.0.1\etc\kafka\server-2.properties
1 change: 1 addition & 0 deletions pos-fanout-avro/scripts/start-schema-registry.cmd
@@ -0,0 +1 @@
C:\Users\prashant\Downloads\confluent-5.0.1\bin\windows\schema-registry-start.bat C:\Users\prashant\Downloads\confluent-5.0.1\etc\schema-registry\schema-registry.properties
1 change: 1 addition & 0 deletions pos-fanout-avro/scripts/start-zookeeper.cmd
@@ -0,0 +1 @@
C:\Users\prashant\Downloads\confluent-5.0.1\bin\windows\zookeeper-server-start.bat C:\Users\prashant\Downloads\confluent-5.0.1\etc\kafka\zookeeper.properties
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2019. Prashant Kumar Pandey
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package guru.learningjournal.kafka.examples;

/**
* Configurations for the PosFanOutApp
*
* @author prashant
* @author www.learningjournal.guru
*/

class FanOutConfigs {
final static String applicationID = "PosFanOutApp";
final static String bootstrapServers = "localhost:9092";
final static String posTopicName = "pos";
final static String shipmentTopicName = "shipment";
final static String notificationTopic = "loyalty";
final static String hadoopTopic = "hadoop-sink";
final static String DELIVERY_TYPE_HOME_DELIVERY = "HOME-DELIVERY";
final static String CUSTOMER_TYPE_PRIME = "PRIME";
final static Double LOYALTY_FACTOR = 0.02;
}
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2019. Prashant Kumar Pandey
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package guru.learningjournal.kafka.examples;

import guru.learningjournal.kafka.examples.types.HadoopRecord;
import guru.learningjournal.kafka.examples.types.Notification;
import guru.learningjournal.kafka.examples.types.PosInvoice;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;

/**
* Transform and FanOut Invoices to different topics for other services
*
* @author prashant
* @author www.learningjournal.guru
*/

@SuppressWarnings("unchecked")
public class PosFanOutApp {
private static final Logger logger = LogManager.getLogger();

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, FanOutConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, FanOutConfigs.bootstrapServers);

StreamsBuilder builder = new StreamsBuilder();
KStream KS0 = builder.stream(FanOutConfigs.posTopicName,
Consumed.with(PosSerdes.String(), PosSerdes.PosInvoice()));

//Requirement 1 - Produce to shipment
KStream KS1 = KS0.filter((Predicate<String, PosInvoice>) (key, value) ->
value.getDeliveryType().toString()
.equalsIgnoreCase(FanOutConfigs.DELIVERY_TYPE_HOME_DELIVERY));

KS1.to(FanOutConfigs.shipmentTopicName,
Produced.with(PosSerdes.String(), PosSerdes.PosInvoice()));

//Requirement 2 - Produce to loyaltyHadoopRecord
KStream KS3 = KS0.filter((Predicate<String, PosInvoice>) (key, value) ->
value.getCustomerType().toString()
.equalsIgnoreCase(FanOutConfigs.CUSTOMER_TYPE_PRIME));

KStream KS4 = KS3.mapValues((ValueMapper<PosInvoice, Notification>)
RecordBuilder::getNotification);

KS4.to(FanOutConfigs.notificationTopic,
Produced.with(PosSerdes.String(), PosSerdes.Notification()));

//Requirement 3 - Produce to Hadoop
KStream KS6 = KS0.mapValues((ValueMapper<PosInvoice, PosInvoice>)
RecordBuilder::getMaskedInvoice);

KStream KS7 = KS6.flatMapValues((ValueMapper<PosInvoice, Iterable<HadoopRecord>>)
RecordBuilder::getHadoopRecords);

KS7.to(FanOutConfigs.hadoopTopic,
Produced.with(PosSerdes.String(), PosSerdes.HadoopRecord()));

Topology posFanOutTopology = builder.build();

logger.info("Starting the following topology");
logger.info(posFanOutTopology.describe().toString());

KafkaStreams myStream = new KafkaStreams(posFanOutTopology, props);
myStream.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping Stream");
myStream.close();
}));
}
}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2019. Prashant Kumar Pandey
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package guru.learningjournal.kafka.examples;

import guru.learningjournal.kafka.examples.types.HadoopRecord;
import guru.learningjournal.kafka.examples.types.Notification;
import guru.learningjournal.kafka.examples.types.PosInvoice;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Collections;
import java.util.Map;

/**
* Extend Kafka Streams Serdes to add custom specific Avro Serdes
*
* @author prashant
* @author www.learningjournal.guru
*/
class PosSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Collections.singletonMap(
"schema.registry.url",
"http://localhost:8081");

static Serde<PosInvoice> PosInvoice() {
final Serde<PosInvoice> specificAvroSerde = new SpecificAvroSerde<>();
specificAvroSerde.configure(serdeConfig, false);
return specificAvroSerde;
}

static Serde<Notification> Notification() {
final Serde<Notification> specificAvroSerde = new SpecificAvroSerde<>();
specificAvroSerde.configure(serdeConfig, false);
return specificAvroSerde;
}

static Serde<HadoopRecord> HadoopRecord() {
final Serde<HadoopRecord> specificAvroSerde = new SpecificAvroSerde<>();
specificAvroSerde.configure(serdeConfig, false);
return specificAvroSerde;
}
}

0 comments on commit 3e12b70

Please sign in to comment.