Using the [docs on the site](https://spark.apache.org/docs/latest/), to start at least. These docs seem to be focused on Spark SQL (even w/o the SQL syntax), perhaps because the recommended approach w/ Spark 2.0 is to use DataSet[Row] w/ pyspark, which is a DataFrame in Spark terminology?

In [1]:
import findspark
findspark.init()

import pyspark
import pyspark.sql # provides SparkSession, at least
import pyspark.sql.functions as sf

In [2]:
# we don't use the context at this point, but here's how we'd
# get it if we needed it
#sc = pyspark.SparkContext(appName="Intro")

In [3]:
spark = pyspark.sql.SparkSession.builder \
    .master("local") \
    .appName("Intro") \
    .getOrCreate()

In [4]:
textFile = spark.read.text("/opt/spark/README.md")

The textFile variable is a DataSet[Row] - it's not strongly typed. The docs call it a DataFrame to be consistent with pandas.

The following few lines get values from the DataFrame directly, by calling actions.

In [5]:
textFile.count()

103

In [6]:
textFile.first()

Row(value='# Apache Spark')

And this transforms the DataFrame to get a new DataFrame.

In [7]:
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
linesWithSpark.count()

20

In [8]:
linesWithSpark.first()

Row(value='# Apache Spark')

And chain together transformations and actions.

In [9]:
textFile.filter(textFile.value.contains("Spark")).count()

20

More complex stuff, like finding the line with the most words.

This part transforms the previous data frame into a new one with a line per row and a single column, called numWords, that has an integer with the count of the number of words in that row. The select function takes a Column object. The sf alias is to pyspark.sql.functions, which provides a ton of convenience methods for building a new Column from an existing Column.

In [10]:
textFile.select(sf.size(sf.split(textFile.value, "\s+")) \
                .name("numWords")).collect()[:3]

[Row(numWords=3), Row(numWords=1), Row(numWords=14)]

The agg function also takes a column, which here comes from max.

In [11]:
textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")) \
        .agg(sf.max(sf.col("numWords"))).collect()

[Row(max(numWords)=22)]

Here's an example of map reduce. The explode function takes each row from the file and returns a new row for each element, which is a word here because of the call to split. Then groupBy and count, I think, groups by the words - one row per word - and calculates the number of that word. 

I'm not sure where, exactly, the map reduce comes in... perhaps one perspective is that I don't need to know because the Spark SQL code is figuring it out for me? I might be able to use one of the visual diagnostics tools that come w/ Spark to see what the following code actually does?

In [12]:
wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+"))\
                             .name("word"))\
                     .groupBy("word").count()

In [13]:
wordCounts.collect()

[Row(word='online', count=1),
 Row(word='graphs', count=1),
 Row(word='["Parallel', count=1),
 Row(word='["Building', count=1),
 Row(word='thread', count=1),
 Row(word='documentation', count=3),
 Row(word='command,', count=2),
 Row(word='abbreviated', count=1),
 Row(word='overview', count=1),
 Row(word='rich', count=1),
 Row(word='set', count=2),
 Row(word='-DskipTests', count=1),
 Row(word='name', count=1),
 Row(word='page](http://spark.apache.org/documentation.html).', count=1),
 Row(word='["Specifying', count=1),
 Row(word='stream', count=1),
 Row(word='run:', count=1),
 Row(word='not', count=1),
 Row(word='programs', count=2),
 Row(word='tests', count=2),
 Row(word='./dev/run-tests', count=1),
 Row(word='will', count=1),
 Row(word='[run', count=1),
 Row(word='particular', count=2),
 Row(word='option', count=1),
 Row(word='Alternatively,', count=1),
 Row(word='by', count=1),
 Row(word='must', count=1),
 Row(word='using', count=5),
 Row(word='you', count=4),
 Row(word='MLlib', count=

In [14]:
linesWithSpark.cache()

DataFrame[value: string]

In [15]:
%time linesWithSpark.count()

CPU times: user 953 µs, sys: 1.5 ms, total: 2.45 ms
Wall time: 162 ms


20

In [16]:
%time linesWithSpark.count()

CPU times: user 731 µs, sys: 1.36 ms, total: 2.1 ms
Wall time: 50.7 ms


20

In [17]:
%time linesWithSpark.count()

CPU times: user 918 µs, sys: 1.47 ms, total: 2.39 ms
Wall time: 67.4 ms


20