# Pyspark

In [1]:
import findspark
import os
os.environ['SPARK_HOME'] = '/Users/anirban/Downloads/spark-3.4.1-bin-hadoop3'  # Replace with the actual Spark installation path
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf
# sc.stop()
# sc = SparkContext()

## What is Spark Context ?

1. It's the main entrypoint of a spark Cluster.

2. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on tha cluster.

3. Only one SparkContext may be present perJVM.

4. You need to stop() the active SparkContext to create a new one.

In case of Spark RDD, we need to create SparkContext.

In case of Spark dataframe, we create SparkSession

### How to create Spark Context.

1. Create a Spark Configuration.

2. Use this config to create a Spark Context (connection)

3. You can create a Spark COntext without using any Spark Config like on the fly.

In [3]:
conf = SparkConf().setAppName("Spark_Demo").setMaster("local")
sc = SparkContext(conf=conf)

23/07/23 16:18:21 WARN Utils: Your hostname, Anirbans-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.2 instead (on interface en0)
23/07/23 16:18:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/23 16:18:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '52516'),
 ('spark.driver.host', '192.168.1.2'),
 ('spark.app.submitTime', '1690109301317'),
 ('spark.executor.id', 'driver'),
 ('spark.app.id', 'local-1690109301950'),
 ('spark.app.name', 'Spark_Demo'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add

In [5]:
a = sc.parallelize([1,2,3,4])

In [6]:
a.collect()

                                                                                

[1, 2, 3, 4]

In [7]:
a.take(1)

[1]

In [8]:
type(a)

pyspark.rdd.RDD

In [9]:
from functools import reduce
reduce(lambda x,y : x +y, [1,2,3])

6

# RDD (Resilient Distributed Database)

Working with RDD (Two types of operations)
1. Actions
2. Transformations

## Create RDD and their basic actions

In [10]:
names = sc.parallelize(['Adam','Cray','Shaun','Brian','Mark','Christ','Shail','Satya','Cray'])

In [11]:
type(names)

pyspark.rdd.RDD

Create RDD from textfile

In [12]:
# sc.parallelize(["x", "y", "z"]).saveAsTextFile('random.txt')
rand = sc.textFile('random.txt')
type(rand)
rand.take(3)

['x', 'y', 'z']

#### Collect


1. We should not use collect as it will return all the elements in RDD then ther is no point in using Pyspark.

2. This method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory

In [13]:
names.collect()

['Adam', 'Cray', 'Shaun', 'Brian', 'Mark', 'Christ', 'Shail', 'Satya', 'Cray']

In [14]:
names.take(3)

['Adam', 'Cray', 'Shaun']

In [15]:
names.countByValue()

defaultdict(int,
            {'Adam': 1,
             'Cray': 2,
             'Shaun': 1,
             'Brian': 1,
             'Mark': 1,
             'Christ': 1,
             'Shail': 1,
             'Satya': 1})

In [16]:
names.distinct().count()

8

### foreach() 
Does not return anything but logs to stdout.

In [17]:
num = sc.parallelize([1,2,3,3,4,5,3,5,2,7,8,9],5)

In [18]:
a = names.foreach(lambda x : print(x))

Adam
Cray
Shaun
Brian
Mark
Christ
Shail
Satya
Cray


In [19]:
type(a)

NoneType

### GLOM

Return an RDD created by coalescing all elements within each partition
into a list.

In [20]:
type(num)

pyspark.rdd.RDD

In [21]:
num.collect()

[1, 2, 3, 3, 4, 5, 3, 5, 2, 7, 8, 9]

In [22]:
num.take(4)

[1, 2, 3, 3]

RDD stored in each partition (partition-wise RDD/ pipeline RDD)

In [23]:
num.glom().collect()

[[1, 2], [3, 3], [4, 5], [3, 5], [2, 7, 8, 9]]

In [24]:
type(num.glom())

pyspark.rdd.PipelinedRDD

The data present in the first partition only

In [25]:
num.glom().collect()[0]

[1, 2]

### REDUCE

In [26]:
reduce(lambda x,y : x+y, num.collect()) # Python's reduce

52

In [27]:
num.reduce(lambda x,y:x+y) # pysaprk.rdd.RDD reduce

52

### SORT RDD

In [28]:
num.takeOrdered(5)

[1, 2, 2, 3, 3]

### FOLD **** Confusing ****

In [29]:
num.fold(1, lambda x,y : x*y)

5443200

In [30]:
%time
b = sc.parallelize(range(1,10))
b.collect()

CPU times: user 0 ns, sys: 1e+03 ns, total: 1e+03 ns
Wall time: 3.1 µs


[1, 2, 3, 4, 5, 6, 7, 8, 9]

## Transformations

### Narrow Transformations

#### Map
Return a new RDD by applying a function to each element of this RDD.

In [67]:
num = sc.parallelize([1,2,3,4,5,6,7,8], 3)
num.glom().collect()

[[1, 2], [3, 4], [5, 6, 7, 8]]

In [59]:
num.map(lambda x : x*x).glom().collect()

[[1, 4], [9, 16], [25, 36, 49, 64]]

#### Flatmap
Same as map but the result is flattened.

In [62]:
num_2 = sc.parallelize([4,7,12],2)
num_2.glom().collect()

[[4], [7, 12]]

In [63]:
num_2.flatMap(lambda x : range(x)).glom().collect()

[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]]

