In [10]:
import sys
print("python version:\n{}\n".format(sys.version))

print("spark context version:\n{}".format(sc.version))

python version:
2.7.12 |Anaconda 4.1.1 (64-bit)| (default, Jul  2 2016, 17:42:40) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]

spark context version:
1.6.0


### Spark dataframes from python collections (lists) ###

In [43]:

data=[("Alice",1),("Bob",2)]
print("data:\n{}\n".format(data))

df=sqlContext.createDataFrame(data,["name","age"])
print("df:\n{}\n{}\n".format(df,df.take(10)))

df_age10=df.select(df.name,(df.age+10).alias("age10"))
print("df_age10:\n{}\n{}\n".format(df_age10,df_age10.take(10)))

df_drop_age=df.drop(df.age)
print("df:\n{}\n{}\n".format(df,df.take(10)))
print("df_drop_age:\n{}\n{}\n".format(df_drop_age,df_drop_age.take(10)))

## User Defined Function (UDF) Transformation 

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen=udf(lambda s: len(s),IntegerType())
df_slen=df_drop_age.select(slen(df_drop_age.name).alias("slen"))
print("df_slen:\n{}\n{}\n".format(df_slen,df_slen.take(10)))

## Other useful transformations
    ## filter(func)
    ## where(func)
    ## distinct()
    ## orderBy(*cols,**kw)
    ## sort(*cols,**kw)
    ## explode(col) # returns a new row for each element in the given array or map

doubled=udf(lambda d: d*2,IntegerType())
df_doubled=df.select(df.name,df.age,doubled(df.age).alias("age_doubled"))
print("df_doubled:\n{}\n{}\n".format(df_doubled,df_doubled.take(10)))

df_filtered=df_doubled.filter(df_doubled.age_doubled>3)
print("df_filtered:\n{}\n{}\n".format(df_filtered,df_filtered.take(10)))

data2=[("Alice",1),("Bob",2),("Bob",2)]
df2=sqlContext.createDataFrame(data2,["name","age"])
df2_distinct=df2.distinct()
print("df2:\n{}\n{}\n".format(df2,df2.take(10)))
print("df2_distinct:\n{}\n{}\n".format(df2_distinct,df2_distinct.take(10)))

df2_sorted=df2_distinct.sort(df2_distinct.age,ascending=False)
print("df2_sorted:\n{}\n{}\n".format(df2_sorted,df2_sorted.take(10)))

from pyspark.sql.functions import explode

data3=[pyspark.sql.Row(a=1,intlist=[1,2,3])]
df3=sqlContext.createDataFrame(data3)
df_explode=df3.select(explode(df3.intlist).alias("anInt"))
print("data3:\n{}\n".format(data3))
print("df3:\n{}\n{}\n".format(df3,df3.take(10)))
print("df_explode:\n{}\n{}\n".format(df_explode,df_explode.take(10)))


data:
[('Alice', 1), ('Bob', 2)]

df:
DataFrame[name: string, age: bigint]
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]

df_age10:
DataFrame[name: string, age10: bigint]
[Row(name=u'Alice', age10=11), Row(name=u'Bob', age10=12)]

df:
DataFrame[name: string, age: bigint]
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]

df_drop_age:
DataFrame[name: string]
[Row(name=u'Alice'), Row(name=u'Bob')]

df_slen:
DataFrame[slen: int]
[Row(slen=5), Row(slen=3)]

df_doubled:
DataFrame[name: string, age: bigint, age_doubled: int]
[Row(name=u'Alice', age=1, age_doubled=2), Row(name=u'Bob', age=2, age_doubled=4)]

df_filtered:
DataFrame[name: string, age: bigint, age_doubled: int]
[Row(name=u'Bob', age=2, age_doubled=4)]

df2:
DataFrame[name: string, age: bigint]
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2), Row(name=u'Bob', age=2)]

df2_distinct:
DataFrame[name: string, age: bigint]
[Row(name=u'Bob', age=2), Row(name=u'Alice', age=1)]

