Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/pyspark/mllib/linalg/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def rows(self):
# on the Scala/Java side. Then we map each Row in the
# DataFrame back to an IndexedRow on this side.
rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
rows = rows_df.map(lambda row: IndexedRow(row[0], row[1]))
rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1]))
return rows

def numRows(self):
Expand Down Expand Up @@ -475,7 +475,7 @@ def entries(self):
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a MatrixEntry on this side.
entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
return entries

def numRows(self):
Expand Down Expand Up @@ -700,7 +700,7 @@ def blocks(self):
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a sub-matrix block on this side.
blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1]))
return blocks

@property
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ def test_infer_schema(self):
schema = df.schema
field = [f for f in schema.fields if f.name == "features"][0]
self.assertEqual(field.dataType, self.udt)
vectors = df.map(lambda p: p.features).collect()
vectors = df.rdd.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
if isinstance(v, SparseVector):
Expand Down Expand Up @@ -729,7 +729,7 @@ def test_infer_schema(self):
df = rdd.toDF()
schema = df.schema
self.assertTrue(schema.fields[1].dataType, self.udt)
matrices = df.map(lambda x: x._2).collect()
matrices = df.rdd.map(lambda x: x._2).collect()
self.assertEqual(len(matrices), 2)
for m in matrices:
if isinstance(m, DenseMatrix):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(self, sparkContext, sqlContext=None):
... 'from allTypes where b and i > 0').collect()
[Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \
dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
>>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
>>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
[(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]
"""
self._sc = sparkContext
Expand Down
42 changes: 2 additions & 40 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,44 +267,6 @@ def take(self, num):
self._jdf, num)
return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))

@ignore_unicode_prefix
@since(1.3)
def map(self, f):
""" Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`.

This is a shorthand for ``df.rdd.map()``.

>>> df.map(lambda p: p.name).collect()
[u'Alice', u'Bob']
"""
return self.rdd.map(f)

@ignore_unicode_prefix
@since(1.3)
def flatMap(self, f):
""" Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`,
and then flattening the results.

This is a shorthand for ``df.rdd.flatMap()``.

>>> df.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
"""
return self.rdd.flatMap(f)

@since(1.3)
def mapPartitions(self, f, preservesPartitioning=False):
"""Returns a new :class:`RDD` by applying the ``f`` function to each partition.

This is a shorthand for ``df.rdd.mapPartitions()``.

>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
>>> rdd.mapPartitions(f).sum()
4
"""
return self.rdd.mapPartitions(f, preservesPartitioning)

@since(1.3)
def foreach(self, f):
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
Expand All @@ -315,7 +277,7 @@ def foreach(self, f):
... print(person.name)
>>> df.foreach(f)
"""
return self.rdd.foreach(f)
self.rdd.foreach(f)

@since(1.3)
def foreachPartition(self, f):
Expand All @@ -328,7 +290,7 @@ def foreachPartition(self, f):
... print(person.name)
>>> df.foreachPartition(f)
"""
return self.rdd.foreachPartition(f)
self.rdd.foreachPartition(f)

@since(1.3)
def cache(self):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,10 +616,10 @@ def log(arg1, arg2=None):

If there is only one argument, then this takes the natural logarithm of the argument.

>>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect()
>>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']

>>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect()
>>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
"""
sc = SparkContext._active_spark_context
Expand Down
24 changes: 12 additions & 12 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def test_basic_functions(self):

def test_apply_schema_to_row(self):
df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
df2 = self.sqlCtx.createDataFrame(df.rdd.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())

rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x))
Expand Down Expand Up @@ -382,15 +382,15 @@ def test_serialize_nested_array_and_map(self):
self.assertEqual(1, row.l[0].a)
self.assertEqual("2", row.d["key"].d)

l = df.map(lambda x: x.l).first()
l = df.rdd.map(lambda x: x.l).first()
self.assertEqual(1, len(l))
self.assertEqual('s', l[0].b)

d = df.map(lambda x: x.d).first()
d = df.rdd.map(lambda x: x.d).first()
self.assertEqual(1, len(d))
self.assertEqual(1.0, d["key"].c)

row = df.map(lambda x: x.d["key"]).first()
row = df.rdd.map(lambda x: x.d["key"]).first()
self.assertEqual(1.0, row.c)
self.assertEqual("2", row.d)

Expand All @@ -399,16 +399,16 @@ def test_infer_schema(self):
Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")]
rdd = self.sc.parallelize(d)
df = self.sqlCtx.createDataFrame(rdd)
self.assertEqual([], df.map(lambda r: r.l).first())
self.assertEqual([None, ""], df.map(lambda r: r.s).collect())
self.assertEqual([], df.rdd.map(lambda r: r.l).first())
self.assertEqual([None, ""], df.rdd.map(lambda r: r.s).collect())
df.registerTempTable("test")
result = self.sqlCtx.sql("SELECT l[0].a from test where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])

df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0)
self.assertEqual(df.schema, df2.schema)
self.assertEqual({}, df2.map(lambda r: r.d).first())
self.assertEqual([None, ""], df2.map(lambda r: r.s).collect())
self.assertEqual({}, df2.rdd.map(lambda r: r.d).first())
self.assertEqual([None, ""], df2.rdd.map(lambda r: r.s).collect())
df2.registerTempTable("test2")
result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'")
self.assertEqual(1, result.head()[0])
Expand Down Expand Up @@ -462,8 +462,8 @@ def test_apply_schema(self):
StructField("list1", ArrayType(ByteType(), False), False),
StructField("null1", DoubleType(), True)])
df = self.sqlCtx.createDataFrame(rdd, schema)
results = df.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
results = df.rdd.map(lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1,
x.date1, x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
r = (127, -128, -32768, 32767, 2147483647, 1.0, date(2010, 1, 1),
datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
self.assertEqual(r, results.first())
Expand Down Expand Up @@ -570,15 +570,15 @@ def test_udf_with_udt(self):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT())
self.assertEqual(ExamplePoint(2.0, 3.0), df.select(udf2(df.point)).first()[0])

row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT())
Expand Down