#### PySpark transformation

This notebook serves as an example collection of common PySpark functions/utilities for transformation. All basic functions will be shown with instructions, together with examples.

We first import `SparkContext`. It is the main entry point for spark functionality. <br>
`SparkContext` represents the connection to a Spark cluster, and can be used to create RDDs. <br>
Note: Only one `SparkContext` can be activated per JVM.

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
# define configuration
conf = SparkConf().setAppName("Spark_Demo").setMaster("local")
# and use the predefined configuration to start a Spark session
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/29 22:27:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '62728'),
 ('spark.driver.host', 'yangs-mac-mini.home'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'Spark_Demo'),
 ('spark.app.startTime', '1684687150600'),
 ('spark.app.submitTime', '1684687150416'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNA

In [None]:
sc.stop()

We can also start a Spark session without specifying configuration.

In [2]:
sc = SparkContext()
sc.setLogLevel("Error")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/02 22:05:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc.getConf().getAll()

[('spark.driver.host', 'yangs-mac-mini.home'),
 ('spark.executor.id', 'driver'),
 ('spark.app.id', 'local-1691006759971'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.app.submitTime', '1691006759374'),
 ('spark.app.startTime', '1691006759471'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=

To monitor the process of spark, check localhost:4040

In [4]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

#### Create RDD
Resilient Distributed Dataset is simply an immutable distributed collections of objects. <br>
Each RDD is split into multiple partitions, which can be computed on different nodes of cluster.


There are two type of operations with RDD:
- Action
- Transformation

Here are some actions.

In [5]:
# create an RDD manualy
data = sc.parallelize([1,2,3,4,5,6,7,8,9,1])
# we can use `collect` action to check the data
data.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 1]

However, `collect` will return all data, which means it will load everything into memory. It is very computational expensive. <br>
To be more efficient, just use `take` to check a certain number of elements.

In [6]:
data.take(5)

[1, 2, 3, 4, 5]

In [7]:
data.first()

1

In [8]:
data.count()

10

In [9]:
data.countByValue()

defaultdict(int, {1: 2, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})

In [10]:
type(data)

pyspark.rdd.RDD

RDD does not allow to refer each data to each worker. In order to check data by partition, we need to use glom, which returns data in a `list` like format.

In [6]:
# create an RDD manualy
data = sc.parallelize([1,2,3,4,5,6,7,8,9,1], 3) # creates 3 partition
data.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 1]

In [7]:
# with glom we can print data in partitions
data.glom().collect()

[[1, 2, 3], [4, 5, 6], [7, 8, 9, 1]]

In [7]:
# we can also refer to slices
data.glom().collect()[1:]

[[4, 5, 6], [7, 8, 9, 1]]

In [15]:
type(data.glom())

pyspark.rdd.PipelinedRDD

In [16]:
data.max()

9

In [17]:
data.min()

1

In [18]:
data.mean()

4.6

We can apply functions to RDD data and get an aggragation using `reduce`.

In [24]:
# two values must be given to specify the operation
data.reduce(lambda x,y: x+y)

46

There is another operation called `fold`, which aggregate the elements of each partition, and then the results for all the partitions (see the documentation https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.fold.html)

In [8]:
from operator import add
sc.parallelize(range(5)).fold(1, add)

12

Now let's see how transformation works. In a nutshell, transformation is an operation that produces new RDD based on the old RDD.

There are two types of transformation:<br>
- narrow transformation (no shuffling, less expensive)
- wide transformation (data shuffling, computational expensive)

Here are functions for narrow transformation.

In [8]:
num = sc.parallelize([5,5,4,3,2,9,2])
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [10]:
# map function
num.map(lambda x: pow(x, 2)).collect()

[25, 25, 16, 9, 4, 81, 4]

There are anthoer method like `map`, which is `flatmap`. To be brief, it is not shown here.

`filter` helps to filter partitions with given condition.

In [12]:
num.filter(lambda x: x%2==0).collect()

[4, 2, 2]

`union` will combine more data.

In [13]:
num.collect()

[5, 5, 4, 3, 2, 9, 2]

In [9]:
num2 = sc.parallelize([1,7,9,4,10])
num2.collect()

[1, 7, 9, 4, 10]

In [15]:
num2.union(num).collect()

[1, 7, 9, 4, 10, 5, 5, 4, 3, 2, 9, 2]

`sample` return a random sample of subset RDD of the input RDD.

In [17]:
num2.sample(True, 0.3).collect()

[7, 9]

Here are functions for wide transformation.

`groupby` return RDDs in sub-groups based on the given condition.

In [4]:
num_gp = num.groupBy(lambda x: x % 2)

In [6]:
for (k, v) in num_gp.collect():
    print(k, list(v))

1 [5, 5, 3, 9]
0 [4, 2, 2]


                                                                                

`intersection` returns the subset that exists in both RDDs.

In [8]:
num.intersection(num2).collect()

[4, 9]

`subtract` returns the subset of RDD that exclude elements from given RDD

In [9]:
num.subtract(num2).collect()

[2, 2, 5, 5, 3]

`distinct` returns the subset only contains unique elements.

In [10]:
num.distinct().collect()

[5, 4, 3, 2, 9]

The transformation can also be applied to key value pairs in RDDs.

In [18]:
pair = sc.parallelize([(1,2),(3,4),(5,6),(7,8),(3,4)])
pair.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (3, 4)]

In [19]:
pair.count()

5

In [20]:
pair.countByValue()

defaultdict(int, {(1, 2): 1, (3, 4): 2, (5, 6): 1, (7, 8): 1})

In [22]:
pair.sortByKey().collect()

[(1, 2), (3, 4), (3, 4), (5, 6), (7, 8)]

In [25]:
pair.lookup(5)

[6]

In [26]:
pair.keys().collect()

[1, 3, 5, 7, 3]

Similar to narrow transformation, for key value pairs there are also methods like `reduceBy` and `groupBy`, which are `reduceByKey` and `groupByKey` in this case.