# First PySpark data program

## Prerequisite

Make sure that PySpark is installed.

In [1]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 20
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.


## REPL

### PySpark REPL

The `pyspark` program provides quick and easy access to a Python REPL with PySpark preconfigured. 

The `Spark context` is then available as `sc` and the `Spark session` is available as `spark`. Spark context is your entry point to Spark, a liaison between your Python REPL and the Spark cluster. Spark session wraps the Spark context and provides you functionalities to interact with the Spark SQL API, which includes the data frame structure.

```bash
$ pyspark
Python 3.9.12 (main, Apr  5 2022, 01:52:34)
[Clang 12.0.0 ] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/15 12:03:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Python version 3.9.12 (main, Apr  5 2022 01:52:34)
Spark context Web UI available at http://xiaolis-air:4040
Spark context available as 'sc' (master = local[*], app id = local-1681527792485).
SparkSession available as 'spark'.
>>> spark
<pyspark.sql.session.SparkSession object at 0x11607f0d0>
>>> sc
<SparkContext master=local[*] appName=PySparkShell>
>>> spark.sparkContext
<SparkContext master=local[*] appName=PySparkShell>
```

### Normal REPL with Your Configured Spark

You can configure the spark session using the builder pattern. When using REPL, do not start the REPL with `pyspark`, but open a normal `python3` REPL, import `SparkSession` , build and configure a spark context as you want.

```bash
$ python
>>> from pyspark.sql import SparkSession
>>> spark = (SparkSession.builder.appName("Analyzing the vocabulary of Pride and Prejudice").getOrCreate()
```

Check out the configured spark context you just built:

```REPL
>>> spark
<pyspark.sql.session.SparkSession object at 0x11772d4f0>
>>> spark.sparkContext
<SparkContext master=local[*] appName=Analyzing the vocabulary of Pride and Prejudice>
```

## Log levels

Set log level as follows:

```
spark.sparkContext.setLogLevel("<KEYWORD>")
```
Here are possible `KEYWORD`s listed in ascending order of chattiness, each includes the logs of its above levels.

- `OFF`: no logging
- `FATAL`: fatal errors that will crash your Spark cluster
- `ERROR`: fatal and recoverable errors
- `WARN`: warnings and errors. This is the default of `pyspark` shell.
- `INFO`: runtime information such as repartitioning and data recovery, and everything above. This is the default of non-interactive PySpark program.
- `DEBUG`: debug information of your jobs and everything above
- `TRACE`: trace your jobs (more verbose debug logs) and everything above
- `ALL`: everything PySpark can spit.

The `pyspark` shell defaults to `WARN` and non-interactive PySpark programs default to `INFO`.

## Our First Data Preparation Program

### Overview

We want to find the most used words in Pride and Prejudice. Here are the steps we want to take:
1. `Read` input data (assuming a plain text file)
2. `Token`ize each word
3. `Clean` up: 
   1. Remove puncuations and non-word tokens
   2. Lowercase each word
4. `Count` the frequency of each word
5. `Answer` return the top 10 (or 20, 50, 100)

### Ingest

#### Read Data into a Data Frame

Data structures:
- RDD (Resilient distributed dataset): a distributed collection of objects (or rows). Use regular Python functions to manipulate them.
- Dataframe (DF): a stricter version of the RDD, can be seen conceptually as a table. This is the dominant data structure. Operate on columns instead of records.

We use `DataFrameReader` object to read data into a data frame. You can access the `DataFrameReader` through `spark.read`, let's print its content to see what's there

```
>>> spark.read
<pyspark.sql.readwriter.DataFrameReader object at 0x102eb0f70>
>>> dir(spark.read)
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_df', '_jreader', '_set_opts', '_spark', 'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text']
```

It accomodates different data formats, csv, text, JSON, ORC(optimized row columnar), Parquet, etc. By default PySpark uses Parquet in reading and writing files. Under the hood, `spark.read.csv()` will map to `spark.read.format('csv').load()`.

(Since it's hard to note down everything when using REPL, I'm changing to use Jupyter notebook in this note from here on.)

#### Import PySpark

In [59]:
import pyspark
from pyspark.sql import SparkSession

<module 'pyspark.sql' from '/Users/xiaolishen/opt/miniconda3/lib/python3.9/site-packages/pyspark/sql/__init__.py'>

#### Build the SparkSession object

In [5]:
spark = SparkSession.builder.appName("Analyzing the vocabulary of Pride and Prejudice").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/15 14:49:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/15 14:49:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Read the text file

We first load the text into a DataFrame object using the `spark.read.text()`. Printing the object gives us the schema (column name and its data type). PySpark data frames also provides a `printSchema()` method to print schema in a tree form to give you more information on the schema.

In [12]:
book = spark.read.text("../data/gutenberg_books/1342-0.txt")
print(book)

# to get more information on the schema
book.printSchema()

# or access the schema, or the dtypes directly
print(book.schema)
print(book.dtypes)

