# Axonious API
- https://docs.axonius.com/docs/axonius-rest-api


# Function to Query Axonius Devices and Services

Let's create a function to:
1. Query all devices from Axonius
2. Get associated applications and services for each device
3. Aggregate the data into a structured format

In [None]:
# Secure secrets in encrypted vault
from ansible_vault import Vault
import os

vault = Vault(open(os.path.expanduser('~/.vault_pass')).read().strip())
data = vault.load(open(os.path.expanduser('/Users/eric.louhi/Github/reach-data-experiments/vault.yml')).read().strip())
API_KEY = data['AXONIOUS_API_KEY']
API_SECRET = data['AXONIOUS_API_SECRET']
AXONIOUS_URL = data['AXONIOUS_URL']
AXONIOUS_URL_KEY = data['AXONIOUS_URL_KEY']
print(axonious_url)

In [None]:
import pandas as pd
import requests

def get_devices_with_services():
    # Query parameters for devices
    # API Documentation and Schema References:
    # - Axonius REST API Entity Request Schema: https://docs.axonius.com/reference/get_devices
    # - Device Query Fields Reference: https://docs.axonius.com/docs/device-fields-reference
    # - Pagination Parameters: https://docs.axonius.com/docs/api-pagination-limits
    # - Specific Data Fields Guide: https://docs.axonius.com/docs/understanding-specific-data


    payload = {
        "meta": None,
        "data": {
            "type": "entity_request_schema",
            "attributes": {
                "page": {
                    "offset": 0,
                    "limit": 1000  # Increased limit to get more devices
                },
                "fields": {
                    "devices": [
                        "adapters",
                        "specific_data.data.hostname",
                        "specific_data.data.installed_software",
                        "specific_data.data.services",
                        "specific_data.data.os.type",
                        "specific_data.data.os.name"
                    ]
                },
                "include_details": True
            }
        }
    }

    devices_url = f"{AXONIOUS_URL}/api/devices"
    headers = {
        'Api-Key': API_KEY,
        'Api-Secret': API_SECRET,
        'Content-Type': 'application/vnd.api+json',
        'Accept': 'application/vnd.api+json'
    }
    response = requests.post(
        devices_url,
        headers=headers,
        json=payload
    )

    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to get devices: {response.status_code} - {response.text}")


def parse_device_data(devices_data):
    parsed_data = []

    for device in devices_data.get('data', []):
        device_info = {
            'hostname': None,
            'os_type': None,
            'os_name': None,
            'installed_software': [],
            'services': []
        }

        specific_data = device.get('specific_data', {}).get('data', {})

        device_info['hostname'] = specific_data.get('hostname')
        device_info['os_type'] = specific_data.get('os', {}).get('type')
        device_info['os_name'] = specific_data.get('os', {}).get('name')

        # Extract installed software
        if 'installed_software' in specific_data:
            device_info['installed_software'] = [
                software.get('name')
                for software in specific_data['installed_software']
                if isinstance(software, dict)
            ]

        # Extract services
        if 'services' in specific_data:
            device_info['services'] = [
                service.get('name')
                for service in specific_data['services']
                if isinstance(service, dict)
            ]

        parsed_data.append(device_info)

    return parsed_data

In [None]:
# Get and parse the data
devices_data = get_devices_with_services()
parsed_devices = parse_device_data(devices_data)

# Convert to DataFrame
df_devices = pd.DataFrame(parsed_devices)
df_devices


## Adapters
Sources:
-https://docs.axonius.com/docs/adapters-list

## Queries
Sources:
- https://docs.axonius.com/docs/importexport-dashboards-and-queries-via-api

