Skip to content

Commit

Permalink
Merge pull request #708 from mabi/dev-cassandra
Browse files Browse the repository at this point in the history
address issue #706
  • Loading branch information
TrystanLea committed Feb 9, 2018
2 parents 39cfca2 + cfb6937 commit b39d714
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 21 deletions.
50 changes: 36 additions & 14 deletions Modules/feed/engine/CassandraEngine.php
Expand Up @@ -8,6 +8,7 @@
*/
class CassandraEngine
{
const ONE_TABLE_PER_FEED = false;
protected $cluster;
protected $session;
protected $log;
Expand Down Expand Up @@ -42,8 +43,8 @@ public function __construct($settings)
*/
public function create($feedid,$options)
{
$feedid = (int) $feedid;
$feedname = "feed_".trim($feedid)."";
$feedid = (int) $feedid;
$feedname = $this->feedtable($feedid);
$this->execCQL("CREATE TABLE IF NOT EXISTS $feedname (feed_id int, day int, time bigint, data float, PRIMARY KEY ((feed_id,day), time)) WITH CLUSTERING ORDER BY (time ASC)");
return true;
}
Expand All @@ -55,8 +56,11 @@ public function create($feedid,$options)
*/
public function delete($feedid)
{
$feedid = (int) $feedid;
$this->execCQL("DROP TABLE feed_".$feedid);
$feedid = (int) $feedid;
$feedname = $this->feedtableToDrop($feedid);
if($feedname){
$this->execCQL("DROP TABLE $feedname");
}
}

/**
Expand All @@ -66,7 +70,7 @@ public function delete($feedid)
*/
public function get_meta($feedid)
{
$feedid = (int) $feedid;
$feedid = (int) $feedid;
$meta = new stdClass();
$meta->id = $feedid;
$meta->start_time = 0;
Expand All @@ -84,7 +88,7 @@ public function get_meta($feedid)
public function get_feed_size($feedid)
{
$feedid = (int) $feedid;
$tablesize = 0; // FIXME
$tablesize = 0;
return $tablesize;
}

Expand All @@ -105,7 +109,7 @@ public function post($feedid,$time,$value,$arg=null)
$time = (int) $time;
$value = (float) $value;

$feedname = "feed_".trim($feedid)."";
$feedname = $this->feedtable($feedid);
$day = $this->unixtoday($time);

$this->execCQL("INSERT INTO $feedname(feed_id,day,time,data) VALUES($feedid,$day,$time,$value)");
Expand All @@ -124,7 +128,7 @@ public function update($feedid,$time,$value)
$time = (int) $time;
$value = (float) $value;

$feedname = "feed_".trim($feedid)."";
$feedname = $this->feedtable($feedid);
$day = $this->unixtoday($time);
$this->execCQL("UPDATE $feedname SET data = $value WHERE feed_id = $feedid AND day = $day AND time = $time");
return $value;
Expand All @@ -138,7 +142,7 @@ public function update($feedid,$time,$value)
public function lastvalue($feedid)
{
$feedid = (int) $feedid;
$feedname = "feed_".trim($feedid)."";
$feedname = $this->feedtable($feedid);

$result = $this->execCQL("SELECT max(day) AS max_day FROM $feedname WHERE feed_id=$feedid");
if ($result && count($result)>0){
Expand Down Expand Up @@ -173,7 +177,7 @@ public function get_data($feedid,$start,$end,$interval,$skipmissing,$limitinterv
$start = round($start/1000);
$end = round($end/1000);
$interval = intval($interval);
$feedname = "feed_$feedid";
$feedname = $this->feedtable($feedid);
// Minimum interval
if ($interval<1) $interval = 1;
// Maximum request size
Expand Down Expand Up @@ -203,8 +207,8 @@ public function get_data($feedid,$start,$end,$interval,$skipmissing,$limitinterv

public function export($feedid,$start)
{
$feedid = (int) $feedid;
$start = (int) $start;
$feedid = (int) $feedid;
$start = (int) $start;
$this->log->info("export($feedid,$start)");
// TODO implement
}
Expand All @@ -230,7 +234,7 @@ public function delete_data_point($feedid,$time)
$time = (int) $time;
$day = $this->unixtoday($time);

$feedname = "feed_".trim($feedid)."";
$feedname = $this->feedtable($feedid);
$this->execCQL("DELETE FROM $feedname WHERE feed_id = $feedid AND day = $day AND time = $time");
}

Expand All @@ -241,7 +245,7 @@ public function deletedatarange($feedid,$start,$end)
$end = intval($end/1000.0);
$day_range = range($this->unixtoday($start), $this->unixtoday($end));

$feedname = "feed_".trim($feedid)."";
$feedname = $this->feedtable($feedid);
$this->execCQL("DELETE FROM $feedname WHERE feed_id=$feedid AND day IN (".implode($day_range,',').") AND time >= $start AND time <= $end");
return true;
}
Expand All @@ -261,4 +265,22 @@ private function unixtoday($unixtime)
return floor($unixtime/86400);
}

private function feedtable($feedid)
{
$feedid = (int) $feedid;
if($this::ONE_TABLE_PER_FEED){
return "feed_".trim($feedid)."";
}
return "feed";
}

private function feedtableToDrop($feedid)
{
$feedid = (int) $feedid;
if($this::ONE_TABLE_PER_FEED){
return "feed_".trim($feedid)."";
}
return false;
}

}
5 changes: 4 additions & 1 deletion default.emonpi.settings.php
Expand Up @@ -41,7 +41,7 @@
//Engine::PHPTIMESERIES, // 2
//Engine::PHPFINA, // 5
Engine::PHPFIWA, // 6 PHPFIWA disabled for compatibility with Low-write mode
Engine::CASSANDRA // 10 Disabled by default, enable if you wish to use
Engine::CASSANDRA // 10 Apache Cassandra disabled by default for emonpi, enable if you wish to use
),

// Redis Low-write mode
Expand All @@ -63,6 +63,9 @@
),
'phptimeseries'=>array(
'datadir' => '/home/pi/data/phptimeseries/'
),
'cassandra'=>array(
'keyspace' => 'emoncms'
)
);

