# SPCS Networking Connectivity Test: Apache Kafka

**Note: This Notebook should be run in an SPCS Container for testing to be valid**

## Purpose

This notebook tests SPCS networking connectivity to Kafka clusters in preparation for configuring Snowflake Openflow Kafka connectors:

- **[Openflow Connector for Kafka](https://docs.snowflake.com/en/user-guide/data-integration/openflow/connectors/kafka/about)** - Ingests real-time events from Kafka topics into Snowflake tables using Snowpipe Streaming
- **[Openflow Connector for Snowflake to Kafka](https://docs.snowflake.com/en/user-guide/data-integration/openflow/connectors/snowflake-to-kafka/about)** - Replicates Snowflake tables to Kafka using CDC for real-time insights distribution

Both connectors require External Access Integration (EAI) configuration to enable network connectivity from SPCS to your Kafka brokers. This notebook validates that connectivity before deploying Openflow connectors.

## Supported Platforms

Works with any Kafka distribution including:
- **Apache Kafka** (self-hosted)
- **AWS MSK** (Amazon's managed Apache Kafka service)
- **Confluent Cloud** and **Confluent Platform**
- **Redpanda** and **Redpanda Cloud**
- Any other Kafka-compatible platform

## Steps

1. Configure your Kafka bootstrap server URL and authentication details
2. **(Optional)** Set up PyPI access if confluent-kafka library needs to be installed
3. Install the Confluent Kafka Python client library
4. Run the connectivity test to verify network access
5. If tests fail, create and attach the Kafka External Access Integration (EAI)
6. Restart the notebook session and retest
7. Once successful, proceed with Openflow connector configuration


## Step 1: Configure Kafka Connection Settings

Update the configuration below with your actual Kafka cluster details.

In [None]:
# Kafka Connectivity Test Configuration
# Update these values with your actual Kafka cluster details

# ============================================================================
# KAFKA BOOTSTRAP SERVER CONFIGURATION
# ============================================================================
KAFKA_BOOTSTRAP_SERVERS = "your-kafka-broker.example.com:9092"

# ============================================================================
# AUTHENTICATION CONFIGURATION
# ============================================================================
KAFKA_SASL_USERNAME = "your-username-or-api-key"
KAFKA_SASL_PASSWORD = "your-password-or-api-secret"

# SASL Mechanism
# - Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (Kerberos)
KAFKA_SASL_MECHANISM = "SCRAM-SHA-512"

# Security Protocol
# - Most production clusters: "SASL_SSL" (SASL over TLS/SSL)
# - Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
KAFKA_SECURITY_PROTOCOL = "SASL_SSL"

# ============================================================================
# SNOWFLAKE ROLE CONFIGURATION
# ============================================================================
# This role will be used to create the EAI and other objects if necessary
IMPLEMENTATION_ROLE = "ACCOUNTADMIN"
OPENFLOW_RUNTIME_ROLE = "OPENFLOWRUNTIMEROLE"

# ============================================================================
# AUTO-EXTRACT CONFIGURATION FOR NETWORK RULES
# ============================================================================
import re

# Extract hostname and port from bootstrap servers
bootstrap_parts = KAFKA_BOOTSTRAP_SERVERS.split(',')[0].strip()
match = re.match(r'([^:]+):(\d+)', bootstrap_parts)
if match:
    KAFKA_HOST = match.group(1)
    KAFKA_PORT = match.group(2)
else:
    KAFKA_HOST = bootstrap_parts
    KAFKA_PORT = "9092"

# Extract domain for wildcard rule
host_parts = KAFKA_HOST.split('.')
if len(host_parts) >= 2:
    KAFKA_DOMAIN = '.'.join(host_parts[-2:])
else:
    KAFKA_DOMAIN = KAFKA_HOST

print("=" * 70)
print("KAFKA CONFIGURATION SUMMARY")
print("=" * 70)
print(f"Bootstrap Server(s): {KAFKA_BOOTSTRAP_SERVERS}")
print(f"SASL Mechanism: {KAFKA_SASL_MECHANISM}")
print(f"Security Protocol: {KAFKA_SECURITY_PROTOCOL}")
print(f"\nNetwork Rule Configuration:")
print(f"  Primary Host: {KAFKA_HOST}")
print(f"  Primary Port: {KAFKA_PORT}")
print(f"  Domain: {KAFKA_DOMAIN}")
print("=" * 70)
print("\n✓ Configuration loaded. Ready to test connectivity...")


## Step 2a: PyPI Setup (Optional)

Run these cells if you need to install the confluent-kafka library from PyPI. This creates the necessary network rules and External Access Integration for PyPI access.

**Skip this section if you already have confluent-kafka installed or have PyPI access configured.**


In [None]:
-- Create Network Rule and External Access Integration for PyPI
-- Run this cell to enable installing Python packages from PyPI

USE ROLE {{IMPLEMENTATION_ROLE}};

CREATE OR REPLACE NETWORK RULE pypi_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pypi.org', 'pypi.python.org', 'pythonhosted.org', 'files.pythonhosted.org');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pypi_access_integration
  ALLOWED_NETWORK_RULES = (pypi_network_rule)
  ENABLED = true
  COMMENT = 'External Access Integration for PyPI package installation';

-- Grant usage on the integration
GRANT USAGE ON INTEGRATION pypi_access_integration TO ROLE {{IMPLEMENTATION_ROLE}};

SHOW EXTERNAL ACCESS INTEGRATIONS LIKE 'pypi_access_integration';

In [None]:
-- Apply PyPI integration to this notebook
-- Run this after creating the PyPI integration above

ALTER NOTEBOOK EAI_KAFKA
  SET EXTERNAL_ACCESS_INTEGRATIONS = ('pypi_access_integration');

-- Restart your Notebook session after applying an EAI

## Step 2b: Install Confluent Kafka Client Library

Make sure PyPI access is configured first if you get connection errors.
You can run this cell twice; the first to install the library, the second to confirm it is imported.


In [None]:
# Install the Confluent Kafka Python client library
# Make sure PyPI access is configured first if you get connection errors
# You can run this cell twice; the first to install the library, the second to confirm it is imported

try:
    from confluent_kafka import Producer
    print("✅ confluent-kafka already available")
except ImportError:
    print("📦 Installing confluent-kafka...")
    %pip install confluent-kafka
    print("✅ confluent-kafka installed")


## Step 3: Connectivity Tests

Run these test cells to verify network connectivity and authentication to your Kafka cluster.
If any tests fail, use the EAI setup cells in Step 4 to configure network access, then restart and retest.


In [None]:
### Test 3a: Socket Connectivity

# Test basic network connectivity to the Kafka broker
import socket

print("=" * 60)
print("TEST 3a: SOCKET CONNECTIVITY")
print("=" * 60)
print(f"\nTesting connection to {KAFKA_HOST}:{KAFKA_PORT}...")

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(10)
    result = sock.connect_ex((KAFKA_HOST, int(KAFKA_PORT)))
    sock.close()
    
    if result == 0:
        print(f"✅ SUCCESS: Socket connection established")
        print(f"   Network access to Kafka broker is working")
    else:
        print(f"❌ FAILED: Socket connection failed (error code: {result})")
        print(f"   Action: Configure EAI in Step 4 below")
        
except socket.gaierror as e:
    print(f"❌ FAILED: DNS resolution failed")
    print(f"   Error: {e}")
    print(f"   This typically means the network rule is not configured or EAI is not attached")
    print(f"   Action: Configure EAI in Step 4 below")
    
except socket.timeout:
    print(f"❌ FAILED: Connection timeout")
    print(f"   Action: Verify firewall rules and EAI configuration")
    
except Exception as e:
    print(f"❌ FAILED: Socket error")
    print(f"   Error: {e}")

print("=" * 60)

In [None]:
### Test 3b: Kafka Producer & Metadata

# Test Kafka client connection and fetch cluster metadata
from confluent_kafka import Producer, KafkaException

print("=" * 60)
print("TEST 3b: KAFKA PRODUCER & METADATA")
print("=" * 60)
print(f"\nConnecting to Kafka cluster...")

try:
    # Create producer configuration
    producer_conf = {
        'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
        'security.protocol': KAFKA_SECURITY_PROTOCOL,
        'sasl.mechanism': KAFKA_SASL_MECHANISM,
        'sasl.username': KAFKA_SASL_USERNAME,
        'sasl.password': KAFKA_SASL_PASSWORD,
    }
    
    # Create producer instance
    producer = Producer(producer_conf)
    
    # Fetch cluster metadata to verify connection
    print(f"  Fetching cluster metadata (timeout: 10s)...")
    metadata = producer.list_topics(timeout=10)
    
    if metadata and metadata.brokers:
        print(f"\n✅ SUCCESS: Connected to Kafka cluster")
        print(f"   Cluster ID: {metadata.cluster_id if hasattr(metadata, 'cluster_id') else 'N/A'}")
        print(f"   Number of brokers: {len(metadata.brokers)}")
        print(f"   Number of topics: {len(metadata.topics)}")
    else:
        print(f"\n❌ FAILED: No broker information received")
        print(f"   Action: Verify network connectivity and broker configuration")
        
except KafkaException as e:
    print(f"\n❌ FAILED: Kafka error")
    print(f"   Error: {e}")
    print(f"   Action: Verify credentials and SASL configuration")
    
except Exception as e:
    print(f"\n❌ FAILED: Unexpected error")
    print(f"   Error: {e}")
    print(f"   Action: Check configuration and network access")

print("=" * 60)


## Step 4: EAI Setup (If connectivity tests failed)

If connectivity testing fails, you can use the cells below to prepare and implement an EAI suitable for Kafka access.


In [None]:
-- Create Network Rule for Kafka connectivity
-- Run this cell if connectivity tests failed

-- Wildcard for all brokers in the same domain
-- Examples: *.confluent.cloud, *.amazonaws.com, *.redpanda.com, *.example.com

USE ROLE {{IMPLEMENTATION_ROLE}};

CREATE OR REPLACE NETWORK RULE kafka_access_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = (
    -- Specific Kafka bootstrap server
    '{{ KAFKA_HOST }}:{{ KAFKA_PORT }}'
  )
  COMMENT = 'Network rule for Kafka broker access';

SHOW NETWORK RULES LIKE 'kafka_%';

In [None]:
-- Create External Access Integration for Kafka
-- This uses the network rule created above

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION kafka_eai
  ALLOWED_NETWORK_RULES = (kafka_access_rule)
  ENABLED = TRUE
  COMMENT = 'External Access Integration for Kafka connectivity';

-- Grant usage on the integration to your roles
GRANT USAGE ON INTEGRATION kafka_eai TO ROLE {{IMPLEMENTATION_ROLE}};
GRANT USAGE ON INTEGRATION kafka_eai TO ROLE {{OPENFLOW_RUNTIME_ROLE}};

SHOW EXTERNAL ACCESS INTEGRATIONS LIKE 'kafka_eai';

In [None]:
-- Attach the EAI to this notebook
ALTER NOTEBOOK EAI_KAFKA
  SET EXTERNAL_ACCESS_INTEGRATIONS = ('kafka_eai');

## Step 5: Restart and Retest

After creating and setting the EAI on the Notebook:
1. **Restart your Notebook session** (this is required for the EAI to take effect)
2. Re-run the configuration cell (Step 1)
3. Re-run the connectivity test (Step 3)

The tests should now pass if the EAI was configured correctly.


## Step 6: Test Kafka Consumer (Optional)

This optional step allows you to test consuming messages from a Kafka topic. This is useful for:
- Verifying that you can read messages from existing topics
- Testing end-to-end connectivity with consumer operations
- Validating that your credentials have read permissions

The consumer uses the same configuration defined in Step 1 and will consume a configurable number of messages from a specified topic.

In [None]:
# Test Kafka Consumer - Read messages from a topic
import json
from typing import Dict, Any
from confluent_kafka import Consumer, KafkaException, KafkaError


def deserialize_json_record(json_bytes: bytes) -> Dict[str, Any]:
    """Deserialize JSON bytes back to a record."""
    return json.loads(json_bytes.decode('utf-8'))


# ============================================================================
# CONSUMER CONFIGURATION
# ============================================================================
KAFKA_TOPIC = "your-topic-name" # Update with your actual topic name
NUM_MESSAGES = 10 # Number of messages to consume
CONSUMER_GROUP_ID = "eai-kafka-notebook" # Consumer group ID

print("=" * 70)
print("KAFKA CONSUMER TEST")
print("=" * 70)
print(f"Topic: {KAFKA_TOPIC}")
print(f"Messages to consume: {NUM_MESSAGES}")
print(f"Consumer Group: {CONSUMER_GROUP_ID}")
print("=" * 70)

try:
    # Create consumer configuration
    consumer_conf = {
        'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
        'security.protocol': KAFKA_SECURITY_PROTOCOL,
        'sasl.mechanism': KAFKA_SASL_MECHANISM,
        'sasl.username': KAFKA_SASL_USERNAME,
        'sasl.password': KAFKA_SASL_PASSWORD,
        'group.id': CONSUMER_GROUP_ID,
        'client.id': CONSUMER_GROUP_ID,
        'auto.offset.reset': 'earliest',  # Start from earliest message if no offset exists
        'enable.auto.commit': True,
    }
    
    # Create consumer instance
    consumer = Consumer(consumer_conf)

    # Subscribe to topic
    print(f"\n📡 Subscribing to topic '{KAFKA_TOPIC}'...")
    consumer.subscribe([KAFKA_TOPIC])
    
    # Consume messages in batch
    print(f"📥 Consuming up to {NUM_MESSAGES} messages...\n")
    
    # consume() returns a list of messages (or empty list if none available)
    messages = consumer.consume(num_messages=NUM_MESSAGES, timeout=60.0)
    
    if not messages:
        print("⏱️  No messages available (timeout reached)")
    else:
        # Process each message in the batch
        for idx, msg in enumerate(messages, start=1):
            if msg.error():
                print(f"❌ Consumer error: {msg.error()}")
                continue
                
            # Display message details
            print(f"Message {idx}:")
            print(f"  Topic: {msg.topic()}")
            print(f"  Partition: {msg.partition()}")
            print(f"  Offset: {msg.offset()}")
            print(f"  Key: {msg.key().decode('utf-8') if msg.key() else None}")
            print(f"  Value: {msg.value().decode('utf-8') if msg.value() else None}")
            print(f"  Timestamp: {msg.timestamp()[1] if msg.timestamp()[0] != -1 else 'N/A'}")
            print("-" * 70)
    
    # Close consumer
    consumer.close()
    
    print(f"\n✅ SUCCESS: Consumed {len(messages)} message(s) from topic '{KAFKA_TOPIC}'")
    
except KafkaException as e:
    print(f"\n❌ FAILED: Kafka error")
    print(f"   Error: {e}")
    print(f"   Action: Verify topic exists, credentials have read permissions, and network connectivity")
    
except Exception as e:
    print(f"\n❌ FAILED: Unexpected error")
    print(f"   Error: {e}")
    print(f"   Action: Check configuration and network access")

print("=" * 70)