Skip to content

Commit

Permalink
Livestatus\Connection: switch to new fetch methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-Gelf committed Nov 16, 2014
1 parent efd395e commit fd55ffe
Showing 1 changed file with 57 additions and 54 deletions.
111 changes: 57 additions & 54 deletions library/Icinga/Protocol/Livestatus/Connection.php
Expand Up @@ -32,6 +32,13 @@ class Connection
const TYPE_UNIX = 1;
const TYPE_TCP = 2;

protected $bytesRead = 0;
protected $responseSize;
protected $status;
protected $headers;

// List of available Livestatus tables. Kept here as we otherwise get no
// useful error message
protected $available_tables = array(
'hosts', // hosts
'services', // services, joined with all data from hosts
Expand Down Expand Up @@ -110,9 +117,10 @@ public function count(Query $query)
{
return 100;
$count = clone($query);
$count->count();
// WTF? $count->count();
Benchmark::measure('Sending Livestatus Count Query');
$data = $this->doFetch((string) $count);
$this->execute($query);
$data = $this->fetchRowFromSocket();
Benchmark::measure('Got Livestatus count result');
return $data[0][0];
}
Expand Down Expand Up @@ -163,21 +171,26 @@ public function fetchPairs(Query $query)
public function fetchAll(Query $query)
{
Benchmark::measure('Sending Livestatus Query');
$data = $this->doFetch((string) $query);
$this->execute($query);
Benchmark::measure('Got Livestatus Data');

if ($query->hasColumns()) {
$headers = $query->getColumnAliases();
} else {
// TODO: left this here, find out how to handle it better
die('F*** no data');
$headers = array_shift($data);
}
$result = array();
foreach ($data as $row) {
$result_row = & $result[];
$result_row = (object) array();
foreach ($row as $key => $val) {
$result_row->{$headers[$key]} = $val;
}
$filter = $query->filterIsSupported() ? null : $query->getFilter();

while ($row = $this->fetchRowFromSocket()) {
$r = new ResponseRow($row, $query);
$res = $query->resultRow($row);
if ($filter !== null && ! $filter->matches($res)) continue;
$result[] = $res;
}

if ($query->hasOrder()) {
usort($result, array($query, 'compare'));
}
Expand All @@ -193,59 +206,49 @@ public function fetchAll(Query $query)
return $result;
}

protected function doFetch($raw_query)
protected function hasBeenExecuted()
{
$conn = $this->getConnection();
$this->writeToSocket($raw_query);
$header = $this->readFromSocket(16);
$status = (int) substr($header, 0, 3);
$length = (int) trim(substr($header, 4));
$body = $this->readFromSocket($length);
if ($status !== 200) {
throw new IcingaException(
'Problem while reading %d bytes from livestatus: %s',
$length,
$body
);
}
$result = json_decode($body);
if ($result === null) {
throw new IcingaException('Got invalid response body from livestatus');
}

return $result;
return $this->status !== null;
}

protected function readFromSocket($length)
protected function execute($query)
{
$offset = 0;
$buffer = '';

while ($offset < $length) {
$data = socket_read($this->connection, $length - $offset);
if ($data === false) {
throw new IcingaException(
'Failed to read from livestatus socket: %s',
socket_strerror(socket_last_error($this->connection))
);
}
$size = strlen($data);
$offset += $size;
$buffer .= $data;
// Reset state
$this->status = null;
$this->responseSize = null;
$this->bytesRead = 0;

if ($size === 0) {
break;
}
$raw = $query->toString();

Benchmark::measure($raw);

// "debug"
// echo $raw . "\n<br>";
$this->writeToSocket($raw);
$header = $this->readLineFromSocket();

if (! preg_match('~^(\d{3})\s\s*(\d+)$~', $header, $m)) {
$this->disconnect();
throw new Exception(
sprintf('Got invalid header. First 16 Bytes: %s', $header)
);
}
if ($offset !== $length) {
throw new IcingaException(
'Got only %d instead of %d bytes from livestatus socket',
$offset,
$length
$this->status = (int) $m[1];
$this->bytesRead = 0;
$this->responseSize = (int) $m[2];
if ($this->status !== 200) {
// "debug"
//die(var_export($raw, 1));
throw new Exception(
sprintf(
'Error %d while querying livestatus: %s %s',
$this->status,
$raw,
$this->readLineFromSocket()
)
);
}

return $buffer;
$this->discoverColumnHeaders($query);
}

protected function discoverColumnHeaders($query)
Expand Down

0 comments on commit fd55ffe

Please sign in to comment.