kafka data ingestion

In [15]:
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
import json
import time

# Function to initialize Kafka producer
def create_kafka_producer(bootstrap_servers):
    return Producer({'bootstrap.servers': bootstrap_servers})

# Function to send data to Kafka
def send_to_kafka(producer, topic, key, value):
    producer.produce(topic, key=key, value=value)
    producer.flush()  # Make sure to flush after sending

# Function to process and send indicators in batches
def process_and_send_to_kafka(indicators, producer, topic, batch_size=100):
    for i in range(0, len(indicators), batch_size):
        batch = indicators[i:i + batch_size]
        serialized_batch = json.dumps(batch)  # Serialize batch to JSON
        print(f"Sending batch {i // batch_size + 1} to Kafka...")
        send_to_kafka(producer, topic, key=None, value=serialized_batch)

# Function to initialize Kafka consumer
def create_kafka_consumer(bootstrap_servers, group_id):
    return Consumer({
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    })

# Function to consume a limited number of messages from Kafka
def consume_from_kafka(consumer, topic, message_count=10):
    consumer.subscribe([topic])
    messages = []
    try:
        for _ in range(message_count):
            msg = consumer.poll(timeout=1.0)  # 1 second timeout
            if msg is None:
                continue  # No message, just continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"End of partition reached {msg.topic()} {msg.partition()} offset {msg.offset()}")
                else:
                    raise KafkaException(msg.error())
            else:
                messages.append(msg.value().decode('utf-8'))  # Store the message content
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
    return messages

# Main execution function for running in a notebook
def main():
    # Kafka Configuration
    KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'  # Confluent Kafka broker address
    KAFKA_TOPIC = 'indicators-stream'  # Correct topic name where messages should be sent
    KAFKA_GROUP_ID = 'test-consumer-group'  # Consumer group ID

    # Create Kafka producer
    producer = create_kafka_producer(KAFKA_BOOTSTRAP_SERVERS)

    # Example indicator data (for testing)
    indicators = [{'id': i, 'value': f'Indicator {i}'} for i in range(500)]  # Simulate 500 indicators

    # Process and send indicators to Kafka (only sending a chunk of 100 for testing in notebook)
    CHUNK_SIZE = 100  # Number of indicators per batch
    limited_indicators = indicators[:CHUNK_SIZE]
    print(f"Processing and sending the first {len(limited_indicators)} indicators to Kafka...")

    process_and_send_to_kafka(limited_indicators, producer, KAFKA_TOPIC)

    # Create Kafka consumer to consume messages
    consumer = create_kafka_consumer(KAFKA_BOOTSTRAP_SERVERS, KAFKA_GROUP_ID)

    # Consume a limited number of messages from Kafka
    print("Consuming messages from Kafka...")
    messages = consume_from_kafka(consumer, KAFKA_TOPIC, message_count=10)

    # Print the consumed messages
    for msg in messages:
        print(f"Consumed message: {msg}")


In [16]:
# Run the main function in the notebook
main()

