# Kafka Consumer, Processing, and Producer

#### This notebook uses Kafka to consume (read) a Kafka topic, classify IP Addresses that appear more than X times, and pushes (KafkaProducer) suspected IP addresses to a new Kafka topic for further processing.

In [1]:
from json import loads
from json import dumps
from kafka import KafkaConsumer
from kafka import KafkaProducer
import os
import re

## Create Producer and Consumer

### IMPORTANT:
- Verify and update if needed line 4 of the consumer topic

In [2]:
# Update line 4 with the Kafka Topic name

consumer = KafkaConsumer(
                        'ipTest9',
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',
                         enable_auto_commit=False,
                         group_id='my-group',
                         consumer_timeout_ms=60000,
                         value_deserializer=lambda x: loads(x.decode('utf-8')))

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

## Create temp lists & dictionary

In [4]:
ConsumedMessages = []
Message_Values = []
IP_Addresses = []
TempDict1 = dict()
AttackIP_1 = []

## Process Kafka Stream
 - Verify Producer Topic on line 18 - this will post suspected attack IP addresses to Kafka for further processing
 - Verify attack filter on line 15 - if an IP address appears more than X times, it will be classified as AttackIP
 - Prints local list of IP addresses classified as AttackIP

In [5]:

for message in consumer:
    ConsumedMessages.append(message)
    Message_Values.append(message.value)

    IP_Address = message.value[0]
    IP_Addresses.append(IP_Address)

   
    # Add to dictionary or add value to dictionary if entry already exists
    if IP_Address not in TempDict1:
        TempDict1[IP_Address] = 1
    else:
        TempDict1[IP_Address] = TempDict1[IP_Address] + 1
        if TempDict1[IP_Address] > 4 and IP_Address not in AttackIP_1:
            AttackIP_1.append(IP_Address)
            print(f'Identified as Attack IP: {IP_Address}')
            producer.send('attack9', value=IP_Address)

            recentAttack = IP_Address


Identified as Attack IP: 136.224.219.25
Identified as Attack IP: 244.174.48.40
Identified as Attack IP: 253.99.92.111
Identified as Attack IP: 214.221.36.5
Identified as Attack IP: 134.147.163.170
Identified as Attack IP: 155.240.72.201
Identified as Attack IP: 182.195.167.96
Identified as Attack IP: 71.88.52.74
Identified as Attack IP: 208.204.43.199
Identified as Attack IP: 101.225.118.41
Identified as Attack IP: 23.144.220.60
Identified as Attack IP: 222.109.217.68
Identified as Attack IP: 234.188.132.109
Identified as Attack IP: 39.7.230.102
Identified as Attack IP: 90.81.190.109
Identified as Attack IP: 246.157.250.39
Identified as Attack IP: 216.213.134.229
Identified as Attack IP: 153.255.46.223
Identified as Attack IP: 162.216.154.125
Identified as Attack IP: 197.59.89.173
Identified as Attack IP: 205.182.177.239
Identified as Attack IP: 39.163.133.234
Identified as Attack IP: 189.246.11.232
Identified as Attack IP: 164.192.24.244
Identified as Attack IP: 199.148.145.6
Identifi

Identified as Attack IP: 31.13.105.14
Identified as Attack IP: 67.200.64.139
Identified as Attack IP: 70.189.157.98
Identified as Attack IP: 126.227.5.115
Identified as Attack IP: 221.136.21.215
Identified as Attack IP: 172.64.134.155
Identified as Attack IP: 7.232.104.191
Identified as Attack IP: 170.227.46.244
Identified as Attack IP: 152.88.221.233
Identified as Attack IP: 113.109.155.183
Identified as Attack IP: 13.106.27.141
Identified as Attack IP: 39.200.111.163
Identified as Attack IP: 82.190.72.124
Identified as Attack IP: 6.103.214.67
Identified as Attack IP: 163.138.97.176
Identified as Attack IP: 198.40.46.233
Identified as Attack IP: 57.17.81.109
Identified as Attack IP: 81.24.204.186
Identified as Attack IP: 74.230.13.204
Identified as Attack IP: 153.84.62.37
Identified as Attack IP: 67.147.76.179
Identified as Attack IP: 30.32.93.173
Identified as Attack IP: 68.18.220.194
Identified as Attack IP: 184.57.81.20
Identified as Attack IP: 32.55.116.33
Identified as Attack IP:

## Sum of total attacks

In [7]:
Total_attacks = 0
for item in AttackIP_1:
    attacks = TempDict1[item]
    Total_attacks = Total_attacks + attacks

## Summary Stats

In [9]:
print('Summary')
print(f'Total messages processed:        {len(ConsumedMessages)}')
print(f'Total Message Values Collected:  {len(Message_Values)}')
print(f'Total IP Addresses Collected:    {len(IP_Addresses)}')
print(f'Total entries in Dictionary:     {len(TempDict1)}')
print(f'Total attack IP_Addresses:       {len(AttackIP_1)}')
print(f'Total number of attacks:         {Total_attacks}')

Summary
total messages processed:        163416
Total Message Values Collected:  163416
Total IP Addresses Collected:    163416
Total entries in Dictionary:     50983
Total attack IP_Addresses:       991
Total number of attacks:         87131
