Skip to content

Commit

Permalink
Get rid of ValidatedOrders topic. After validation, write directly to…
Browse files Browse the repository at this point in the history
… Orders. Otherwise the service in Lab2 won't be able to see results
  • Loading branch information
gwenshap committed Nov 2, 2018
1 parent 7f5ff6d commit 0d6511c
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 109 deletions.
24 changes: 24 additions & 0 deletions .gitignore
@@ -0,0 +1,24 @@
.DS_Store
venv
__pycache__
*.pyc
*ipynb*
*cache*
.idea
*.csr
*.srl
*.req
*.log
*.jar
Untitled*
docs/_build
docs/README.md
target
.idea
*.iml
local.make
java/.classpath
java/.project
java/.settings/
.vscode/*

70 changes: 56 additions & 14 deletions README.md
Expand Up @@ -9,29 +9,71 @@ This workshop will include:
- Implement a webservice that submits orders and uses "read-own-writes" pattern to show order status.
- Use Streams-to-Table join to send status updates by email.


## Preparation!

## Make sure you have the following:
- OSX or Linux
- Java 8
- Java development environment (Intellij recommended)
- Maven

### Please create cloud clusters for the workshop!

1. Login to https://confluent.cloud with username and password you received by email from QCon.
2. Create a new cluster! call it “lastname-firstname”, make sure you
use the smallest options, create it on Google Cloud Platform and use
us-centeral1 region. You'll be shown the cost for the cluster. Don't
worry about it - Confluent has been kind enough to sponsor the
clusters and you will not be charged.
3. Follow the on-screen instructions closely to install Confluent
Cloud CLI and initialize the CLI.
4. If you ran into issues, try to refer to the documentation here:
https://docs.confluent.io/current/quickstart/cloud-quickstart.html.
5. If you managed to create a cluster, follow Step 3 of the Quickstart
(https://docs.confluent.io/current/quickstart/cloud-quickstart.html)
to create a topic and produce/consume some events.

### In addition:
1. Please clone this repository
2. Run `mvn dependency:resolve` to get dependencies in advance. This will help with the network in the class.


## Outline

1. Introduction to Kafka, topics and events:
1. *Lesson 0:* Introduction to Kafka, topics and events:
- What is Stream Processing?
- Key concepts of Kafka Brokers, Connectors and Streams
2. *Lab 0:*
- Create a cluster in Confluent Cloud
- Install the Confluent Cloud CLI
- Create topic
- Produce events to topic
- Consume events from topic
2. Simple event validation:
- Create topic, produce and consume events from Cloud CLI
3. *Lesson 1:* Simple event validation:
- Overview of our Architecture
- "Hipster Stream Processing"
- Producer and Consumer APIs
- Configuring Kafka Clients to Connect to Cloud
3.
4. *Lab 1:*
- `git checkout lab1`
- Validate events and produce results to Kafka: Implement the TODO part of the OrderValidationService. Note that `validate()` method implements the specific validation rules.
- Test the simple validation service
- How would you add multiple validators?
5. *Lesson 2:* CQRS-based web-service with Kafka Streams
- Introduction to Kafka Streams
- Introducing local materialized views
- Interactive queries in Kafka Streams
- Code review of web-service
6. *Lab 2:*
- `git checkout lab2`
- Run and experiment with web service
- Add "check for validation" endpoint: Implement the TODO part of the OrderService. Note that we already have a simple GET endpoint that gets any order, we need a new endpoint that only returns validated orders.
- How would you scale to multiple instances?
7. *Lesson 3* (if there is time): Simple email service
- Stream-table join enrichment pattern
- Global KTables
8. *Lab 3* (if there is time):
- `git checkout lab3`
- Implement Stream-table join: Implement the two functions required for the join (marked as TODO)
- Test email service using the CustomerProducer and OrderProducer clases


## Requirements
- OSX or Linux
- Java 8
- Java development environment (Intellij recommended)
- Maven
- Maven dependencies (Fetch with "mvn dependency:resolve")
- Step 1 of outline (Create cluster and test connectivity)

13 changes: 13 additions & 0 deletions pom.xml
Expand Up @@ -49,4 +49,17 @@ place log4j.jar on your class path. -->
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
33 changes: 17 additions & 16 deletions src/main/java/io/confluent/qcon/orders/OrderDetailsService.java
Expand Up @@ -2,30 +2,23 @@

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.confluent.qcon.orders.domain.Order;
import io.confluent.qcon.orders.domain.OrderState;
import io.confluent.qcon.orders.domain.OrderValidation;
import io.confluent.qcon.orders.domain.OrderValidationResult;
import io.confluent.qcon.orders.domain.OrderValidationType;
import io.confluent.qcon.orders.domain.Schemas;
import io.confluent.qcon.orders.utils.LoadConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +41,7 @@ public class OrderDetailsService implements Service {
private final String CONSUMER_GROUP_ID = getClass().getSimpleName();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private KafkaConsumer<String, Order> consumer;
private KafkaProducer<String, OrderValidation> producer;
private KafkaProducer<String, Order> producer;
private boolean running;

public void start(String configFile, String stateDir) {
Expand Down Expand Up @@ -77,7 +70,9 @@ private void startService(String configFile) throws IOException {
for (ConsumerRecord<String, Order> record : records) {
Order order = record.value();
if (order.getState() == OrderState.CREATED) {
//TODO: Validate the order then send the result
// TODO: Validate the order (using validate())
// TODO: create a ProducerRecord from the order and result (see record())
// TODO: then produce the result to Kafka using the existing producer
}
}
}
Expand Down Expand Up @@ -107,12 +102,18 @@ private OrderValidationResult validate(Order order) {
return OrderValidationResult.PASS;
}

private ProducerRecord<String, OrderValidation> record(String orderId,
private ProducerRecord<String, Order> record(Order order,
OrderValidationResult result) {
return new ProducerRecord<String, OrderValidation>(
Schemas.Topics.ORDER_VALIDATIONS.name(),
orderId,
new OrderValidation(orderId, OrderValidationType.DETAILS, result));

if (result.equals(OrderValidationResult.PASS))
order.setState(OrderState.VALIDATED);
else
order.setState(OrderState.FAILED);

return new ProducerRecord<String, Order>(
Schemas.Topics.ORDERS.name(),
order.getId(),
order);
}

private void startProducer(String configFile) throws IOException {
Expand All @@ -122,8 +123,8 @@ private void startProducer(String configFile) throws IOException {
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "order-details-service-producer");

producer = new KafkaProducer<>(producerConfig,
Schemas.Topics.ORDER_VALIDATIONS.keySerde().serializer(),
Schemas.Topics.ORDER_VALIDATIONS.valueSerde().serializer());
Schemas.Topics.ORDERS.keySerde().serializer(),
Schemas.Topics.ORDERS.valueSerde().serializer());
}

private void startConsumer(String configFile) throws IOException {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/confluent/qcon/orders/domain/Order.java
Expand Up @@ -23,6 +23,10 @@ public OrderState getState() {
return state;
}

public void setState(OrderState state) {
this.state = state;
}

public String getCustomerId() {
return customerId;
}
Expand Down
24 changes: 0 additions & 24 deletions src/main/java/io/confluent/qcon/orders/domain/OrderValidation.java

This file was deleted.

This file was deleted.

8 changes: 0 additions & 8 deletions src/main/java/io/confluent/qcon/orders/domain/Schemas.java
Expand Up @@ -54,16 +54,8 @@ public OrderSerde() {
}
}

static public final class OrderValidationSerde extends Serdes.WrapperSerde<OrderValidation> {
public OrderValidationSerde() {
super(new JsonSerializer<OrderValidation>(), new JsonDeserializer<OrderValidation>(OrderValidation.class));
}
}

public static Topic<String, Order> ORDERS =
new Topic<String, Order>("orders", Serdes.String(), new OrderSerde());

public static Topic<String, OrderValidation> ORDER_VALIDATIONS =
new Topic<String, OrderValidation>("order-validations", Serdes.String(), new OrderValidationSerde());
}
}

This file was deleted.

0 comments on commit 0d6511c

Please sign in to comment.