Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into countDistinctPartial
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Aug 19, 2014
2 parents 8ff6402 + 217b5e9 commit 2b46c4b
Show file tree
Hide file tree
Showing 42 changed files with 818 additions and 79 deletions.
32 changes: 0 additions & 32 deletions .travis.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
.toLowerCase + "-" + System.currentTimeMillis
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
Expand Down
2 changes: 2 additions & 0 deletions examples/src/main/python/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,5 @@ def update(i, vec, mat, ratings):
error = rmse(R, ms, us)
print "Iteration %d:" % i
print "\nRMSE: %5.4f\n" % error

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/cassandra_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@
output = cass_rdd.collect()
for (k, v) in output:
print (k, v)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/cassandra_outputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,5 @@
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/hbase_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/hbase_outputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ def closestPoint(p, centers):
kPoints[x] = y

print "Final centers: " + str(kPoints)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ def add(x, y):
w -= points.map(lambda m: gradient(m, w)).reduce(add)

print "Final w: " + str(w)

sc.stop()
60 changes: 60 additions & 0 deletions examples/src/main/python/mllib/correlations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Correlations using MLlib.
"""

import sys

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.util import MLUtils


if __name__ == "__main__":
if len(sys.argv) not in [1,2]:
print >> sys.stderr, "Usage: correlations (<file>)"
exit(-1)
sc = SparkContext(appName="PythonCorrelations")
if len(sys.argv) == 2:
filepath = sys.argv[1]
else:
filepath = 'data/mllib/sample_linear_regression_data.txt'
corrType = 'pearson'

points = MLUtils.loadLibSVMFile(sc, filepath)\
.map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))

print
print 'Summary of data file: ' + filepath
print '%d data points' % points.count()

# Statistics (correlations)
print
print 'Correlation (%s) between label and each feature' % corrType
print 'Feature\tCorrelation'
numFeatures = points.take(1)[0].features.size
labelRDD = points.map(lambda lp: lp.label)
for i in range(numFeatures):
featureRDD = points.map(lambda lp: lp.features[i])
corr = Statistics.corr(labelRDD, featureRDD, corrType)
print '%d\t%g' % (i, corr)
print

sc.stop()
9 changes: 8 additions & 1 deletion examples/src/main/python/mllib/decision_tree_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""
Decision tree classification and regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""

import numpy, os, sys
Expand Down Expand Up @@ -117,17 +119,22 @@ def usage():
if len(sys.argv) == 2:
dataPath = sys.argv[1]
if not os.path.isfile(dataPath):
sc.stop()
usage()
points = MLUtils.loadLibSVMFile(sc, dataPath)

# Re-index class labels if needed.
(reindexedData, origToNewLabels) = reindexClassLabels(points)

# Train a classifier.
model = DecisionTree.trainClassifier(reindexedData, numClasses=2)
categoricalFeaturesInfo={} # no categorical features
model = DecisionTree.trainClassifier(reindexedData, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
# Print learned tree and stats.
print "Trained DecisionTree for classification:"
print " Model numNodes: %d\n" % model.numNodes()
print " Model depth: %d\n" % model.depth()
print " Training accuracy: %g\n" % getAccuracy(model, reindexedData)
print model

sc.stop()
1 change: 1 addition & 0 deletions examples/src/main/python/mllib/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ def parseVector(line):
k = int(sys.argv[2])
model = KMeans.train(data, k)
print "Final centers: " + str(model.clusterCenters)
sc.stop()
1 change: 1 addition & 0 deletions examples/src/main/python/mllib/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ def parsePoint(line):
model = LogisticRegressionWithSGD.train(points, iterations)
print "Final weights: " + str(model.weights)
print "Final intercept: " + str(model.intercept)
sc.stop()
55 changes: 55 additions & 0 deletions examples/src/main/python/mllib/random_rdd_generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Randomly generated RDDs.
"""

import sys

from pyspark import SparkContext
from pyspark.mllib.random import RandomRDDs


if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
print >> sys.stderr, "Usage: random_rdd_generation"
exit(-1)

sc = SparkContext(appName="PythonRandomRDDGeneration")

numExamples = 10000 # number of examples to generate
fraction = 0.1 # fraction of data to sample

# Example: RandomRDDs.normalRDD
normalRDD = RandomRDDs.normalRDD(sc, numExamples)
print 'Generated RDD of %d examples sampled from the standard normal distribution'\
% normalRDD.count()
print ' First 5 samples:'
for sample in normalRDD.take(5):
print ' ' + str(sample)
print

# Example: RandomRDDs.normalVectorRDD
normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
print ' First 5 samples:'
for sample in normalVectorRDD.take(5):
print ' ' + str(sample)
print

sc.stop()
86 changes: 86 additions & 0 deletions examples/src/main/python/mllib/sampled_rdds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Randomly sampled RDDs.
"""

import sys

from pyspark import SparkContext
from pyspark.mllib.util import MLUtils


if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
exit(-1)
if len(sys.argv) == 2:
datapath = sys.argv[1]
else:
datapath = 'data/mllib/sample_binary_classification_data.txt'

sc = SparkContext(appName="PythonSampledRDDs")

fraction = 0.1 # fraction of data to sample

examples = MLUtils.loadLibSVMFile(sc, datapath)
numExamples = examples.count()
if numExamples == 0:
print >> sys.stderr, "Error: Data file had no samples to load."
exit(1)
print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)

# Example: RDD.sample() and RDD.takeSample()
expectedSampleSize = int(numExamples * fraction)
print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
% (fraction, expectedSampleSize)
sampledRDD = examples.sample(withReplacement = True, fraction = fraction)
print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize)
print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)

print

# Example: RDD.sampleByKey()
keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
print ' Keyed data using label (Int) as key ==> Orig'
# Count examples per label in original data.
keyCountsA = keyedRDD.countByKey()

# Subsample, and count examples per label in sampled data.
fractions = {}
for k in keyCountsA.keys():
fractions[k] = fraction
sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions)
keyCountsB = sampledByKeyRDD.countByKey()
sizeB = sum(keyCountsB.values())
print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
% sizeB

# Compare samples
print ' \tFractions of examples with key'
print 'Key\tOrig\tSample'
for k in sorted(keyCountsA.keys()):
fracA = keyCountsA[k] / float(numExamples)
if sizeB != 0:
fracB = keyCountsB.get(k, 0) / float(sizeB)
else:
fracB = 0
print '%d\t%g\t%g' % (k, fracA, fracB)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ def parseNeighbors(urls):
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print "%s has rank: %s." % (link, rank)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ def f(_):

count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@
output = sortedCount.collect()
for (num, unitcount) in output:
print num

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/transitive_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@ def generateGraph():
break

print "TC has %i edges" % tc.count()

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)

sc.stop()
Loading

0 comments on commit 2b46c4b

Please sign in to comment.