# PySpark on Mac

To install PySpark, I used this [tutorial](https://medium.com/@GalarnykMichael/install-spark-on-mac-pyspark-453f395f240b) that offers a complete guideline about how to configure and update PySpark driver environment variables adding lines to your ~/.bash_profile file.

In [1]:
# import SparkContext
from pyspark import SparkContext

## Resilient Distributed Datasets (RDDs)

RDDs are the backbone of Apache Spark. They perform calculations faster because the dataset is parallelized, it means, distributed or split into chuncks based on keys and executor nodes.  

The transformations to the dataset only occur when the action is taken, optimizing the execution.

Let's try an example of RDDs:

In [2]:
sc = SparkContext.getOrCreate()

1 million of 2D dots are randomly generated. A basic multiplication and substraction is applied to every coordinate and then we calculate the mean and standard deviation of every population of coordinates. 

In [3]:
import numpy as np

TOTAL = 1000000
dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in range(TOTAL)]).cache()

In [4]:
#access to the first 10 elements on dots:
dots.take(10)

[array([0.43353363, 0.81139593]),
 array([0.75313286, 0.40509211]),
 array([-0.83095786, -0.62381484]),
 array([ 0.7889119 , -0.76342479]),
 array([ 0.52880978, -0.57605368]),
 array([0.33215834, 0.82674053]),
 array([-0.3594745 ,  0.41015793]),
 array([0.87670549, 0.21995188]),
 array([-0.58730223,  0.94808889]),
 array([-0.30174567, -0.5058061 ])]

In [5]:
#count the elements on dots:
dots.count()

1000000

In [6]:
#inspect firt line
dots.first()

array([0.43353363, 0.81139593])

In [7]:
stats = dots.stats()
print('Mean:', stats.mean())
print('stdev:', stats.stdev())

Mean: [ 0.0002259  -0.00015072]
stdev: [0.57732011 0.57712783]


### RDD Data Transformations


What kind of transformations we can do? Mapping, filtering, joining, and transcoding are the operations that transform the values in the dataset.

In [8]:
sc.parallelize(['a', 'b', 'c', 1, 1.1]).count()

5

In [9]:
rdd = sc.parallelize([('flat white', 1), ('latte', 4), ('pour over', 1), ('flat white', 3)]) 
sorted(rdd.countByKey().items())

[('flat white', 2), ('latte', 1), ('pour over', 1)]

In [10]:
sorted(sc.parallelize(['a', 'b', 'c', 'd', 'e', 'a', 'b']).distinct().collect())

['a', 'b', 'c', 'd', 'e']

In [11]:
rdd = sc.parallelize(['flat white', 'capuccino', 'latte', 'tea', 'matcha'])
rdd.map(lambda x: 'cup of '+''.join(x)).collect()

['cup of flat white',
 'cup of capuccino',
 'cup of latte',
 'cup of tea',
 'cup of matcha']

In [12]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]

In [13]:
rdd = sc.parallelize(['flat white', 'capuccino', 'latte', 'tea', 'matcha'])
rdd.first()

'flat white'

From [Using PySpark to perform Transformations and Actions on RDD](https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/?utm_source=blog&utm_medium=DataFramePySparkarticle)

In [14]:
sp = SparkContext.getOrCreate()

In [15]:
rdd = sp.textFile("../pyspark/data/blogtexts")

In [16]:
rdd.take(1)

['Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.']

### Map and flatMap

#### Q1: Convert all words in a rdd to lowercase and split the lines of a document using space

In [17]:
def func(text):
    return text.lower().split()

In [18]:
rdd1 = rdd.map(func)

In [19]:
rdd1.take(2)

[['think',
  'of',
  'it',
  'for',
  'a',
  'moment',
  '–',
  '1',
  'qunitillion',
  '=',
  '1',
  'million',
  'billion!',
  'can',
  'you',
  'imagine',
  'how',
  'many',
  'drives',
  '/',
  'cds',
  '/',
  'blue-ray',
  'dvds',
  'would',
  'be',
  'required',
  'to',
  'store',
  'them?',
  'it',
  'is',
  'difficult',
  'to',
  'imagine',
  'this',
  'scale',
  'of',
  'data',
  'generation',
  'even',
  'as',
  'a',
  'data',
  'science',
  'professional.',
  'while',
  'this',
  'pace',
  'of',
  'data',
  'generation',
  'is',
  'very',
  'exciting,',
  'it',
  'has',
  'created',
  'entirely',
  'new',
  'set',
  'of',
  'challenges',
  'and',
  'has',
  'forced',
  'us',
  'to',
  'find',
  'new',
  'ways',
  'to',
  'handle',
  'big',
  'huge',
  'data',
  'effectively.'],
 []]

In [20]:
rdd2 = rdd.flatMap(func)

In [21]:
rdd2.take(10)