In [None]:
# Get all saved Queries using Axonius Import/Export API
def get_saved_queries(query_type="devices", max_retries=3):
    """
    Get saved queries from Axonius using the correct import/export endpoint

    Args:
        query_type (str): "devices" or "users"
        max_retries (int): Maximum number of retry attempts

    Returns:
        dict: Response containing saved queries
    """
    import requests
    import time

    # Validate inputs
    if query_type not in ["devices", "users"]:
        raise ValueError("query_type must be 'devices' or 'users'")

    # Setup request with proper headers as per Axonius documentation
    headers = {
        'api-key': API_KEY,
        'api-secret': API_SECRET,
        'Content-Type': 'application/vnd.api+json',
        'Accept': 'application/vnd.api+json'
    }

    # Use the correct endpoint for exporting queries (views)
    # According to Axonius documentation: /api/{entity}/views/export
    url = f"{AXONIOUS_URL}/api/{query_type}/views/export"

    # Request payload for exporting all queries
    payload = {
        "meta": None,
        "data": {
            "type": "export_views_schema",
            "attributes": {
                "export_all": True,
                "queries": []  # Empty array means export all
            }
        }
    }

    # Implement retry logic
    for attempt in range(max_retries):
        try:
            print(f"Fetching saved {query_type} queries from: {url}")
            response = requests.post(url, headers=headers, json=payload, timeout=30)

            print(f"Status Code: {response.status_code}")

            if response.status_code == 200:
                data = response.json()
                print(f"✓ Success! Retrieved saved {query_type} queries")
                return data
            elif response.status_code == 400:
                try:
                    error_data = response.json() if response.content else {}
                    error_message = error_data.get('message', 'Bad request')
                except:
                    error_message = response.text
                raise ValueError(f"Invalid request: {error_message}")
            elif response.status_code == 401:
                raise Exception("Authentication failed - check your API key and secret")
            elif response.status_code == 403:
                raise Exception("Access forbidden - insufficient permissions for exporting queries")
            elif response.status_code == 404:
                raise Exception(f"Export endpoint not found - {url}")
            else:
                raise Exception(f"API error: {response.status_code} - {response.text}")

        except requests.exceptions.RequestException as e:
            if attempt < max_retries - 1:
                print(f"Attempt {attempt + 1} failed, retrying in {2 ** attempt} seconds...")
                time.sleep(2 ** attempt)  # Exponential backoff
                continue
            raise Exception(f"Network error after {max_retries} attempts: {str(e)}")

    raise Exception("Max retries exceeded")

def get_saved_queries_list_method(query_type="devices", max_retries=3):
    """
    Alternative method: Get list of saved queries using the list endpoint

    Args:
        query_type (str): "devices" or "users"
        max_retries (int): Maximum number of retry attempts

    Returns:
        dict: Response containing saved queries list
    """
    import requests
    import time

    # Validate inputs
    if query_type not in ["devices", "users"]:
        raise ValueError("query_type must be 'devices' or 'users'")

    # Setup request with proper headers
    headers = {
        'api-key': API_KEY,
        'api-secret': API_SECRET,
        'Content-Type': 'application/vnd.api+json',
        'Accept': 'application/vnd.api+json'
    }

    # Try the views list endpoint first
    url = f"{AXONIOUS_URL}/api/{query_type}/views"

    # Implement retry logic
    for attempt in range(max_retries):
        try:
            print(f"Fetching saved {query_type} queries list from: {url}")
            response = requests.get(url, headers=headers, timeout=30)

            print(f"Status Code: {response.status_code}")

            if response.status_code == 200:
                data = response.json()
                print(f"✓ Success! Retrieved {query_type} queries list")
                return data
            elif response.status_code == 401:
                raise Exception("Authentication failed - check your API key and secret")
            elif response.status_code == 403:
                raise Exception("Access forbidden - insufficient permissions for viewing queries")
            elif response.status_code == 404:
                raise Exception(f"Queries list endpoint not found - {url}")
            else:
                raise Exception(f"API error: {response.status_code} - {response.text}")

        except requests.exceptions.RequestException as e:
            if attempt < max_retries - 1:
                print(f"Attempt {attempt + 1} failed, retrying in {2 ** attempt} seconds...")
                time.sleep(2 ** attempt)  # Exponential backoff
                continue
            raise Exception(f"Network error after {max_retries} attempts: {str(e)}")

    raise Exception("Max retries exceeded")

def parse_saved_queries(queries_data):
    """
    Parse saved queries data from Axonius API response

    Args:
        queries_data (dict): Response from Axonius API

    Returns:
        list: List of parsed queries
    """
    if not queries_data:
        return []

    try:
        # Handle export response format
        if isinstance(queries_data, dict):
            if 'data' in queries_data:
                # Standard API response format
                queries = queries_data['data']
            elif 'queries' in queries_data:
                # Direct queries array
                queries = queries_data['queries']
            elif 'views' in queries_data:
                # Views array
                queries = queries_data['views']
            else:
                # Might be the entire response is query data
                queries = [queries_data]
        elif isinstance(queries_data, list):
            queries = queries_data
        else:
            queries = []

        return queries
    except Exception as e:
        print(f"Warning: Error parsing queries data: {e}")
        return []

def display_queries_info(queries, query_type):
    """
    Display information about retrieved queries

    Args:
        queries (list): List of queries
        query_type (str): Type of queries (devices/users)
    """
    print(f"\n{query_type.upper()} QUERIES SUMMARY:")
    print("=" * 50)
    print(f"Total {query_type} queries: {len(queries)}")

    if queries:
        print(f"\nFirst 5 {query_type} queries:")
        for i, query in enumerate(queries[:5], 1):
            if isinstance(query, dict):
                # Try different possible name fields
                name = (query.get('name') or
                       query.get('attributes', {}).get('name') or
                       query.get('title') or
                       'Unknown Query')

                # Try different possible ID fields
                query_id = (query.get('id') or
                           query.get('uuid') or
                           query.get('_id') or
                           'N/A')

                # Get description if available
                description = (query.get('description') or
                              query.get('attributes', {}).get('description') or
                              'No description')

                print(f"  {i}. {name}")
                print(f"     ID: {query_id}")
                print(f"     Description: {description[:100]}...")
                print()
            else:
                print(f"  {i}. {query}")
                print()

