In [1]:
import pandas as pd
import numpy as np
import json
import time
import io
from confluent_kafka import Producer
import ipywidgets as widgets
from IPython.display import display, clear_output
import warnings
warnings.filterwarnings('ignore')

In [2]:
import pandas as pd
import numpy as np
import json
import time
import io
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
import ipywidgets as widgets
from IPython.display import display, clear_output
import warnings
warnings.filterwarnings('ignore')

def create_producer_config(servers='localhost:9092'):
    """Configure Kafka producer settings"""
    return {
        'bootstrap.servers': servers,
        'client.id': 'adaptive-producer'
    }

def init_kafka_producer(config):
    """Initialize and return Kafka producer instance"""
    return Producer(config)

def load_file_content(file_info):
    """Extract content from uploaded file"""
    if not file_info:
        raise ValueError("No file selected")
    return file_info[0]['content'], file_info[0]['name']

def parse_csv_data(content):
    """Parse CSV content into DataFrame"""
    return pd.read_csv(io.BytesIO(content))

def parse_arff_data(content):
    """Parse ARFF content into DataFrame"""
    import arff
    if isinstance(content, memoryview):
        content = bytes(content)
    dataset = arff.loads(content.decode('utf-8'))
    df = pd.DataFrame(dataset['data'], columns=[attr[0] for attr in dataset['attributes']])
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='ignore')
    return df

def load_dataset(file_content, filename):
    """Load dataset based on file extension"""
    if filename.endswith('.csv'):
        return parse_csv_data(file_content)
    elif filename.endswith('.arff'):
        return parse_arff_data(file_content)
    raise ValueError("Unsupported file format")

def get_numeric_columns(df):
    """Extract numerical columns from DataFrame"""
    return df.select_dtypes(include=[np.number]).columns.tolist()

def get_kafka_topics(servers='localhost:9092'):
    """Get list of available Kafka topics"""
    try:
        admin = AdminClient({'bootstrap.servers': servers})
        metadata = admin.list_topics(timeout=5)
        return [topic for topic in metadata.topics.keys() if not topic.startswith('__')]
    except Exception as e:
        print(f"Error fetching topics: {e}")
        return []

def create_feature_checkboxes(columns, callback, exclude_cols=None):
    """Create checkbox widgets for feature selection"""
    exclude_cols = exclude_cols or []
    checkboxes = []
    for col in columns:
        if col not in exclude_cols:
            cb = widgets.Checkbox(value=False, description=col)
            cb.observe(callback, names='value')
            checkboxes.append(cb)
    return checkboxes

def get_selected_features(checkboxes):
    """Get list of selected feature names"""
    return [cb.description for cb in checkboxes if cb.value]

def prepare_stream_data(df, features, class_col, mapping):
    """Prepare data for streaming with renamed columns"""
    data = df[features + [class_col]].copy()
    data[class_col] = data[class_col].astype(str).map(mapping)

    # Dynamic column renaming
    col_map = {feat: f'at{i+1}' for i, feat in enumerate(features)}
    col_map[class_col] = 'cl'

    data = data.rename(columns=col_map)
    data['cl'] = data['cl'].astype('category')
    return data

def send_message(producer, topic, message):
    """Send single message to Kafka topic"""
    producer.produce(topic, message.encode('utf-8'))
    producer.poll(1)

def stream_data(producer, topic, data, delay=0.1, check_streaming=None):
    """Stream data rows to Kafka topic with streaming check"""
    messages = []
    for idx, row in data.iterrows():
        # Check if streaming should continue
        if check_streaming and not check_streaming():
            break
        msg = row.to_json()
        send_message(producer, topic, msg)
        messages.append(msg)
        yield idx + 1, msg
        time.sleep(delay)
    producer.flush()

