In [1]:
# Setup Spark SQL
# Note if running locally you need the JVM https://www.oracle.com/java/technologies/downloads/ 
# Consider running in https://colab.research.google.com/
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
     ---------------------------------------- 0.0/317.2 MB ? eta -:--:--
     --------------------------------------- 1.6/317.2 MB 11.9 MB/s eta 0:00:27
      -------------------------------------- 5.0/317.2 MB 15.9 MB/s eta 0:00:20
     - ------------------------------------- 8.7/317.2 MB 16.3 MB/s eta 0:00:19
     - ------------------------------------ 12.3/317.2 MB 16.8 MB/s eta 0:00:19
     -- ----------------------------------- 17.0/317.2 MB 18.2 MB/s eta 0:00:17
     -- ----------------------------------- 21.2/317.2 MB 18.4 MB/s eta 0:00:17
     --- ---------------------------------- 25.7/317.2 MB 18.7 MB/s eta 0:00:16
     --- ---------------------------------- 31.5/317.2 MB 20.2 MB/s eta 0:00:15
     ---- --------------------------------- 37.0/317.2 MB 21.0 MB/s eta 0:00:14
     ----- -------------------------------- 42.7/317.2 MB 21.6 MB/s eta 0:00:13
     ----- -------------------------------- 48.0/317.2 MB 22.0

  DEPRECATION: Building 'pyspark' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'pyspark'. Discussion can be found at https://github.com/pypa/pip/issues/6334


In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HelloWorld") \
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')

In [3]:
nums = sc.parallelize([1,2,3,4])
print(nums.map(lambda x: x*x).collect())

[1, 4, 9, 16]


In [4]:
# Download the Book of Mormon as a text file from Gutenburg
!curl -L https://ia601205.us.archive.org/18/items/thebookofmormon00017gut/mormon13.txt > bookOfMormon.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
 21 1551k   21  333k    0     0   302k      0  0:00:05  0:00:01  0:00:04  302k
100 1551k  100 1551k    0     0   953k      0  0:00:01  0:00:01 --:--:--  953k


In [5]:
# Read all the lines of the text file (textFile auto does parallelization)
lines = sc.textFile("bookOfMormon.txt")

lines.top(10)

['zealously striving to repair all the injuries which they had done',
 'zealous for keeping the commandments of God.',
 'youth, and ye stand in need to be nourished by your brothers. ',
 'yourselves?',
 'yourselves, that ye may have hope as well as your brethren from',
 'yourselves, that ye have been sufficiently humble?  That your',
 'yourselves, and your thoughts, and your words, and your deeds,',
 'yourselves wrath against the day of judgment.',
 'yourselves unto him that he may have power over you, to blind',
 'yourselves treasures in heaven, where nothing doth corrupt, and']

In [6]:
# Count how many lines there are
lines.count()

40116

In [7]:
# Count how many duplicate lines?
allCount = lines.count()
distinctCount = lines.distinct().count()

allCount - distinctCount

7651

In [8]:
# What are some of those duplicate lines?

# start each line count at one, key = line (key, count)
lineCounts = lines.map(lambda line: (line, 1)) 

lineCounts.take(10)

[('', 1),
 ('****This is the Project Gutenberg edition of Book of Mormon****', 1),
 ('This 13th edition should be labeled mormon13.txt or mormon13.zip', 1),
 ('***This edition is being officially released on March 8, 1992***', 1),
 ('', 1),
 ('[Date last updated: May 22, 2005]', 1),
 (' ', 1),
 ('Corrected EDITIONS of our etexts get a new NUMBER, xxxxx11.txt.', 1),
 ('VERSIONS based on separate sources get new LETTER, xxxxx10a.txt.', 1),
 ('', 1)]

In [9]:
# reduce by key in a parallel way, matching up duplicate lines and summing their counts
lineCounts = lineCounts.reduceByKey(lambda lineCount1, lineCount2: lineCount1 + lineCount2) 

lineCounts.take(10)

[('', 6937),
 ('****This is the Project Gutenberg edition of Book of Mormon****', 1),
 ('This 13th edition should be labeled mormon13.txt or mormon13.zip', 1),
 ('[Date last updated: May 22, 2005]', 1),
 (' ', 2),
 ('Corrected EDITIONS of our etexts get a new NUMBER, xxxxx11.txt.', 1),
 ('to get any etext selected, entered, proofread, edited, copyright', 1),
 ('projected audience is one hundred million readers.  If our value', 1),
 ('The Goal of Project Gutenberg is to Give Away One Trillion Etext', 1),
 ('Files by the December 31, 2001.  [10,000 x 100,000,000=Trillion]', 1)]

In [10]:
sortedLineCounts = lineCounts.sortBy(lambda t: t[1], False)
sortedLineCounts.take(20)

[('', 6937),
 ('them.', 33),
 ('people.', 18),
 ('God.', 15),
 ('land.', 15),
 ('wilderness.', 14),
 ('saying:', 11),
 ('Chapter 2', 10),
 ('Chapter 3', 10),
 ('Chapter 4', 10),
 ('him.', 10),
 ('Israel.', 10),
 ('Chapter 1', 10),
 ('Chapter 5', 10),
 ('Chapter 6', 10),
 ('Chapter 7', 10),
 ('Chapter 8', 9),
 ('judges over the people of Nephi.', 9),
 ('Chapter 9', 9),
 ('Nephi.', 8)]

In [39]:
# What would you you need to count all the words?
bom_words = lines.flatMap(lambda line: [i for i in re.split('[^a-zA-Z]', line.lower()) if i])
bom_words.take(50)
bom_words.count()
bom_word_counts = bom_words.map(lambda word: (word, 1))
bom_word_counts = bom_word_counts.reduceByKey(lambda count1, count2: count1 + count2)
sorted_bom_counts = bom_word_counts.sortBy(lambda t: t[1], False)
sorted_bom_counts.take(30)

[('the', 19304),
 ('and', 16406),
 ('of', 11863),
 ('that', 6901),
 ('to', 6510),
 ('they', 4495),
 ('in', 3715),
 ('unto', 3643),
 ('i', 3321),
 ('he', 3178),
 ('it', 3108),
 ('nephi', 2812),
 ('their', 2808),
 ('them', 2653),
 ('for', 2548),
 ('be', 2527),
 ('shall', 2490),
 ('alma', 2295),
 ('his', 2255),
 ('which', 2219),
 ('a', 2171),
 ('not', 2088),
 ('were', 2019),
 ('ye', 2012),
 ('did', 1990),
 ('have', 1795),
 ('all', 1793),
 ('had', 1767),
 ('people', 1764),
 ('my', 1702)]

In [16]:
# Python tip - 
import re
line = "What is this? 1. A Helicopter. 2. A plane. 3. Super hero"
re.split('[^a-zA-Z]', line)

# my own use in combination with lower()
words = [i.lower() for i in re.split('[^a-zA-Z]', line) if i != '']
print(words)

['what', 'is', 'this', 'a', 'helicopter', 'a', 'plane', 'super', 'hero']


In [12]:
# Python tip
"This Had somE uPPerCase LeTtErs".lower()

'this had some uppercase letters'

In [13]:
# SparkSQL tip!
# Flat map  -- take rows of arrays and make each element in array a new row a

sentences = sc.parallelize(["Hello World!", "Take your vitamins!", "Get enough sleep!"])
words = sentences.flatMap(lambda sentence: sentence.split())
words.take(20)

['Hello', 'World!', 'Take', 'your', 'vitamins!', 'Get', 'enough', 'sleep!']