Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Atbrox committed Jun 13, 2010
0 parents commit b5e64ae
Show file tree
Hide file tree
Showing 9 changed files with 884 additions and 0 deletions.
22 changes: 22 additions & 0 deletions README
@@ -0,0 +1,22 @@
Snabler - Parallel Algorithms in Python for Hadoop/Mapreduce

Contact opensource@atbrox.com for more info.

FAQ
1. Which Algorithms are currently implemented in Snabler?
(So far) A Parallel Machine Learning Classifier for Hadoop Streaming in Python.

2. Which Algorithms will be implemented?
There is a potential backlog of hadoop/mapreduce algorithms here

3. Why the name Snabler?
The word Snabler is the Danish and Norwegian plural of an Elephant's Trunk (e.g. the Hadoop elephant), and shapewise referring to Python and plurality referring to parallelism.

4. Is Snabler open source?
Yes, Snabler is an open source project with an Apache Licence 2.0

5. Who is behind and develops Snabler?
Atbrox - a startup company that develops cloud & search software - is behind Snabler.

6. How do I contribute to Snabler?
Implement an algorithm in Python for Hadoop Streaming with methods def map(key,value) and def reduce(key,values), see this for an example implementation
Empty file added psvm/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions psvm/create_mapper_input_data.py
@@ -0,0 +1,35 @@
import cPickle as pickle

def numerifyFeature(feature):
if feature == "?":
feature = 0.0
return float(feature)

def extractFeatures(example):
# remove running id and category
features = example.split(",")[1:-1]
return [numerifyFeature(f) for f in features]

def extractCategory(example, negCategory, posCategory):
category = example.split(",")[-1].strip()
if category == negCategory:
return -1.0
return 1.0

def maprunner(incsize=1):
# read incsize number of examples from iris
breastcancerfile = file('testdata/breastcancerwisconsin.data')
# just some mapping to -1 and +1
negative = [4]
positive = [2]

reduceinput = {}

for example in breastcancerfile:
# skip first and last feature (id + class)
features = extractFeatures(example)
category = extractCategory(example, negCategory="4", posCategory="2")
print "%s\t%s" % (str(category),",".join([str(f) for f in features]))

if __name__ == "__main__":
maprunner()
46 changes: 46 additions & 0 deletions psvm/mapper.py
@@ -0,0 +1,46 @@
#!/usr/bin/env python
# encoding: utf-8

import base64
import cPickle as pickle
import logging
import numpy

#import mrtools

from itertools import groupby
from operator import itemgetter
import sys

def read_input(file, separator="\t"):
for line in file:
yield line.rstrip().split(separator)

def run_mapper(map, separator="\t"):
data = read_input(sys.stdin,separator)
for (key,value) in data:
map(key,value)

def map(key, value):
trainingclasses = [float(i) for i in key.split(",")]
trainingfeatures = [float(i) for i in value.split(",")]

# if more than one class in classes, the features contains
# |classes| consecutive training example feature vectors
# so need to reshape it, to have 1 per row
numtrainingclasses = len(trainingclasses)
numtrainingfeatures = len(trainingfeatures)
numfeaturesperexample = numtrainingfeatures/numtrainingclasses

A = numpy.matrix(
numpy.reshape(numpy.array(trainingfeatures),
(numtrainingclasses,numfeaturesperexample)))
D = numpy.diag(trainingclasses)
e = numpy.matrix(numpy.ones(len(A)).reshape(len(A),1))
E = numpy.matrix(numpy.append(A,-e,axis=1))

value = base64.b64encode(pickle.dumps((E.T*E, E.T*D*e)))
print "outputkey\t%s" % ( value )

if __name__ == "__main__":
run_mapper(map)
22 changes: 22 additions & 0 deletions psvm/mrtools.py
@@ -0,0 +1,22 @@
# #!/usr/bin/env python
# encoding: utf-8

from itertools import groupby
from operator import itemgetter
import sys

def read_input(file, separator="\t"):
for line in file:
yield line.rstrip().split(separator)

def run_mapper(map, separator="\t"):
data = read_input(sys.stdin,separator)
for (key,value) in data:
map(key,value)

def run_reducer(reduce,separator="\t"):
data = read_input(sys.stdin,
separator)
for key, values in groupby(data, itemgetter(0)):
reduce(key, values)

45 changes: 45 additions & 0 deletions psvm/reducer.py
@@ -0,0 +1,45 @@
#!/usr/bin/env python
# encoding: utf-8

import base64
import cPickle as pickle
import logging
import numpy

#import mrtools

from itertools import groupby
from operator import itemgetter
import sys

def read_input(file, separator="\t"):
for line in file:
yield line.rstrip().split(separator)

def run_reducer(reduce,separator="\t"):
data = read_input(sys.stdin,
separator)
for key, values in groupby(data, itemgetter(0)):
reduce(key, values)


def reduce(key, values, mu=0.1):
sumETE = None
sumETDe = None

for _, value in values:
ETE, ETDe = pickle.loads(base64.b64decode(value))
if sumETE == None:
sumETE = numpy.matrix(numpy.eye(ETE.shape[1])/mu)
sumETE += ETE

if sumETDe == None:
sumETDe = ETDe
else:
sumETDe += ETDe

result = sumETE.I*sumETDe
print "%s\t%s" % (key, str(result.tolist()))

if __name__ == "__main__":
run_reducer(reduce)
3 changes: 3 additions & 0 deletions psvm/run_simple_test.sh
@@ -0,0 +1,3 @@
#!/bin/bash

python create_mapper_input_data.py | python mapper.py | python reducer.py

0 comments on commit b5e64ae

Please sign in to comment.