Processing and sending the first 100 indicators to Kafka...
Sending batch 1 to Kafka...
Consuming messages from Kafka...
Consumed message: [{"id": 0, "value": "Indicator 0"}, {"id": 1, "value": "Indicator 1"}, {"id": 2, "value": "Indicator 2"}, {"id": 3, "value": "Indicator 3"}, {"id": 4, "value": "Indicator 4"}, {"id": 5, "value": "Indicator 5"}, {"id": 6, "value": "Indicator 6"}, {"id": 7, "value": "Indicator 7"}, {"id": 8, "value": "Indicator 8"}, {"id": 9, "value": "Indicator 9"}, {"id": 10, "value": "Indicator 10"}, {"id": 11, "value": "Indicator 11"}, {"id": 12, "value": "Indicator 12"}, {"id": 13, "value": "Indicator 13"}, {"id": 14, "value": "Indicator 14"}, {"id": 15, "value": "Indicator 15"}, {"id": 16, "value": "Indicator 16"}, {"id": 17, "value": "Indicator 17"}, {"id": 18, "value": "Indicator 18"}, {"id": 19, "value": "Indicator 19"}, {"id": 20, "value": "Indicator 20"}, {"id": 21, "value": "Indicator 21"}, {"id": 22, "value": "Indicator 22"}, {"id": 23, "value": "Indicato

In [27]:
import subprocess

# Define the Logstash pipeline configuration for Pipeline 1
logstash_config_pipeline_1 = """
input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["indicators-stream"]
    group_id => "top-10-target-country-consumer"
    codec => "json"
  }
}

filter {
  if [details][indicators][ip][country_name] {
    mutate {
      add_field => { "target_country" => "%{[details][indicators][ip][country_name]}" }
    }
    mutate {
      add_field => { "count" => 1 }
    }
  } else {
    drop { }
  }
}

aggregate {
  task_id => "%{target_country}"
  code => "
    map['count'] ||= 0
    map['count'] += event.get('count').to_i
  "
  push_previous_map_as_event => true
  timeout => 60
}

ruby {
  code => "
    event.set('sorted_countries', event.get('aggregate_maps').values.sort_by { |entry| -entry['count'] }.first(10))
  "
}

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "top-target-countries"
    codec => "json"
  }
  stdout {
    codec => rubydebug
  }
}
"""

# Define the Logstash pipeline configuration for Pipeline 2
logstash_config_pipeline_2 = """
input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["indicators-stream"]
    group_id => "top-10-threat-source-consumer"
    codec => "json"
  }
}

filter {
  if [details][indicators][source][country_name] {
    mutate {
      add_field => { "source_country" => "%{[details][indicators][source][country_name]}" }
    }
    mutate {
      add_field => { "count" => 1 }
    }
  } else {
    drop { }
  }
}

aggregate {
  task_id => "%{source_country}"
  code => "
    map['count'] ||= 0
    map['count'] += event.get('count').to_i
  "
  push_previous_map_as_event => true
  timeout => 60
}

ruby {
  code => "
    event.set('sorted_source_countries', event.get('aggregate_maps').values.sort_by { |entry| -entry['count'] }.first(10))
  "
}

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "top-threat-source-countries"
    codec => "json"
  }
  stdout {
    codec => rubydebug
  }
}
"""

# Define the Logstash pipeline configuration for Pipeline 3
logstash_config_pipeline_3 = """
input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["indicators-stream"]
    group_id => "detect-target-changes-consumer"
    codec => "json"
  }
}

filter {
  if [details][indicators][ip][country_name] {
    mutate {
      add_field => { "target_country" => "%{[details][indicators][ip][country_name]}" }
    }
  }

  # Check for changes in country/region and flag them
  if ([target_country] != [previous_target_country]) {
    mutate {
      add_field => { "country_change" => "true" }
    }
  }

  mutate {
    add_field => { "previous_target_country" => "%{target_country}" }
  }
}

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "target-country-change-report"
    codec => "json"
  }
  stdout {
    codec => rubydebug
  }
}
"""

# Define the Logstash pipeline configuration for Pipeline 4
logstash_config_pipeline_4 = """
input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["indicators-stream"]
    group_id => "detect-threat-source-changes-consumer"
    codec => "json"
  }
}

filter {
  if [details][indicators][source][country_name] {
    mutate {
      add_field => { "source_country" => "%{[details][indicators][source][country_name]}" }
    }
  }

  # Check for changes in source country/region and flag them
  if ([source_country] != [previous_source_country]) {
    mutate {
      add_field => { "source_change" => "true" }
    }
  }

  mutate {
    add_field => { "previous_source_country" => "%{source_country}" }
  }
}

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "threat-source-country-change-report"
    codec => "json"
  }
  stdout {
    codec => rubydebug
  }
}
"""

# File paths where we will save the configuration files
logstash_config_path_1 = r"C:/Users/I745988/logstash-8.16.1/bin/pipeline_1.conf"

logstash_config_path_2 = r"C:/Users/I745988/logstash-8.16.1/bin/pipeline_2.conf"
logstash_config_path_3 = r"C:/Users/I745988/logstash-8.16.1/bin/pipeline_3.conf"
logstash_config_path_4 = r"C:/Users/I745988/logstash-8.16.1/bin/pipeline_4.conf"

# Write the configuration contents to the respective files
with open(logstash_config_path_1, 'w', encoding='utf-8') as f:
    f.write(logstash_config_pipeline_1)
print("Pipeline 1 configuration written successfully.")

with open(logstash_config_path_2, 'w', encoding='utf-8') as f:
    f.write(logstash_config_pipeline_2)
print("Pipeline 2 configuration written successfully.")

with open(logstash_config_path_3, 'w', encoding='utf-8') as f:
    f.write(logstash_config_pipeline_3)
print("Pipeline 3 configuration written successfully.")

with open(logstash_config_path_4, 'w', encoding='utf-8') as f:
    f.write(logstash_config_pipeline_4)
print("Pipeline 4 configuration written successfully.")

Pipeline 1 configuration written successfully.
Pipeline 2 configuration written successfully.
Pipeline 3 configuration written successfully.
Pipeline 4 configuration written successfully.


In [None]:
import subprocess

# Define the Logstash executable path and the paths to the config files
logstash_bin_path = "C:/Users/I745988/logstash-8.16.1/bin/logstash.bat"  # Ensure this points to the correct Logstash executable
logstash_config_paths = [
    "C:\\Users\\I745988\\logstash-8.16.1\\bin\\pipeline_1.conf",
]

# Run Logstash for each configuration file
for config_path in logstash_config_paths:
    # Ensure the config path is not empty and valid
    if not config_path or not config_path.strip():
        print(f"Invalid configuration file path: {config_path}")
        continue

    command = [logstash_bin_path, "-f", config_path, "--config.test_and_exit"]
    print(f"Running Logstash with config: {config_path}")
    print(f"Command to be executed: {' '.join(command)}")  # Log the full command

    try:
        # Run the Logstash command and capture its output
        result = subprocess.run(command, capture_output=True, text=True)

        # Print standard output and error
        print(f"Standard Output:\n{result.stdout}")
        print(f"Standard Error:\n{result.stderr}")

        # Check if there was an error (non-zero exit code)
        if result.returncode != 0:
            print(f"Logstash command failed with exit code {result.returncode}")
            raise RuntimeError(f"Logstash command failed with exit code {result.returncode}")
        else:
            print("Logstash command executed successfully.")

    except subprocess.CalledProcessError as e:
        print(f"Error running Logstash: {e}")
        raise RuntimeError(f"command '{e.cmd}' returned with error (code {e.returncode}): {e.output}")


Running Logstash with config: C:\Users\I745988\logstash-8.16.1\bin\pipeline_1.conf
Command to be executed: C:/Users/I745988/logstash-8.16.1/bin/logstash.bat -f C:\Users\I745988\logstash-8.16.1\bin\pipeline_1.conf


In [None]:
"""import os
import subprocess

# Ensure JAVA_HOME is set
os.environ['JAVA_HOME'] = "C:\\Program Files\\SapMachine\\JDK\\17"  # Update with your JDK path
java_path = os.path.join(os.environ['JAVA_HOME'], 'bin', 'java.exe')

# Check if Java is accessible
if not os.path.exists(java_path):
    raise FileNotFoundError(f"Java not found at {java_path}. Please check your JAVA_HOME setting.")"""

In [None]:


# Define the Logstash executable path and config files
logstash_bin_path = "C:/Program Files/logstash-8.16.1/bin/benchmark.bat"
logstash_config_paths = [
    "C:/Users/I745988/Downloads/logstash-8.16.1-windows-x86_64/logstash-8.16.1/pipeline_1.conf",  # Update with actual paths
]

# Run Logstash for each configuration file
for config_path in logstash_config_paths:
    command = [logstash_bin_path, "-f", config_path]
    print(f"Running Logstash with config: {config_path}")
    
    try:
        # Run the Logstash command
        result = subprocess.run(command, capture_output=True, text=True)
        
        # Print output
        print(f"Standard Output:\n{result.stdout}")
        print(f"Standard Error:\n{result.stderr}")
        
        # Handle non-zero exit codes
        if result.returncode != 0:
            print(f"Logstash command failed with exit code {result.returncode}")
            raise RuntimeError(f"Logstash command failed with exit code {result.returncode}")
        else:
            print("Logstash command executed successfully.")
    
    except subprocess.CalledProcessError as e:
        print(f"Error running Logstash: {e}")
        raise RuntimeError(f"Command '{e.cmd}' returned with error (code {e.returncode}): {e.output}")
    except Exception as e:
        print(f"Unexpected error: {e}")


Running Logstash with config: C:/Users/I745988/Downloads/logstash-8.16.1-windows-x86_64/logstash-8.16.1/pipeline_1.conf
Standard Output:
"Using system java: "C:\Program Files\SapMachine\JDK\17\bin\java.exe""

Standard Error:
The system cannot find the path specified.
could not find java; set JAVA_HOME or ensure java is in PATH 

Logstash command failed with exit code 1
Unexpected error: Logstash command failed with exit code 1