# Usage with comprehensive error handling and multiple methods
print("AXONIUS SAVED QUERIES RETRIEVAL")
print("=" * 60)

device_queries = []
user_queries = []

# Try export method first (recommended by documentation)
print("\n1. Trying Export Method...")
try:
    # Get device saved queries using export method
    print("\n--- Device Queries (Export Method) ---")
    device_queries_data = get_saved_queries("devices")
    device_queries = parse_saved_queries(device_queries_data)
    display_queries_info(device_queries, "devices")

    # Get user saved queries using export method
    print("\n--- User Queries (Export Method) ---")
    user_queries_data = get_saved_queries("users")
    user_queries = parse_saved_queries(user_queries_data)
    display_queries_info(user_queries, "users")

except Exception as e:
    print(f"Export method failed: {e}")
    print("\n2. Trying List Method...")

    # Fallback to list method
    try:
        # Get device saved queries using list method
        print("\n--- Device Queries (List Method) ---")
        device_queries_data = get_saved_queries_list_method("devices")
        device_queries = parse_saved_queries(device_queries_data)
        display_queries_info(device_queries, "devices")

        # Get user saved queries using list method
        print("\n--- User Queries (List Method) ---")
        user_queries_data = get_saved_queries_list_method("users")
        user_queries = parse_saved_queries(user_queries_data)
        display_queries_info(user_queries, "users")

    except Exception as e2:
        print(f"List method also failed: {e2}")
        print("\nBoth methods failed. Please check:")
        print("1. API credentials are correct")
        print("2. Network connectivity")
        print("3. API permissions for viewing/exporting queries")
        print("4. Axonius instance version compatibility")

# Final summary
print(f"\nFINAL SUMMARY:")
print(f"Device queries retrieved: {len(device_queries)}")
print(f"User queries retrieved: {len(user_queries)}")
print(f"Total queries: {len(device_queries) + len(user_queries)}")

# Store results for further use
all_queries = {
    'devices': device_queries,
    'users': user_queries
}

In [None]:
import json
with open("../data/axonious/queries.json", "w") as f:
    json.dump(all_queries, f, indent=4)

In [None]:

import requests
import json
import pandas as pd
import logging
import time
from typing import Optional, Dict, Any, List
from datetime import datetime
import sys


# Configure comprehensive logging
def setup_logging(log_level=logging.INFO, log_file=None):
    """Setup comprehensive logging with file and console output"""

    # Create logger
    logger = logging.getLogger('axonius_api')
    logger.setLevel(log_level)

    # Clear existing handlers
    logger.handlers.clear()

    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # File handler (optional)
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(log_level)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    return logger


# Initialize logger
logger = setup_logging(log_level=logging.DEBUG, log_file='axonius_api.log')


class AxoniusAPIError(Exception):
    """Custom exception for Axonius API errors"""

    def __init__(self, message: str, status_code: Optional[int] = None, response_text: Optional[str] = None):
        self.message = message
        self.status_code = status_code
        self.response_text = response_text
        super().__init__(self.message)


