<img align=left src="files/images/pyspark-pictures-dataframes-page1.svg" width=500 height=250 />

<img align=left src="files/images/pyspark-pictures-dataframes-page2.svg" width=500 height=500 />

# Click on an image to view the corresponding pyspark docs

In [113]:
import IPython
print("pyspark version:" + str(sc.version))
print("Ipython version:" + str(IPython.__version__))

pyspark version:1.6.1
Ipython version:3.2.0


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.agg">
<img align=left src="files/images/pyspark-pictures-dataframes-page3.svg" width=500 height=500 />
</a>

In [114]:
# agg
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.agg({"amt":"avg"})
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------------------+
|           avg(amt)|
+-------------------+
|0.20000000000000004|
+-------------------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.alias">
<img align=left src="files/images/pyspark-pictures-dataframes-page4.svg" width=750 height=750 />
</a>

In [115]:
# alias
from pyspark.sql.functions import col
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.alias('transactions')
x.show()
y.show()
y.select(col("transactions.to")).show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+
|   to|
+-----+
|  Bob|
|Carol|
| Dave|
+-----+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cache">
<img align=left src="files/images/pyspark-pictures-dataframes-page5.svg" width=750 height=750 />
</a>

In [116]:
# cache
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
print(x.count()) # first action materializes x in memory
print(x.count()) # later actions avoid IO overhead

3
3


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce">
<img align=left src="files/images/pyspark-pictures-dataframes-page6.svg" width=750 height=750 />
</a>

In [117]:
# coalesce
x_rdd = sc.parallelize([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)],2)
x = sqlContext.createDataFrame(x_rdd, ['from','to','amt'])
y = x.coalesce(numPartitions=1)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())

2
1


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.getNumPartitions">
<img align=left src="files/images/pyspark-pictures-dataframes-page7.svg" width=750 height=750 />
</a>

In [118]:
# collect
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.collect()
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.filter">
<img align=left src="files/images/pyspark-pictures-dataframes-page8.svg" width=500 height=500 />
</a>

In [119]:
# columns
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.columns
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

['from', 'to', 'amt']


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.distinct">
<img align=left src="files/images/pyspark-pictures-dataframes-page9.svg" width=500 height=500 />
</a>

In [120]:
# corr
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.corr(col1="amt",col2="fee")
x.show()
print(y)

+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.866025403784


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sample">
<img align=left src="files/images/pyspark-pictures-dataframes-page10.svg" width=500 height=500 />
</a>

In [121]:
# count
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
print(x.count())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

3


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.takeSample">
<img align=left src="files/images/pyspark-pictures-dataframes-page11.svg" width=500 height=500 />
</a>

In [122]:
# cov
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.cov(col1="amt",col2="fee")
x.show()
print(y)

+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.00095


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.union">
<img align=left src="files/images/pyspark-pictures-dataframes-page12.svg" width=500 height=500 />
</a>

In [123]:
# crosstab
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.crosstab(col1='from',col2='to')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------+----+-----+---+
|from_to|Dave|Carol|Bob|
+-------+----+-----+---+
|    Bob|   0|    1|  0|
|  Alice|   0|    0|  1|
|  Carol|   1|    0|  0|
+-------+----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.intersection">
<img align=left src="files/images/pyspark-pictures-dataframes-page13.svg" width=500 height=500 />
</a>

In [124]:
# cube
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Alice","Carol",0.2)], ['from','to','amt'])
y = x.cube('from','to')
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show() 
y.max().show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7f7a6341ed50>
+-----+-----+-------------------+
| from|   to|           sum(amt)|
+-----+-----+-------------------+
|Alice|Carol|                0.2|
|Alice|  Bob|                0.1|
|Alice| null|0.30000000000000004|
| null|Carol|                0.2|
| null|  Bob|                0.1|
| null| null|0.30000000000000004|
+-----+-----+-------------------+

