Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebuilt Splunk using custom library #40123

Merged
merged 9 commits into from Sep 20, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Expand Up @@ -806,6 +806,7 @@ omit =
homeassistant/components/spc/*
homeassistant/components/speedtestdotnet/*
homeassistant/components/spider/*
homeassistant/components/splunk/*
homeassistant/components/spotcrime/sensor.py
homeassistant/components/spotify/__init__.py
homeassistant/components/spotify/media_player.py
Expand Down
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -403,6 +403,7 @@ homeassistant/components/songpal/* @rytilahti @shenxn
homeassistant/components/spaceapi/* @fabaff
homeassistant/components/speedtestdotnet/* @rohankapoorcom @engrbm87
homeassistant/components/spider/* @peternijssen
homeassistant/components/splunk/* @Bre77
homeassistant/components/spotify/* @frenck
homeassistant/components/sql/* @dgomes
homeassistant/components/squeezebox/* @rajlaud
Expand Down
147 changes: 109 additions & 38 deletions homeassistant/components/splunk/__init__.py
@@ -1,9 +1,12 @@
"""Support to send data to an Splunk instance."""
"""Support to send data to a Splunk instance."""
import asyncio
from collections import deque
import json
import logging
import time

from aiohttp.hdrs import AUTHORIZATION
import requests
from requests import exceptions as request_exceptions
from splunk_data_sender import SplunkSender
import voluptuous as vol

from homeassistant.const import (
Expand All @@ -22,8 +25,10 @@

_LOGGER = logging.getLogger(__name__)

CONF_FILTER = "filter"
DOMAIN = "splunk"
CONF_FILTER = "filter"
SPLUNK_ENDPOINT = "collector/event"
SPLUNK_SIZE_LIMIT = 102400 # 100KB, Actual limit is 512KB

DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8088
Expand All @@ -48,23 +53,7 @@
)


def post_request(event_collector, body, headers, verify_ssl):
"""Post request to Splunk."""
try:
payload = {"host": event_collector, "event": body}
requests.post(
event_collector,
data=json.dumps(payload, cls=JSONEncoder),
headers=headers,
timeout=10,
verify=verify_ssl,
)

except requests.exceptions.RequestException as error:
_LOGGER.exception("Error saving event to Splunk: %s", error)


def setup(hass, config):
async def async_setup(hass, config):
"""Set up the Splunk component."""
conf = config[DOMAIN]
host = conf.get(CONF_HOST)
Expand All @@ -75,16 +64,50 @@ def setup(hass, config):
name = conf.get(CONF_NAME)
entity_filter = conf[CONF_FILTER]

if use_ssl:
uri_scheme = "https://"
else:
uri_scheme = "http://"

event_collector = f"{uri_scheme}{host}:{port}/services/collector/event"
headers = {AUTHORIZATION: f"Splunk {token}"}
event_collector = SplunkSender(
host=host,
port=port,
token=token,
hostname=name,
protocol=["http", "https"][use_ssl],
verify=(use_ssl and verify_ssl),
api_url=SPLUNK_ENDPOINT,
retry_count=1,
)

payload = {
"time": time.time(),
"host": name,
"event": {
"domain": DOMAIN,
"meta": "Splunk integration has started",
},
}

def splunk_event_listener(event):
try:
event_collector._send_to_splunk( # pylint: disable=protected-access
Bre77 marked this conversation as resolved.
Show resolved Hide resolved
"send-event", json.dumps(payload)
)
except request_exceptions.HTTPError as err:
if err.response.status_code in (401, 403):
_LOGGER.error("Invalid or disabled token")
return False
_LOGGER.warning(err)
except (
request_exceptions.Timeout,
request_exceptions.ConnectionError,
request_exceptions.TooManyRedirects,
json.decoder.JSONDecodeError,
) as err:
_LOGGER.warning(err)

batch = deque()
lock = asyncio.Lock()

async def splunk_event_listener(event):
"""Listen for new messages on the bus and sends them to Splunk."""
nonlocal batch

state = event.data.get("new_state")

if state is None or not entity_filter(state.entity_id):
Expand All @@ -95,19 +118,67 @@ def splunk_event_listener(event):
except ValueError:
_state = state.state

json_body = [
{
payload = {
"time": event.time_fired.timestamp(),
"host": name,
"event": {
"domain": state.domain,
"entity_id": state.object_id,
"attributes": dict(state.attributes),
"time": str(event.time_fired),
"value": _state,
"host": name,
}
]

post_request(event_collector, json_body, headers, verify_ssl)
},
}
batch.append(json.dumps(payload, cls=JSONEncoder))

hass.bus.listen(EVENT_STATE_CHANGED, splunk_event_listener)
# Enforce only one loop is running
if lock.locked():
return
async with lock:
# Run until there are no new events to sent
while batch:
size = len(batch[0])
events = deque()
# Do Until loop to get events until maximum payload size or no more events
# Ensures at least 1 event is always sent even if it exceeds the size limit
while True:
# Add first event
events.append(batch.popleft())
# Stop if no more events
if not batch:
break
# Add size of next event
size += len(batch[0])
# Stop if next event exceeds limit
if size > SPLUNK_SIZE_LIMIT:
break
_LOGGER.info(
Bre77 marked this conversation as resolved.
Show resolved Hide resolved
"Sending %s of %s events to Splunk",
len(events),
len(events) + len(batch),
)
# Send the selected events
try:
await hass.async_add_executor_job(
event_collector._send_to_splunk, # pylint: disable=protected-access
"send-event",
"\n".join(events),
)
except (
request_exceptions.HTTPError,
request_exceptions.Timeout,
request_exceptions.ConnectionError,
request_exceptions.TooManyRedirects,
) as err:
_LOGGER.warning(err)
# Requeue failed events
batch = events + batch
break
except json.decoder.JSONDecodeError:
_LOGGER.warning("Unexpected response")
# Requeue failed events
batch = events + batch
break

hass.bus.async_listen(EVENT_STATE_CHANGED, splunk_event_listener)

return True
7 changes: 6 additions & 1 deletion homeassistant/components/splunk/manifest.json
Expand Up @@ -2,5 +2,10 @@
"domain": "splunk",
"name": "Splunk",
"documentation": "https://www.home-assistant.io/integrations/splunk",
"codeowners": []
"requirements": [
"splunk-data-sender==0.0.7"
],
"codeowners": [
"@Bre77"
]
}
3 changes: 3 additions & 0 deletions requirements_all.txt
Expand Up @@ -2052,6 +2052,9 @@ speedtest-cli==2.1.2
# homeassistant.components.spider
spiderpy==1.3.1

# homeassistant.components.splunk
splunk-data-sender==0.0.7

# homeassistant.components.spotcrime
spotcrime==1.0.4

Expand Down
1 change: 0 additions & 1 deletion tests/components/splunk/__init__.py

This file was deleted.