class AxoniusAPIClient:
    """Enhanced Axonius API client with robust error handling and logging"""

    def __init__(self, base_url: str, api_key: str, api_secret: str, max_retries: int = 3, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.api_secret = api_secret
        self.max_retries = max_retries
        self.timeout = timeout
        self.session = requests.Session()

        logger.info(f"Initializing Axonius API client for {self.base_url}")
        logger.debug(f"Configuration: max_retries={max_retries}, timeout={timeout}")

        # Validate configuration
        self._validate_config()

    def _validate_config(self):
        """Validate API configuration"""
        if not self.base_url:
            raise ValueError("Base URL cannot be empty")
        if not self.api_key:
            raise ValueError("API key cannot be empty")
        if not self.api_secret:
            raise ValueError("API secret cannot be empty")

        logger.debug("API configuration validated successfully")

    def _get_headers(self, content_type: str = 'application/vnd.api+json') -> Dict[str, str]:
        """Get standard headers for API requests"""
        headers = {
            'Api-Key': self.api_key,
            'Api-Secret': self.api_secret,
            'Content-Type': content_type,
            'Accept': 'application/vnd.api+json',
            'User-Agent': 'AxoniusAPIClient/1.0'
        }
        logger.debug(f"Generated headers: {dict(headers)}")  # Log headers (without sensitive data in production)
        return headers

    def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
        """Make HTTP request with retry logic and comprehensive error handling"""

        logger.info(f"Making {method.upper()} request to {url}")
        logger.debug(f"Request kwargs: {kwargs}")

        last_exception = None

        for attempt in range(1, self.max_retries + 1):
            try:
                logger.debug(f"Attempt {attempt}/{self.max_retries}")

                # Set timeout if not provided
                if 'timeout' not in kwargs:
                    kwargs['timeout'] = self.timeout

                # Make the request
                start_time = time.time()
                response = self.session.request(method, url, **kwargs)
                elapsed_time = time.time() - start_time

                logger.info(f"Request completed in {elapsed_time:.2f}s - Status: {response.status_code}")
                logger.debug(f"Response headers: {dict(response.headers)}")

                # Log response details based on status
                if response.status_code == 200:
                    logger.info("Request successful")
                    try:
                        response_data = response.json()
                        if isinstance(response_data, dict) and 'data' in response_data:
                            data_count = len(response_data['data']) if isinstance(response_data['data'], list) else 1
                            logger.info(f"Received {data_count} data items")
                    except:
                        logger.debug("Response is not JSON or doesn't contain 'data' field")
                    return response

                elif response.status_code in [400, 401, 403, 404]:
                    # Client errors - don't retry
                    error_msg = self._extract_error_message(response)
                    logger.error(f"Client error {response.status_code}: {error_msg}")
                    raise AxoniusAPIError(
                        f"Client error: {error_msg}",
                        status_code=response.status_code,
                        response_text=response.text
                    )

                elif response.status_code in [429, 500, 502, 503, 504]:
                    # Server errors or rate limiting - retry
                    error_msg = self._extract_error_message(response)
                    logger.warning(f"Server error {response.status_code} (attempt {attempt}): {error_msg}")

                    if attempt == self.max_retries:
                        raise AxoniusAPIError(
                            f"Server error after {self.max_retries} attempts: {error_msg}",
                            status_code=response.status_code,
                            response_text=response.text
                        )
                else:
                    # Other errors
                    error_msg = self._extract_error_message(response)
                    logger.error(f"Unexpected status code {response.status_code}: {error_msg}")

                    if attempt == self.max_retries:
                        raise AxoniusAPIError(
                            f"Unexpected error: {error_msg}",
                            status_code=response.status_code,
                            response_text=response.text
                        )

            except requests.exceptions.Timeout as e:
                logger.warning(f"Request timeout (attempt {attempt}): {str(e)}")
                last_exception = e

            except requests.exceptions.ConnectionError as e:
                logger.warning(f"Connection error (attempt {attempt}): {str(e)}")
                last_exception = e

            except requests.exceptions.RequestException as e:
                logger.error(f"Request exception (attempt {attempt}): {str(e)}")
                last_exception = e

            except AxoniusAPIError:
                # Re-raise API errors immediately
                raise

            except Exception as e:
                logger.error(f"Unexpected error (attempt {attempt}): {str(e)}")
                last_exception = e

            # Wait before retry (exponential backoff)
            if attempt < self.max_retries:
                wait_time = 2 ** (attempt - 1)
                logger.info(f"Waiting {wait_time}s before retry...")
                time.sleep(wait_time)

        # All retries exhausted
        error_msg = f"All {self.max_retries} attempts failed"
        if last_exception:
            error_msg += f". Last error: {str(last_exception)}"

        logger.error(error_msg)
        raise AxoniusAPIError(error_msg)

    def _extract_error_message(self, response: requests.Response) -> str:
        """Extract error message from response"""
        try:
            if response.content:
                error_data = response.json()
                if isinstance(error_data, dict):
                    # Try common error message fields
                    for field in ['message', 'error', 'detail', 'errors']:
                        if field in error_data:
                            return str(error_data[field])
                return str(error_data)
            return response.text or f"HTTP {response.status_code}"
        except:
            return response.text or f"HTTP {response.status_code}"

    def get_devices_with_services(self, limit: int = 1000, offset: int = 0,
                                  fields: Optional[List[str]] = None) -> Dict[str, Any]:
        """Get devices with enhanced error handling and logging"""

        logger.info(f"Fetching devices with limit={limit}, offset={offset}")

        # Default fields if none provided
        if fields is None:
            fields = [
                "adapters",
                "specific_data.data.hostname",
                "specific_data.data.installed_software",
                "specific_data.data.services",
                "specific_data.data.os.type",
                "specific_data.data.os.name"
            ]

        logger.debug(f"Requested fields: {fields}")

        payload = {
            "meta": None,
            "data": {
                "type": "entity_request_schema",
                "attributes": {
                    "page": {
                        "offset": offset,
                        "limit": limit
                    },
                    "use_cache_entry": False,
                    "always_cached_query": False,
                    "fields": {
                        "devices": fields
                    },
                    "filter": "((\"specific_data.data.hostname\" == ({\"$exists\":true,\"$ne\":\"\"})))",
                    "get_metadata": True,
                    "include_details": True,
                    "complex_fields_preview_limit": 1,
                    "max_field_items": 1
                }
            }
        }

        url = f"{self.base_url}/api/devices"
        headers = self._get_headers()

        try:
            response = self._make_request('POST', url, headers=headers, json=payload)
            data = response.json()

            logger.info("Successfully retrieved devices data")
            return data

        except Exception as e:
            logger.error(f"Failed to get devices: {str(e)}")
            raise

    def get_adapters(self) -> Dict[str, Any]:
        """Get adapters with error handling"""

        logger.info("Fetching adapters")

        url = f"{self.base_url}/api/v2/adapters"
        headers = self._get_headers()

        try:
            response = self._make_request('GET', url, headers=headers)
            data = response.json()

            logger.info("Successfully retrieved adapters data")
            return data

        except Exception as e:
            logger.error(f"Failed to get adapters: {str(e)}")
            raise

    def get_saved_queries(self, query_id: Optional[str] = None) -> Dict[str, Any]:
        """Get saved queries with proper Axonius endpoints"""

        logger.info(f"Fetching saved queries" + (f" for ID: {query_id}" if query_id else ""))

        url = f"{self.base_url}/api/v2/devices/saved_query"
        if query_id:
            url += f"/{query_id}"

        headers = self._get_headers()

        try:
            response = self._make_request('GET', url, headers=headers)
            data = response.json()

            logger.info("Successfully retrieved saved queries")
            return data

        except Exception as e:
            logger.error(f"Failed to get saved queries: {str(e)}")
            raise


def parse_device_data(devices_data: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Parse device data with enhanced error handling and logging"""

    logger.info("Starting device data parsing")

    if not devices_data:
        logger.warning("No devices data provided")
        return []

    if not isinstance(devices_data, dict) or 'data' not in devices_data:
        logger.error("Invalid devices data structure")
        raise ValueError("Invalid devices data structure - expected dict with 'data' key")

    devices = devices_data.get('data', [])
    logger.info(f"Processing {len(devices)} devices")

    parsed_data = []
    errors_count = 0

    for i, device in enumerate(devices):
        try:
            logger.debug(f"Processing device {i + 1}/{len(devices)}")

            device_info = {
                'hostname': None,
                'os_type': None,
                'os_name': None,
                'installed_software': [],
                'services': []
            }

            if not isinstance(device, dict):
                logger.warning(f"Device {i + 1} is not a dict, skipping")
                errors_count += 1
                continue

            specific_data = device.get('specific_data', {})
            if isinstance(specific_data, dict):
                specific_data = specific_data.get('data', {})

            if not isinstance(specific_data, dict):
                logger.warning(f"Device {i + 1} has invalid specific_data structure")
                errors_count += 1
                continue

            # Extract hostname
            hostname = specific_data.get('hostname')
            if hostname:
                device_info['hostname'] = str(hostname)
                logger.debug(f"Device {i + 1} hostname: {hostname}")

            # Extract OS information
            os_info = specific_data.get('os', {})
            if isinstance(os_info, dict):
                device_info['os_type'] = os_info.get('type')
                device_info['os_name'] = os_info.get('name')

            # Extract installed software
            installed_software = specific_data.get('installed_software', [])
            if isinstance(installed_software, list):
                software_names = []
                for software in installed_software:
                    if isinstance(software, dict) and 'name' in software:
                        software_names.append(str(software['name']))
                device_info['installed_software'] = software_names
                logger.debug(f"Device {i + 1} has {len(software_names)} software items")

            # Extract services
            services = specific_data.get('services', [])
            if isinstance(services, list):
                service_names = []
                for service in services:
                    if isinstance(service, dict) and 'name' in service:
                        service_names.append(str(service['name']))
                device_info['services'] = service_names
                logger.debug(f"Device {i + 1} has {len(service_names)} services")

            parsed_data.append(device_info)

        except Exception as e:
            logger.error(f"Error processing device {i + 1}: {str(e)}")
            errors_count += 1
            continue

    logger.info(f"Successfully parsed {len(parsed_data)} devices")
    if errors_count > 0:
        logger.warning(f"Encountered {errors_count} errors during parsing")

    return parsed_data


# Initialize the enhanced API client
try:


    logger.info("Initializing Axonius API client")

    client = AxoniusAPIClient(
        base_url=AXONIOUS_URL,
        api_key=API_KEY,
        api_secret=API_SECRET,
        max_retries=3,
        timeout=30
    )

    logger.info("API client initialized successfully")

except Exception as e:
    logger.error(f"Failed to initialize API client: {str(e)}")
    raise


# Enhanced usage example with comprehensive error handling
def main_workflow():
    """Main workflow with comprehensive error handling"""

    try:
        logger.info("Starting main workflow")

        # Get devices data
        logger.info("Fetching devices data...")
        devices_data = client.get_devices_with_services(limit=100)  # Start with smaller limit for testing

        # Parse devices data
        logger.info("Parsing devices data...")
        parsed_devices = parse_device_data(devices_data)

        if not parsed_devices:
            logger.warning("No devices were parsed successfully")
            return None

        # Convert to DataFrame
        logger.info("Converting to DataFrame...")
        df_devices = pd.DataFrame(parsed_devices)

        logger.info(f"DataFrame created with {len(df_devices)} rows and {len(df_devices.columns)} columns")
        logger.info(f"DataFrame columns: {list(df_devices.columns)}")

        # Display summary statistics
        logger.info("Data summary:")
        logger.info(f"  - Devices with hostnames: {df_devices['hostname'].notna().sum()}")
        logger.info(f"  - Devices with OS info: {df_devices['os_type'].notna().sum()}")
        logger.info(f"  - Average software per device: {df_devices['installed_software'].apply(len).mean():.2f}")
        logger.info(f"  - Average services per device: {df_devices['services'].apply(len).mean():.2f}")

        return df_devices

    except AxoniusAPIError as e:
        logger.error(f"Axonius API error: {e.message}")
        if e.status_code:
            logger.error(f"Status code: {e.status_code}")
        if e.response_text:
            logger.debug(f"Response text: {e.response_text}")
        return None

    except Exception as e:
        logger.error(f"Unexpected error in main workflow: {str(e)}")
        logger.exception("Full traceback:")
        return None


# Run the workflow
df_result = main_workflow()

if df_result is not None:
    logger.info("Workflow completed successfully")
    print("\nFirst 5 rows of results:")
    print(df_result.head())
else:
    logger.error("Workflow failed")


In [None]:
# Get Saved Queries from Axonius API

def get_first_10_saved_queries():
    """
    Connect to Axonius REST API and return the first 10 saved queries.

    Returns:
        list: First 10 saved queries from Axonius

    Raises:
        Exception: If API request fails or authentication issues occur
    """
    import requests
    import json

    # Use existing configuration
    headers = {
        'Api-Key': API_KEY,
        'Api-Secret': API_SECRET,
        'Content-Type': 'application/vnd.api+json',
        'Accept': 'application/vnd.api+json'
    }

    # Axonius saved queries endpoint
    url = f"{AXONIUS_URL}/api/v2/devices/saved_query"

    try:
        logger.info(f"Fetching saved queries from: {url}")

        # Make GET request to retrieve saved queries
        response = requests.get(url, headers=headers, timeout=30)

        # Check if request was successful
        if response.status_code == 200:
            data = response.json()

            # Extract queries from response
            if isinstance(data, dict) and 'data' in data:
                queries = data['data']
            elif isinstance(data, list):
                queries = data
            else:
                queries = [data] if data else []

            # Return first 10 queries
            first_10_queries = queries[:10]

            logger.info(f"Successfully retrieved {len(first_10_queries)} saved queries")

            return first_10_queries

        elif response.status_code == 401:
            raise Exception("Authentication failed - check your API key and secret")
        elif response.status_code == 403:
            raise Exception("Access forbidden - insufficient permissions for saved queries")
        elif response.status_code == 404:
            raise Exception("Saved queries endpoint not found - check API version")
        else:
            raise Exception(f"API request failed: {response.status_code} - {response.text}")

    except requests.exceptions.Timeout:
        raise Exception("Request timeout - API server took too long to respond")
    except requests.exceptions.ConnectionError:
        raise Exception("Connection error - unable to connect to Axonius API")
    except requests.exceptions.RequestException as e:
        raise Exception(f"Request error: {str(e)}")
    except json.JSONDecodeError:
        raise Exception("Invalid JSON response from API")
    except Exception as e:
        raise Exception(f"Unexpected error: {str(e)}")


# Execute the function to get saved queries
try:
    saved_queries = get_first_10_saved_queries()

    print(f"Retrieved {len(saved_queries)} saved queries:")
    print("-" * 50)

    for i, query in enumerate(saved_queries, 1):
        if isinstance(query, dict):
            # Extract relevant information from each query
            query_id = query.get('id', 'N/A')
            query_name = query.get('attributes', {}).get('name', 'Unnamed Query')
            query_description = query.get('attributes', {}).get('description', 'No description')

            print(f"{i}. Query ID: {query_id}")
            print(f"   Name: {query_name}")
            print(f"   Description: {query_description}")
            print()
        else:
            print(f"{i}. {query}")
            print()

except Exception as e:
    print(f"Error retrieving saved queries: {e}")
    print("Please check your API credentials and network connection.")


In [None]:
# Get Saved Queries from Axonius API

def get_first_10_saved_queries(query_type="devices"):
    """
    Connect to Axonius REST API and return the first 10 saved queries.

    Args:
        query_type (str): Type of queries to retrieve - "devices" or "users"

    Returns:
        list: First 10 saved queries from Axonius

    Raises:
        Exception: If API request fails or authentication issues occur
    """
    import requests
    import json

    # Validate query type
    if query_type not in ["devices", "users"]:
        raise ValueError("query_type must be 'devices' or 'users'")

    # Use existing configuration with correct headers for Axonius API
    headers = {
        'Api-Key': API_KEY,
        'Api-Secret': API_SECRET,
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }

    # Use the correct endpoint for exporting saved queries
    url = f"{AXONIUS_URL}/api/{query_type}/views/export"

    try:
        logger.info(f"Fetching saved {query_type} queries from: {url}")

        # Make GET request to retrieve saved queries
        response = requests.get(url, headers=headers, timeout=30)

        # Check if request was successful
        if response.status_code == 200:
            data = response.json()

            # Handle different response formats from the export endpoint
            queries = []

            if isinstance(data, dict):
                # If response contains saved queries in a specific structure
                if 'views' in data:
                    queries = data['views']
                elif 'queries' in data:
                    queries = data['queries']
                elif 'data' in data:
                    queries = data['data']
                else:
                    # If the entire response is query data
                    queries = [data]
            elif isinstance(data, list):
                queries = data

            # Return first 10 queries
            first_10_queries = queries[:10]

            logger.info(f"Successfully retrieved {len(first_10_queries)} saved {query_type} queries")

            return first_10_queries

        elif response.status_code == 401:
            raise Exception("Authentication failed - check your API key and secret")
        elif response.status_code == 403:
            raise Exception("Access forbidden - insufficient permissions for viewing saved queries")
        elif response.status_code == 404:
            raise Exception(f"Saved queries endpoint not found - {url}")
        else:
            raise Exception(f"API request failed: {response.status_code} - {response.text}")

    except requests.exceptions.Timeout:
        raise Exception("Request timeout - API server took too long to respond")
    except requests.exceptions.ConnectionError:
        raise Exception("Connection error - unable to connect to Axonius API")
    except requests.exceptions.RequestException as e:
        raise Exception(f"Request error: {str(e)}")
    except json.JSONDecodeError:
        raise Exception("Invalid JSON response from API")
    except Exception as e:
        raise Exception(f"Unexpected error: {str(e)}")


def display_saved_queries(queries, query_type="devices"):
    """
    Display saved queries in a formatted way

    Args:
        queries (list): List of saved queries
        query_type (str): Type of queries ("devices" or "users")
    """
    print(f"Retrieved {len(queries)} saved {query_type} queries:")
    print("=" * 60)

    for i, query in enumerate(queries, 1):
        print(f"{i}. {'-' * 40}")

        if isinstance(query, dict):
            # Extract relevant information from each query
            query_name = query.get('name', query.get('title', 'Unnamed Query'))
            query_description = query.get('description', 'No description available')
            query_id = query.get('id', query.get('uuid', 'N/A'))
            created_by = query.get('created_by', query.get('author', 'Unknown'))
            created_date = query.get('created_date', query.get('timestamp', 'Unknown'))

            print(f"   Name: {query_name}")
            print(f"   ID: {query_id}")
            print(f"   Description: {query_description}")
            print(f"   Created by: {created_by}")
            print(f"   Created: {created_date}")

            # Show query details if available
            if 'filter' in query:
                print(f"   Filter: {query['filter']}")
            if 'fields' in query:
                fields_count = len(query['fields']) if isinstance(query['fields'], list) else 'N/A'
                print(f"   Fields: {fields_count}")

        else:
            print(f"   {query}")

        print()


# Execute the function to get saved device queries
try:
    print("Fetching Device Saved Queries...")
    device_queries = get_first_10_saved_queries("devices")
    display_saved_queries(device_queries, "devices")

except Exception as e:
    print(f"Error retrieving device queries: {e}")
    print("Please check your API credentials and network connection.")

# Execute the function to get saved user queries
try:
    print("\nFetching User Saved Queries...")
    user_queries = get_first_10_saved_queries("users")
    display_saved_queries(user_queries, "users")

except Exception as e:
    print(f"Error retrieving user queries: {e}")
    print("Please check your API credentials and network connection.")


# Alternative approach: Get both types of queries
def get_all_saved_queries():
    """
    Get both device and user saved queries

    Returns:
        dict: Dictionary containing both device and user queries
    """
    results = {
        'devices': [],
        'users': []
    }

    for query_type in ['devices', 'users']:
        try:
            queries = get_first_10_saved_queries(query_type)
            results[query_type] = queries
            logger.info(f"Retrieved {len(queries)} {query_type} queries")
        except Exception as e:
            logger.error(f"Failed to retrieve {query_type} queries: {e}")
            results[query_type] = []

    return results


# Get all saved queries
all_queries = get_all_saved_queries()

print(f"\nSummary:")
print(f"Device queries: {len(all_queries['devices'])}")
print(f"User queries: {len(all_queries['users'])}")
print(f"Total queries: {len(all_queries['devices']) + len(all_queries['users'])}")


In [None]:
# WORKING SOLUTION: Get saved queries from Axonius API
import requests
import json
import pandas as pd

# Use your existing credentials
AXONIUS_URL = "https://aristocrat-us-0374d3735da01e8a.on.axonius.com"
API_KEY = "SK133FvhlLlztz08QoxILKOvptAgVL8HbhZdyztp9Ac"
API_SECRET = "Z1Fo1T4A04jnBiJVLPa1DlIVkjpy9jU2HmdoGWUfSC4"

def get_axonius_saved_queries(query_type="devices"):
    """
    Get saved queries from Axonius using the CORRECT endpoint

    Args:
        query_type: "devices" or "users"

    Returns:
        dict: Response containing saved queries
    """
    # Correct headers - same as what works for your device queries
    headers = {
        'Api-Key': API_KEY,
        'Api-Secret': API_SECRET,
        'Content-Type': 'application/vnd.api+json',
        'Accept': 'application/vnd.api+json'
    }

    # CORRECT endpoint for saved queries (called "views" in Axonius)
    url = f"{AXONIUS_URL}/api/{query_type}/views"

    print(f"Fetching saved {query_type} queries from: {url}")

    try:
        response = requests.get(url, headers=headers, timeout=30)

        print(f"Status Code: {response.status_code}")

        if response.status_code == 200:
            data = response.json()

            # Parse the response
            if isinstance(data, dict) and 'data' in data:
                queries = data['data']
            elif isinstance(data, list):
                queries = data
            else:
                queries = []

            print(f"✓ Success! Found {len(queries)} saved {query_type} queries")
            return queries

        else:
            print(f"✗ Failed: {response.status_code}")
            print(f"Response: {response.text[:500]}")
            return None

    except Exception as e:
        print(f"✗ Error: {str(e)}")
        return None

# Test device saved queries
print("=" * 80)
print("FETCHING DEVICE SAVED QUERIES")
print("=" * 80)
device_queries = get_axonius_saved_queries("devices")

if device_queries:
    print(f"\nFound {len(device_queries)} device queries:")
    for i, query in enumerate(device_queries[:5], 1):  # Show first 5
        if isinstance(query, dict):
            name = query.get('name', query.get('attributes', {}).get('name', 'Unknown'))
            query_id = query.get('id', query.get('uuid', 'N/A'))
            print(f"{i}. {name} (ID: {query_id})")

# Test user saved queries
print("\n" + "=" * 80)
print("FETCHING USER SAVED QUERIES")
print("=" * 80)
user_queries = get_axonius_saved_queries("users")

if user_queries:
    print(f"\nFound {len(user_queries)} user queries:")
    for i, query in enumerate(user_queries[:5], 1):  # Show first 5
        if isinstance(query, dict):
            name = query.get('name', query.get('attributes', {}).get('name', 'Unknown'))
            query_id = query.get('id', query.get('uuid', 'N/A'))
            print(f"{i}. {name} (ID: {query_id})")

# If views endpoint doesn't work, try alternative endpoints
if not device_queries and not user_queries:
    print("\n" + "=" * 80)
    print("TRYING ALTERNATIVE ENDPOINTS")
    print("=" * 80)

    alternative_endpoints = [
        "/api/devices/saved_queries",
        "/api/v1/devices/views",
        "/api/saved_queries/devices",
        "/api/queries/devices"
    ]

    for endpoint in alternative_endpoints:
        url = f"{AXONIOUS_URL}{endpoint}"
        print(f"\nTrying: {url}")

        try:
            response = requests.get(url, headers=headers, timeout=10)
            print(f"Status: {response.status_code}")

            if response.status_code == 200:
                print("✓ This endpoint works!")
                data = response.json()
                print(f"Response preview: {str(data)[:200]}...")
                break
        except Exception as e:
            print(f"✗ Error: {str(e)[:100]}")