# Flume & Kafka Streaming Pipeline

## Verify the Cluster
```zookeeper = ip-20-0-21-196.ec2.internal```

```brokers = ip-20-0-31-210.ec2.internal, ip-20-0-31-221.ec2.internal, ip-20-0-31-4.ec2.internal```

## Create a topic in Kafka
Create a topic in Kafka so that consumers and produces can enqueue/dequeue data respectively from the topic

```kafka-topics --create --zookeeper ip-20-0-21-196.ec2.internal:2181 --replication-factor 1 --partitions 1 --topic edureka_854312_assignment_9_1```

```kafka-topics --describe --zookeeper ip-20-0-21-196.ec2.internal --topic edureka_854312_assignment_9_1```

## Test Kafka Consumer
Write the test Kafka consumer and verify that data is sent successfully.

**Set-up Producer**

```kafka-console-producer --topic edureka_854312_assignment_9_1 --broker-list ip-20-0-31-210.ec2.internal:9092, ip-20-0-31-221.ec2.internal:9092, ip-20-0-31-221.ec2.internal:9092```

**Set-up Consumer**

```kafka-console-consumer --topic edureka_854312_assignment_9_1 --from-beginning --zookeeper ip-20-0-21-196.ec2.internal```

## Flume Configuration
Configure a flume agent to use Kafka as the channel and HDFS as the sink

**In FTP:** upload kafka-flume-hdfs.conf

**In webconsole:**

```hdfs dfs -mkdir Flume_kafka```

```hdfs dfs -put -f kafka-flume-hdfs.conf Flume_kafka/```

**Codes in kafka-flume-hdfs.conf**
```
edureka_854312_9_1.sources = kafka-source
edureka_854312_9_1.channels = memory-channel
edureka_854312_9_1.sinks = hdfs-sink

edureka_854312_9_1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
edureka_854312_9_1.sources.kafka-source.kafka.bootstrap.servers = ip-20-0-31-210.ec2.internal:9092, ip-20-0-31-221.ec2.internal:9092, ip-20-0-31-221.ec2.internal:9092
edureka_854312_9_1.sources.kafka-source.kafka.topics = edureka_854312_assignment_9_1
edureka_854312_9_1.sources.kafka-source.kafka.consumer.group.id = flume
edureka_854312_9_1.sources.kafka-source.interceptors = i1
edureka_854312_9_1.sources.kafka-source.interceptors.i1.type = timestamp
edureka_854312_9_1.sources.kafka-source.kafka.consumer.timeout.ms = 100
 
edureka_854312_9_1.channels.memory-channel.type = memory
edureka_854312_9_1.channels.memory-channel.capacity = 10000
edureka_854312_9_1.channels.memory-channel.transactionCapacity = 1000
 
edureka_854312_9_1.sinks.hdfs-sink.type = hdfs
edureka_854312_9_1.sinks.hdfs-sink.hdfs.path = hdfs://nameservice1/user/edureka_854312/Flume_kafka
edureka_854312_9_1.sinks.hdfs-sink.hdfs.rollInterval = 5
edureka_854312_9_1.sinks.hdfs-sink.hdfs.rollSize = 0
edureka_854312_9_1.sinks.hdfs-sink.hdfs.rollCount = 0
edureka_854312_9_1.sinks.hdfs-sink.hdfs.fileType = DataStream
 
edureka_854312_9_1.sources.kafka-source.channels = memory-channel
edureka_854312_9_1.sinks.hdfs-sink.channel = memory-channel
```

## Start Flume Agent
Start flume agent and test the output to HDFS

```flume-ng agent --conf conf --conf-file kafka-flume-hdfs.conf --name edureka_854312_9_1 -Dflume.root.logger=INFO,console```

## Test the complete pipeline

```kafka-console-producer --broker-list ip-20-0-31-210.ec2.internal:9092, ip-20-0-31-221.ec2.internal:9092, ip-20-0-31-221.ec2.internal:9092 --topic edureka_854312_assignment_9_1```