# Splunk Vendor Product Analysis

This notebook provides a comprehensive tool for querying Splunk Enterprise indices to identify vendor products and perform gap coverage analysis.

## Features
- Multiple Splunk environment support (dev, staging, prod)
- HEC token authentication
- Custom SPL query generation
- Interactive visualizations
- Data export (CSV, JSON, HTML)
- Vendor coverage gap analysis

## 1. Configuration and Setup

In [1]:
# Import required libraries
import sys
import os
import json
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import warnings
warnings.filterwarnings('ignore')

# Add src directory to path
sys.path.append(os.path.join(os.getcwd(), 'src'))

# Import our custom modules
from splunk_client import SplunkHECClient
from vendor_parser import VendorQueryBuilder
from visualization import VendorVisualizer

print("✅ All libraries imported successfully")

✅ All libraries imported successfully


In [2]:
# Load configuration
config_path = "config/environments.json"

try:
    with open(config_path, 'r') as f:
        config = json.load(f)
    print("✅ Configuration loaded successfully")
    print(f"Available environments: {list(config['environments'].keys())}")
except FileNotFoundError:
    print("❌ Configuration file not found. Please ensure config/environments.json exists.")
    config = None

✅ Configuration loaded successfully
Available environments: ['docker_local', 'docker_localhost', 'enterprise_dev', 'enterprise_prod', 'cloud_splunk']


## 2. Environment Selection

In [3]:
# Create environment selection widget
if config:
    env_options = [(env_data['name'], env_key) for env_key, env_data in config['environments'].items()]
    
    environment_selector = widgets.Dropdown(
        options=env_options,
        description='Environment:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='300px')
    )
    
    display(environment_selector)
    
    # Global variables to store client and selected environment
    splunk_client = None
    selected_env_config = None
else:
    print("Cannot create environment selector without configuration.")

