<H3>Loading libraries and creating spark context</H3>

In [1]:
import os
import sys
import operator

from operator import add

if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = " C:\Spark\spark-1.6.1-bin-hadoop2.6"
    
sys.path.append("C:\Spark\spark-1.6.1-bin-hadoop2.6/python")
sys.path.append("C:\Spark\spark-1.6.1-bin-hadoop2.6/python/build")
sys.path.append("C:\Spark\spark-1.6.1-bin-hadoop2.6\python\pyspark")
sys.path.append("C:\Spark\spark-1.6.1-bin-hadoop2.6\python \lib\py4j-0.9-src.zip")
sys.path.append("C:\Spark\spark-1.6.0-bin-hadoop2.4\python\lib\pyspark.zip")
sys.path.append("C:\Spark\spark-1.6.0-bin-hadoop2.4\python\lib")

import pyspark

sc = pyspark.SparkContext('local')

<H3>Creating SQL Context</H3>

In [2]:
from pyspark.sql import Row
from pyspark import SQLContext
sqlContext = SQLContext(sc)

<H3>Verfiy that spark is working. If it isn't working, this will throw an assertion error</H3>

In [3]:
data = [('Alice', 1), ('Bob', 2), ('Bill', 4)]
df = sqlContext.createDataFrame(data, ['name', 'age'])
fil = df.filter(df.age > 3).collect()
print fil
assert fil == [Row(u'Bill', 4)]

[Row(name=u'Bill', age=4)]


<H3>Let's load a text file</H3>
<H4>The text file contains the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) retrieved from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page).</H4>

In [4]:
import os.path
baseDir = os.path.join('Datasets')
inputPath = os.path.join('pg100.txt')
fileName = os.path.join(baseDir, inputPath)

<H3>Count the number of words in the text file</H3>

In [5]:
dataDF = sqlContext.read.text(fileName)
shakespeareCount = dataDF.count()
print shakespeareCount

124787


<H3>Let's check the attributes of sqlContext</H3>

In [6]:
dir(sqlContext)

['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_createFromLocal',
 '_createFromRDD',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedContext',
 '_jsc',
 '_jvm',
 '_sc',
 '_scala_SQLContext',
 '_ssql_ctx',
 'applySchema',
 'cacheTable',
 'clearCache',
 'createDataFrame',
 'createExternalTable',
 'dropTempTable',
 'getConf',
 'getOrCreate',
 'inferSchema',
 'jsonFile',
 'jsonRDD',
 'load',
 'newSession',
 'parquetFile',
 'range',
 'read',
 'registerDataFrameAsTable',
 'registerFunction',
 'setConf',
 'sql',
 'table',
 'tableNames',
 'tables',
 'udf',
 'uncacheTable']

<H3>Use sc.version to see what version of Spark we are running</H3>

In [7]:
sc.version

u'1.6.1'

<H2>Working with your first DataFrames</H2>
<H4>In Spark, we first create a base DataFrame. We can then apply one or more transformations to that base DataFrame. A DataFrame is immutable, so once it is created, it cannot be changed. As a result, each transformation creates a new DataFrame. Finally, we can apply one or more actions to the DataFrames. Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.</H4>

<H3>We will use the fake-factory python library to create a collection of fake person records.</H3>

In [8]:
from faker import Factory
fake = Factory.create()
fake.seed(4321)

<H3>We're going to use this factory to create a collection of randomly generated people records.</H3> 
<H3>In the next section, we'll turn that collection into a DataFrame. We'll use the Spark Row class, because that will help us define the Spark DataFrame schema.</H3>

In [9]:
from pyspark.sql import Row
def fake_entry():
    name = fake.name().split()
    return Row(name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

In [10]:
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)

In [11]:
data = list(repeat(10000, fake_entry))

<H3>The object 'data' is just a normal Python list, containing Spark SQL Row objects. Let's look at the first item in the list:</H3>

In [12]:
data[0][0], data[0][1], data[0][2], data[0][3], data[0][4]

(u'Harvey', u'Tracey', u'160-37-9051', 'Agricultural engineer', 39)

In [13]:
len(data)

10000

<H3>Let's create a dataframe</H3>