+-----+-----+--------+
| from|   to|max(amt)|
+-----+-----+--------+
|Alice|Carol|     0.2|
|Alice|  Bob|     0.1|
|Alice| null|     0.2|
| null|Carol|     0.2|
| null|  Bob|     0.1|
| null| null|     0.2|
+-----+-----+--------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sortByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page14.svg" width=500 height=500 />
</a>

In [125]:
# describe
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.describe().show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------+-------------------+
|summary|                amt|
+-------+-------------------+
|  count|                  3|
|   mean|0.20000000000000004|
| stddev|0.09999999999999998|
|    min|                0.1|
|    max|                0.3|
+-------+-------------------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sortBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page15.svg" width=500 height=500 />
</a>

In [126]:
# distinct
import numpy as np
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.distinct()
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|Alice|  Bob|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.glom">
<img align=left src="files/images/pyspark-pictures-dataframes-page16.svg" width=500 height=500 />
</a>

In [127]:
# drop
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.drop('amt')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+
| from|   to|
+-----+-----+
|Alice|  Bob|
|  Bob|Carol|
|Carol| Dave|
+-----+-----+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.cartesian">
<img align=left src="files/images/pyspark-pictures-dataframes-page17.svg" width=500 height=500 />
</a>

In [128]:
# dropDuplicates / drop_duplicates
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Bob","Carol",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropDuplicates(subset=['from','to'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|  Bob|Carol|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Alice|  Bob|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.groupBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page18.svg" width=500 height=500 />
<

In [129]:
# dropna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropna(how='any',subset=['from','to'])
x.show()
y.show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
|  Bob|Carol| 0.2|
+-----+-----+----+

+----+-----+----+
|from|   to| amt|
+----+-----+----+
| Bob|Carol|null|
| Bob|Carol| 0.2|
+----+-----+----+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.pipe">
<img align=left src="files/images/pyspark-pictures-dataframes-page19.svg" width=500 height=500 />
</a>

In [130]:
# dtypes
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.dtypes
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[('from', 'string'), ('to', 'string'), ('amt', 'double')]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.foreach">
<img align=left src="files/images/pyspark-pictures-dataframes-page20.svg" width=500 height=500 />
</a>

In [131]:
# explain
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.explain(extended = True)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

