# Basic informations

## Partitions

Partition is a collection of divided data from a large dataset that is stored across the cluster.

If there is only one executor the parallelism is one even if the parameter ```minPartitions > 1```. And if you have ```minPartitions = 1``` and more than one executor, the parallelism is only one.

In [1]:
from pyspark import SparkContext

# create a spark context
sc = SparkContext("local", "Basic informations")

In [2]:
rdd = sc.textFile("sample_data/201-basic-informations.txt", 4)

```.getNumPartitions()``` return the number of partitions in RDD.

In [3]:
rdd.getNumPartitions()

4

## Count

```.count()``` - return the number of elements in this RDD.

```.countByKey``` - count RDD elements by each key.

```.countByValue``` - count RDD elements by value.

In [4]:
rdd.count()

54

In [5]:
rdd.countByValue()

defaultdict(int,
            {'From fairest creatures we desire increase,': 4,
             "That thereby beauty's rose might never die,": 2,
             'But as the riper should by time decease,': 2,
             'His tender heir might bear his memory:': 2,
             'But thou contracted to thine own bright eyes,': 2,
             "Feed'st thy light's flame with self-substantial fuel,": 1,
             'Making a famine where abundance lies,': 1,
             'Thy self thy foe, to thy sweet self too cruel:': 1,
             "Thou that art now the world's fresh ornament,": 1,
             'And only herald to the gaudy spring,': 1,
             'Within thine own bud buriest thy content,': 1,
             "And tender churl mak'st waste in niggarding:": 1,
             'Pity the world, or else this glutton be,': 1,
             "To eat the world's due, by the grave and thee.": 1,
             'When forty winters shall besiege thy brow,': 1,
             "And dig deep trenches in thy be

In [6]:
rdd1 = sc.parallelize([('a',2),('b',3),['c',2],('f',20),['d',4],('f',12),('d',11)])
rdd1.countByKey()

defaultdict(int, {'a': 1, 'b': 1, 'c': 1, 'f': 2, 'd': 2})

## collectAsMap

```.collectAsMap()``` - return a dictionary key-value pairs for the current RDD to master

In [7]:
rdd2 = sc.parallelize([("a", "b"), ("g", 2), ["c", "d"], {"e", "f"}])
rdd2.collectAsMap()

{'a': 'b', 'g': 2, 'c': 'd', 'e': 'f'}

## Sum

Sum of elements of RDD

In [8]:
rdd3 = sc.parallelize(range(100, 200), 3)
rdd3.sum()

14950

## isEmpty

checks if the RDD is empty and return ```True``` if and only if the RDD contains no elements

In [9]:
rdd.isEmpty()

False

In [10]:
sc.parallelize([]).isEmpty()

True

In [11]:
sc.parallelize([[]]).isEmpty()

False

# Some statistics methods

## max

Return the max value of an RDD

In [12]:
rdd4 = sc.parallelize(range(50,150))
rdd4.max()

149

## min

Return the min value of an RDD

In [13]:
rdd4.min()

50

## mean

Return the mean value of an RDD

In [14]:
rdd4.mean()

99.5

## stdev

Return the standart deviation value of an RDD

In [15]:
rdd4.stdev()

28.86607004772212

## histogram

Compute a histogram by bins

In [16]:
rdd4.histogram(4)

([50.0, 74.75, 99.5, 124.25, 149], [25, 25, 25, 25])

## stats

Return a StatCounter object that captures the count, mean, stdev, max, and min of the RDD’s elements in one operation

In [17]:
rdd4.stats()

(count: 100, mean: 99.5, stdev: 28.86607004772212, max: 149.0, min: 50.0)

# Functions

## map

Applies a function for each element in RDD

In [18]:
rdd5 = sc.textFile("sample_data/201-greetings.txt")
rdd5.map(lambda line: line.split()).collect()

[['Good', 'Morning'],
 ['Good', 'Evening'],
 ['Good', 'Day'],
 ['Happy', 'Birthday'],
 ['Happy', 'New', 'Year'],
 ['Nice', 'to', 'meet', 'you'],
 ['Hi'],
 ['Hello'],
 ['How', 'are', 'you?']]

## flatMap

Applies a map function for each element in RDD and flattening the results

In [19]:
rdd5.flatMap(lambda line: line.split()).collect()

['Good',
 'Morning',
 'Good',
 'Evening',
 'Good',
 'Day',
 'Happy',
 'Birthday',
 'Happy',
 'New',
 'Year',
 'Nice',
 'to',
 'meet',
 'you',
 'Hi',
 'Hello',
 'How',
 'are',
 'you?']

## flatMapValues

Each key-value pair pass through a flatMap function and then the key-value pairs are mapped whithout changing the keys.

In [20]:
rdd6 = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
rdd6.flatMapValues(lambda x: x).collect()

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

## How to select data in RDD?

Methods as ```.collect``` we saw in the examples above.

```.collect``` returns RDD elements as a list

In [21]:
rdd5.collect()

['Good Morning',
 'Good Evening',
 'Good Day',
 'Happy Birthday',
 'Happy New Year',
 'Nice to meet you',
 'Hi',
 'Hello',
 'How are you?']

```.take(3)``` returns first 3 elements of a RDD

In [22]:
rdd5.take(3)

['Good Morning', 'Good Evening', 'Good Day']

```.first``` return the first element of a RDD

In [23]:
rdd5.first()

'Good Morning'

```.top(3)``` returns top 3 elements of a RDD sorted by descending

In [24]:
rdd5.top(2)

['Nice to meet you', 'How are you?']

Being sure of the last function ```.top(3)``` the sorted list is shown below:

In [25]:
sorted(rdd5.collect())

['Good Day',
 'Good Evening',
 'Good Morning',
 'Happy Birthday',
 'Happy New Year',
 'Hello',
 'Hi',
 'How are you?',
 'Nice to meet you']

## Sampling data

```.sample()``` return a sample subset of the RDD

In [26]:
rdd7 = sc.parallelize(range(100,120))
print("RDD: ", rdd7.collect())
print("Subset: ", rdd7.sample(False, 0.44, 42).collect())

RDD:  [100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119]
Subset:  [100, 101, 103, 104, 107, 110, 113, 114, 116, 117, 118]


```.filter()``` - filter values in the RDD, looks like a ```where``` statement of SQL

In [27]:
# collect even number in a range(100,150)
rdd7.filter(lambda x: x % 2 == 0).collect()

[100, 102, 104, 106, 108, 110, 112, 114, 116, 118]

```.distinct``` - return a new RDD with distinct elements of the current RDD

In [28]:
rdd8 = rdd5.flatMap(lambda line: line.split())
rdd8.distinct().collect()

['Good',
 'Morning',
 'Evening',
 'Day',
 'Happy',
 'Birthday',
 'New',
 'Year',
 'Nice',
 'to',
 'meet',
 'you',
 'Hi',
 'Hello',
 'How',
 'are',
 'you?']

```.keys()``` - return the keys in the RDD

In [29]:
# rdd6 = [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
rdd9 = rdd6.flatMapValues(lambda x: x)
rdd9.keys().collect()

['a', 'a', 'a', 'b', 'b']

Using ```.keys()``` and ```.distinct()``` to return unique keys in the RDD

In [30]:
rdd9.keys().distinct().collect()

['a', 'b']

## Iterating over RDD

```.foreach``` is a useful function when you desire moving your data to an external system, i.e. a database not supported by PySpark or publish to a web service. This function does not return anything. 

In [31]:
def save_local_file(x):
    with open('sample_Data/201-foreach.txt', 'a') as f:
        f.write(str(x)+'\n')

In [32]:
def f(x): save_local_file(x + 1)
rdd10 = sc.parallelize([x for x in range(100)], 4)

In [33]:
rdd10.foreach(f)