Skip to content

Commit

Permalink
working on the k-means
Browse files Browse the repository at this point in the history
  • Loading branch information
crodas committed May 19, 2009
1 parent d60592d commit 8a98cd7
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 15 deletions.
101 changes: 90 additions & 11 deletions sample.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

final class PrepareCluster extends Job
{
function __construct()
function __hadoop_init()
{
define("MIN_WORD_LENGTH", 3);
define("MIN_WORD_FREQ", 3);
define("WORD_MATRIX_X", 20000);
}

function map($value)
Expand Down Expand Up @@ -59,7 +60,7 @@ function reduce($key, $values)
/* some calculations */
$tmp['sum'] = array_sum($val);
$tmp['seq'] = array_sum(array_map(array(&$this, "_pearsonpow"), $val));
$tmp['den'] = $tmp['seq'] - pow($tmp['sum'], 2) / 10000;
$tmp['den'] = $tmp['seq'] - pow($tmp['sum'], 2) / WORD_MATRIX_X;

$values = '';
foreach ($val as $word => $count) {
Expand All @@ -72,9 +73,9 @@ function reduce($key, $values)

final class InitCluster extends Job
{
function __construct()
function __hadoop_init()
{
define("KMEANS", 10000);
define("KMEANS", 5000);

}

Expand All @@ -90,49 +91,127 @@ function reduce($key, $value)
{
static $i=0;
if ($i++ < KMEANS) {
$this->Emit($key, $value[0]);
$this->Emit($i, $value[0]);
}
}
}

final class ClusterIterator extends Job
{
function __construct()
private $_centroids;
private $_centroids_id;
function __hadoop_init()
{
hadoop::setHome("/home/crodas/hadoop/hadoop-0.18.3");
define("WORD_MATRIX_X", 20000);
$centroids = array();

$fp = Hadoop::getFile("noticias/step1/part-00000");
while ($r = fgets($fp)) {
list($id, $content) = explode("\t", $r, 2);
$centroids[$id] = $this->_getObject($content);
}
$this->_centroids = & $centroids;
$this->_centroids_id = array_keys($centroids);
fclose($fp);
}

private function _getObject($line)
{
$obj = new stdClass;

list($s, $seq, $den, $extra) = explode("|", $line, 4);
$obj->sum = $s;
$obj->seq = $seq;
$obj->den = $den;
$obj->words = & $words;
$words = array();
foreach (explode(":", $extra) as $word) {
if (trim($word) == "") {
continue;
}
list($w, $c) = explode(",", $word, 2);
$words[$w] = $c;
}

return $obj;
}

private function _pearson($obj, &$centroid)
{
$pSum = 0;

$w = & $centroids->words;
foreach ($obj->words as $word=>$cnt) {
if (isset($w[$word])) {
$pSum += $cnt * $w[$word];
}
}

$num = $pSum - ($obj->sum * $centroid->sum / WORD_MATRIX_X);
$den = sqrt($obj->den * $centroid->den);
if ($den == 0) {
return 0;
}
return 1-($num/$den);
}

function map($line)
{
$centroids = & $this->_centroids;
$centroids_id = & $this->_centroids_id;

list($key, $content) = explode("\t", $line, 2);
$word = $this->_getObject($content);
$bmatch = 2;
$best = -1;

foreach ($centroids_id as $id) {
$dist = $this->_pearson($word, $centroids[$id]);
if ($dist < $bmatch) {
$bmatch = $dist;
$best = $id;
}
}

fwrite(STDERR, "$key $best $bmatch\n");
if ($bmatch < 0.6) {
fwrite(STDERR, "$key $best\n");
$this->EmitIntermediate($key, $best);
}


}

function reduce($key, $value)
{
$this->Emit($key, $value[0]);
}
}

hadoop::setHome("/home/crodas/hadoop/hadoop-0.18.3");

$hadoop = new Hadoop;
/* create an invert index for fast computation */
$hadoop->setHome("/home/crodas/hadoop/hadoop-0.18.3");
$hadoop->setInput("noticias");
$hadoop->setOutput("noticias/init");
$hadoop->setJob(new PrepareCluster);
$hadoop->setNumberOfReduces(4);
$hadoop->Run();
//$hadoop->Run();


$hadoop->setInput("noticias/init");
$hadoop->setOutput("noticias/step1");
$hadoop->setJob(new InitCluster);
$hadoop->setNumberOfReduces(1);
$hadoop->Run();
die();
//$hadoop->Run();

for($i=1; ;$i++) {
$hadoop->setInput("noticias/init");
$hadoop->setOutput("noticias/ite-$i");
$hadoop->setNumberOfReduces(5);
$hadoop->setNumberOfReduces(1);
$hadoop->setJob(new ClusterIterator);
$hadoop->Run();
break;
}
?>
20 changes: 16 additions & 4 deletions src/hadoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ final class Hadoop
{
private $_ipath;
private $_opath;
private $_path;
private static $_path;
private $_jar = "contrib/streaming/hadoop-0.18.3-streaming.jar";
private $_tmp;
private $_id;
Expand All @@ -29,6 +29,18 @@ static function import($file)
include(dirname(__FILE__)."/$file");
}

static function getFile($file)
{
$home = self::$_path;
$tmp = tmpfile();
$p = popen("${home}/bin/hadoop fs -cat ${file}", "r");
while ($r = fread($p,1024)) {
fwrite($tmp, $r);
}
fclose($p);
rewind($tmp);
return $tmp;
}

function setNumberOfReduces($number)
{
Expand All @@ -51,7 +63,7 @@ function Run()
@unlink($this->_getFileName("reduce"));
}

function setHome($file)
public static function setHome($file)
{
if (!is_dir($file)) {
return False;
Expand All @@ -63,7 +75,7 @@ function setHome($file)
if ($file[strlen($file)-1] != "/") {
$file .= "/";
}
$this->_path = $file;
self::$_path = $file;
return True;
}

Expand Down Expand Up @@ -115,7 +127,7 @@ private function _getCmd()
$jarpath = $this->_jar;
$ipath = $this->_ipath;
$opath = $this->_opath;
$path = $this->_path;
$path = self::$_path;

$cmd = sprintf("%sbin/hadoop jar %s -input %s -output %s -mapper %s -reducer %s -jobconf mapred.reduce.tasks=%d -file %s -file %s -file %s",
$path, $path.$jarpath, $ipath, $opath,
Expand Down
2 changes: 2 additions & 0 deletions src/job.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ final function Emit($key, $value)

final function RunMap()
{
$this->__hadoop_init();
while (($line = fgets(STDIN)) !== false) {
$line = substr($line, 0, strlen($line)-1);
if (strlen($line) == 0) {
Expand All @@ -26,6 +27,7 @@ final function RunMap()

final function RunReduce()
{
$this->__hadoop_init();
$values = new parray();

while (($line = fgets(STDIN)) !== false) {
Expand Down

0 comments on commit 8a98cd7

Please sign in to comment.