Uncomment this cell and run once if you need to install pyspark

In [2]:

#!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853649 sha256=eb9f49820d8b0d3ac1280353ddca6f9283259ada4e000e02f90f766bc79cebc0
  Stored in directory: c:\users\owner\appdata\local\pip\cache\wheels\9f\f5\07\7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1




Sometimes we need to set the python to be used

In [None]:
import os
os.environ['PYSPARK_PYTHON'] = 'python'

In [3]:
import pyspark
from pyspark import SparkContext

We configure our "SparkContext" at the start - e.g. configuration options to be passed to all worker/executor nodes

In [None]:
# NOTE - we're running in 'local' context,
# however this could be changed later to use a resource/cluster manager e.g.YARN:
# conf = pyspark.SparkConf().setMaster('yarn').setAppName('YarnSparkTest')

conf = pyspark.SparkConf().setMaster('local[*]')\
                          .setAppName('LocalSparkTest')

sc = SparkContext(conf=conf)

In [None]:
sc

Read the Macbeth file from the local filesystem and count the words on each line

In [None]:
# Update if the data file is somewhere else relative to the notebook
FILEPATH = ""

In [None]:
# Note - this file is not actually read here, this is a "lazy" operation, the file will be read when needed
# Note - the filename is missing a t "on purpose", we'll fix it in a moment!
localFileRdd = sc.textFile(FILEPATH + "Macbeth.txt")
localFileRdd

### Perform a transform on the RDD

In this case we pass each row (line of text) through a function that splits the line on spaces and gets the number of returned elements

In [None]:
# still nothing done, this is another "lazy" operation
wordsPerLineRdd = localFileRdd.map(lambda line: len(line.split()))

# this will mean that if we run more operations later then we'll have cached the RDD at this point
# so we won't re-run the entire read/transform
# Note: this is also "lazy"
wordsPerLineRdd.cache()

In [None]:
# Now as we want an aggregated number to print, all of the above operation will happen
# Note that the filename might have been "wrong" all along, it's only actually read at this point
print("Total word count: ", wordsPerLineRdd.sum())
print("Average words per line: ", wordsPerLineRdd.mean())

In [None]:
# Gather the first 5 rows from the cluster
wordsPerLineRdd.take(5)

In [None]:
# Collect the entire transformed RDD from the cluster
wordsPerLineRdd.collect()

If in a hadoop cluster, we may write the wordsPerLineRdd to a directory in HDFS

In [None]:
# If running in Hadoop, it may make sense to save the output RDD to a new text file
#wordsPerLineRdd.saveAsTextFile("hdfs://hadoop-master:9000/user/ec2-user/macbethWordsPerLine")