['think', 'of', 'it', 'for', 'a', 'moment', '–', '1', 'qunitillion', '=']

### Filter
#### Q2: Remove stopwords and special characters

In [22]:
from spacy.lang.en import English
from spacy.lang.en.stop_words import STOP_WORDS
import re

In [23]:
STOP_WORDS.add('')

In [24]:
rdd3 = rdd2.map(lambda x: re.sub('[^A-Za-z0-9]+', '', x))

In [25]:
rdd3.take(10)

['think', 'of', 'it', 'for', 'a', 'moment', '', '1', 'qunitillion', '']

In [26]:
rdd4 = rdd3.filter(lambda x: x not in STOP_WORDS)

In [27]:
rdd4.take(10)

['think',
 'moment',
 '1',
 'qunitillion',
 '1',
 'million',
 'billion',
 'imagine',
 'drives',
 'cds']

### groupBy

#### Q3: Group the words in rdd4 based on which letters they start with.

In [28]:
rdd5 = rdd4.groupBy(lambda x: x[0])

In [29]:
[(k, len(list(value))) for k, value in rdd5.take(20)]

[('t', 227),
 ('m', 127),
 ('1', 23),
 ('q', 7),
 ('b', 63),
 ('i', 109),
 ('d', 233),
 ('c', 275),
 ('r', 187),
 ('s', 371),
 ('g', 20),
 ('p', 199),
 ('e', 88),
 ('n', 94),
 ('f', 118),
 ('w', 49),
 ('h', 63),
 ('u', 34),
 ('o', 67),
 ('a', 171)]

In [30]:
[(k, list(value)) for k, value in rdd5.take(1)]