DataFrame[value: string]
root
 |-- value: string (nullable = true)

StructType(List(StructField(value,StringType,true)))
[('value', 'string')]


The data will be distributed across multiple nodes, each one a segment of the records. However, when working with data frames, we only need to worry about the `logical schema`, which is the same as if the data is all on a single node.

### Explore

Use the DataFrame object's `show()` method to explore the data content.

In [15]:
book.show(10, truncate=50)

+--------------------------------------------------+
|                                             value|
+--------------------------------------------------+
|The Project Gutenberg EBook of Pride and Prejud...|
|                                                  |
|This eBook is for the use of anyone anywhere at...|
|almost no restrictions whatsoever.  You may cop...|
|re-use it under the terms of the Project Gutenb...|
|    with this eBook or online at www.gutenberg.org|
|                                                  |
|                                                  |
|                        Title: Pride and Prejudice|
|                                                  |
+--------------------------------------------------+
only showing top 10 rows



For more complex data we'd normally have a lot of columns. We can use `select()` method to select a column in a data frame. PySpark provides multiple ways to access a column:
- dot notation `book.value` - as long as the column name doesn't contain funny characters such as `$!@#`
- double quote `book["value"]`
- `col` function, where you need to first import it as `from pyspark.sql.functions import col`
- give the column name as a string and PySpark will infer it to be a column `book.select("value")`

In [28]:
book.select(book.value).show(5)

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|                    |
|This eBook is for...|
|almost no restric...|
|re-use it under t...|
+--------------------+
only showing top 5 rows



### Transform

#### Tokenization

##### Transform sentences to lists of words

To transform the data in each row from a long sentence to a list of words, we use the
- `split()` function from `pyspark.sql.functions` to split sentences into words
- `select()` method to select data
- `alias()` method to rename transformed columns. By default the `split()` function gives us a very unintuitive name (`split(value, ,-1)`) and we want something better.

The `split()` function refers to the JVM equivalent, which makes it so fast. As a trade off, you'd have to use JVM-base regular expressions (where we passed the `" "` as the delimiter) instead of Python regular expressions.

Note that the `select` and the transformation happening inside doesn't change the orginal data frame.

In [26]:
from pyspark.sql.functions import split

lines = book.select(
    split(book.value, " ").
    alias("line")
)
lines.show(5)
book.show(5)