== Parsed Logical Plan ==
LogicalRDD [from#873,to#874,amt#875], MapPartitionsRDD[1295] at applySchemaToPythonRDD at null:-1

== Analyzed Logical Plan ==
from: string, to: string, amt: double
LogicalRDD [from#873,to#874,amt#875], MapPartitionsRDD[1295] at applySchemaToPythonRDD at null:-1

== Optimized Logical Plan ==
LogicalRDD [from#873,to#874,amt#875], MapPartitionsRDD[1295] at applySchemaToPythonRDD at null:-1

== Physical Plan ==
Scan ExistingRDD[from#873,to#874,amt#875]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.foreachPartition">
<img align=left src="files/images/pyspark-pictures-dataframes-page21.svg" width=500 height=500 />
</a>

In [132]:
# fillna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3)], ['from','to','amt'])
y = x.fillna(value='unknown',subset=['from','to'])
x.show()
y.show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
+-----+-----+----+

+-------+-------+----+
|   from|     to| amt|
+-------+-------+----+
|unknown|    Bob| 0.1|
|    Bob|  Carol|null|
|  Carol|unknown| 0.3|
+-------+-------+----+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.collect">
<img align=left src="files/images/pyspark-pictures-dataframes-page22.svg" width=500 height=500 />
</a>

In [133]:
# filter
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.filter("amt > 0.1")
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.reduce">
<img align=left src="files/images/pyspark-pictures-dataframes-page23.svg" width=500 height=500 />
</a>

In [134]:
# first
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.first()
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

Row(from=u'Alice', to=u'Bob', amt=0.1)


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.fold">
<img align=left src="files/images/pyspark-pictures-dataframes-page24.svg" width=500 height=500 />
</a>

In [135]:
# flatMap
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.flatMap(lambda x: (x[0],x[2]))
x.show()
print(y)
y.collect()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

PythonRDD[1329] at RDD at PythonRDD.scala:43


[u'Alice', 0.1, u'Bob', 0.2, u'Carol', 0.3]

<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.aggregate">
<img align=left src="files/images/pyspark-pictures-dataframes-page25.svg" width=500 height=500 />
</a>

In [136]:
# foreach
from __future__ import print_function

def fappend(el,f):
    '''appends el to file f'''
    print(el,file=open(f, 'a+') )

x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
fn = './foreachExampleDataFrames.txt'
open(fn, 'w').close()  # clear the file
y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt

print(x.collect()) # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
    print (foreachExample.read())

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]
None
Row(from=u'Alice', to=u'Bob', amt=0.1)
Row(from=u'Bob', to=u'Carol', amt=0.2)
Row(from=u'Carol', to=u'Dave', amt=0.3)



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.max">
<img align=left src="files/images/pyspark-pictures-dataframes-page26.svg" width=500 height=500 />
</a>

In [137]:
# foreachPartition
from __future__ import print_function

def fappend(partition,f):
    '''append all elements in partition to file f'''
    print([el for el in partition],file=open(f, 'a+'))

x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
fn = './foreachExampleDataFrames.txt'
open(fn, 'w').close()  # clear the file
y = x.foreachPartition(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt

print(x.collect()) # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
    print (foreachExample.read())

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Carol', to=u'Dave', amt=0.3), Row(from=u'Bob', to=u'Carol', amt=0.2)]
None
[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Carol', to=u'Dave', amt=0.3)]
[Row(from=u'Bob', to=u'Carol', amt=0.2)]



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.min">
<img align=left src="files/images/pyspark-pictures-dataframes-page27.svg" width=500 height=500 />
</a>

In [138]:
# freqItems
x = sqlContext.createDataFrame([('Bob',"Carol",0.1), \
                                ("Alice","Dave",0.1), \
                                ("Alice","Bob",0.1), \
                                ("Alice","Bob",0.5), \
                                ("Carol","Bob",0.1)], \
                               ['from','to','amt'])
y = x.freqItems(cols=['from','amt'],support=0.8)
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.1|
|Alice| Dave|0.1|
|Alice|  Bob|0.1|
|Alice|  Bob|0.5|
|Carol|  Bob|0.1|
+-----+-----+---+

+--------------+-------------+
|from_freqItems|amt_freqItems|
+--------------+-------------+
|       [Alice]|        [0.1]|
+--------------+-------------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sum">
<img align=left src="files/images/pyspark-pictures-dataframes-page28.svg" width=500 height=500 />
</a>

In [139]:
# groupBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from')
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7f7a6334c490>


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.count">
<img align=left src="files/images/pyspark-pictures-dataframes-page29.svg" width=500 height=500 />
</a>

In [140]:
# groupBy(col1).avg(col2)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from').avg('amt')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-------------------+
| from|           avg(amt)|
+-----+-------------------+
|Carol|                0.3|
|Alice|0.15000000000000002|
+-----+-------------------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.histogram">
<img align=left src="files/images/pyspark-pictures-dataframes-page30.svg" width=500 height=500 />
</a>

In [141]:
# head
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.head(2)
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.mean">
<img align=left src="files/images/pyspark-pictures-dataframes-page31.svg" width=500 height=500 />
</a>

In [142]:
# intersect
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Alice",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.intersect(y)
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Alice|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+---+---+
| from| to|amt|
+-----+---+---+
|Alice|Bob|0.1|
+-----+---+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.variance">
<img align=left src="files/images/pyspark-pictures-dataframes-page32.svg" width=500 height=500 />
</a>

In [143]:
# isLocal
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.isLocal()
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

False


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.stdev">
<img align=left src="files/images/pyspark-pictures-dataframes-page33.svg" width=500 height=500 />
</a>

In [144]:
# join
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',20),("Bob",40),("Dave",80)], ['name','age'])
z = x.join(y,x.to == y.name,'inner')
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 40|
| Dave| 80|
+-----+---+

+-----+----+---+----+---+
| from|  to|amt|name|age|
+-----+----+---+----+---+
|Carol|Dave|0.3|Dave| 80|
|Alice| Bob|0.1| Bob| 40|
+-----+----+---+----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sampleStdev">
<img align=left src="files/images/pyspark-pictures-dataframes-page34.svg" width=500 height=500 />
</a>

In [145]:
# limit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.limit(2)
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sampleVariance">
<img align=left src="files/images/pyspark-pictures-dataframes-page35.svg" width=500 height=500 />
</a>

In [146]:
# map
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.map(lambda x: x.amt+1)
x.show()
print(y.collect())  # output is a rdd

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[1.1, 1.2, 1.3]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.countByValue">
<img align=left src="files/images/pyspark-pictures-dataframes-page36.svg" width=500 height=500 />
</a>

In [147]:
# mapPartitions
def amtsum(partition):
    '''sum the value in field amt'''
    yield sum([el.amt for el in partition])
    
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2)
y = x.mapPartitions(lambda p: amtsum(p))
x.show()
print(x.rdd.glom().collect()) # flatten elements on the same partition
print(y.collect())
print(y.glom().collect())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

[[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Carol', to=u'Dave', amt=0.3)], [Row(from=u'Bob', to=u'Carol', amt=0.2)]]
[0.4, 0.2]
[[0.4], [0.2]]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.top">
<img align=left src="files/images/pyspark-pictures-dataframes-page37.svg" width=500 height=500 />
</a>

In [148]:
# na
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.na  # returns an object for handling missing values, supports drop, fill, and replace methods
x.show()
print(y)
y.drop().show()
y.fill({'from':'unknown','to':'unknown','amt':0}).show()
y.fill(0).show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
|  Bob|Carol| 0.2|
+-----+-----+----+

<pyspark.sql.dataframe.DataFrameNaFunctions object at 0x7f7a6334cfd0>
+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+

+-------+-------+---+
|   from|     to|amt|
+-------+-------+---+
|unknown|    Bob|0.1|
|    Bob|  Carol|0.0|
|  Carol|unknown|0.3|
|    Bob|  Carol|0.2|
+-------+-------+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
| null|  Bob|0.1|
|  Bob|Carol|0.0|
|Carol| null|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.takeOrdered">
<img align=left src="files/images/pyspark-pictures-dataframes-page38.svg" width=500 height=500 />
</a>

In [149]:
# orderBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.orderBy(['from'],ascending=[False])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
|Alice|  Bob|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.take">
<img align=left src="files/images/pyspark-pictures-dataframes-page39.svg" width=500 height=500 />
</a>

In [150]:
# persist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.persist(storageLevel=StorageLevel(True,True,False,True,1)) # StorageLevel(useDisk,useMemory,useOffHeap,deserialized,replication=1)
x.show()
x.is_cached

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



True

<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.first">
<img align=left src="files/images/pyspark-pictures-dataframes-page40.svg" width=500 height=500 />
</a>

In [151]:
# printSchema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.printSchema()

root
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)
 |-- amt: double (nullable = true)



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.collectAsMap">
<img align=left src="files/images/pyspark-pictures-dataframes-page41.svg" width=500 height=500 />
</a>

In [152]:
# randomSplit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.randomSplit([0.5,0.5])
x.show()
y[0].show()
y[1].show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+----+---+
| from|  to|amt|
+-----+----+---+
|Alice| Bob|0.1|
|Carol|Dave|0.3|
+-----+----+---+

+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.keys">
<img align=left src="files/images/pyspark-pictures-dataframes-page42.svg" width=500 height=500 />
</a>

In [153]:
# rdd
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rdd
x.show()
print(y.collect())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.values">
<img align=left src="files/images/pyspark-pictures-dataframes-page43.svg" width=500 height=500 />
</a>

In [154]:
# registerTempTable
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.registerTempTable(name="TRANSACTIONS")
y = sqlContext.sql('SELECT * FROM TRANSACTIONS WHERE amt > 0.1')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.reduceByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page44.svg" width=500 height=500 />
</a>

In [155]:
# repartition
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.repartition(3)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())

1
3


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.reduceByKeyLocally">
<img align=left src="files/images/pyspark-pictures-dataframes-page45.svg" width=500 height=500 />
</a>

In [156]:
# replace
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.replace('Dave','David',['from','to'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|David|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.countByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page46.svg" width=500 height=500 />
</a>

In [157]:
# rollup
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rollup(['from','to'])
x.show()
y.sum().show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+------------------+
| from|   to|          sum(amt)|
+-----+-----+------------------+
|Alice|  Bob|               0.1|
|  Bob|Carol|               0.2|
|Alice| null|               0.1|
|Carol| Dave|               0.3|
|  Bob| null|               0.2|
|Carol| null|               0.3|
| null| null|0.6000000000000001|
+-----+-----+------------------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.join">
<img align=left src="files/images/pyspark-pictures-dataframes-page47.svg" width=500 height=500 />
</a>

In [158]:
# sample
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.sample(False,0.5)
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.leftOuterJoin">
<img align=left src="files/images/pyspark-pictures-dataframes-page48.svg" width=500 height=500 />
</a>

In [159]:
# sampleBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Alice","Alice",0.3), \
                               ('Alice',"Dave",0.4),("Bob","Bob",0.5),("Bob","Carol",0.6)], \
                                ['from','to','amt'])
y = x.sampleBy(col='from',fractions={'Alice':0.1,'Bob':1.0})
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Alice|Alice|0.3|
|Alice| Dave|0.4|
|  Bob|  Bob|0.5|
|  Bob|Carol|0.6|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|  Bob|0.5|
|  Bob|Carol|0.6|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.rightOuterJoin">
<img align=left src="files/images/pyspark-pictures-dataframes-page49.svg" width=500 height=500 />
</a>

In [160]:
# schema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.schema
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

StructType(List(StructField(from,StringType,true),StructField(to,StringType,true),StructField(amt,DoubleType,true)))


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.partitionBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page50.svg" width=500 height=500 />
</a>

In [161]:
# select
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.select(['from','amt'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+---+
| from|amt|
+-----+---+
|Alice|0.1|
|  Bob|0.2|
|Carol|0.3|
+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.combineByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page51.svg" width=500 height=500 />
</a>

In [162]:
# selectExpr
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.selectExpr(['substr(from,1,1)','amt+10'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----------------+----------+
|substr(from,1,1)|(amt + 10)|
+----------------+----------+
|               A|      10.1|
|               B|      10.2|
|               C|      10.3|
+----------------+----------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.aggregateByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page52.svg" width=500 height=500 />
</a>

In [163]:
# show
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.foldByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page53.svg" width=500 height=500 />
</a>

In [164]:
# sort
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.sort(['to'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Carol|Alice|0.3|
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.groupByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page54.svg" width=500 height=500 />
</a>

In [166]:
# sortWithinPartitions
x = sqlContext.createDataFrame([('Alice',"Bob",0.1,1),("Bob","Carol",0.2,2),("Carol","Alice",0.3,2)], \
                               ['from','to','amt','p_id']).repartition(2,'p_id')
y = x.sortWithinPartitions(['to'])
x.show()
y.show()
print(x.rdd.glom().collect()) # glom() flattens elements on the same partition
print(y.rdd.glom().collect())

+-----+-----+---+----+
| from|   to|amt|p_id|
+-----+-----+---+----+
|Alice|  Bob|0.1|   1|
|  Bob|Carol|0.2|   2|
|Carol|Alice|0.3|   2|
+-----+-----+---+----+

+-----+-----+---+----+
| from|   to|amt|p_id|
+-----+-----+---+----+
|Alice|  Bob|0.1|   1|
|Carol|Alice|0.3|   2|
|  Bob|Carol|0.2|   2|
+-----+-----+---+----+

[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2), Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2)]]
[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2), Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2)]]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.mapValues">
<img align=left src="files/images/pyspark-pictures-dataframes-page56.svg" width=500 height=500 />
</a>

In [167]:
# stat
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.stat.corr(col1="amt",col2="fee")
x.show()
print(y)

+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.866025403784


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.groupWith">
<img align=left src="files/images/pyspark-pictures-dataframes-page57.svg" width=500 height=500 />
</a>

In [168]:
# subtract
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.subtract(y)
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+----+---+
| from|  to|amt|
+-----+----+---+
|Carol|Dave|0.3|
+-----+----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.cogroup">
<img align=left src="files/images/pyspark-pictures-dataframes-page58.svg" width=500 height=500 />
</a>

In [169]:
# take
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.take(num=2)
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sampleByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page59.svg" width=500 height=500 />
</a>

In [170]:
# toDF
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toDF("seller","buyer","amt")
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+------+-----+---+
|seller|buyer|amt|
+------+-----+---+
| Alice|  Bob|0.1|
|   Bob|Carol|0.2|
| Carol| Dave|0.3|
+------+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.subtractByKey">
<img align=left src="files/images/pyspark-pictures-dataframes-page60.svg" width=500 height=500 />
</a>

In [171]:
# toJSON
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toJSON()
x.show()
print(y.collect())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[u'{"from":"Alice","to":"Bob","amt":0.1}', u'{"from":"Bob","to":"Carol","amt":0.2}', u'{"from":"Carol","to":"Dave","amt":0.3}']


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.subtract">
<img align=left src="files/images/pyspark-pictures-dataframes-page61.svg" width=500 height=500 />
</a>

In [172]:
# toPandas
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toPandas()
x.show()
y

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



Unnamed: 0,from,to,amt
0,Alice,Bob,0.1
1,Bob,Carol,0.2
2,Carol,Dave,0.3


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.keyBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page62.svg" width=500 height=500 />
</a>

In [173]:
# unionAll
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.unionAll(y)
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.repartition">
<img align=left src="files/images/pyspark-pictures-dataframes-page63.svg" width=500 height=500 />
</a>

In [174]:
# unpersist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
x.count()
x.unpersist()
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.coalesce">
<img align=left src="files/images/pyspark-pictures-dataframes-page64.svg" width=500 height=500 />
</a>

In [175]:
# where (filter)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.filter("amt > 0.1")
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.coalesce">
<img align=left src="files/images/pyspark-pictures-dataframes-page65.svg" width=500 height=500 />
</a>

In [176]:
# withColumn
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumn('conf',x.amt.isNotNull())
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+----+
| from|   to|amt|conf|
+-----+-----+---+----+
|Alice|  Bob|0.1|true|
|  Bob|Carol|0.2|true|
|Carol| Dave|0.3|true|
+-----+-----+---+----+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.coalesce">
<img align=left src="files/images/pyspark-pictures-dataframes-page66.svg" width=500 height=500 />
</a>

In [177]:
# withColumnRenamed
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumnRenamed('amt','amount')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+------+
| from|   to|amount|
+-----+-----+------+
|Alice|  Bob|   0.1|
|  Bob|Carol|   0.2|
|Carol| Dave|   0.3|
+-----+-----+------+



<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.coalesce">
<img align=left src="files/images/pyspark-pictures-dataframes-page67.svg" width=500 height=500 />
</a>

In [178]:
# write
import json
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.write.json('./dataframeWriteExample.json',mode='overwrite')
x.show()
# read the dataframe back in from file
sqlContext.read.json('./dataframeWriteExample.json').show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+---+-----+-----+
|amt| from|   to|
+---+-----+-----+
|0.1|Alice|  Bob|
|0.2|  Bob|Carol|
|0.3|Carol| Dave|
+---+-----+-----+

