# We have a repository with all the examples so you can play along at home:

### https://github.com/RedPillAnalytics/kafka-examples

# Starting our Docker Environment
### We even use Gradle to manage Docker using `com.avast.gradle.docker-compose` plugin:

In [None]:
!./gradlew --console=plain tasks --group docker -q
!./gradlew --console=plain -q clean composeUp

### Using Python to interact with KSQL. Let's see if `CLICKSTREAM`, `CLICKSTREAM_CODES` and `CLIICKSTREAM_USERS` are there:

In [None]:
ksql = "/usr/local/bin/ksql"
!echo "LIST TOPICS;" | "$ksql"

# The KSQL Developer Experience
### Create a *registration* stream `CLICKSTREAM`. This is sort of like DDL in relational DBs:

In [None]:
sql = (
    "CREATE STREAM clickstream "
    "(_time bigint, time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) "
    "with (kafka_topic = 'clickstream', value_format = 'json');"
)
!echo "$sql" | "$ksql"

### Create *persistent query* table `EVENTS_PER_MIN`, which reads data from `CLICKSTREAM` and initializes the streaming process:

In [None]:
sql = (
    "CREATE TABLE events_per_min "
    "AS SELECT userid, count(*) AS events "
    "FROM clickstream window "
    "TUMBLING (size 60 second) GROUP BY userid;"
)
!echo "$sql" | "$ksql"

### Let's list our topics:

In [None]:
!echo "LIST TOPICS;" | "$ksql"

### Suppose we want to make changes to `CLICKSTREAM`. Just drop it right?

In [None]:
sql = (
    "DROP STREAM clickstream;"
)
!echo "$sql" | "$ksql"

### Bummer. First we need to terminate the persistent query... but the *query id* is auto-incrementing, and not constant:

In [None]:
sql = (
    "TERMINATE QUERY CTAS_EVENTS_PER_MIN_0; "
)
!echo "$sql" | "$ksql"

### Now drop CLICKSTREAM again

In [None]:
sql = (
    "DROP STREAM clickstream; "
)
!echo "$sql" | "$ksql"

### We can do something better to increase developer efficiency!

# The `gradle-confluent` Plugin

### [GitHub Repository](https://github.com/RedPillAnalytics/gradle-confluent)

### [Gradle Plugin Portal](https://plugins.gradle.org/plugin/com.redpillanalytics.gradle-confluent)

### Our `build.gradle` file:

### Let's take a look at the Gradle tasks available by applying the plugin:

In [None]:
!./gradlew ksql:tasks --group confluent

### Let's see the *help* assocated with the `pipelineExecute` task:

In [None]:
!./gradlew help --task ksql:pipelineExecute

### Let's take a look at our [pipeline source code](https://github.com/RedPillAnalytics/kafka-examples/tree/master/ksql/src/main/pipeline).

### We can use `pipelineExecute` to execute a single pipeline. Notice the plugin auto-generates `TERMINATE` and `DROP` statements:

In [None]:
!./gradlew --console=plain ksql:pipelineExecute --pipeline-dir 01-clickstream

### Or multiple pipelines. We'll turn the logging up a bit. The `DROP` statements occur in reverse order:

In [None]:
!./gradlew --console=plain ksql:pipelineExecute -i

### The `pipelineExecute` task is great for developers. But in a real delivery pipeline, we want to build and publish artifacts. We are using mavenLocal() which defaults to `$HOME/.m2` as our maven repository location:

### Let's build and then peek inside the artifact:

In [None]:
!./gradlew --console=plain ksql:build ksql:publish
!zipinfo ~/.m2/repository/com/redpillanalytics/ksql-pipeline/1.0.0/ksql-pipeline-1.0.0.zip

### We build the `ksql-script.sql` script in case we want to use it as our queries file, with either option `--queries-file` or, with property `ksql.queries.file`. Notice we *normalize* the script, removing comments, line breaks, etc.:

In [None]:
!cat ksql/build/pipeline/ksql-script.sql

### We chose to deploy our artifacts using the KSQL REST API, which is similar to using the `pipelineExecute` command, except that it extracts and executes scripts in the artifact instead:

In [None]:
!./gradlew --console=plain ksql:deploy

### Again, with `-i` for more info:

In [None]:
!./gradlew --console=plain ksql:deploy -i