+--------------------+
|                line|
+--------------------+
|[The, Project, Gu...|
|                  []|
|[This, eBook, is,...|
|[almost, no, rest...|
|[re-use, it, unde...|
+--------------------+
only showing top 5 rows

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|                    |
|This eBook is for...|
|almost no restric...|
|re-use it under t...|
+--------------------+
only showing top 5 rows



##### Explode rows of lists to rows of words

In [25]:
from pyspark.sql.functions import explode, col

words = lines.select(
    explode(col("line")).
    alias("word")
)
words.show()

+----------+
|      word|
+----------+
|       The|
|   Project|
| Gutenberg|
|     EBook|
|        of|
|     Pride|
|       and|
|Prejudice,|
|        by|
|      Jane|
|    Austen|
|          |
|      This|
|     eBook|
|        is|
|       for|
|       the|
|       use|
|        of|
|    anyone|
+----------+
only showing top 20 rows



#### Lowercase all words

In [24]:
from pyspark.sql.functions import lower
words_lower = words.select(
    lower(col("word")).
    alias("word_lower")
)
words_lower.show()

+----------+
|word_lower|
+----------+
|       the|
|   project|
| gutenberg|
|     ebook|
|        of|
|     pride|
|       and|
|prejudice,|
|        by|
|      jane|
|    austen|
|          |
|      this|
|     ebook|
|        is|
|       for|
|       the|
|       use|
|        of|
|    anyone|
+----------+
only showing top 20 rows



#### Remove non-words with Regex

In [29]:
from pyspark.sql.functions import regexp_extract

words_clean = words_lower.select(
    regexp_extract(
        col("word_lower"),
        "[a-z]+",
        0
    ).alias("word")
)
words_clean.show()

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|         |
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
+---------+
only showing top 20 rows




### Filtering

To filter out the empty rows, use either `.filter()` of its alias `where()`.

In [30]:
words_nonull = words_clean.filter(col("word") != "")
words_nonull.show()

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
| anywhere|
+---------+
only showing top 20 rows



## Exercices

### Exercise 2.2

Given the following dataframe, count the number of columns that aren't strings. `createDataFrame()` lets you create a data frame from multiple sources.

`SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)`

In [43]:
# createDataFrame(rows[cols[]], column_names[])
exo2_2_df = spark.createDataFrame(
    [
        ["test", "more test", 10_000_000_000],
        ["test2", "more test 2", 1234]
    ], 
    ["one", "two", "three"]
)
exo2_2_df.printSchema()
exo2_2_df.show()

root
 |-- one: string (nullable = true)
 |-- two: string (nullable = true)
 |-- three: long (nullable = true)

+-----+-----------+-----------+
|  one|        two|      three|
+-----+-----------+-----------+
| test|  more test|10000000000|
|test2|more test 2|       1234|
+-----+-----------+-----------+



Now count the number of columns that aren't strings.

In [44]:
print(exo2_2_df.dtypes)
non_string_cols = [
    column 
        for column in exo2_2_df.dtypes 
        if not column[1].startswith('string')
]
print(non_string_cols)
len(non_string_cols)

[('one', 'string'), ('two', 'string'), ('three', 'bigint')]
[('three', 'bigint')]


1

### Exercise 2.3

Rewrite the code to remove `withColumnRenamed` method.

In [48]:
from pyspark.sql.functions import col, length
# The `length` function returns the number of characters in a string column.
exo2_3_df = (
    spark.read.text("../data/gutenberg_books/1342-0.txt") 
    .select(length(col("value"))) 
    .withColumnRenamed("length(value)", 
                       "number_of_char")
)
exo2_3_df.show(5)

+--------------+
|number_of_char|
+--------------+
|            66|
|             0|
|            64|
|            68|
|            67|
+--------------+
only showing top 5 rows



In [52]:
exo2_3_df = spark.read.text(
    "../data/gutenberg_books/1342-0.txt"
).select(
    length(
        "value"
    ).alias("number_of_char")
)

exo2_3_df.show(5)

+--------------+
|number_of_char|
+--------------+
|            66|
|             0|
|            64|
|            68|
|            67|
+--------------+
only showing top 5 rows



### Exercise 2.4
What is the problem, and how can you solve it?

In [56]:
from pyspark.sql.functions import col, greatest
exo2_4_df = spark.createDataFrame(
    [["value", 10_000, 20_000]], ["key", "value1", "value2"]
)
exo2_4_df.printSchema()
# root
#  |-- key: string (containsNull = true)
#  |-- value1: long (containsNull = true)
#  |-- value2: long (containsNull = true)
# `greatest` will return the greatest value of the list of column names,
# skipping null value
# The following statement will return an error
from pyspark.sql.utils import AnalysisException
try:
    exo2_4_mod = exo2_4_df.select(
greatest(col("value1"), col("value2")).alias("maximum_value") ).select("key", "max_value")
except AnalysisException as err:
    print(err)

root
 |-- key: string (nullable = true)
 |-- value1: long (nullable = true)
 |-- value2: long (nullable = true)

Column 'key' does not exist. Did you mean one of the following? [maximum_value];
'Project ['key, 'max_value]
+- Project [greatest(value1#367L, value2#368L) AS maximum_value#372L]
   +- LogicalRDD [key#366, value1#367L, value2#368L], false



The code tries to select "key" column from the result of `greatest` , which doesn't exist there.

In [57]:
exo2_4_mod = exo2_4_df.select(
    "key",
    greatest(
        col("value1"), col("value2")
    ).alias("maximum_value")
)

exo2_4_mod.show()

+-----+-------------+
|  key|maximum_value|
+-----+-------------+
|value|        20000|
+-----+-------------+



### Exercise 2.5

Take our words_nonull data frame, 
1. Remove all of the occurrences of the word is.
2. (Challenge) Using the length function, keep only the words with more than three
characters.

In [75]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, lower, regexp_extract, length

spark = SparkSession.builder.getOrCreate()

book = spark.read.text("../data/gutenberg_books/1342-0.txt")

words_nonull = book.select(
        split(book.value, " ").alias("line")
    ).select(
        explode("line").alias("word")
    ).select(
        lower("word").alias("word_lower")
    ).select(
        regexp_extract("word_lower", "[a-z]+", 0).
            alias("word")
    )

no_is_or_null = words_nonull.where(
        ~col("word").isin(["", "is"])
    ).show(5)

longer_than_three_chars = words_nonull.where(
    length(col("word")) >= 3
).show(5)

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
+---------+
only showing top 5 rows

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|    pride|
+---------+
only showing top 5 rows



### Exercise 2.6

Remove the words "is, not, the, if".

In [82]:
cleaned = words_nonull.where(
    (~col("word").isin(["", "is", "not", "the", "if"])) & (length(col("word")) >= 3)
).show(5)

+---------+
|     word|
+---------+
|  project|
|gutenberg|
|    ebook|
|    pride|
|      and|
+---------+
only showing top 5 rows



### Exercise 2.7

Diagnose the problem in the try block

In [85]:
from pyspark.sql.functions import col, split
try:
    book = spark.read.text("../data/gutenberg_books/1342-0.txt")
    book = book.printSchema()
    lines = book.select(split(book.value, " ").alias("line"))
    words = lines.select(explode(col("line")).alias("word"))
except AnalysisException as err:
    print(err)

root
 |-- value: string (nullable = true)



AttributeError: 'NoneType' object has no attribute 'select'

`printSchema()` returns `None`. In line 4 you assiged `book.printSchema()` to `book`, making the variable `book` now `None`.