class FlexibleKafkaStreamer:
    def __init__(self, servers='localhost:9092'):
        self.servers = servers
        self.topic = None
        self.producer = None
        self.df = None
        self.features = []
        self.class_col = None
        self.streaming = False
        self.count = 0
        self.available_topics = []

    def build_interface(self):
        """Build the streaming dashboard interface"""
        # Header
        self.header = widgets.HTML("""
        <div style='background: linear-gradient(135deg, #667eea, #764ba2);
                    padding: 20px; border-radius: 10px; color: white; margin-bottom: 20px;'>
            <h2 style='margin: 0;'>🚀 Adaptive Streaming Dashboard</h2>
            <p style='margin: 5px 0;'>Flexible Attribute Selection</p>
        </div>
        """)

        # Topic selection
        self.refresh_topics_btn = widgets.Button(
            description='🔄 Refresh Topics',
            button_style='info',
            layout=widgets.Layout(width='150px')
        )
        self.topic_selector = widgets.Dropdown(
            options=['Select a topic'],
            description='Kafka Topic:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='400px')
        )

        # File section
        self.uploader = widgets.FileUpload(accept='.csv,.arff', multiple=False)
        self.load_btn = widgets.Button(description='Load Dataset', button_style='primary')
        self.status = widgets.Label('Select a dataset file')

        # Data preview
        self.info_display = widgets.HTML()
        self.preview_area = widgets.Output()

        # Feature selection
        self.feature_box = widgets.VBox()
        self.selected_count = widgets.HTML('<b>Selected features: 0</b>')

        # Class configuration
        self.class_selector = widgets.Dropdown(
            options=['Choose class label'],
            description='Class Label:',
            style={'description_width': 'initial'}
        )
        self.mapping_box = widgets.VBox()

        # Controls
        self.start_btn = widgets.Button(
            description='▶️ Start Stream',
            button_style='success',
            disabled=True
        )
        self.stop_btn = widgets.Button(
            description='⏹️ Stop',
            button_style='danger',
            disabled=True
        )

        # Statistics
        self.progress = widgets.IntProgress(
            description='Progress:',
            bar_style='info'
        )
        self.log_display = widgets.Textarea(
            placeholder='Streaming log...',
            disabled=True,
            layout=widgets.Layout(width='100%', height='120px')
        )

        # Connect handlers
        self.refresh_topics_btn.on_click(self._refresh_topics)
        self.topic_selector.observe(self._handle_topic_change, 'value')
        self.load_btn.on_click(self._handle_load)
        self.class_selector.observe(self._handle_class_change, 'value')
        self.start_btn.on_click(self._handle_start)
        self.stop_btn.on_click(self._handle_stop)

        # Load topics on init
        self._refresh_topics(None)

        # Layout assembly
        return widgets.VBox([
            self.header,
            widgets.VBox([
                widgets.HTML('<h3>🎯 Kafka Topic Selection</h3>'),
                widgets.HBox([self.topic_selector, self.refresh_topics_btn])
            ]),
            widgets.VBox([
                widgets.HTML('<h3>📁 Dataset Input</h3>'),
                widgets.HBox([self.uploader, self.load_btn]),
                self.status
            ]),
            widgets.VBox([
                widgets.HTML('<h3>📊 Data Preview</h3>'),
                self.info_display,
                self.preview_area
            ]),
            widgets.VBox([
                widgets.HTML('<h3>⚙️ Stream Configuration</h3>'),
                widgets.HTML('<b>Select Features (any number):</b>'),
                self.selected_count,
                self.feature_box,
                widgets.HTML('<br>'),
                self.class_selector,
                widgets.HTML('<b>Class Value Mapping:</b>'),
                self.mapping_box
            ]),
            widgets.HBox([self.start_btn, self.stop_btn]),
            widgets.VBox([
                widgets.HTML('<h3>📈 Stream Status</h3>'),
                self.progress,
                self.log_display
            ])
        ], layout=widgets.Layout(padding='10px'))

    def _handle_load(self, _):
        """Handle dataset loading"""
        self.status.value = 'Loading...'
        try:
            content, name = load_file_content(self.uploader.value)
            self.df = load_dataset(content, name)
            self.status.value = f'Loaded: {name} ({len(self.df)} rows)'
            self._show_preview()
            self._setup_features()
        except Exception as e:
            self.status.value = f'Error: {str(e)}'

    def _show_preview(self):
        """Display dataset information"""
        self.info_display.value = f"""
        <p><b>Shape:</b> {self.df.shape[0]} × {self.df.shape[1]}</p>
        <p><b>Columns:</b> {', '.join(list(self.df.columns)[:5])}{'...' if len(self.df.columns) > 5 else ''}</p>
        """
        with self.preview_area:
            clear_output()
            display(self.df.head())

    def _refresh_topics(self, _):
        """Refresh available Kafka topics"""
        self.available_topics = get_kafka_topics(self.servers)
        if self.available_topics:
            self.topic_selector.options = ['Select a topic'] + self.available_topics
            self.status.value = f'Found {len(self.available_topics)} topics'
        else:
            self.topic_selector.options = ['No topics found']
            self.status.value = 'No topics found. Create a topic first.'

    def _handle_topic_change(self, change):
        """Handle topic selection"""
        if change['new'] not in ['Select a topic', 'No topics found']:
            self.topic = change['new']
            self._check_ready()

    def _setup_features(self):
        """Setup feature selection interface"""
        numeric_cols = get_numeric_columns(self.df)
        if not numeric_cols:
            self.status.value = 'No numerical features found!'
            return

        # Exclude potential class columns (cl, class, target, etc.)
        exclude_patterns = ['cl', 'class', 'target', 'label']
        exclude_cols = [col for col in numeric_cols
                       if any(pattern in col.lower() for pattern in exclude_patterns)]

        self.checkboxes = create_feature_checkboxes(numeric_cols, self._update_features, exclude_cols)
        self.feature_box.children = self.checkboxes
        self.class_selector.options = ['Choose class label'] + list(self.df.columns)

    def _update_features(self, _):
        """Update selected features"""
        self.features = get_selected_features(self.checkboxes)
        self.selected_count.value = f'<b>Selected features: {len(self.features)}</b>'
        self._check_ready()

    def _handle_class_change(self, change):
        """Handle class selection"""
        if change['new'] != 'Choose class label':
            self.class_col = change['new']
            self._setup_mapping()
        else:
            self.class_col = None
            self.mapping_box.children = []
        self._check_ready()

    def _setup_mapping(self):
        """Setup class value mapping interface"""
        unique_vals = self.df[self.class_col].unique()
        self.mapping_inputs = {}

        widgets_list = []
        for i, val in enumerate(unique_vals):
            text_input = widgets.IntText(value=i, layout=widgets.Layout(width='80px'))
            self.mapping_inputs[str(val)] = text_input
            widgets_list.append(widgets.HBox([
                widgets.Label(f'{val} →'),
                text_input
            ]))

        self.mapping_box.children = widgets_list

    def _check_ready(self):
        """Check if configuration is complete"""
        self.start_btn.disabled = not (self.features and self.class_col and
                                      self.class_col != 'Choose class label' and
                                      self.topic and self.topic != 'Select a topic')

    def _handle_start(self, _):
        """Start streaming process"""
        try:
            # Get mappings
            mapping = {k: v.value for k, v in self.mapping_inputs.items()}

            # Initialize producer
            if not self.producer:
                config = create_producer_config(self.servers)
                self.producer = init_kafka_producer(config)
                print(f"Producer initialized for topic: {self.topic}")

            self.streaming = True
            self.count = 0

            # Update UI
            self.start_btn.disabled = True
            self.stop_btn.disabled = False
            self.status.value = 'Streaming...'
            self.log_display.value = ''

            # Prepare data
            stream_df = prepare_stream_data(self.df, self.features, self.class_col, mapping)
            self.progress.max = len(stream_df)
            self.progress.value = 0

            # Stream messages with streaming check
            for count, msg in stream_data(self.producer, self.topic, stream_df,
                                         check_streaming=lambda: self.streaming):
                if not self.streaming:
                    break

                self.count = count
                self.progress.value = count

                if count <= 5:
                    self.log_display.value += f"[{count}] {msg}\n"

            # Only call stop if streaming completed naturally
            if self.streaming:
                self._handle_stop(None)

        except Exception as e:
            self.status.value = f'Error: {str(e)}'
            self._handle_stop(None)

    def _handle_stop(self, _):
        """Stop streaming process"""
        self.streaming = False
        self.start_btn.disabled = False
        self.stop_btn.disabled = True
        if self.count > 0:
            self.status.value = f'Streamed {self.count} messages to {self.topic}'
        else:
            self.status.value = 'Streaming stopped'

    def launch(self):
        """Launch the streaming dashboard"""
        dashboard = self.build_interface()
        display(dashboard)
        print(f"Dashboard ready! Kafka broker: {self.servers}")
        if self.available_topics:
            print(f"Available topics: {', '.join(self.available_topics)}")
        else:
            print("No topics found. Create a topic using:")
            print("bin/kafka-topics.sh --create --topic your_topic --bootstrap-server localhost:9092")

# Initialize and launch the streamer
streamer = FlexibleKafkaStreamer()
streamer.launch()

VBox(children=(HTML(value="\n        <div style='background: linear-gradient(135deg, #667eea, #764ba2);\n     …

Dashboard ready! Kafka broker: localhost:9092
Available topics: drift_detection_topic