df2_sorted:
DataFrame[name: string, age: bigi

### GroupedData Transformations ###

+ groupBy(*cols)
+ GroupedData functions
    - agg(*exprs)
        + compute aggregates (avg,max,min,sum,or count) and returns a DataFrame
    - count()
    - avg(*args)

In [44]:
data=[("Alice",1,6),("Bob",2,8),("Alice",3,9),("Bob",4,7)]
df=sqlContext.createDataFrame(data,["name","age","grade"])
df_agg_count=df.groupBy(df.name).agg({"*":"count"})
df_groupBy_count=df.groupBy(df.name).count()
df_groupBy_avg=df.groupBy().avg()
df_groupBy_name_avg=df.groupBy(df.name).avg()
print("data:\n{}\n".format(data))
print("df:\n{}\n{}\n".format(df,df.take(10)))
print("df_agg_count:\n{}\n{}\n".format(df_agg_count,df_agg_count.take(10)))
print("df_groupBy_count:\n{}\n{}\n".format(df_groupBy_count,df_groupBy_count.take(10)))
print("df_groupBy_avg:\n{}\n{}\n".format(df_groupBy_avg,df_groupBy_avg.take(10)))
print("df_groupBy_name_avg:\n{}\n{}\n".format(df_groupBy_name_avg,df_groupBy_name_avg.take(10)))

data:
[('Alice', 1, 6), ('Bob', 2, 8), ('Alice', 3, 9), ('Bob', 4, 7)]

df:
DataFrame[name: string, age: bigint, grade: bigint]
[Row(name=u'Alice', age=1, grade=6), Row(name=u'Bob', age=2, grade=8), Row(name=u'Alice', age=3, grade=9), Row(name=u'Bob', age=4, grade=7)]

df_agg_count:
DataFrame[name: string, count(1): bigint]
[Row(name=u'Alice', count(1)=2), Row(name=u'Bob', count(1)=2)]

df_groupBy_count:
DataFrame[name: string, count: bigint]
[Row(name=u'Alice', count=2), Row(name=u'Bob', count=2)]

df_groupBy_avg:
DataFrame[avg(age): double, avg(grade): double]
[Row(avg(age)=2.5, avg(grade)=7.5)]

df_groupBy_name_avg:
DataFrame[name: string, avg(age): double, avg(grade): double]
[Row(name=u'Alice', avg(age)=2.0, avg(grade)=7.5), Row(name=u'Bob', avg(age)=3.0, avg(grade)=7.5)]



### Spark Actions ###

+ show(n,truncate)
    - prints the first n rows of the DataFrame
+ take(n)
    - returns the first n rows of a list of Row
+ collect()
    - returns all records of a list of Row
+ count()
    - count for DataFrame is an action, while for GroupedData it is a transformation
+ describe(*cols*)
    - Exploratory Data Analysis function that computes statistics (count,mean,stddev,min,max)

In [54]:
data=[("Alice",1),("Bob",2)]
df=sqlContext.createDataFrame(data,["name","age"])
print("collect():\n{}\n".format(df.collect()))
print("count():\n{}\n".format(df.count()))
print("take():\n{}\n".format(df.take(1)))
print("show():\n{}\n".format(df.show()))
df.describe().show()

collect():
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
|  Bob|  2|
+-----+---+

show():
None

count():
2

take():
[Row(name=u'Alice', age=1)]

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               1.5|
| stddev|0.7071067811865476|
|    min|                 1|
|    max|                 2|
+-------+------------------+



### Spark DataFrames using Python Pandas DataFrame ##

+ [Python Pandas](http://pandas.pydata.org/)

In [12]:
import pandas as pd
pdf=pd.DataFrame(data,columns=["name","age"])
print("pdf:\n{}\n".format(pdf))
df=sqlContext.createDataFrame(pdf)
print("df:\n{}\n{}".format(df,df.take(10)))

pdf:
    name  age
0  Alice    1
1    Bob    2

df:
DataFrame[name: string, age: bigint]
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]


### Spark dataframe using HDFS file ###

+ create README.md file in local dir.

+ put README.md into hdfs

    - $ hdfs dfs -ls /user/cloudera

    - $ hdfs dfs -mkdir /user/cloudera/edx

    - $ hdfs dfs -mkdir /user/cloudera/edx/intro-to-apache-spark

    - $ hdfs dfs -mkdir /user/cloudera/edx/intro-to-apache-spark/week1

    - $ hdfs dfs -put README.md /user/cloudera/edx/intro-to-apache-spark/week1

[Hadoop file system shell guide](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html)

In [14]:
df=sqlContext.read.text("/user/cloudera/edx/intro-to-apache-spark/week1/README.md")
print("df:\n{}\n{}".format(df,df.take(10)))

df:
DataFrame[value: string]
[Row(value=u'# Test BerkeleyX: CS105x Introduction to Apache Spark #'), Row(value=u''), Row(value=u'## Welcome to CS 105.1x and Course Objectives/Prequisites ##'), Row(value=u''), Row(value=u'### Requirements ###'), Row(value=u'+ Python 2.7'), Row(value=u'+ PySpark (the Python API for Apache Spark)'), Row(value=u'+ Spark SQL (the SQL API for Apache Spark)'), Row(value=u'+ [Python mini quiz](http://www.mypythonquiz.com/)'), Row(value=u'+ [Python mini course](http://ai.berkeley.edu/tutorial.html#PythonBasics)')]