Expand Down
1 change: 1 addition & 0 deletions default.settings.php
Expand Up @@ -41,6 +41,7 @@
//Engine::PHPTIMESERIES // 2
//,Engine::PHPFINA // 5
//,Engine::PHPFIWA // 6
//,Engine::CASSANDRA // 10 Apache Cassandra
),

// Redis Low-write mode
Expand Down
39 changes: 34 additions & 5 deletions docs/Cassandra.md
@@ -1,9 +1,38 @@
# Cassandra
# Apache Cassandra

experimental Cassandra support
Experimental Apache Cassandra support.

This document describes how to enable the use of Apache Cassandra as a storage engine for feeds.

Cassandra is an open-source NoSQL distributed database, designed to handle a large amount of data across many nodes in a highly available cluster with no single point of failure.

Feed data is stored in a single table in a dedicated keyspace, the row key is the (feed_id,day) pair to limit row size i.e. PRIMARY KEY((feed_id,day), time))

The keyspace must be created by the user, the table is created by the engine.

## Install Cassandra

Install Cassandra following the instructions at http://cassandra.apache.org/download/

## Install Cassandra PHP Driver

Install Cassandra PHP Driver following the instructions at http://datastax.github.io/php-driver/ (depends on the DataStax C/C++ Driver for Apache Cassandra)

## Create a new keyspace

Create a new keyspace that will hold emoncms feed data using cqlsh

cqlsh> CREATE KEYSPACE emoncms WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

Change replication strategy and replication factor to suit your needs. The sample command is only for testing, do not use these settings in a multi datacenter distributed installation.

## Configure emoncms

Copy default.settings.php to settings.php if not done already. Configure the newly created keyspace in settings.php in the feedsettings section

'cassandra'=>array(
'keyspace' => 'emoncms'
)

create a new keyspace if needed and set it in settings.php
eg. CREATE KEYSPACE emoncms WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};

install Cassandra PHP Driver e.g. http://datastax.github.io/php-driver/ (depends on the DataStax C/C++ Driver for Apache Cassandra)

2 changes: 1 addition & 1 deletion process_settings.php
Expand Up @@ -63,7 +63,7 @@

$error_out = "";

if (!isset($config_file_version) || $config_file_version < 9) $error_out .= '<p>settings.php config file has new settings for this version. Copy default.settings.php to settings.php and modify the later.</p>';
if (!isset($config_file_version) || $config_file_version < 10) $error_out .= '<p>settings.php config file has new settings for this version. Copy default.settings.php to settings.php and modify the later.</p>';
if (!isset($username) || $username=="") $error_out .= '<p>missing setting: $username</p>';
if (!isset($password)) $error_out .= '<p>missing setting: $password</p>';
if (!isset($server) || $server=="") $error_out .= '<p>missing setting: $server</p>';
Expand Down

0 comments on commit b39d714

Please sign in to comment.