[('t',
  ['think',
   'typically',
   'turns',
   'terms',
   'time',
   'traditional',
   'topic',
   'table',
   'terms',
   'traditional',
   'tough',
   'task',
   'temperature',
   'time',
   'typically',
   'task',
   'task',
   'time',
   'time',
   'task',
   'task',
   'terms',
   'time',
   'task',
   'task',
   'time',
   'tasks',
   'times',
   'times',
   'traditional',
   'tasks',
   'terms',
   'types',
   'terms',
   'tasks',
   'traditional',
   'talk',
   'times',
   'tasks',
   'turn',
   'terms',
   'time',
   'time',
   'tasks',
   'technologies',
   'time',
   'task',
   'time',
   'transactions',
   'time',
   'time',
   'think',
   'transformations',
   'transformations',
   'topic',
   'talking',
   'terminal',
   'terms',
   'type',
   'transfers',
   'tar',
   'tar',
   'tool',
   'time',
   'typing',
   'typing',
   'typing',
   'typing',
   'thread',
   'time',
   'time',
   'transformations',
   'talk',
   'tolerant',
   'transformations',
   'things',
   

### groupByKey / reduceByKey 

#### Q4. Frecuency of words.

In [31]:
rdd6 = rdd4.map(lambda x: (x, 1))

In [32]:
rdd7 = rdd6.groupByKey()

In [33]:
[(key, len(list(value))) for key, value in rdd7.take(10)]

[('think', 2),
 ('moment', 1),
 ('1', 12),
 ('qunitillion', 1),
 ('million', 1),
 ('billion', 1),
 ('imagine', 4),
 ('drives', 1),
 ('cds', 1),
 ('blueray', 1)]

In [34]:
rdd7_freq_of_words = rdd7.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

In [35]:
rdd7_freq_of_words.take(10)

[(91, 'spark'),
 (74, 'data'),
 (52, 'apache'),
 (48, 'rdd'),
 (27, 'need'),
 (22, 'dataframe'),
 (22, 'cluster'),
 (22, 'train'),
 (21, 'lets'),
 (21, 'method')]

In [36]:
rdd6.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(10)

[(91, 'spark'),
 (74, 'data'),
 (52, 'apache'),
 (48, 'rdd'),
 (27, 'need'),
 (22, 'dataframe'),
 (22, 'cluster'),
 (22, 'train'),
 (21, 'lets'),
 (21, 'method')]

### mapPartitions

mapPartitions is a map transformation running in different partitions of the RDD. For instances, if we are curious about the frequency of 10 specific words, it's possible count them in different partitions.

#### Q5. Perform a task in different partitions of the RDD: Investigate the frequency of the following words: 
- spark
- apache

We need to define a function with the task required and then, pass the function to the **mapPartitions** transformation.

In [37]:
from collections import Counter

def func(iterator):
    
    counter = Counter(iterator)
    return (counter['spark'], counter['apache'])

Let's display the number of partitions:

In [38]:
rdd4.getNumPartitions()

1

Now, we increase the number of partitions applying the function **repartition** over the RDD. 

In [39]:
repartrdd = rdd4.repartition(6)

Checking the number of partitions again:

In [40]:
repartrdd.getNumPartitions()

6

Passing the function to the mapPartitions -using the original RDD- performs the task in just one partition. **glom** function allows us inspect the data into every partition. We use this tool to compare the original output and others generated changing the number of partitions. In the first case, as we could expect, the output is a single array with the frecuency of every word.

In [41]:
rdd4.mapPartitions(func).glom().collect()

[[91, 52]]

Then, for the new RDD with 6 partitions, we expect 6 outputs with two elements each one, corresponding to the frecuency of the pair of words searched in every partition:

In [42]:
repartrdd.mapPartitions(func).glom().collect()

[[12, 8], [15, 8], [15, 8], [20, 10], [16, 11], [13, 7]]

Which function do we use to reduce the number of partitions? We can use **repartition** or **coalesce**. The first one does a full shuffle of the data and creates equal sized partitions of data. Instead, coalesce combine existint partitions to avoid a full shuffle, an expensive operation for large datasets

In [43]:
coalescerdd = repartrdd.coalesce(4)

*coalesce* is the function to reduce the number of partitions. For this new RDD with 4 partitions, we expect 4 outputs:

In [44]:
coalescerdd.mapPartitions(func).glom().collect()

[[12, 8], [33, 17], [30, 16], [16, 11]]

### Sampling

#### Q6: How do we get a sample of a population?

*sample* transformation has the following parameters:
- withReplacement (bool)
- fraction (float)
- seed, random state (int)

In [45]:
rdd_sample = rdd4.sample(False, 0.2, 42)

In [46]:
len(rdd_sample.collect())

525

In [47]:
len(rdd4.collect())

2762

### Union/join

#### Q7: Union of two RDD's. One with all the words starting with 'u' and another with the words ending with 'a'.
#### Notes: union doesn't delete duplicates.

In [48]:
e_words = rdd4.filter(lambda x: x[0] == 'e').distinct()

In [49]:
e_words.collect()

['exciting',
 'entirely',
 'effectively',
 'experience',
 'efficient',
 'emanating',
 'efficiently',
 'example',
 'end',
 'explain',
 'exact',
 'exactly',
 'examples',
 'enables',
 'executors',
 'ends',
 'evaluation',
 'execute',
 'entire',
 'easiest',
 'extract',
 'extracted',
 'editing',
 'environment',
 'editor',
 'export',
 'extent',
 'exploring',
 'elements',
 'element',
 'existing',
 'external',
 'earlier',
 'executing',
 'elegant',
 'encode',
 'extra',
 'evaluate',
 'error',
 'evaluator',
 'evaluatorevaluatepredictionsevaluatormetricnamemse']

In [50]:
words_a = rdd4.filter(lambda x: x[-1] == 'a').distinct()

In [51]:
words_a.collect()

['data',
 'phenomena',
 'scala',
 'california',
 'java',
 'ppawebupd8teamjava',
 'metadata',
 'scparallelizedata',
 'gupta',
 'lambda',
 'rddmaplambda',
 'trueinferschema',
 'inferschema',
 'printschema',
 'schema',
 'trainprintschema',
 'fillna',
 'comma',
 'formula',
 'rformula',
 'extra']

In [52]:
union = e_words.union(words_a)

In [53]:
print('Length words starting with "e": {}'.format(len(e_words.collect())))
print('Length words finishing with "a": {}'.format(len(words_a.collect())))     
print('Lenght union: {}'.format(len(union.collect())))     

Length words starting with "e": 41
Length words finishing with "a": 21
Lenght union: 62


#### Q8: Joining two pair RDD's based on their key.

In [54]:
sample1 = rdd6.sample(False, 0.2, 42).distinct()
sample2 = rdd6.sample(False, 0.2, 21).distinct()

In [55]:
sample1.take(10)

[('think', 1),
 ('qunitillion', 1),
 ('1', 1),
 ('blueray', 1),
 ('difficult', 1),
 ('data', 1),
 ('generation', 1),
 ('find', 1),
 ('handle', 1),
 ('huge', 1)]

In [56]:
joined_samples = sample1.join(sample2)

In [57]:
joined_samples.take(10)

[('think', (1, 1)),
 ('qunitillion', (1, 1)),
 ('1', (1, 1)),
 ('huge', (1, 1)),
 ('offering', (1, 1)),
 ('based', (1, 1)),
 ('large', (1, 1)),
 ('read', (1, 1)),
 ('learning', (1, 1)),
 ('challenges', (1, 1))]

In [58]:
print('Length sample 1: {}'.format(len(sample1.collect())))
print('Length sample 2: {}'.format(len(sample2.collect())))     
print('Lenght joined samples: {}'.format(len(joined_samples.collect())))   

Length sample 1: 335
Length sample 2: 356
Lenght joined samples: 166