In [14]:
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))

In [15]:
print 'type of dataDF: {0}'.format(type(dataDF))

type of dataDF: <class 'pyspark.sql.dataframe.DataFrame'>


<H3>Let's take a look at the DataFrame's schema</H3>

In [16]:
dataDF.printSchema()

root
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- ssn: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: long (nullable = true)



<H3>Let's register the newly created DataFrame as a named table, using the registerDataFrameAsTable() method.</H3>

In [17]:
sqlContext.registerDataFrameAsTable(dataDF, 'dataframe')

In [18]:
help(dataDF)

Help on DataFrame in module pyspark.sql.dataframe object:

class DataFrame(__builtin__.object)
 |  A distributed collection of data grouped into named columns.
 |  
 |  A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
 |  and can be created using various functions in :class:`SQLContext`::
 |  
 |      people = sqlContext.read.parquet("...")
 |  
 |  Once created, it can be manipulated using the various domain-specific-language
 |  (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
 |  
 |  To select a column from the data frame, use the apply method::
 |  
 |      ageCol = people.age
 |  
 |  A more concrete example::
 |  
 |      # To create DataFrame using SQLContext
 |      people = sqlContext.read.parquet("...")
 |      department = sqlContext.read.parquet("...")
 |  
 |      people.filter(people.age > 30).join(department, people.deptId == department.id))           .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
 |  
 |  

<H3>How many partitions will the DataFrame be split into?</H3>

In [19]:
dataDF.rdd.getNumPartitions()

1

<H3>Let's add a couple transformations to our DataFrame and look at the query plan on the resulting transformed DataFrame. </H3>

In [20]:
newDF = dataDF.distinct().select('*')
newDF.explain(True)

