# Spark High-Level API

![img](https://static.packt-cdn.com/products/9781785888359/graphics/8bffbd94-04f7-46e3-a1e9-0d6046d2dcab.png)

Source: https://static.packt-cdn.com/products/9781785888359/graphics/8bffbd94-04f7-46e3-a1e9-0d6046d2dcab.png

## Overview of Spark SQL

Creating DataFrames
- RDD
    - createDataFrame()
- Text file
    - read.text()
- JSON file
    - read.json()
    - read,json(RDD)
- Parquet file
    - read.parquet()
- Table in a relational database
- Temporary table in Spark

DataFrame to RDD
- rdd()

Schemas
- Inferring schemas
    - Why it is not optimal practice
- Specifying schemas
    - Using StructType and StructField
    - Using DDL string (schema = “author STRING, title STRING, pages INT”)
- Metadata
    - printSchema()
    - columns()
    - dtypes()
- Actions
    - show()
- Transforms
    - select() and alias()
    - drop()
    - filter() / where()
    - distinct()
    - dropDuplicates()
    - sample
    - sampleBy()
    - limit()
    - orderBy() / sort()
    - groupBy()

Operations that return an RDD 
    - rdd.map()
    - rdd.flatMap()

pyspark.sql.functions module
- String functions
- Math functions
- Statistics functions
- Date functions
- Hashing functions
- Algorithms (sounded, levenstein)
- Windowing functions

User defined functions
- udf()
- pandas_udf()

Multiple DataFrames
- join(other, on, how)
- union(), unionAll()
- intersect()
- subtract()

Persistence
- cache()
- persist(:
- unpersist()
- cacheTable()
- clearCache()
- repartition()
- coalesce()

Output
- write.csv()
- write.parquet()
- write.json()

Spark SQL
- df.createOrReplaceTempView
- sql()
- table()

## Implementation notes

Spark 2.4 is desinged to run on Java 8 (or version 1.8). If you have veriosns of Java greater than Java 8, also install Java 8 and include this export

```bash
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
```

If using the latest `pyarrow` (`pip install -U pyarrow`), you will need to set this variable

```bash
ARROW_PRE_0_15_IPC_FORMAT=1
```

in the file

```bash
SPARK_HOME/conf/spark-env.sh
```


In [1]:
from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [3]:
spark = (
    SparkSession.builder 
    .master("local") 
    .appName("BIOS-823") 
    .config("spark.executor.cores", 4) 
    .getOrCreate()    
)

In [4]:
spark.version

'3.0.1'

In [5]:
spark.conf.get('spark.executor.cores')

'4'

## Create a Spark DataFrame

In [6]:
df = spark.range(3)

In [7]:
df.show(3)

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



In [8]:
%%file data/test.csv
number,letter
0,a
1,c
2,b
3,a
4,b
5,c
6,a
7,a
8,a
9,b
10,b
11,c
12,c
13,b
14,b

Overwriting data/test.csv


#### Implicit schema

In [9]:
df = (
    spark.read.
    format('csv').
    option('header', 'true').
    option('inferSchema', 'true').
    load('data/test.csv')
)

In [10]:
df.show(3)

+------+------+
|number|letter|
+------+------+
|     0|     a|
|     1|     c|
|     2|     b|
+------+------+
only showing top 3 rows



In [11]:
df.printSchema()

root
 |-- number: integer (nullable = true)
 |-- letter: string (nullable = true)



#### Explicit schema

For production use, you should provide an explicit schema to reduce risk of error.

In [12]:
schema = T.StructType([
    T.StructField("number", T.DoubleType()),
    T.StructField("letter", T.StringType()),
])

In [13]:
df = (
    spark.read.
    format('csv').
    option('header', 'true').
    schema(schema).
    load('data/test.csv')
)

In [14]:
df.show(3)

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   1.0|     c|
|   2.0|     b|
+------+------+
only showing top 3 rows



In [15]:
df.printSchema()

root
 |-- number: double (nullable = true)
 |-- letter: string (nullable = true)



#### Alternative way to specify schema

You can use SQL DDL syntax to specify a schema as well.

In [16]:
schema = 'number DOUBLE, letter STRING'

In [17]:
df_altschema = (
    spark.read.
    format('csv').
    option('header', 'true').
    schema(schema=schema).
    load('data/test.csv')
)

In [18]:
df_altschema.take(3)

[Row(number=0.0, letter='a'),
 Row(number=1.0, letter='c'),
 Row(number=2.0, letter='b')]

In [19]:
df_altschema.printSchema()

root
 |-- number: double (nullable = true)
 |-- letter: string (nullable = true)



### Persist

In [20]:
df.cache()

DataFrame[number: double, letter: string]

## Data manipulation

### Select

In [21]:
df.select('number').show(3)

+------+
|number|
+------+
|   0.0|
|   1.0|
|   2.0|
+------+
only showing top 3 rows



In [22]:
from pyspark.sql.functions import col, expr

In [23]:
df.select(col('number').alias('index')).show(3)

+-----+
|index|
+-----+
|  0.0|
|  1.0|
|  2.0|
+-----+
only showing top 3 rows



In [24]:
df.select(expr('number as x')).show(3)

+---+
|  x|
+---+
|0.0|
|1.0|
|2.0|
+---+
only showing top 3 rows



In [25]:
df.withColumnRenamed('number', 'x').show(3)

+---+------+
|  x|letter|
+---+------+
|0.0|     a|
|1.0|     c|
|2.0|     b|
+---+------+
only showing top 3 rows



## Filter

In [26]:
df.filter('number % 2 == 0').show(3)

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   2.0|     b|
|   4.0|     b|
+------+------+
only showing top 3 rows



## Sort

In [27]:
df.sort(df.number.desc()).show(3)

+------+------+
|number|letter|
+------+------+
|  14.0|     b|
|  13.0|     b|
|  12.0|     c|
+------+------+
only showing top 3 rows



In [28]:
df.orderBy(df.letter.desc()).show(3)

+------+------+
|number|letter|
+------+------+
|   1.0|     c|
|   5.0|     c|
|  11.0|     c|
+------+------+
only showing top 3 rows



## Transform

In [29]:
df.selectExpr('number*2 as x').show(3)

+---+
|  x|
+---+
|0.0|
|2.0|
|4.0|
+---+
only showing top 3 rows



In [30]:
df.withColumn('x', expr('number*2')).show(3)

+------+------+---+
|number|letter|  x|
+------+------+---+
|   0.0|     a|0.0|
|   1.0|     c|2.0|
|   2.0|     b|4.0|
+------+------+---+
only showing top 3 rows



## Sumarize

In [31]:
import pyspark.sql.functions as F

In [32]:
df.agg(F.min('number'),
       F.max('number'), 
       F.min('letter'), 
       F.max('letter')).show(3)

+-----------+-----------+-----------+-----------+
|min(number)|max(number)|min(letter)|max(letter)|
+-----------+-----------+-----------+-----------+
|        0.0|       14.0|          a|          c|
+-----------+-----------+-----------+-----------+



## Group by

In [33]:
(
    df.groupby('letter').
    agg(F.mean('number'), F.stddev_samp('number')).show()
)

+------+-----------------+-------------------+
|letter|      avg(number)|stddev_samp(number)|
+------+-----------------+-------------------+
|     c|             7.25|  5.188127472091127|
|     b|8.666666666666666|  4.802776974487434|
|     a|              4.8|  3.271085446759225|
+------+-----------------+-------------------+



## Window functions

In [34]:
from pyspark.sql.window import Window

In [35]:
ws = (
    Window.partitionBy('letter').
    orderBy(F.desc('number')).
    rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

In [36]:
df.groupby('letter').agg(F.sum('number')).show()

+------+-----------+
|letter|sum(number)|
+------+-----------+
|     c|       29.0|
|     b|       52.0|
|     a|       24.0|
+------+-----------+



In [37]:
df.show()

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   1.0|     c|
|   2.0|     b|
|   3.0|     a|
|   4.0|     b|
|   5.0|     c|
|   6.0|     a|
|   7.0|     a|
|   8.0|     a|
|   9.0|     b|
|  10.0|     b|
|  11.0|     c|
|  12.0|     c|
|  13.0|     b|
|  14.0|     b|
+------+------+



In [38]:
(
    df.select('letter', F.sum('number').
              over(ws).
              alias('rank')).show()
)

+------+----+
|letter|rank|
+------+----+
|     c|12.0|
|     c|23.0|
|     c|28.0|
|     c|29.0|
|     b|14.0|
|     b|27.0|
|     b|37.0|
|     b|46.0|
|     b|50.0|
|     b|52.0|
|     a| 8.0|
|     a|15.0|
|     a|21.0|
|     a|24.0|
|     a|24.0|
+------+----+



## SQL

In [39]:
df.createOrReplaceTempView('df_table')

In [40]:
spark.sql('''SELECT * FROM df_table''').show(3)

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   1.0|     c|
|   2.0|     b|
+------+------+
only showing top 3 rows



In [41]:
spark.sql('''
SELECT letter, mean(number) AS mean, 
stddev_samp(number) AS sd from df_table
WHERE number % 2 = 0
GROUP BY letter
ORDER BY letter DESC
''').show()

+------+-----------------+-----------------+
|letter|             mean|               sd|
+------+-----------------+-----------------+
|     c|             12.0|              NaN|
|     b|              7.5|5.507570547286102|
|     a|4.666666666666667|4.163331998932265|
+------+-----------------+-----------------+



## String operatons

In [42]:
from pyspark.sql.functions import split, lower, explode

In [43]:
import pandas as pd

In [44]:
s = spark.createDataFrame(
    pd.DataFrame(
        dict(sents=('Thing 1 and Thing 2',
                    'The Quick Brown Fox'))))

In [45]:
s.show()

+-------------------+
|              sents|
+-------------------+
|Thing 1 and Thing 2|
|The Quick Brown Fox|
+-------------------+



In [46]:
from pyspark.sql.functions import regexp_replace

In [47]:
s1 = (
    s.select(explode(split(lower(expr('sents')), ' '))).
    sort('col')
)

In [48]:
s1.show()

+-----+
|  col|
+-----+
|    1|
|    2|
|  and|
|brown|
|  fox|
|quick|
|  the|
|thing|
|thing|
+-----+



In [49]:
s1.groupBy('col').count().show()

+-----+-----+
|  col|count|
+-----+-----+
|thing|    2|
|  fox|    1|
|  the|    1|
|  and|    1|
|    1|    1|
|brown|    1|
|quick|    1|
|    2|    1|
+-----+-----+



In [50]:
s.createOrReplaceTempView('s_table')

In [51]:
spark.sql('''
SELECT regexp_replace(sents, 'T.*?g', 'FOO')
FROM s_table
''').show()

+---------------------------------+
|regexp_replace(sents, T.*?g, FOO)|
+---------------------------------+
|                  FOO 1 and FOO 2|
|              The Quick Brown Fox|
+---------------------------------+



### Numeric operations

In [52]:
from pyspark.sql.functions import log1p, randn

In [53]:
df.selectExpr('number', 'log1p(number)', 'letter').show(3)

+------+------------------+------+
|number|     LOG1P(number)|letter|
+------+------------------+------+
|   0.0|               0.0|     a|
|   1.0|0.6931471805599453|     c|
|   2.0|1.0986122886681096|     b|
+------+------------------+------+
only showing top 3 rows



In [54]:
(
    df.selectExpr('number', 'randn() as random').
    stat.corr('number', 'random')
)

0.06913354354513815

### Date and time

In [55]:
dt = (
    spark.range(3).
    withColumn('today', F.current_date()).
    withColumn('tomorrow', F.date_add('today', 1)).
    withColumn('time', F.current_timestamp())
)

In [56]:
dt.show()

+---+----------+----------+--------------------+
| id|     today|  tomorrow|                time|
+---+----------+----------+--------------------+
|  0|2020-10-26|2020-10-27|2020-10-26 10:21:...|
|  1|2020-10-26|2020-10-27|2020-10-26 10:21:...|
|  2|2020-10-26|2020-10-27|2020-10-26 10:21:...|
+---+----------+----------+--------------------+



In [57]:
dt.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- tomorrow: date (nullable = false)
 |-- time: timestamp (nullable = false)



### Nulls

In [58]:
%%file data/test_null.csv
number,letter
0,a
1,
2,b
3,a
4,b
5,
6,a
7,a
8,
9,b
10,b
11,c
12,
13,b
14,b

Overwriting data/test_null.csv


In [59]:
dn = (
    spark.read.
    format('csv').
    option('header', 'true').
    option('inferSchema', 'true').
    load('data/test_null.csv')
)

In [60]:
dn.printSchema()

root
 |-- number: integer (nullable = true)
 |-- letter: string (nullable = true)



In [61]:
dn.show()

+------+------+
|number|letter|
+------+------+
|     0|     a|
|     1|  null|
|     2|     b|
|     3|     a|
|     4|     b|
|     5|  null|
|     6|     a|
|     7|     a|
|     8|  null|
|     9|     b|
|    10|     b|
|    11|     c|
|    12|  null|
|    13|     b|
|    14|     b|
+------+------+



In [62]:
dn.na.drop().show()

+------+------+
|number|letter|
+------+------+
|     0|     a|
|     2|     b|
|     3|     a|
|     4|     b|
|     6|     a|
|     7|     a|
|     9|     b|
|    10|     b|
|    11|     c|
|    13|     b|
|    14|     b|
+------+------+



In [63]:
dn.na.fill('Missing').show()

+------+-------+
|number| letter|
+------+-------+
|     0|      a|
|     1|Missing|
|     2|      b|
|     3|      a|
|     4|      b|
|     5|Missing|
|     6|      a|
|     7|      a|
|     8|Missing|
|     9|      b|
|    10|      b|
|    11|      c|
|    12|Missing|
|    13|      b|
|    14|      b|
+------+-------+



## UDF

To avoid degrading performance, avoid using UDF if you can use the functions in `pyspark.sql.functions`. If you must use UDFs, prefer `pandas_udf` to `udf` where possible.

In [64]:
from pyspark.sql.functions import udf, pandas_udf

### Standard Python UDF

In [65]:
@udf('double')
def square(x):
    return x**2

In [66]:
df.select('number', square('number')).show(3)

+------+--------------+
|number|square(number)|
+------+--------------+
|   0.0|           0.0|
|   1.0|           1.0|
|   2.0|           4.0|
+------+--------------+
only showing top 3 rows



### Pandas UDF

This can be tricky to set up. I use Oracle Java SDK v11 and set the following environment variables.

```bash
export JAVA_HOME=$(/usr/libexec/java_home -v 11)
export JAVA_TOOL_OPTIONS="-Dio.netty.tryReflectionSetAccessible=true"
```

In [67]:
@pandas_udf('double')
def scale(x):
    return (x - x.mean())/x.std()

In [68]:
df.select(scale('number')).show(3)

+-------------------+
|      scale(number)|
+-------------------+
|-1.5652475842498528|
|-1.3416407864998738|
| -1.118033988749895|
+-------------------+
only showing top 3 rows



#### Grouped agg

In [69]:
@pandas_udf('double', F.PandasUDFType.GROUPED_AGG)
def gmean(x):
    return x.mean()



In [70]:
df.groupby('letter').agg(gmean('number')).show()

+------+-----------------+
|letter|    gmean(number)|
+------+-----------------+
|     c|             7.25|
|     b|8.666666666666666|
|     a|              4.8|
+------+-----------------+



### Spark 3

Use type hints rather than specify pandas UDF type 

See [blog](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html)

In [71]:
@pandas_udf('double')
def gmean1(x: pd.Series) -> float:
    return x.mean()

In [72]:
df.groupby('letter').agg(gmean1('number')).show()

+------+-----------------+
|letter|   gmean1(number)|
+------+-----------------+
|     c|             7.25|
|     b|8.666666666666666|
|     a|              4.8|
+------+-----------------+



#### Grouped map

In [73]:
@pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def gscale(pdf):
    return pdf.assign(
        number = (pdf.number - pdf.number.mean()) /
        pdf.number.std())

In [74]:
df.groupby('letter').apply(gscale).show()



+--------------------+------+
|              number|letter|
+--------------------+------+
| -1.2046735616310666|     c|
|-0.43368248218718397|     c|
|    0.72280413697864|     c|
|  0.9155519068396106|     c|
| -1.3880858307767148|     b|
| -0.9716600815437003|     b|
| 0.06940429153883587|     b|
|  0.2776171661553431|     b|
|  0.9022557900048648|     b|
|  1.1104686646213722|     b|
|  -1.467402817237783|     a|
| -0.5502760564641687|     a|
| 0.36685070430944583|     a|
|  0.6725596245673173|     a|
|  0.9782685448251889|     a|
+--------------------+------+



### Spark 3

Use the new `pandas` function API. Currently, type annotations are not used in the function API. 

See [blog](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html)

#### `applyinPandas`

This implements the `split-apply-combine` pattern. Method of grouped DataFrame. 

- Variant 1: Function takes a single DataFrame input
- Variant 2: Function takes a tuple of keys, and a DataFrame input

In [75]:
def gscale1(pdf: pd.DataFrame) -> pd.DataFrame:
    num = pdf.number
    return pdf.assign(
        number = (num - num.mean()) / num.std())

In [76]:
df.groupby('letter').applyInPandas(gscale1, schema=df.schema).show()

+--------------------+------+
|              number|letter|
+--------------------+------+
| -1.2046735616310666|     c|
|-0.43368248218718397|     c|
|    0.72280413697864|     c|
|  0.9155519068396106|     c|
| -1.3880858307767148|     b|
| -0.9716600815437003|     b|
| 0.06940429153883587|     b|
|  0.2776171661553431|     b|
|  0.9022557900048648|     b|
|  1.1104686646213722|     b|
|  -1.467402817237783|     a|
| -0.5502760564641687|     a|
| 0.36685070430944583|     a|
|  0.6725596245673173|     a|
|  0.9782685448251889|     a|
+--------------------+------+



In [77]:
def gsum(key, pdf):
    return pd.DataFrame([key + (pdf.number.sum(),)])

In [78]:
df.groupby('letter').applyInPandas(gsum, 'letter string, number long').show()

+------+------+
|letter|number|
+------+------+
|     c|    29|
|     b|    52|
|     a|    24|
+------+------+



Of course, you do not need a UDF in this example!

In [79]:
df.groupBy('letter').sum().show()

+------+-----------+
|letter|sum(number)|
+------+-----------+
|     c|       29.0|
|     b|       52.0|
|     a|       24.0|
+------+-----------+



So shoudl only be used for truly custom functions.

In [80]:
def func(pdf: pd.DataFrame) -> int:
    return (pdf.number.astype('str').str.len()).sum()

def gcustom(key, pdf):
    return pd.DataFrame([key + (func(pdf),)])

In [81]:
df.groupby('letter').applyInPandas(gcustom, 'letter string, number long').show()

+------+------+
|letter|number|
+------+------+
|     c|    14|
|     b|    21|
|     a|    15|
+------+------+



#### `mapinPandas`

This works on iterators. Method of DataFrame. Can be used to implement a filter.

In [82]:
def even(it):
    for pdf in it:
        yield pdf[pdf.number % 2 == 0]

In [83]:
df.mapInPandas(even, 'letter string, number long').show()

+------+------+
|letter|number|
+------+------+
|     a|     0|
|     b|     2|
|     b|     4|
|     a|     6|
|     a|     8|
|     b|    10|
|     c|    12|
|     b|    14|
+------+------+



## Joins

In [84]:
names = 'ann ann bob bob chcuk'.split()
courses = '821 823 821 824 823'.split()
pdf1 = pd.DataFrame(dict(name=names, course=courses))

In [85]:
pdf1

Unnamed: 0,name,course
0,ann,821
1,ann,823
2,bob,821
3,bob,824
4,chcuk,823


In [86]:
course_id = '821 822 823 824 825'.split()
course_names = 'Unix Python R Spark GLM'.split()
pdf2 = pd.DataFrame(dict(course_id=course_id, name=course_names))

In [87]:
pdf2

Unnamed: 0,course_id,name
0,821,Unix
1,822,Python
2,823,R
3,824,Spark
4,825,GLM


In [88]:
df1 = spark.createDataFrame(pdf1)
df2 = spark.createDataFrame(pdf2)

In [89]:
df1.join(df2, df1.course == df2.course_id, how='inner').show()

+-----+------+---------+-----+
| name|course|course_id| name|
+-----+------+---------+-----+
|  ann|   823|      823|    R|
|chcuk|   823|      823|    R|
|  bob|   824|      824|Spark|
|  ann|   821|      821| Unix|
|  bob|   821|      821| Unix|
+-----+------+---------+-----+



In [90]:
df1.join(df2, df1.course == df2.course_id, how='right').show()

+-----+------+---------+------+
| name|course|course_id|  name|
+-----+------+---------+------+
|  ann|   823|      823|     R|
|chcuk|   823|      823|     R|
|  bob|   824|      824| Spark|
| null|  null|      825|   GLM|
| null|  null|      822|Python|
|  ann|   821|      821|  Unix|
|  bob|   821|      821|  Unix|
+-----+------+---------+------+



## DataFrame conversions

In [91]:
sc = spark.sparkContext

In [92]:
rdd = sc.parallelize([('ann', 23), ('bob', 34)])

In [93]:
df = spark.createDataFrame(rdd, schema='name STRING, age INT')

In [94]:
df.show()

+----+---+
|name|age|
+----+---+
| ann| 23|
| bob| 34|
+----+---+



In [95]:
df.rdd.map(lambda x: (x[0], x[1]**2)).collect()

[('ann', 529), ('bob', 1156)]

In [96]:
df.rdd.mapValues(lambda x: x**2).collect()

[('ann', 529), ('bob', 1156)]

In [97]:
df.toPandas()

Unnamed: 0,name,age
0,ann,23
1,bob,34