### We realized quickly that we needed to be able to granularly control some of our auto-generated KSQL. For instance... we wanted to control whether `DELETE TOPIC` was added to `DROP` statements. So we introduced *directives*:

### Directives are *smart comments* beginning with `--@`. So far we have only introduced `--@DeleteTopic`, but others are planned:

In [None]:
!./gradlew --console=plain ksql:pipelineExecute --pipeline-dir 01-clickstream -i

# KSQL User Defined Functions

### What we really needed was a `CASE` statement. KSQL doesn't have the [CASE](https://github.com/confluentinc/ksql/issues/620) yet, but it's coming.

### In the meantime, we wrote the `decode()` function, which was inspired by the Oracle `decode()` function that existed before `CASE`.

### `Decode` function:

```groovy
import groovy.util.logging.Slf4j
import io.confluent.ksql.function.udf.Udf
import io.confluent.ksql.function.udf.UdfDescription

@Slf4j
@UdfDescription(
        name = "decode",
        description = """Given up to 3 pairs of 'search' and 'text', return the first 'text' value where 'search' matches 'expression'. If no match, return 'defaultvalue'. 'ignorecase' defaults to 'false'.""")
class Decode {

   @Udf(description = """Given 1 pair of 'search' and 'text', return the first 'text' value where 'search' matches 'expression'. If no match, return 'defaultvalue'. 'ignorecase' defaults to 'false'.""")
   String decode(String expression, String search1, String text1, String defaultvalue, Boolean ignorecase = false) {

      // If any of the expected values are null, then just return null
      if (expression == null || search1 == null || text1 == null) return null

      return Utils.textMatch(expression, search1, ignorecase) ? text1 : defaultvalue
   }
    
   @Udf(description = """Given 2 pairs of 'search' and 'text', return the first 'text' value where 'search' matches 'expression'. If no match, return 'defaultvalue'. 'ignorecase' defaults to 'false'.""")
   String decode(String expression, String search1, String text1, String search2, String text2, String defaultvalue, Boolean ignorecase = false) {

      // If any of the expected values are null, then just return null
      if (expression == null || search1 == null || text1 == null || search2 == null || text2 == null) return null

      if (Utils.textMatch(expression, search1, ignorecase)) return text1

      else if (Utils.textMatch(expression, search2, ignorecase)) return text2

      else return defaultvalue
   }
    
   @Udf(description = """Given 3 pairs of 'search' and 'text', return the first 'text' value where 'search' matches 'expression'. If no match, return 'defaultvalue'. 'ignorecase' defaults to 'false'.""")
   String decode(String expression, String search1, String text1, String search2, String text2, String search3, String text3, String defaultvalue, Boolean ignorecase = false) {

      // If any of the expected values are null, then just return null
      if (expression == null || search1 == null || text1 == null || search2 == null || text2 == null || search3 == null || text3 == null) return null

      if (Utils.textMatch(expression, search1, ignorecase)) return text1

      else if (Utils.textMatch(expression, search2, ignorecase)) return text2

      else if (Utils.textMatch(expression, search3, ignorecase)) return text3

      else return defaultvalue
   }
}
```

### `DecodeTest` Spock test specification:

```groovy
import groovy.util.logging.Slf4j
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll

@Slf4j
class DecodeTest extends Specification {

   @Shared
   Decode decode = new Decode()

   @Unroll
   def "When: #expression, #search, #text, #defaultValue; Expect: #result"() {

      expect:
      decode.decode(expression, search, text, defaultValue) == result

      where:
      expression    | search        | text  | defaultValue || result
      'KSQL Rocks!' | 'ksql rocks!' | 'yes' | 'no'         || 'no'
      'KSQL Rocks!' | 'KSQL Rocks!' | 'yes' | 'no'         || 'yes'
   }

   @Unroll
   def "When: #expression, #search, #text, #defaultValue, #ignorecase; Expect: #result"() {

      expect:
      decode.decode(expression, search, text, defaultValue, ignorecase) == result

      where:
      expression    | search        | text  | defaultValue | ignorecase || result
      'KSQL Rocks!' | 'KSQL rocks!' | 'yes' | 'no'         | true       || 'yes'
      'KSQL Rocks!' | 'KSQL Rocks!' | 'yes' | 'no'         | true       || 'yes'
      'KSQL Rocks!' | 'KSQL rocks!' | 'yes' | 'no'         | false      || 'no'
   }

   @Unroll
   def "When: #expression, #search1, #text1, #search2, #text2, #defaultValue, #ignorecase; Expect: #result"() {

      expect:
      decode.decode(expression, search1, text1, search2, text2, defaultValue, ignorecase) == result

      where:
      expression    | search1       | text1 | search2       | text2 | defaultValue | ignorecase || result
      'KSQL Rocks!' | 'KSQL rocks!' | 'yes' | 'KSQL Sucks!' | 'no'  | 'no'         | true       || 'yes'
      'KSQL Rocks!' | 'KSQL rocks!' | 'no'  | 'KSQL Rocks!' | 'yes' | 'no'         | false      || 'yes'
      'KSQL Rocks!' | 'KSQL rocks!' | 'no'  | 'KSQL Sucks!' | 'no'  | 'yes'        | false      || 'yes'
   }

   @Unroll
   def "When: #expression, #search1, #text1, #search2, #text2, #search3, #text3, #defaultValue, #ignorecase; Expect: #result"() {

      expect:
      decode.decode(expression, search1, text1, search2, text2, search3, text3, defaultValue, ignorecase) == result

      where:
      expression    | search1       | text1 | search2       | text2 | search3       | text3 | defaultValue | ignorecase || result
      'KSQL Rocks!' | 'KSQL rocks!' | 'yes' | 'KSQL Sucks!' | 'no'  | 'KSQL, meh'   | 'no'  | 'no'         | true       || 'yes'
      'KSQL Rocks!' | 'KSQL rocks!' | 'no'  | 'KSQL Rocks!' | 'yes' | 'KSQL, meh'   | 'no'  | 'no'         | false      || 'yes'
      'KSQL, meh'   | 'KSQL rocks!' | 'no'  | 'KSQL Sucks!' | 'no'  | 'KSQL, meh'   | 'yes' | 'no'         | false      || 'yes'
      'KSQL, meh'   | 'KSQL rocks!' | 'no'  | 'KSQL Sucks!' | 'no'  | "What's KSQL" | 'no'  | 'yes'        | false      || 'yes'
      'KSQL, meh'   | 'ksql rocks!' | 'no'  | 'ksql, meh'   | 'yes' | "What's KSQL" | 'no'  | 'no'         | true       || 'yes'

   }
}
```

### The KSQL server does not resolve any of our library dependencies... not even the `io.confluent.ksql.function.udf` library that contains our KSQL annotations. So we need to compile a *fat* or *uber* JAR with all of our dependencies included. For this, we use the `shadow` plugin.

### Here is our `build.gradle` file for the `functions` subproject:

### Let's build and publish our UDF artifact, and see how many items are in it:

In [None]:
!./gradlew --console=plain functions:build functions:publish
!zipinfo -h ~/.m2/repository/com/redpillanalytics/functions/1.0.0/functions-1.0.0.jar

### We now copy our full JAR artifact, with all dependencies, to the `/etc/ksql-server/ext` directory, which is where our KSQL Server property `ksql.extension.dir` points to:

In [None]:
!/usr/local/bin/docker cp ~/.m2/repository/com/redpillanalytics/functions/1.0.0/functions-1.0.0.jar ksql-server:/etc/ksql-server/ext

### To speed up the demo, I copied the artifact to the server ahead of time... so no KSQL Server restart is required:

In [None]:
!echo "DESCRIBE FUNCTION DECODE;" | "$ksql"

In [None]:
sql = (
    "select definition, decode(definition, "
    "'Proxy authentication required','Bad', "
    "'Page not found','Bad', "
    "'Redirect','Good', "
    "'Unknown') label "
    "from enriched_error_codes limit 20;"
)
!echo "$sql" | "$ksql"

# Kafka Streams Microservices

### Each of our Kafka Streams processes was developed as a separate service, and it needs to build and deploy that way. As an example, we've borrowed the `WordCountLambdaExample` class from the `kafka-streams-examples` repository provided by Confluent:

```java
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;


public class WordCountLambdaExample {

  public static void main(final String[] args) throws Exception {
    final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
    final Properties streamsConfiguration = new Properties();
    // Give the Streams application a unique name.  The name must be unique in the Kafka cluster
    // against which the application is run.
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
    // Where to find Kafka broker(s).
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    // Specify default (de)serializers for record keys and for record values.
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    // Records should be flushed every 10 seconds. This is less than the default
    // in order to keep this example interactive.
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    // For illustrative purposes we disable record caches
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

    // Set up serializers and deserializers, which we will use for overriding the default serdes
    // specified above.
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();

    // In the subsequent lines we define the processing topology of the Streams application.
    final StreamsBuilder builder = new StreamsBuilder();

    // Construct a `KStream` from the input topic "streams-plaintext-input", where message values
    // represent lines of text (for the sake of this example, we ignore whatever may be stored
    // in the message keys).
    //
    // Note: We could also just call `builder.stream("streams-plaintext-input")` if we wanted to leverage
    // the default serdes specified in the Streams configuration above, because these defaults
    // match what's in the actual topic.  However we explicitly set the deserializers in the
    // call to `stream()` below in order to show how that's done, too.
    final KStream<String, String> textLines = builder.stream("clickstream");

    final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

    final KTable<String, Long> wordCounts = textLines
      // Split each text line, by whitespace, into words.  The text lines are the record
      // values, i.e. we can ignore whatever data is in the record keys and thus invoke
      // `flatMapValues()` instead of the more generic `flatMap()`.
      .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
      // Count the occurrences of each word (record key).
      //
      // This will change the stream type from `KStream<String, String>` to `KTable<String, Long>`
      // (word -> count).  In the `count` operation we must provide a name for the resulting KTable,
      // which will be used to name e.g. its associated state store and changelog topic.
      //
      // Note: no need to specify explicit serdes because the resulting key and value types match our default serde settings
      .groupBy((key, word) -> word)
      .count();

    // Write the `KTable<String, Long>` to the output topic.
    wordCounts.toStream().to("clickstream-wordcount", Produced.with(stringSerde, longSerde));

    // Now that we have finished the definition of the processing topology we can actually run
    // it via `start()`.  The Streams application as a whole can be launched just like any
    // normal Java application that has a `main()` method.
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to play around with the example
    // when resetting the application for doing a re-run (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
    // will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
    // is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    streams.cleanUp();
    streams.start();

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

}
```


### We used the Gradle `application` plugin. Here is our `build.gradle` file for building this service:

### The `application` plugin provides us the `run` task, which allows developers to test their application easily:

In [None]:
!./gradlew --console=plain streams:run -q

### Running Streams using Gradle directly from the Git repository is not an ideal solution for a production microservice. Thankfully, the Application plugin can create distribution artifacts complete with start scripts that can be packaged up as part of the build process:

In [None]:
!./gradlew --console=plain streams:build streams:publish

### We can now get our distribution artifact from the Maven repository, unzip it, and start the application. In our production environment, we used Jenkins processes to manage this side of it:

In [None]:
!rm -rf streams/build/deploy
!mkdir -p streams/build/deploy
!cp ~/.m2/repository/com/redpillanalytics/streams/1.0.0/streams-1.0.0.zip streams/build/deploy/streams-1.0.0.zip
!unzip streams/build/deploy/streams-1.0.0.zip -d streams/build/deploy
!./streams/build/deploy/wordcount-lambda-example-1.0.0/bin/wordcount-lambda-example

# One More Thing... Gradle Analytics

### We've also contributed the `gradle-analytics` plugin, which captures everything going on side of Gradle, and makes it available to numerous cloud analytics sources... and of course, Apache Kafka.

### https://github.com/RedPillAnalytics/gradle-analytics

### By default, the `producer` task compresses and cleans files once they've been sent. We aren't doing that...

In [None]:
!./gradlew --console=plain producer -Panalytics.compressFiles=false -Panalytics.cleanFiles=false

### Now let's see our new topics:

In [None]:
!echo "LIST TOPICS;" | "$ksql"

In [None]:
!/Users/stewartbryson/bin/kaf consume gradle_build

In [None]:
!/Users/stewartbryson/bin/kaf consume gradle_task

In [None]:
!/Users/stewartbryson/bin/kaf consume gradle_test

In [None]:
!/Users/stewartbryson/bin/kaf consume gradle_ksqlstatements

# Stopping our Docker environment

In [None]:
!./gradlew --console=plain -q composeDown
!/usr/local/bin/docker ps