== Parsed Logical Plan ==
'Project [*]
+- Aggregate [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], [last_name#7,first_name#8,ssn#9,occupation#10,age#11L]
   +- LogicalRDD [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], MapPartitionsRDD[19] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Analyzed Logical Plan ==
last_name: string, first_name: string, ssn: string, occupation: string, age: bigint
Project [last_name#7,first_name#8,ssn#9,occupation#10,age#11L]
+- Aggregate [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], [last_name#7,first_name#8,ssn#9,occupation#10,age#11L]
   +- LogicalRDD [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], MapPartitionsRDD[19] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Optimized Logical Plan ==
Aggregate [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], [last_name#7,first_name#8,ssn#9,occupation#10,age#11L]
+- LogicalRDD [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], MapP

<H3>Subtract one from each value using select</H3>

In [21]:
subDF = dataDF.select('last_name', 'first_name', 'ssn', 'occupation', (dataDF.age - 1).alias('age'))

<H3>Let's take a look at the query plan.</H3>

In [22]:
subDF.explain(True)

== Parsed Logical Plan ==
'Project [unresolvedalias('last_name),unresolvedalias('first_name),unresolvedalias('ssn),unresolvedalias('occupation),(age#11L - 1) AS age#12]
+- LogicalRDD [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], MapPartitionsRDD[19] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Analyzed Logical Plan ==
last_name: string, first_name: string, ssn: string, occupation: string, age: bigint
Project [last_name#7,first_name#8,ssn#9,occupation#10,(age#11L - cast(1 as bigint)) AS age#12L]
+- LogicalRDD [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], MapPartitionsRDD[19] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Optimized Logical Plan ==
Project [last_name#7,first_name#8,ssn#9,occupation#10,(age#11L - 1) AS age#12L]
+- LogicalRDD [last_name#7,first_name#8,ssn#9,occupation#10,age#11L], MapPartitionsRDD[19] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2

== Physical Plan ==
Project [last_name#7,first_name#

<H3>Use collect to view results</H3>

In [23]:
results = subDF.collect()
print results

[Row(last_name=u'Harvey', first_name=u'Tracey', ssn=u'160-37-9051', occupation=u'Agricultural engineer', age=38), Row(last_name=u'Green', first_name=u'Isabel', ssn=u'361-94-4342', occupation=u'Teacher, primary school', age=26), Row(last_name=u'Lewis', first_name=u'Tammy', ssn=u'769-27-5887', occupation=u'Scientific laboratory technician', age=20), Row(last_name=u'Cunningham', first_name=u'Kathleen', ssn=u'175-24-7915', occupation=u'Geophysicist/field seismologist', age=41), Row(last_name=u'Marquez', first_name=u'Jorge', ssn=u'310-69-7326', occupation=u'Forensic psychologist', age=25), Row(last_name=u'Summers', first_name=u'Beth', ssn=u'099-90-9730', occupation=u'Best boy', age=42), Row(last_name=u'Jessica', first_name=u'Mrs.', ssn=u'476-06-5497', occupation=u'English as a foreign language teacher', age=42), Row(last_name=u'Turner', first_name=u'Diana', ssn=u'722-09-8354', occupation=u'Psychologist, prison and probation services', age=6), Row(last_name=u'Johnson', first_name=u'David', s

<H3>A better way to visualize the data is to use the 'show' method.</H3>

In [24]:
subDF.show()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|    Harvey|    Tracey|160-37-9051|Agricultural engi...| 38|
|     Green|    Isabel|361-94-4342|Teacher, primary ...| 26|
|     Lewis|     Tammy|769-27-5887|Scientific labora...| 20|
|Cunningham|  Kathleen|175-24-7915|Geophysicist/fiel...| 41|
|   Marquez|     Jorge|310-69-7326|Forensic psycholo...| 25|
|   Summers|      Beth|099-90-9730|            Best boy| 42|
|   Jessica|      Mrs.|476-06-5497|English as a fore...| 42|
|    Turner|     Diana|722-09-8354|Psychologist, pri...|  6|
|   Johnson|     David|715-56-1708|     Sales executive|  5|
|     Lewis|   Melissa|123-48-8354|Engineer, broadca...| 16|
| Hernandez|  Benjamin|293-22-0265|Scientist, produc...| 28|
|     Dixon| Stephanie|041-23-3263|Building control ...| 23|
|       Kim|      Leah|725-61-1132|              Artist| 42|
|    Snyder|    Leslie|2

<H3>By default 'show' displays 20 rows. If you don't want to truncate the data, you can tell it not to.</H3>

In [25]:
subDF.show(n=30, truncate=False)

+----------+----------+-----------+-------------------------------------------+---+
|last_name |first_name|ssn        |occupation                                 |age|
+----------+----------+-----------+-------------------------------------------+---+
|Harvey    |Tracey    |160-37-9051|Agricultural engineer                      |38 |
|Green     |Isabel    |361-94-4342|Teacher, primary school                    |26 |
|Lewis     |Tammy     |769-27-5887|Scientific laboratory technician           |20 |
|Cunningham|Kathleen  |175-24-7915|Geophysicist/field seismologist            |41 |
|Marquez   |Jorge     |310-69-7326|Forensic psychologist                      |25 |
|Summers   |Beth      |099-90-9730|Best boy                                   |42 |
|Jessica   |Mrs.      |476-06-5497|English as a foreign language teacher      |42 |
|Turner    |Diana     |722-09-8354|Psychologist, prison and probation services|6  |
|Johnson   |David     |715-56-1708|Sales executive                          

<H3>Use count to get total.</H3>

In [26]:
print dataDF.count()
print subDF.count()

10000
10000


<H3>Apply transformation filter and view results with collect.</H3>

In [27]:
filteredDF = subDF.filter(subDF.age < 10)
filteredDF.show(truncate=False)
filteredDF.count()

+---------+----------+-----------+-------------------------------------------+---+
|last_name|first_name|ssn        |occupation                                 |age|
+---------+----------+-----------+-------------------------------------------+---+
|Turner   |Diana     |722-09-8354|Psychologist, prison and probation services|6  |
|Johnson  |David     |715-56-1708|Sales executive                            |5  |
|Andrade  |Briana    |386-07-6013|Social research officer, government        |6  |
|Arnold   |Heather   |737-44-0894|Economist                                  |7  |
|Troy     |Mr.       |363-83-5358|Hotel manager                              |8  |
|Kelly    |Tracy     |082-13-6448|Architectural technologist                 |8  |
|Jones    |Michelle  |412-91-9340|Engineer, drilling                         |1  |
|Church   |David     |370-59-5122|Museum education officer                   |6  |
|Olson    |Melissa   |209-27-9609|Surveyor, hydrographic                     |1  |
|Jon

2055

<H3>Python Lambda functions and User Defined Functions.</H3>

In [28]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
less_ten = udf(lambda s: s < 10, BooleanType())
lambdaDF = subDF.filter(less_ten(subDF.age))
lambdaDF.show()
lambdaDF.count()

+---------+----------+-----------+--------------------+---+
|last_name|first_name|        ssn|          occupation|age|
+---------+----------+-----------+--------------------+---+
|   Turner|     Diana|722-09-8354|Psychologist, pri...|  6|
|  Johnson|     David|715-56-1708|     Sales executive|  5|
|  Andrade|    Briana|386-07-6013|Social research o...|  6|
|   Arnold|   Heather|737-44-0894|           Economist|  7|
|     Troy|       Mr.|363-83-5358|       Hotel manager|  8|
|    Kelly|     Tracy|082-13-6448|Architectural tec...|  8|
|    Jones|  Michelle|412-91-9340|  Engineer, drilling|  1|
|   Church|     David|370-59-5122|Museum education ...|  6|
|    Olson|   Melissa|209-27-9609|Surveyor, hydrogr...|  1|
|    Jones|      Anne|824-64-4586|Operational resea...|  3|
|   Harris|     Carla|097-46-3461|Higher education ...|  5|
|    Wyatt|    Alicia|188-35-0575| Structural engineer|  3|
|    Clark|     Linda|841-82-4522|      Phytotherapist|  3|
|  Jackson|     David|571-22-1560|      

2055

<H3>Let's collect the even values less than 10.</H3>

In [29]:
even = udf(lambda s: s % 2 == 0, BooleanType())
evenDF = lambdaDF.filter(even(lambdaDF.age))
evenDF.show()
evenDF.count()

+---------+----------+-----------+--------------------+---+
|last_name|first_name|        ssn|          occupation|age|
+---------+----------+-----------+--------------------+---+
|   Turner|     Diana|722-09-8354|Psychologist, pri...|  6|
|  Andrade|    Briana|386-07-6013|Social research o...|  6|
|     Troy|       Mr.|363-83-5358|       Hotel manager|  8|
|    Kelly|     Tracy|082-13-6448|Architectural tec...|  8|
|   Church|     David|370-59-5122|Museum education ...|  6|
|  Jackson|     David|571-22-1560|        Sports coach|  4|
|Rodriguez|   Valerie|662-31-2283|       Oceanographer|  6|
|    Woods| Elizabeth|864-11-4296|           Herbalist|  2|
|  Clayton|     Paula|027-90-7296|Public house manager|  2|
|  Johnson|    Thomas|216-45-5965|      Teacher, music|  6|
|    Hayes|   Matthew|028-15-3538|Administrator, ed...|  8|
|  Rosales|    Pamela|391-64-8825|  Arts administrator|  2|
|   Mendez|     Jason|804-20-9778|Volunteer coordin...|  8|
|     Hill|      Eric|899-61-6918|Mainte

1003

<H3>Let's look at the first few entries to obtain a rough idea of what information is available.</H3>

In [30]:
print "first: {0}\n".format(filteredDF.first())
print "Four of them: {0}\n".format(filteredDF.take(4))

first: Row(last_name=u'Turner', first_name=u'Diana', ssn=u'722-09-8354', occupation=u'Psychologist, prison and probation services', age=6)

Four of them: [Row(last_name=u'Turner', first_name=u'Diana', ssn=u'722-09-8354', occupation=u'Psychologist, prison and probation services', age=6), Row(last_name=u'Johnson', first_name=u'David', ssn=u'715-56-1708', occupation=u'Sales executive', age=5), Row(last_name=u'Andrade', first_name=u'Briana', ssn=u'386-07-6013', occupation=u'Social research officer, government', age=6), Row(last_name=u'Arnold', first_name=u'Heather', ssn=u'737-44-0894', occupation=u'Economist', age=7)]



<H3>This looks better.</H3>

In [31]:
filteredDF.show(4)

+---------+----------+-----------+--------------------+---+
|last_name|first_name|        ssn|          occupation|age|
+---------+----------+-----------+--------------------+---+
|   Turner|     Diana|722-09-8354|Psychologist, pri...|  6|
|  Johnson|     David|715-56-1708|     Sales executive|  5|
|  Andrade|    Briana|386-07-6013|Social research o...|  6|
|   Arnold|   Heather|737-44-0894|           Economist|  7|
+---------+----------+-----------+--------------------+---+
only showing top 4 rows



<H3>Get the five oldest people in the list. To do that, sort by age in descending order.</H3>

In [32]:
dataDF.orderBy(dataDF.age.desc()).take(5)

[Row(last_name=u'Smith', first_name=u'Jessica', ssn=u'371-59-8543', occupation=u'Medical physicist', age=47),
 Row(last_name=u'Blankenship', first_name=u'Crystal', ssn=u'341-29-9523', occupation=u'Commercial/residential surveyor', age=47),
 Row(last_name=u'Meyer', first_name=u'Christine', ssn=u'803-59-5869', occupation=u'Early years teacher', age=47),
 Row(last_name=u'George', first_name=u'Wesley', ssn=u'622-72-1540', occupation=u'Therapist, art', age=47),
 Row(last_name=u'Davila', first_name=u'Steven', ssn=u'479-63-8770', occupation=u'Purchasing manager', age=47)]

In [33]:
dataDF.orderBy(dataDF.age.desc()).show(5)

+-----------+----------+-----------+--------------------+---+
|  last_name|first_name|        ssn|          occupation|age|
+-----------+----------+-----------+--------------------+---+
|      Smith|   Jessica|371-59-8543|   Medical physicist| 47|
|      Kelly|    Rachel|241-07-8262|Loss adjuster, ch...| 47|
|Blankenship|   Crystal|341-29-9523|Commercial/reside...| 47|
|      Meyer| Christine|803-59-5869| Early years teacher| 47|
|     George|    Wesley|622-72-1540|      Therapist, art| 47|
+-----------+----------+-----------+--------------------+---+
only showing top 5 rows



<H3>Let's reverse the sort order</H3>

In [34]:
dataDF.orderBy('age').show(5)

+---------+----------+-----------+--------------------+---+
|last_name|first_name|        ssn|          occupation|age|
+---------+----------+-----------+--------------------+---+
|   Harmon|  Charlene|633-88-8219|Geneticist, molec...|  1|
|Velasquez|  Margaret|197-26-4433|        Cartographer|  1|
|      Cox|   Melanie|844-43-5712|Nurse, learning d...|  1|
|   Hardin|  Benjamin|051-85-9778|  Engineer, drilling|  1|
|   Wright|   Gregory|482-67-9269|           Mudlogger|  1|
+---------+----------+-----------+--------------------+---+
only showing top 5 rows



<H3>Distinct and drop duplicates.</H3>

In [35]:
print dataDF.count()
print dataDF.distinct().count()

10000
10000


<H3>To demonstrate 'distinct', let's create a quick throwaway dataset.</H3>

In [36]:
tempDF = sqlContext.createDataFrame([("Joe", 1), ("Joe", 1), ("Anna", 15), ("Anna", 12), ("Ravi", 5)], ('name', 'score'))

In [37]:
tempDF.show()

+----+-----+
|name|score|
+----+-----+
| Joe|    1|
| Joe|    1|
|Anna|   15|
|Anna|   12|
|Ravi|    5|
+----+-----+



In [38]:
tempDF.distinct().show()

+----+-----+
|name|score|
+----+-----+
|Ravi|    5|
|Anna|   12|
|Anna|   15|
| Joe|    1|
+----+-----+



In [39]:
print dataDF.count()
print dataDF.dropDuplicates(['first_name', 'last_name']).count()

10000
9360


<H3>If we want to drop certain columns instead of selecting certain, we should use 'drop'.</H3>

In [40]:
dataDF.drop('occupation').drop('age').show()

+----------+----------+-----------+
| last_name|first_name|        ssn|
+----------+----------+-----------+
|    Harvey|    Tracey|160-37-9051|
|     Green|    Isabel|361-94-4342|
|     Lewis|     Tammy|769-27-5887|
|Cunningham|  Kathleen|175-24-7915|
|   Marquez|     Jorge|310-69-7326|
|   Summers|      Beth|099-90-9730|
|   Jessica|      Mrs.|476-06-5497|
|    Turner|     Diana|722-09-8354|
|   Johnson|     David|715-56-1708|
|     Lewis|   Melissa|123-48-8354|
| Hernandez|  Benjamin|293-22-0265|
|     Dixon| Stephanie|041-23-3263|
|       Kim|      Leah|725-61-1132|
|    Snyder|    Leslie|268-79-4330|
|    Ortega|    Joseph|077-96-8349|
|    Barnes|     Brian|061-88-1648|
|     Adams|      Eric|582-28-0099|
|   Andrade|    Briana|386-07-6013|
|     Weeks| Catherine|363-94-7993|
|     Tapia|   Michael|386-39-5490|
+----------+----------+-----------+
only showing top 20 rows



<H3>The transformation 'groupBy' allows you to perform aggregations on a DataFrame.</H3>
<H4>Unlike other DataFrame transformations, 'groupBy' does not return a DataFrame. Instead, it returns a special GroupedData object that contains various aggregation functions</H4>

In [41]:
dataDF.groupBy('occupation').count().show(truncate=False)

+------------------------------------+-----+
|occupation                          |count|
+------------------------------------+-----+
|Agricultural engineer               |9    |
|Operational researcher              |17   |
|Textile designer                    |11   |
|Public relations officer            |20   |
|Politician's assistant              |11   |
|Personal assistant                  |11   |
|Hotel manager                       |15   |
|Engineer, materials                 |23   |
|Waste management officer            |18   |
|Counselling psychologist            |16   |
|Geoscientist                        |18   |
|Therapist, music                    |19   |
|Outdoor activities/education manager|15   |
|Physiological scientist             |7    |
|Housing manager/officer             |8    |
|Theatre stage manager               |16   |
|Programmer, systems                 |14   |
|Fitness centre manager              |16   |
|Fish farm manager                   |12   |
|Musician 

In [42]:
dataDF.groupBy().avg('age').show(truncate=False)

+--------+
|avg(age)|
+--------+
|24.4669 |
+--------+



In [43]:
print "Maximum age: {0}".format(dataDF.groupBy().max('age').first()[0])
print "Minimum age: {0}".format(dataDF.groupBy().min('age').first()[0])

Maximum age: 47
Minimum age: 1


<H3>Sample returns a new DataFrame with a random sample of elements from the dataset.</H3>

In [44]:
sampledDF = dataDF.sample(withReplacement=False, fraction=0.10)
print sampledDF.count()
sampledDF.show()

946
+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|Cunningham|  Kathleen|175-24-7915|Geophysicist/fiel...| 42|
|   Summers|      Beth|099-90-9730|            Best boy| 43|
|    Barnes|     Brian|061-88-1648|Production assist...| 33|
|    Arnold|   Heather|737-44-0894|           Economist|  8|
|     Rojas|    Thomas|702-94-4924|           Press sub| 40|
|      Todd|      Cody|292-28-5631|Biomedical scientist| 46|
|   Chapman|   Chelsea|667-49-0913|Television produc...| 28|
|      Beck|     Bryan|665-91-5669| Production engineer| 11|
|     Smith|   Patrick|025-39-2755|    Financial trader| 39|
|     Salas| Elizabeth|376-51-7310|Designer, ceramic...| 21|
|      Buck|     Craig|227-70-6222| Engineer, materials| 23|
|      Pena|    Morgan|361-09-0070|Customer service ...| 33|
|    Carter|      Mark|873-85-4598| Scientist, forensic| 42|
|      Mann|     Tas

In [45]:
print dataDF.sample(withReplacement=False, fraction=0.05).count()

515


<H3>Caching DataFrames</H3>

In [46]:
filteredDF.cache()
print filteredDF.count()
print filteredDF.is_cached

2055
True


<H3>If we are done with the DataFrame we can unpersist it so that its memory can be reclaimed.</H3>

In [47]:
filteredDF.unpersist()
print filteredDF.is_cached

False


<H3>Use lambda functions instead of separately defined functions when their use improves readability and conciseness.</H3>

In [48]:
myUDF = udf(lambda v: v < 10)
subDF.filter(myUDF(subDF.age) == True)

DataFrame[last_name: string, first_name: string, ssn: string, occupation: string, age: bigint]

<H3>To make the expert coding style more readable, enclose the statement in parentheses and put each method, transformation, or action on a separate line.</H3>

In [49]:
from pyspark.sql.functions import *
(dataDF
 .filter(dataDF.age > 20)
 .select(concat(dataDF.first_name, lit(' '), dataDF.last_name), dataDF.occupation)
 .show(truncate=False)
 )

+------------------------------+--------------------------------------+
|concat(first_name, ,last_name)|occupation                            |
+------------------------------+--------------------------------------+
|Tracey Harvey                 |Agricultural engineer                 |
|Isabel Green                  |Teacher, primary school               |
|Tammy Lewis                   |Scientific laboratory technician      |
|Kathleen Cunningham           |Geophysicist/field seismologist       |
|Jorge Marquez                 |Forensic psychologist                 |
|Beth Summers                  |Best boy                              |
|Mrs. Jessica                  |English as a foreign language teacher |
|Benjamin Hernandez            |Scientist, product/process development|
|Stephanie Dixon               |Building control surveyor             |
|Leah Kim                      |Artist                                |
|Brian Barnes                  |Production assistant, television

<H2>Lab 1</H2>

<H3>We'll start by generating a base DataFrame by using a Python list of tuples and the sqlContext.createDataFrame method.</H3>

In [50]:
wordsDF = sqlContext.createDataFrame([('cat',), ('elephant',), ('rat',), ('rat',), ('cat', )], ['word'])
wordsDF.show()
print type(wordsDF)
wordsDF.printSchema()

+--------+
|    word|
+--------+
|     cat|
|elephant|
|     rat|
|     rat|
|     cat|
+--------+

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- word: string (nullable = true)



<H3>Use Data-Frame functions to add an 's'.</H3>

In [51]:
from pyspark.sql.functions import lit, concat, col

pluralDF = wordsDF.select(concat("word",lit("s")).alias("word"))
pluralDF.show()

+---------+
|     word|
+---------+
|     cats|
|elephants|
|     rats|
|     rats|
|     cats|
+---------+



<H3>Find the number of characters in each word.</H3>

In [52]:
from pyspark.sql.functions import length

pluralLengthsDF = pluralDF.withColumn('word', length(pluralDF.word))
pluralLengthsDF.show()

+----+
|word|
+----+
|   4|
|   9|
|   4|
|   4|
|   4|
+----+



<H3>Print all the dataframes and their column names</H3>

In [53]:
from spark_notebook_helpers import printDataFrames

printDataFrames(True)

df: ['name', 'age']
sampledDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
filteredDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
tempDF: ['name', 'score']
wordsDF: ['word']
newDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
evenDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
subDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
dataDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
lambdaDF: ['last_name', 'first_name', 'ssn', 'occupation', 'age']
pluralLengthsDF: ['word']
pluralDF: ['word']


<H3>Find the counts of words.</H3>

In [54]:
wordCountsDF = (wordsDF.groupBy('word').count())
wordCountsDF.show()

+--------+-----+
|    word|count|
+--------+-----+
|     cat|    2|
|     rat|    2|
|elephant|    1|
+--------+-----+



<H3>Find the mean number of occurrences of words in wordCountsDF.</H3>

In [55]:
averageCount = wordCountsDF.groupBy().avg('count').collect()[0][0]
print averageCount

1.66666666667


<H3>Define a function for word counting.</H3>
<H4>This function should take in a DataFrame that is a list of words like wordsDF and return a DataFrame that has all of the words and their associated counts.</H4>

In [56]:
def wordCount(wordListDF):
    return (wordListDF.groupBy('word').count())
wordCount(wordsDF).show()

+--------+-----+
|    word|count|
+--------+-----+
|     cat|    2|
|     rat|    2|
|elephant|    1|
+--------+-----+



<H3>Define a function that converts all text to lower case, removes punctuation, and removes leading and trailing spaces.</H3>

In [57]:
sentenceDF = sqlContext.createDataFrame([('Hi, you!',),
                                         (' No under_score!',),
                                         (' *      Remove punctuation then spaces  * ',)], ['sentence'])
sentenceDF.show(truncate=False)

+------------------------------------------+
|sentence                                  |
+------------------------------------------+
|Hi, you!                                  |
| No under_score!                          |
| *      Remove punctuation then spaces  * |
+------------------------------------------+



In [58]:
from pyspark.sql.functions import regexp_replace, trim, col, lower

def removePunctuation(column):
    return (trim(regexp_replace(lower(column), '[^0-9a-zA-Z ]', '')))

In [59]:
sentenceDF.show(truncate=False)
(sentenceDF
 .select(removePunctuation(col('sentence')))
 .show(truncate=False))

+------------------------------------------+
|sentence                                  |
+------------------------------------------+
|Hi, you!                                  |
| No under_score!                          |
| *      Remove punctuation then spaces  * |
+------------------------------------------+

+----------------------------------------------------+
|trim(regexp_replace(lower(sentence),[^0-9a-zA-Z ],))|
+----------------------------------------------------+
|hi you                                              |
|no underscore                                       |
|remove punctuation then spaces                      |
+----------------------------------------------------+



In [60]:
testPunctDF = sqlContext.createDataFrame([(" The Elephant's 4 cats. ",)])
testPunctDF.select(removePunctuation(col('_1'))).show()

+----------------------------------------------+
|trim(regexp_replace(lower(_1),[^0-9a-zA-Z ],))|
+----------------------------------------------+
|                          the elephants 4 cats|
+----------------------------------------------+



<H3>Apply the function to the text file containing Shakespeare's works</H3>

In [61]:
fileName = 'Datasets/pg100.txt'

shakespeareDF = sqlContext.read.text(fileName).select(removePunctuation(col('value')).alias('value'))
shakespeareDF.show(15, truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|the project gutenberg ebook of the complete works of william shakespeare by|
|william shakespeare                                                        |
|                                                                           |
|this ebook is for the use of anyone anywhere at no cost and with           |
|almost no restrictions whatsoever  you may copy it give it away or         |
|reuse it under the terms of the project gutenberg license included         |
|with this ebook or online at wwwgutenbergorg                               |
|                                                                           |
|this is a copyrighted project gutenberg ebook details below                |
|please follow the copyright guidelines in this file            

<H3>Words from lines</H3>
<H4>Apply a transformation that will split each 'sentence' in the DataFrame by its spaces and return a DataFrame with each word in its row</H4>

In [74]:
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import udf

shakeWordsDF = shakespeareDF.select(explode(split(shakespeareDF.value, " ").alias('word')).alias('word'))
null_value = udf(lambda s: len(s)!=0, BooleanType())
shakeWordsDF = shakeWordsDF.filter(null_value(shakeWordsDF.word))

In [69]:
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print shakeWordsDFCount

+-----------+
|       word|
+-----------+
|        the|
|    project|
|  gutenberg|
|      ebook|
|         of|
|        the|
|   complete|
|      works|
|         of|
|    william|
|shakespeare|
|         by|
|    william|
|shakespeare|
|       this|
|      ebook|
|         is|
|        for|
|        the|
|        use|
+-----------+
only showing top 20 rows

903705


<H3>Count the words and find top 15 frequent words</H3>

In [70]:
from pyspark.sql.functions import desc

topWordsAndCountsDF = shakeWordsDF.groupby('word').count().withColumnRenamed('count', 'counts')
topWordsAndCountsDF = topWordsAndCountsDF.orderBy(topWordsAndCountsDF.counts.desc())

In [71]:
topWordsAndCountsDF.show(15)

+----+------+
|word|counts|
+----+------+
| the| 27825|
| and| 26791|
|   i| 20681|
|  to| 19261|
|  of| 18289|
|   a| 14667|
| you| 13716|
|  my| 12481|
|that| 11135|
|  in| 11027|
|  is|  9621|
| not|  8745|
| for|  8261|
|with|  8046|
|  me|  7769|
+----+------+
only showing top 15 rows