#### Filter
Return a new RDD containing only the elements that satisfy a predicate.


In [66]:
num_2.filter(lambda x : x % 2 != 0).glom().collect()

[[], [7]]

#### Union
Union of all partitions from both RDD into a new RDD.

In [70]:
num.union(num_2).glom().collect()

[[1, 2], [3, 4], [5, 6, 7, 8], [4], [7, 12]]

In [71]:
num.union(num_2).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 4, 7, 12]

#### Sample
Return a sampled subset of this RDD based ona fraction of the original size.

In [102]:
num.sample(False, 0.5).collect()

[1, 3, 5, 6, 7]

### Wide Transformations
These methods perform  shuffle(combines the partitions) internally.
Generates a generator.

 #### Groupby

Without Key

In [130]:
num_3  = sc.parallelize(range(13),3)
num_3.glom().collect()

[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11, 12]]

In [116]:
for key, val in num_3.groupBy(lambda x : x %3 == 0).collect():
    print(key, list(val))

False [1, 2, 4, 5, 7, 8, 10, 11]
True [0, 3, 6, 9, 12]


In [118]:
people = sc.parallelize(['Adam','Cray','Shaun','Brian','Mark','Christ','Shail','Satya','Cray'])
people.collect()

['Adam', 'Cray', 'Shaun', 'Brian', 'Mark', 'Christ', 'Shail', 'Satya', 'Cray']

In [122]:
for key, val in people.groupBy(lambda x : x[0]).collect():
    print(key, list(val))

A ['Adam']
C ['Cray', 'Christ', 'Cray']
S ['Shaun', 'Shail', 'Satya']
B ['Brian']
M ['Mark']


#### Intersection

In [131]:
num_2.intersection(num_3).collect()

[7, 12, 4]

#### Subtract

In [136]:
num_3.subtract(num_2).collect()

[0, 5, 10, 1, 6, 11, 2, 3, 8, 9]

#### Distinct

In [140]:
num_3.distinct().collect()

[0, 3, 6, 9, 12, 1, 4, 7, 10, 2, 5, 8, 11]

### Transformations on Key Value Pairs

In [166]:
data = sc.parallelize([(1,2),(3,4),(5,6),(1,7)])
data.collect()

[(1, 2), (3, 4), (5, 6), (1, 7)]

In [167]:
data.count()

4

In [168]:
data.countByValue()

defaultdict(int, {(1, 2): 1, (3, 4): 1, (5, 6): 1, (1, 7): 1})

In [169]:
data.countByKey()

defaultdict(int, {1: 2, 3: 1, 5: 1})

In [170]:
data.top(2)

[(5, 6), (3, 4)]

In [171]:
data.sortByKey().collect()

[(1, 2), (1, 7), (3, 4), (5, 6)]

In [172]:
data.lookup(3)

[4]

In [173]:
data.keys().collect()

[1, 3, 5, 1]

In [174]:
data.values().collect()

[2, 4, 6, 7]

In [175]:
data.mapValues(lambda x : x % 2 ==  0).collect()

[(1, True), (3, True), (5, True), (1, False)]

#### ReduceByKey
Merge the values for each key using an associative and commutative reduce function.

This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.


In [178]:
data.reduceByKey(lambda x,y : x+y).collect()

[(1, 9), (3, 4), (5, 6)]

#### GroupByKey
1. Group the values for each key in the RDD into a single sequence.
2. Unlike ReduceByKey, aggregation is not done in partition and done after combining all partitions.
3. Hence, a lot of unnecessary data is transferred and hence is computationsal expnesive.
4. Generates a generator.

In [188]:
for key, val in data.groupByKey().collect():
    print(key, list(val))

1 [2, 7]
3 [4]
5 [6]


In [207]:
data.groupByKey().mapValues(sum).collect() # This is the same result as obtained by ReeduceByKey() but is computationsally expensive.

[(1, 9), (3, 4), (5, 6)]

------ Note
##### reduceBykey = groupByKey.mapValues()

#### FlatMapValues

In [208]:
data.flatMapValues(lambda x : range(x)).collect()

[(1, 0),
 (1, 1),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (5, 0),
 (5, 1),
 (5, 2),
 (5, 3),
 (5, 4),
 (5, 5),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (1, 5),
 (1, 6)]

#### SubtractByKey
Return each (key, value) pair in `self` that has no pair with matching
key in `other`.

In [213]:
data_2 = sc.parallelize([(1,5), (5,8)])

In [214]:
data.subtractByKey(data_2).collect()

[(3, 4)]

#### Join

In [216]:
data_3 = sc.parallelize([(2,5),(4,8),(5,7),(7,9)])
data_3.collect()

[(2, 5), (4, 8), (5, 7), (7, 9)]

In [217]:
data_3.join(data_2).collect()

[(5, (7, 8))]

In [220]:
data_3.rightOuterJoin(data_2).collect()

[(5, (7, 8)), (1, (None, 5))]

In [221]:
data_3.leftOuterJoin(data_2).collect()

[(2, (5, None)), (4, (8, None)), (5, (7, 8)), (7, (9, None))]

In [222]:
data_3.fullOuterJoin(data_2).collect()

[(2, (5, None)), (4, (8, None)), (5, (7, 8)), (7, (9, None)), (1, (None, 5))]