Sources:
- datazone's [introduction ot spark session](https://dzone.com/articles/introduction-to-spark-session)

# Using Spark

In [16]:
import pyspark

In [5]:
import pyspark.sql.functions as sf
import pyspark.sql.types as st

## Context

A `SparkContext` represents the
connection to a Spark cluster, and can be used to create `RDD` and
broadcast variables on that cluster. Before Spark2, the entry point to Spark Core was `SparkContext`. `streamingContext` are used for streaming, `sqlContext` for SQL (deprecated) and `hiveContext` for Hive (deprecated) functionnalities.

In [19]:
# Deprecated:
# from pyspark import (SparkContext, HiveContext)
# from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext

As of Spark2, a new entry point build for DataSet and DataFrame is available: `SparkSession` replacing `SQLContext` and `hiveContext`

## SparkSession

[Usage examples](https://www.programcreek.com/python/example/100654/pyspark.sql.SparkSession) at programcreek.com

In [24]:
from pyspark.sql import SparkSession

Properties [configuration](https://spark.apache.org/docs/latest/configuration.html#available-properties)

In [52]:
spark = (
    SparkSession.builder
    .master("local")
    .appName("BIOS-821")
    .config("spark.executor.cores", 4)
    .getOrCreate()
)

In [54]:
spark.version

'2.4.3'

In [65]:
spark.conf?

[0;31mType:[0m        property
[0;31mString form:[0m <property object at 0x7f3cc611e188>
[0;31mDocstring:[0m  
Runtime configuration interface for Spark.

This is the interface through which the user can get and set all Spark and Hadoop
configurations that are relevant to Spark SQL. When getting the value of a config,
this defaults to the value set in the underlying :class:`SparkContext`, if any.

.. versionadded:: 2.0


In [56]:
spark.conf.get('spark.executor.cores')

'4'

`SparkSession` also includes a catalog method that contains methods to work with the metastore (i.e. data catalog)

In [59]:
spark.catalog.listTables()

[]

## Create a RDD

In [1]:
import pyspark

In [2]:
pyspark.SparkContext?

[0;31mInit signature:[0m
[0mpyspark[0m[0;34m.[0m[0mSparkContext[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mmaster[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mappName[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msparkHome[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpyFiles[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0menvironment[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mbatchSize[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mserializer[0m[0;34m=[0m[0mPickleSerializer[0m[0;34m([0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mconf[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mgateway[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mjsc[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mprofiler_cls[0m[0;34m=[0m[0;34m<[0m

Access the Underlying SparksContext to `SparkContext`:

In [60]:
sc = spark.sparkContext

or

In [1]:
from pyspark import SparkContext
sc = SparkContext()
# master = 'local[*]'

In [2]:
rdd = sc.parallelize(range(10))

In [3]:
rdd.take(5)

[0, 1, 2, 3, 4]

## Create a Spark DataFrame

In [7]:
df = spark.range(10)

In [8]:
df.show(3)

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+
only showing top 3 rows



In [19]:
%%file data/test.csv
number,letter
0,a
1,c
2,b
3,a
4,b
5,c
6,a
7,a
8,a
9,b
10,b
11,c
12,c
13,b
14,b

Overwriting data/test.csv


#### Implicit schema

In [20]:
df = (
    spark.read.
    format('csv').
    option('header', 'true').
    option('inferSchema', 'true').
    load('data/test.csv')
)

In [21]:
df.show(3)

+------+------+
|number|letter|
+------+------+
|     0|     a|
|     1|     c|
|     2|     b|
+------+------+
only showing top 3 rows



In [12]:
df.printSchema()

root
 |-- number: integer (nullable = true)
 |-- letter: string (nullable = true)



#### Explicit schema

For production use, you should provide an explicit schema to reduce risk of error.

In [22]:
schema = T.StructType([
    T.StructField("number", T.DoubleType()),
    T.StructField("letter", T.StringType()),
])

In [23]:
df = (
    spark.read.
    format('csv').
    option('header', 'true').
    schema(schema).
    load('data/test.csv')
)

In [24]:
df.show(3)

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   1.0|     c|
|   2.0|     b|
+------+------+
only showing top 3 rows



In [25]:
df.printSchema()

root
 |-- number: double (nullable = true)
 |-- letter: string (nullable = true)



### Persist

In [17]:
df.cache()

DataFrame[number: double, letter: string]

## Data manipulation

### Select

In [26]:
df.select('number').show(3)

+------+
|number|
+------+
|   0.0|
|   1.0|
|   2.0|
+------+
only showing top 3 rows



# Koalas

See [databick video](https://medium.com/@kyleake/bye-pandas-meet-koalas-pandas-apis-on-apache-spark-ep-4-aedcd363cf4e)

In [4]:
from databricks import koalas as kl

In [32]:
dfk = kl.read_csv("data/test.csv")

In [37]:
dfk.groupby('letter').count()

Unnamed: 0_level_0,number
letter,Unnamed: 1_level_1
a,5
b,6
c,4


In [19]:
from pyspark.sql.functions import col, expr

In [20]:
df.select(col('number').alias('index')).show(3)

+-----+
|index|
+-----+
|  0.0|
|  1.0|
|  2.0|
+-----+
only showing top 3 rows



In [21]:
df.select(expr('number as x')).show(3)

+---+
|  x|
+---+
|0.0|
|1.0|
|2.0|
+---+
only showing top 3 rows



In [22]:
df.withColumnRenamed('number', 'x').show(3)

+---+------+
|  x|letter|
+---+------+
|0.0|     a|
|1.0|     c|
|2.0|     b|
+---+------+
only showing top 3 rows



## Filter

In [23]:
df.filter('number % 2 == 0').show(3)

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   2.0|     b|
|   4.0|     b|
+------+------+
only showing top 3 rows



## Sort

In [24]:
df.sort(df.number.desc()).show(3)

+------+------+
|number|letter|
+------+------+
|  14.0|     b|
|  13.0|     b|
|  12.0|     c|
+------+------+
only showing top 3 rows



In [25]:
df.orderBy(df.letter.desc()).show(3)

+------+------+
|number|letter|
+------+------+
|   1.0|     c|
|   5.0|     c|
|  11.0|     c|
+------+------+
only showing top 3 rows



## Transform

In [26]:
df.selectExpr('number*2 as x').show(3)

+---+
|  x|
+---+
|0.0|
|2.0|
|4.0|
+---+
only showing top 3 rows



In [27]:
df.withColumn('x', expr('number*2')).show(3)

+------+------+---+
|number|letter|  x|
+------+------+---+
|   0.0|     a|0.0|
|   1.0|     c|2.0|
|   2.0|     b|4.0|
+------+------+---+
only showing top 3 rows



## Sumarize

In [28]:
import pyspark.sql.functions as F

In [29]:
df.agg(F.min('number'), F.max('number'), F.min('letter'), F.max('letter')).show(3)

+-----------+-----------+-----------+-----------+
|min(number)|max(number)|min(letter)|max(letter)|
+-----------+-----------+-----------+-----------+
|        0.0|       14.0|          a|          c|
+-----------+-----------+-----------+-----------+



## Group by

In [30]:
df.groupby('letter').agg(F.mean('number'), F.stddev_samp('number')).show()

+------+-----------------+-------------------+
|letter|      avg(number)|stddev_samp(number)|
+------+-----------------+-------------------+
|     c|             7.25|  5.188127472091127|
|     b|8.666666666666666|  4.802776974487434|
|     a|              4.8|  3.271085446759225|
+------+-----------------+-------------------+



## Window functions

In [31]:
from pyspark.sql.window import Window

In [32]:
ws = (
    Window.partitionBy('letter').
    orderBy(F.desc('number')).
    rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

In [33]:
df.groupby('letter').agg(F.sum('number')).show()

+------+-----------+
|letter|sum(number)|
+------+-----------+
|     c|       29.0|
|     b|       52.0|
|     a|       24.0|
+------+-----------+



In [34]:
df.show()

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   1.0|     c|
|   2.0|     b|
|   3.0|     a|
|   4.0|     b|
|   5.0|     c|
|   6.0|     a|
|   7.0|     a|
|   8.0|     a|
|   9.0|     b|
|  10.0|     b|
|  11.0|     c|
|  12.0|     c|
|  13.0|     b|
|  14.0|     b|
+------+------+



In [35]:
df.select('letter', F.sum('number').over(ws).alias('rank')).show()

+------+----+
|letter|rank|
+------+----+
|     c|12.0|
|     c|23.0|
|     c|28.0|
|     c|29.0|
|     b|14.0|
|     b|27.0|
|     b|37.0|
|     b|46.0|
|     b|50.0|
|     b|52.0|
|     a| 8.0|
|     a|15.0|
|     a|21.0|
|     a|24.0|
|     a|24.0|
+------+----+



## SQL

In [36]:
df.createOrReplaceTempView('df_table')

In [37]:
spark.sql('''SELECT * FROM df_table''').show(3)

+------+------+
|number|letter|
+------+------+
|   0.0|     a|
|   1.0|     c|
|   2.0|     b|
+------+------+
only showing top 3 rows



In [38]:
spark.sql('''
SELECT letter, mean(number) AS mean, stddev_samp(number) AS sd from df_table
WHERE number % 2 = 0
GROUP BY letter
ORDER BY letter DESC
''').show()

+------+-----------------+-----------------+
|letter|             mean|               sd|
+------+-----------------+-----------------+
|     c|             12.0|              NaN|
|     b|              7.5|5.507570547286102|
|     a|4.666666666666667|4.163331998932265|
+------+-----------------+-----------------+



## String operatons

In [39]:
from pyspark.sql.functions import split, lower, explode

In [40]:
import pandas as pd

In [41]:
s = spark.createDataFrame(pd.DataFrame(dict(sents=('Thing 1 and Thing 2', 'The Quick Brown Fox'))))

In [42]:
s.show()

+-------------------+
|              sents|
+-------------------+
|Thing 1 and Thing 2|
|The Quick Brown Fox|
+-------------------+



In [43]:
from pyspark.sql.functions import regexp_replace

In [44]:
s1 = (
    s.select(explode(split(lower(expr('sents')), ' '))).
    sort('col')
)

In [45]:
s1.show()

+-----+
|  col|
+-----+
|    1|
|    2|
|  and|
|brown|
|  fox|
|quick|
|  the|
|thing|
|thing|
+-----+



In [46]:
s1.groupby('col').count().show()

+-----+-----+
|  col|count|
+-----+-----+
|    1|    1|
|    2|    1|
|  and|    1|
|brown|    1|
|  fox|    1|
|quick|    1|
|  the|    1|
|thing|    2|
+-----+-----+



In [47]:
s.createOrReplaceTempView('s_table')

In [48]:
spark.sql('''
SELECT regexp_replace(sents, 'T.*?g', 'FOO')
FROM s_table
''').show()

+---------------------------------+
|regexp_replace(sents, T.*?g, FOO)|
+---------------------------------+
|                  FOO 1 and FOO 2|
|              The Quick Brown Fox|
+---------------------------------+



### Numeric operations

In [49]:
from pyspark.sql.functions import log1p, randn

In [50]:
df.selectExpr('number', 'log1p(number)', 'letter').show(3)

+------+------------------+------+
|number|     LOG1P(number)|letter|
+------+------------------+------+
|   0.0|               0.0|     a|
|   1.0|0.6931471805599453|     c|
|   2.0|1.0986122886681096|     b|
+------+------------------+------+
only showing top 3 rows



In [51]:
df.selectExpr('number', 'randn() as random').stat.corr('number', 'random')

-0.08685576584741893

### Date and time

In [52]:
dt = (
    spark.range(3).
    withColumn('today', F.current_date()).
    withColumn('tomorrow', F.date_add('today', 1)).
    withColumn('time', F.current_timestamp())
)

In [53]:
dt.show()

+---+----------+----------+--------------------+
| id|     today|  tomorrow|                time|
+---+----------+----------+--------------------+
|  0|2018-11-19|2018-11-20|2018-11-19 16:19:...|
|  1|2018-11-19|2018-11-20|2018-11-19 16:19:...|
|  2|2018-11-19|2018-11-20|2018-11-19 16:19:...|
+---+----------+----------+--------------------+



In [54]:
dt.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- tomorrow: date (nullable = false)
 |-- time: timestamp (nullable = false)



### Nulls

In [55]:
%%file data/test_null.csv
number,letter
0,a
1,
2,b
3,a
4,b
5,
6,a
7,a
8,
9,b
10,b
11,c
12,
13,b
14,b

Overwriting data/test_null.csv


In [56]:
dn = (
    spark.read.
    format('csv').
    option('header', 'true').
    option('inferSchema', 'true').
    load('data/test_null.csv')
)

In [57]:
dn.printSchema()

root
 |-- number: integer (nullable = true)
 |-- letter: string (nullable = true)



In [58]:
dn.show()

+------+------+
|number|letter|
+------+------+
|     0|     a|
|     1|  null|
|     2|     b|
|     3|     a|
|     4|     b|
|     5|  null|
|     6|     a|
|     7|     a|
|     8|  null|
|     9|     b|
|    10|     b|
|    11|     c|
|    12|  null|
|    13|     b|
|    14|     b|
+------+------+



In [59]:
dn.na.drop().show()

+------+------+
|number|letter|
+------+------+
|     0|     a|
|     2|     b|
|     3|     a|
|     4|     b|
|     6|     a|
|     7|     a|
|     9|     b|
|    10|     b|
|    11|     c|
|    13|     b|
|    14|     b|
+------+------+



In [60]:
dn.na.fill('Missing').show()

+------+-------+
|number| letter|
+------+-------+
|     0|      a|
|     1|Missing|
|     2|      b|
|     3|      a|
|     4|      b|
|     5|Missing|
|     6|      a|
|     7|      a|
|     8|Missing|
|     9|      b|
|    10|      b|
|    11|      c|
|    12|Missing|
|    13|      b|
|    14|      b|
+------+-------+



## UDF

To avoid degrading performance, avoid using UDF if you can use the functions in `pyspark.sql.functions`. If you must use UDFs, prefer `pandas_udf` to `udf` where possible.

In [61]:
from pyspark.sql.functions import udf, pandas_udf

### Standard Python UDF

In [62]:
@udf('double')
def square(x):
    return x**2

In [63]:
df.select('number', square('number')).show(3)

+------+--------------+
|number|square(number)|
+------+--------------+
|   0.0|           0.0|
|   1.0|           1.0|
|   2.0|           4.0|
+------+--------------+
only showing top 3 rows



### Pandas UDF

In [64]:
@pandas_udf('double')
def scale(x):
    return (x - x.mean())/x.std()

In [65]:
df.select('number', scale('number')).show(3)

+------+-------------------+
|number|      scale(number)|
+------+-------------------+
|   0.0|-1.5652475842498528|
|   1.0|-1.3416407864998738|
|   2.0| -1.118033988749895|
+------+-------------------+
only showing top 3 rows



#### Grouped agg

In [66]:
@pandas_udf('double', F.PandasUDFType.GROUPED_AGG)
def gmean(x):
    return x.mean()

In [67]:
df.groupby('letter').agg(gmean('number')).show()

+------+-----------------+
|letter|    gmean(number)|
+------+-----------------+
|     c|             7.25|
|     b|8.666666666666666|
|     a|              4.8|
+------+-----------------+



#### Grouped map

In [68]:
@pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def gscale(pdf):
    return pdf.assign(number = (pdf.number - pdf.number.mean())/pdf.number.std())

In [69]:
from pyspark.sql.functions import mean

In [70]:
df.groupby('letter').apply(gscale).show()

+--------------------+------+
|              number|letter|
+--------------------+------+
| -1.2046735616310666|     c|
|-0.43368248218718397|     c|
|    0.72280413697864|     c|
|  0.9155519068396106|     c|
| -1.3880858307767148|     b|
| -0.9716600815437003|     b|
| 0.06940429153883587|     b|
|  0.2776171661553431|     b|
|  0.9022557900048648|     b|
|  1.1104686646213722|     b|
|  -1.467402817237783|     a|
| -0.5502760564641687|     a|
| 0.36685070430944583|     a|
|  0.6725596245673173|     a|
|  0.9782685448251889|     a|
+--------------------+------+



## Joins

In [79]:
names = 'ann ann bob bob chcuk'.split()
courses = '821 823 821 824 823'.split()
pdf1 = pd.DataFrame(dict(name=names, course=courses))

In [80]:
pdf1

Unnamed: 0,name,course
0,ann,821
1,ann,823
2,bob,821
3,bob,824
4,chcuk,823


In [81]:
course_id = '821 822 823 824 825'.split()
course_names = 'Unix Python R Spark GLM'.split()
pdf2 = pd.DataFrame(dict(course_id=course_id, name=course_names))

In [82]:
pdf2

Unnamed: 0,course_id,name
0,821,Unix
1,822,Python
2,823,R
3,824,Spark
4,825,GLM


In [83]:
df1 = spark.createDataFrame(pdf1)
df2 = spark.createDataFrame(pdf2)

In [85]:
df1.join(df2, df1.course == df2.course_id, how='inner').show()

+-----+------+---------+-----+
| name|course|course_id| name|
+-----+------+---------+-----+
|  ann|   823|      823|    R|
|chcuk|   823|      823|    R|
|  bob|   824|      824|Spark|
|  ann|   821|      821| Unix|
|  bob|   821|      821| Unix|
+-----+------+---------+-----+



In [86]:
df1.join(df2, df1.course == df2.course_id, how='right').show()

+-----+------+---------+------+
| name|course|course_id|  name|
+-----+------+---------+------+
|  ann|   823|      823|     R|
|chcuk|   823|      823|     R|
|  bob|   824|      824| Spark|
| null|  null|      825|   GLM|
| null|  null|      822|Python|
|  ann|   821|      821|  Unix|
|  bob|   821|      821|  Unix|
+-----+------+---------+------+