Dropdown(description='Environment:', layout=Layout(width='300px'), options=(('Docker Local Splunk', 'docker_lo…

In [6]:
# Initialize Splunk client with selected environment
def initialize_splunk_client():
    global splunk_client, selected_env_config
    
    if not config:
        print("❌ No configuration available")
        return
    
    env_key = environment_selector.value
    selected_env_config = config['environments'][env_key]
    
    # Validate required configuration
    if selected_env_config.get('token') in ['YOUR-DEV-TOKEN-HERE', 'YOUR-STAGING-TOKEN-HERE', 'YOUR-PROD-TOKEN-HERE']:
        print(f"❌ Please update the token for {selected_env_config['name']} environment in config/environments.json")
        return
    
    print(f"Initializing connection to {selected_env_config['name']} environment...")
    
    try:
        splunk_client = SplunkHECClient(
            base_url=selected_env_config['hec_url'],
            token=selected_env_config.get('token'),
            username=selected_env_config.get('username'),
            password=selected_env_config.get('password'),
            verify_ssl=selected_env_config.get('verify_ssl', True)
        )
        
        print("✅ Splunk client initialized")
        print(f"Environment: {selected_env_config['name']}")
        print(f"URL: {selected_env_config['hec_url']}")
        print(f"Indices: {', '.join(selected_env_config['indices'])}")
        
        # Show authentication method
        if selected_env_config.get('username'):
            print(f"Authentication: Username/Password ({selected_env_config['username']})")
        elif selected_env_config.get('token'):
            print(f"Authentication: HEC Token")
        
    except Exception as e:
        print(f"❌ Failed to initialize Splunk client: {str(e)}")
        splunk_client = None

# Button to initialize connection
init_button = widgets.Button(
    description='Initialize Connection',
    button_style='primary',
    layout=widgets.Layout(width='200px')
)
init_button.on_click(lambda x: initialize_splunk_client())

display(init_button)

Button(button_style='primary', description='Initialize Connection', layout=Layout(width='200px'), style=Button…

## 3. Connection Validation

In [8]:
# Test Splunk connection
def test_connection():
    if not splunk_client:
        print("❌ Please initialize Splunk client first")
        return
    
    print("Testing connection...")
    
    try:
        result = splunk_client.validate_connection()
        
        if result['valid']:
            print("✅ Connection successful!")
            print(f"Status: {result['message']}")
        else:
            print("❌ Connection failed")
            print(f"Error: {result['message']}")
            print(f"Status Code: {result['status_code']}")
            
    except Exception as e:
        print(f"❌ Connection test failed: {str(e)}")

# Button to test connection
test_button = widgets.Button(
    description='Test Connection',
    button_style='info',
    layout=widgets.Layout(width='200px')
)
test_button.on_click(lambda x: test_connection())

display(test_button)

Button(button_style='info', description='Test Connection', layout=Layout(width='200px'), style=ButtonStyle())

## 4. Query Configuration

In [9]:
# Initialize query builder and visualizer
query_builder = VendorQueryBuilder()
visualizer = VendorVisualizer()

print("✅ Query builder and visualizer initialized")

✅ Query builder and visualizer initialized


In [10]:
# Query configuration widgets
time_range_widget = widgets.Dropdown(
    options=['-1h@h', '-24h@h', '-7d@d', '-30d@d', '-90d@d'],
    value='-24h@h',
    description='Time Range:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='300px')
)

query_type_widget = widgets.Dropdown(
    options=[('Basic Vendor Query', 'basic'), ('Gap Analysis', 'gap'), ('Custom Query', 'custom')],
    value='basic',
    description='Query Type:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='300px')
)

max_results_widget = widgets.IntSlider(
    value=5000,
    min=100,
    max=50000,
    step=500,
    description='Max Results:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px')
)

# Custom query text area
custom_query_widget = widgets.Textarea(
    placeholder='Enter custom SPL query here...',
    description='Custom Query:',
    layout=widgets.Layout(width='100%', height='150px'),
    style={'description_width': 'initial'}
)

display(widgets.VBox([
    widgets.HTML("<h3>Query Configuration</h3>"),
    time_range_widget,
    query_type_widget,
    max_results_widget
]))

VBox(children=(HTML(value='<h3>Query Configuration</h3>'), Dropdown(description='Time Range:', index=1, layout…

In [11]:
# Show/hide custom query widget based on selection
def update_query_options(change):
    clear_output(wait=True)
    
    if change['new'] == 'custom':
        display(widgets.VBox([
            widgets.HTML("<h4>Custom Query</h4>"),
            widgets.HTML("<p>Enter your custom SPL query below:</p>"),
            custom_query_widget
        ]))
    else:
        display(widgets.HTML("<p>Using built-in query templates</p>"))

query_type_widget.observe(update_query_options, names='value')

# Initial display
update_query_options({'new': query_type_widget.value})

HTML(value='<p>Using built-in query templates</p>')

## 5. Query Execution

In [12]:
# Global variables to store results
current_results = None
gap_analysis_results = None
query_stats = {}

def execute_vendor_query():
    global current_results, gap_analysis_results, query_stats
    
    if not splunk_client or not selected_env_config:
        print("❌ Please initialize Splunk client first")
        return
    
    print("🔍 Executing vendor query...")
    
    try:
        query_type = query_type_widget.value
        time_range = time_range_widget.value
        indices = selected_env_config['indices']
        
        # Build query based on type
        if query_type == 'basic':
            query = query_builder.build_basic_vendor_query(indices, time_range)
        elif query_type == 'gap':
            expected_vendors = config.get('expected_vendors', [])
            query = query_builder.build_gap_analysis_query(indices, expected_vendors, time_range)
        elif query_type == 'custom':
            query = custom_query_widget.value.strip()
            if not query:
                print("❌ Please enter a custom query")
                return
        else:
            print("❌ Invalid query type")
            return
        
        print(f"Query Type: {query_type.title()}")
        print(f"Time Range: {time_range}")
        print(f"Indices: {', '.join(indices)}")
        print("\n📋 Generated SPL Query:")
        print("-" * 50)
        print(query)
        print("-" * 50)
        
        # Execute query with pagination
        print("\n⏳ Executing query (this may take a few minutes)...")
        
        results_df = splunk_client.paginate_results(
            query=query,
            page_size=1000,
            max_results=max_results_widget.value
        )
        
        if results_df.empty:
            print("⚠️ No results found")
            current_results = pd.DataFrame()
            return
        
        # Process results
        current_results = query_builder.extract_vendor_products(results_df)
        
        # Generate gap analysis if needed
        if query_type in ['basic', 'gap']:
            expected_vendors = config.get('expected_vendors', [])
            gap_analysis_results = query_builder.identify_coverage_gaps(current_results, expected_vendors)
        
        # Generate summary statistics
        query_stats = query_builder.generate_summary_stats(current_results)
        
        print(f"\n✅ Query completed successfully!")
        print(f"📊 Results: {len(current_results)} vendors found")
        print(f"📈 Total events: {query_stats['total_events']:,}")
        
        if query_stats['top_vendor'] != 'None':
            print(f"🔝 Top vendor: {query_stats['top_vendor']} ({query_stats['top_vendor_count']:,} events)")
        
        # Display first few results
        print("\n📋 First 10 results:")
        display(current_results.head(10))
        
    except Exception as e:
        print(f"❌ Query execution failed: {str(e)}")
        current_results = None

# Button to execute query
execute_button = widgets.Button(
    description='Execute Query',
    button_style='success',
    layout=widgets.Layout(width='200px')
)
execute_button.on_click(lambda x: execute_vendor_query())

display(execute_button)

Button(button_style='success', description='Execute Query', layout=Layout(width='200px'), style=ButtonStyle())

## 6. Data Visualization

In [13]:
# Vendor Distribution Visualization
def show_vendor_distribution():
    if current_results is None or current_results.empty:
        print("❌ No results available. Please execute a query first.")
        return
    
    fig = visualizer.create_vendor_distribution_pie(current_results, "Vendor Product Distribution")
    fig.show()

# Gap Analysis Visualization
def show_gap_analysis():
    if gap_analysis_results is None or gap_analysis_results.empty:
        print("❌ No gap analysis results available. Please execute a basic or gap query first.")
        return
    
    fig = visualizer.create_gap_analysis_chart(gap_analysis_results, "Vendor Coverage Gap Analysis")
    fig.show()

# Category Breakdown Visualization
def show_category_breakdown():
    if current_results is None or current_results.empty:
        print("❌ No results available. Please execute a query first.")
        return
    
    fig = visualizer.create_category_breakdown(current_results, "Vendor Categories")
    fig.show()

# Summary Dashboard
def show_summary_dashboard():
    if current_results is None or current_results.empty:
        print("❌ No results available. Please execute a query first.")
        return
    
    fig = visualizer.create_summary_dashboard(
        current_results, 
        gap_analysis_results if gap_analysis_results is not None else pd.DataFrame(), 
        query_stats
    )
    fig.show()

# Visualization buttons
viz_buttons = widgets.HBox([
    widgets.Button(description='Vendor Distribution', button_style='info'),
    widgets.Button(description='Gap Analysis', button_style='warning'),
    widgets.Button(description='Categories', button_style='info'),
    widgets.Button(description='Dashboard', button_style='success')
])

# Connect button events
viz_buttons.children[0].on_click(lambda x: show_vendor_distribution())
viz_buttons.children[1].on_click(lambda x: show_gap_analysis())
viz_buttons.children[2].on_click(lambda x: show_category_breakdown())
viz_buttons.children[3].on_click(lambda x: show_summary_dashboard())

display(widgets.VBox([
    widgets.HTML("<h3>Visualizations</h3>"),
    viz_buttons
]))

VBox(children=(HTML(value='<h3>Visualizations</h3>'), HBox(children=(Button(button_style='info', description='…

## 7. Data Export

In [None]:
# Export functionality
def export_data(format_type):
    if current_results is None or current_results.empty:
        print("❌ No results available. Please execute a query first.")
        return
    
    try:
        timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
        env_name = selected_env_config['name'].lower().replace(' ', '_')
        filename = f"vendor_analysis_{env_name}_{timestamp}"
        
        if format_type == 'csv':
            file_path = visualizer.export_to_csv(current_results, filename)
            print(f"✅ Data exported to: {file_path}")
        
        elif format_type == 'json':
            file_path = visualizer.export_to_json(current_results, filename)
            print(f"✅ Data exported to: {file_path}")
        
        elif format_type == 'html_charts':
            # Export multiple charts
            charts_exported = []
            
            # Vendor distribution
            fig1 = visualizer.create_vendor_distribution_pie(current_results)
            chart1_path = visualizer.export_chart_html(fig1, f"{filename}_distribution")
            charts_exported.append(chart1_path)
            
            # Category breakdown
            fig2 = visualizer.create_category_breakdown(current_results)
            chart2_path = visualizer.export_chart_html(fig2, f"{filename}_categories")
            charts_exported.append(chart2_path)
            
            # Gap analysis if available
            if gap_analysis_results is not None and not gap_analysis_results.empty:
                fig3 = visualizer.create_gap_analysis_chart(gap_analysis_results)
                chart3_path = visualizer.export_chart_html(fig3, f"{filename}_gaps")
                charts_exported.append(chart3_path)
            
            print(f"✅ Charts exported:")
            for path in charts_exported:
                print(f"   📊 {path}")
        
        # Also export gap analysis if available
        if gap_analysis_results is not None and not gap_analysis_results.empty and format_type in ['csv', 'json']:
            gap_filename = f"gap_analysis_{env_name}_{timestamp}"
            if format_type == 'csv':
                gap_path = visualizer.export_to_csv(gap_analysis_results, gap_filename)
            else:
                gap_path = visualizer.export_to_json(gap_analysis_results, gap_filename)
            print(f"✅ Gap analysis exported to: {gap_path}")
        
    except Exception as e:
        print(f"❌ Export failed: {str(e)}")

# Export buttons
export_buttons = widgets.HBox([
    widgets.Button(description='Export CSV', button_style='primary'),
    widgets.Button(description='Export JSON', button_style='primary'),
    widgets.Button(description='Export Charts (HTML)', button_style='primary')
])

# Connect export button events
export_buttons.children[0].on_click(lambda x: export_data('csv'))
export_buttons.children[1].on_click(lambda x: export_data('json'))
export_buttons.children[2].on_click(lambda x: export_data('html_charts'))

display(widgets.VBox([
    widgets.HTML("<h3>Data Export</h3>"),
    widgets.HTML("<p>Export your analysis results in various formats:</p>"),
    export_buttons
]))

## 8. Summary and Statistics

In [None]:
# Display comprehensive summary
def show_analysis_summary():
    if not query_stats:
        print("❌ No analysis statistics available. Please execute a query first.")
        return
    
    print("📊 ANALYSIS SUMMARY")
    print("=" * 50)
    
    # Environment info
    if selected_env_config:
        print(f"🌐 Environment: {selected_env_config['name']}")
        print(f"📍 Indices: {', '.join(selected_env_config['indices'])}")
        print(f"⏰ Time Range: {time_range_widget.value}")
        print()
    
    # Summary statistics
    print(f"🔍 Total Vendors Found: {query_stats['total_vendors']}")
    print(f"📈 Total Events Analyzed: {query_stats['total_events']:,}")
    
    if query_stats['top_vendor'] != 'None':
        print(f"🔝 Top Vendor: {query_stats['top_vendor']}")
        print(f"   📊 Events: {query_stats['top_vendor_count']:,}")
    
    unknown_pct = query_stats.get('unknown_percentage', 0)
    if unknown_pct > 0:
        print(f"❓ Unknown Events: {unknown_pct:.1f}%")
    
    print()
    
    # Category breakdown
    categories = query_stats.get('categories', {})
    if categories:
        print("📂 CATEGORY BREAKDOWN:")
        for category, count in sorted(categories.items(), key=lambda x: x[1], reverse=True):
            percentage = (count / query_stats['total_events']) * 100
            print(f"   {category}: {count:,} events ({percentage:.1f}%)")
        print()
    
    # Gap analysis summary
    if gap_analysis_results is not None and not gap_analysis_results.empty:
        print("🔍 GAP ANALYSIS SUMMARY:")
        missing_vendors = gap_analysis_results[gap_analysis_results['status'] == 'Missing']
        present_vendors = gap_analysis_results[gap_analysis_results['status'] == 'Present']
        
        print(f"   ✅ Present: {len(present_vendors)} vendors")
        print(f"   ❌ Missing: {len(missing_vendors)} vendors")
        
        if len(missing_vendors) > 0:
            print(f"   📋 Missing Vendors: {', '.join(missing_vendors['vendor_product'])}")
        
        high_severity = gap_analysis_results[gap_analysis_results['gap_severity'] == 'High']
        if len(high_severity) > 0:
            print(f"   🚨 High Severity Gaps: {len(high_severity)}")
        
        print()
    
    # Recommendations
    print("💡 RECOMMENDATIONS:")
    if query_stats['total_vendors'] == 0:
        print("   - No vendors detected. Consider adjusting time range or indices.")
    elif unknown_pct > 50:
        print("   - High percentage of unknown events. Consider adding custom vendor patterns.")
    
    if gap_analysis_results is not None:
        missing_count = len(gap_analysis_results[gap_analysis_results['status'] == 'Missing'])
        if missing_count > 0:
            print(f"   - {missing_count} expected vendors are missing from logs.")
            print("   - Investigate data ingestion for missing vendor products.")
    
    print("   - Export results for further analysis and reporting.")
    print("   - Consider scheduling regular gap analysis for continuous monitoring.")

# Summary button
summary_button = widgets.Button(
    description='Show Analysis Summary',
    button_style='info',
    layout=widgets.Layout(width='250px')
)
summary_button.on_click(lambda x: show_analysis_summary())

display(summary_button)

## 9. Troubleshooting and Help

In [None]:
# Help and troubleshooting information
help_html = """
<div style="background-color: #f0f0f0; padding: 20px; border-radius: 5px;">
<h3>🆘 Troubleshooting Guide</h3>

<h4>Common Issues:</h4>
<ul>
    <li><strong>Connection Failed:</strong>
        <ul>
            <li>Check if HEC tokens are properly configured in config/environments.json</li>
            <li>Verify Splunk URL and port accessibility</li>
            <li>Check SSL verification settings</li>
        </ul>
    </li>
    
    <li><strong>No Results Found:</strong>
        <ul>
            <li>Extend time range (try -7d@d or -30d@d)</li>
            <li>Verify selected indices contain relevant data</li>
            <li>Check vendor patterns in query builder</li>
        </ul>
    </li>
    
    <li><strong>Query Timeout:</strong>
        <ul>
            <li>Reduce time range</li>
            <li>Lower max results limit</li>
            <li>Add more specific filters to custom queries</li>
        </ul>
    </li>
</ul>

<h4>Configuration Tips:</h4>
<ul>
    <li>Update HEC tokens in config/environments.json before use</li>
    <li>Add custom vendor patterns for better detection</li>
    <li>Modify expected_vendors list for gap analysis</li>
    <li>Adjust query_settings for performance optimization</li>
</ul>

<h4>Best Practices:</h4>
<ul>
    <li>Start with shorter time ranges for initial testing</li>
    <li>Test connection before running large queries</li>
    <li>Export results regularly for analysis</li>
    <li>Monitor query performance and adjust as needed</li>
</ul>
</div>
"""

display(HTML(help_html))

---

## 🎉 Analysis Complete!

You have successfully completed the Splunk vendor product analysis. Here's what you can do next:

1. **📊 Review Results**: Check the visualizations and summary statistics above
2. **💾 Export Data**: Use the export buttons to save results in CSV, JSON, or HTML formats
3. **🔍 Dive Deeper**: Run additional queries with different time ranges or custom patterns
4. **📋 Share Findings**: Export charts and data for reporting and documentation
5. **🔄 Monitor Regularly**: Schedule regular runs to track vendor coverage over time

For questions or issues, refer to the troubleshooting guide above or check the project documentation.
