# Spark (with PySpark)


In this tutorial, you will learn how to use Apache Spark, a framework for large-scale data processing, within a notebook.
Upon completing this lab you will be able to : 
 + Program in Spark with the Python Language
 + Demonstrate how to read and process data using Spark
 + Compare and contrast RDD and Dataframes. 
 + Build a simple machine learning application with Spark.
 
 
 
# Requirements : Installing Spark, PySpark and configuration to run it on jupyter notebooks

According to your OS, you have to follow the following tutorial :

+ [Windows](https://changhsinlee.com/install-pyspark-windows-jupyter/)
+ [Mac Os X](https://jmedium.com/pyspark-in-mac/)
 
At the end of the installation, you should be able to run the following code that is a kind of Hello word in PySpark.

In [1]:
import findspark
findspark.init('/Users/louisdevitry/Downloads/spark-2.4.0-bin-hadoop2.7')

import pyspark
from pyspark.sql import SparkSession, Row, functions
from pyspark.ml.feature import CountVectorizer, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import numpy as np

In [2]:
# Hello world
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
spark.stop()

## Part 1 :  Spark Context

When writing a spark program, the first thing to do is to define a `SparkContext`. 

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

Here, we will use the `findspark` package that has to be installed using the following command: 

` pip3 install findspark`

Then we can use the `findspark.init()`function to locate the Spark process.




In [4]:
sc=pyspark.SparkContext()

Try printing out sc to see its type.

In [5]:
type(sc)

pyspark.context.SparkContext

You can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [6]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__getnewargs__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'defaultMinPartitions',
 'defaultParallelism',
 'dump_prof

In [7]:
sc.stop()

## Part 2 : Understanding Spark RDD’s

### WordCount in Spark
In this part, we will write the wordcount in Spark and apply it on the novel Dracula of Bram Stocker (from the Gutemberg project).

In [8]:
def processing(text):
    words = text.strip().split()
    words_list = []
    for word in words:
        if word != ',':
            words_list.append(word.lower())
    return(words_list)

In [9]:
import pprint as pp

In [10]:
# Initiate spark context
sc=pyspark.SparkContext()

# First read the [pg345.txt](./SparkData/pg345.txt) file.
print('Data type')
data = sc.textFile('./SparkData/pg345.txt')
print(type(data))
print('\n')

# To see the content, of the file, we need to run the action `collect` on the  RDD `data`
# I only print 10 for simplification purposes. collect() has no further difficulty
pp.pprint('Display data')
pp.pprint(data.take(10))
print('\n')

# With the RDD `data`, from the previous cell, execute a `flatMap()` for each line in the input and then convert it to lower case, remove the commas, split the words on a space and store in the RDD `words`
data_cleaned = data.flatMap(processing)

pp.pprint('Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation')
pp.pprint(data_cleaned.take(10))
print('\n')


# Perform a classic `map()` to create a tuple where each word has a count of 1
counts = data_cleaned.map(lambda word: (word, 1))

# Write the reducing function
reduced_counts = counts.reduceByKey(lambda a, b: a + b)

# Get all words that occur more than once
filtered_counts = reduced_counts.filter(lambda x: x[1] > 1)

# Sort them alphabetically
sorted_elem = filtered_counts.sortBy(lambda x: x[0], ascending=True)
pp.pprint('Alphabetically sorted count:')
pp.pprint(sorted_elem.take(10))
print('\n')

# Sort by appearance frequency
pp.pprint('Frequency sorted count:')
freq_counts = filtered_counts.sortBy(lambda x: x[1], ascending = False)
pp.pprint(freq_counts.take(10))

Data type
<class 'pyspark.rdd.RDD'>


'Display data'
[u'The Project Gutenberg EBook of Dracula, by Bram Stoker',
 u'',
 u'This eBook is for the use of anyone anywhere at no cost and with',
 u'almost no restrictions whatsoever.  You may copy it, give it away or',
 u're-use it under the terms of the Project Gutenberg License included',
 u'with this eBook or online at www.gutenberg.org/license',
 u'',
 u'',
 u'Title: Dracula',
 u'']


'Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation'
[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula,',
 u'by',
 u'bram',
 u'stoker',
 u'this']


'Alphabetically sorted count:'
[(u'"\'my', 2),
 (u'"_17', 3),
 (u'"_2', 2),
 (u'"_24', 2),
 (u'"_25', 4),
 (u'"_6', 2),
 (u'"_czarina', 2),
 (u'"a', 12),
 (u'"ah', 2),
 (u'"ah,', 16)]


'Frequency sorted count:'
[(u'the', 7984),
 (u'and', 5754),
 (u'to', 4504),
 (u'i', 4499),
 (u'of', 3710),
 (u'a', 2933),
 (u'he', 2509),
 (u'in', 2475)

In [11]:
sc.stop()

### A simple exercice

In [12]:
pp.pprint("Create a Python collection of 10,000 integers")
integers = list(np.random.randint(low = 0, high = 100, size = 10000))
print('\n')

pp.pprint('Create a Spark base RDD from that collection')
sc=pyspark.SparkContext()
data = sc.parallelize(integers)
print('\n')

pp.pprint("Subtract one from each value using map")
data_minus = data.map(lambda x: x-1)
pp.pprint(data_minus.take(10))
print('\n')

pp.pprint("Wordcount: Map and reduce by key")
data_minus_map = data_minus.map(lambda word: (word, 1))
data_minus_reduced = data_minus_map.reduceByKey(lambda a, b: a + b)
pp.pprint(data_minus_reduced.take(10))
print('\n')

pp.pprint("Perform action count")
sorted_elem = data_minus_reduced.sortBy(lambda x: x[1], ascending=False)
pp.pprint(sorted_elem.take(10))

'Create a Python collection of 10,000 integers'


'Create a Spark base RDD from that collection'


'Subtract one from each value using map'
[80, 94, 74, 57, 98, 77, 58, 56, 56, 31]


'Wordcount: Map and reduce by key'
[(0, 93),
 (4, 93),
 (8, 115),
 (12, 89),
 (16, 96),
 (20, 114),
 (24, 96),
 (28, 87),
 (32, 106),
 (36, 121)]


'Perform action count'
[(85, 130),
 (87, 122),
 (36, 121),
 (34, 120),
 (54, 120),
 (89, 117),
 (55, 116),
 (8, 115),
 (72, 115),
 (97, 115)]


In [13]:
sc.stop()

### An improved WordCount

Print the top 10 most frequent words with their probability of appearance


In [14]:
import re
def regex_(string):
    return(re.sub('[^A-Za-z0-9]+', '', string))

In [15]:
# Instanciate spark context
sc=pyspark.SparkContext()

data = sc.textFile('./SparkData/pg345.txt')

pp.pprint('Basic preprocessing')
data_cleaned = data.flatMap(processing) # Transformation
data_cleaned = data_cleaned.map(regex_) # Transformation
pp.pprint(data_cleaned.take(10)) # Action
print('\n')

# Format words
counts = data_cleaned.map(lambda word: (word, 1)) # Transformation

# Reduce by key
reduced_counts = counts.reduceByKey(lambda a, b: a + b) # Transformation

# Remove low occurence words
filtered_counts = reduced_counts.filter(lambda x: x[1] > 1) # Transformation

pp.pprint('Sort them alphabetically')
sorted_elem = filtered_counts.sortBy(lambda x: x[0], ascending=True) # Transformation
pp.pprint(sorted_elem.take(10)) # Action
print('\n')

pp.pprint('Sort by appearance frequency')
freq_counts = filtered_counts.sortBy(lambda x: x[1], ascending = False) # Transformation
pp.pprint(freq_counts.take(10))# Action
print('\n')

'Basic preprocessing'
[u'the',
 u'project',
 u'gutenberg',
 u'ebook',
 u'of',
 u'dracula',
 u'by',
 u'bram',
 u'stoker',
 u'this']


'Sort them alphabetically'
[(u'', 703),
 (u'1', 19),
 (u'10', 6),
 (u'1030', 2),
 (u'11', 7),
 (u'12', 6),
 (u'13', 3),
 (u'14', 3),
 (u'15', 3),
 (u'16', 5)]


'Sort by appearance frequency'
[(u'the', 8037),
 (u'and', 5896),
 (u'i', 4712),
 (u'to', 4540),
 (u'of', 3738),
 (u'a', 2961),
 (u'in', 2558),
 (u'he', 2543),
 (u'that', 2455),
 (u'it', 2141)]




How many times are the transformations evaluated? (Hint: it depends)
One of spark RDDs feature is its lazy evaluation. It means that until an action is specified, there is no evaluation. Consequently, the number of evaluations depend on the number of actions we specified. However, we can reduce the number of evaluations using of the persist method so that each action results will be saved for the next actions.

In [16]:
sc.stop()

## Part 3 : Spark SQL and dataframes

In this part, you will explore Spark DataFrames and the SQL Context. In particular, we will work on a database that contains a sample of the world population by working on data that comes from [pplapi](http://pplapi.com/). The file [agents.json](./SparkData/agents.json) is a file that was extracted from this api using the following command :


`wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json`

In [17]:
# Read data
spark = SparkSession.builder.getOrCreate()
df = spark.read.json("./SparkData/agents.json")

In [18]:
df.show()

+--------------------+----------+------------------+-------------------+------+
|        country_name|        id|          latitude|          longitude|   sex|
+--------------------+----------+------------------+-------------------+------+
|               China| 227417393| 33.15219798270325| 100.85840672174572|  Male|
|               Haiti|6821129477|19.325567983697297| -72.43795260265814|Female|
|               India|2078667700|23.645271492037235|  80.85636526088884|Female|
|               China| 477556555| 33.45864668881662|  93.33604038078953|Female|
|               India|1379059984|28.816938290678692|   80.7728698035823|Female|
|               India|2278934249|24.223974351280358|  80.14372690674512|  Male|
|         Philippines|4380736204|12.409991630883784| 122.75874146810197|Female|
|               India|1375733494|22.385712662257426|  77.90320433636231|Female|
|             Nigeria|3693807307| 9.967458870426357|  7.562942449523648|Female|
|                Mali|6552202234|16.8825

In [19]:
pp.pprint('Spark instructions that enable to display the number of French agents.')
df[df.country_name.isin(["France"])].show(5)

'Spark instructions that enable to display the number of French agents.'
+------------+----------+--------------------+--------------------+------+
|country_name|        id|            latitude|           longitude|   sex|
+------------+----------+--------------------+--------------------+------+
|      France|5130782577|-0.21142875508479517|-0.00395021443374...|Female|
|      France|5125653041|  1.5099359591520582| -1.7155442515387973|Female|
|      France|5092935162| 0.06978158062530335|  -1.529365900793559|Female|
|      France|5108968681|-0.15326107452236482|  2.1243709186708934|Female|
|      France|5078973934|-0.06137848013048675| -1.4476884573473048|Female|
+------------+----------+--------------------+--------------------+------+
only showing top 5 rows



In [20]:
pp.pprint('Spark instructions that enable to display the number of Indian female agents.')
df[df.country_name.isin(["India"]) & df.sex.isin(["Female"])].show(5)

'Spark instructions that enable to display the number of Indian female agents.'
+------------+----------+------------------+-----------------+------+
|country_name|        id|          latitude|        longitude|   sex|
+------------+----------+------------------+-----------------+------+
|       India|2078667700|23.645271492037235|80.85636526088884|Female|
|       India|1379059984|28.816938290678692| 80.7728698035823|Female|
|       India|1375733494|22.385712662257426|77.90320433636231|Female|
|       India|1584815794|19.102700698004416|79.67843973466778|Female|
|       India|1514105680|17.825246219527756|73.22156078852429|Female|
+------------+----------+------------------+-----------------+------+
only showing top 5 rows



Using the notion of temporary view (function `createTempView`), create a temporary view associated to the dataframe `df`. 

In [21]:
df.createTempView("temp_table")

Write some SQL query on the resulting table as shown in the example below.

In [22]:
spark.sql("SELECT country_name,id FROM temp_table ORDER BY id DESC LIMIT 10").show()

+-----------------+----------+
|     country_name|        id|
+-----------------+----------+
| French Polynesia|7170821229|
|       Cabo Verde|7167692449|
|         Suriname|7166451460|
|         Suriname|7166235088|
|            Macau|7166034642|
|       Montenegro|7164357515|
|Equatorial Guinea|7163867872|
|           Bhutan|7163256789|
|           Bhutan|7163004645|
|           Bhutan|7162877973|
+-----------------+----------+



In [23]:
spark.sql("SELECT * FROM temp_table WHERE country_name == 'France'").show()

+------------+----------+--------------------+--------------------+------+
|country_name|        id|            latitude|           longitude|   sex|
+------------+----------+--------------------+--------------------+------+
|      France|5130782577|-0.21142875508479517|-0.00395021443374...|Female|
|      France|5125653041|  1.5099359591520582| -1.7155442515387973|Female|
|      France|5092935162| 0.06978158062530335|  -1.529365900793559|Female|
|      France|5108968681|-0.15326107452236482|  2.1243709186708934|Female|
|      France|5078973934|-0.06137848013048675| -1.4476884573473048|Female|
|      France|5132969816|  1.1828646583062592| -0.6655754887318799|Female|
|      France|5079112248|  -0.217260169262005| 0.25087488920444284|Female|
|      France|5108994896| 0.49047553633530405| 0.12140982654262467|Female|
|      France|5130956195| -0.4702673586856353|  0.9266846158973026|  Male|
|      France|5135909518|  1.0264852068813861| -0.2140207779096717|  Male|
|      France|5126986401|

In [24]:
spark.sql("SELECT * FROM temp_table WHERE country_name == 'India' AND sex == 'Female'").show()

+------------+----------+------------------+-----------------+------+
|country_name|        id|          latitude|        longitude|   sex|
+------------+----------+------------------+-----------------+------+
|       India|2078667700|23.645271492037235|80.85636526088884|Female|
|       India|1379059984|28.816938290678692| 80.7728698035823|Female|
|       India|1375733494|22.385712662257426|77.90320433636231|Female|
|       India|1584815794|19.102700698004416|79.67843973466778|Female|
|       India|1514105680|17.825246219527756|73.22156078852429|Female|
|       India|2274764948| 19.82309181729447|71.97339986366181|Female|
|       India|2019200843| 19.85322450012048|81.48236565311277|Female|
|       India|1900697806| 25.00490234918041|73.99415281960854|Female|
|       India|1504105198|19.179362533369744|75.62931818391733|Female|
|       India|2188686500|17.218351214433053|80.27969374143316|Female|
|       India|1686639083|24.359614315713024|79.39650850071789|Female|
|       India|155034

In [25]:
spark.stop()
sc.stop()

You can also create a dataframe from an existing RDD as shown on the example below

In [26]:
sc=pyspark.SparkContext()
sqlContext = pyspark.SQLContext(sc)



In [27]:
documents_rdd = sc.parallelize([
        [1, 'cats are cute', 0],
        [2, 'dogs are playfull', 0],
        [3, 'lions are big', 1],
        [4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
        [0, 'Alice', 20],
        [1, 'Bob', 23],
        [2, 'Charles', 32]])

In [28]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])

# printing the inferred schema for documents
documents_df.printSchema()


root
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)



Some functions can be apply to a column or to different columns. Here, we compute the age avarage in the dataframe `users_df`. 

In [29]:
from pyspark.sql import functions as fn

user_age_df = users_df.select(fn.avg('age')).show()

+--------+
|avg(age)|
+--------+
|    25.0|
+--------+




Compute the max of age in the dataframe `users_df`. 

In [30]:
# TO DO 
user_age_df = users_df.select(fn.max('age')).show()

+--------+
|max(age)|
+--------+
|      32|
+--------+



Join (as in SQL) the two dataframes `users_df` and `documents_df`

In [31]:
# TO DO 
inner_join = users_df.join(documents_df, users_df.user_id == documents_df.user_id)

In [32]:
inner_join.show()

+-------+-----+---+------+-----------------+-------+
|user_id| name|age|doc_id|             text|user_id|
+-------+-----+---+------+-----------------+-------+
|      0|Alice| 20|     1|    cats are cute|      0|
|      0|Alice| 20|     2|dogs are playfull|      0|
|      1|  Bob| 23|     3|    lions are big|      1|
|      1|  Bob| 23|     4|    cars are fast|      1|
+-------+-----+---+------+-----------------+-------+



Outer Join (Left) (as in SQL) the two dataframes `users_df` and `documents_df`

In [33]:
# TO DO
inner_join = users_df.join(documents_df, users_df.user_id == documents_df.user_id, 'left_outer')

In [34]:
inner_join.show()

+-------+-------+---+------+-----------------+-------+
|user_id|   name|age|doc_id|             text|user_id|
+-------+-------+---+------+-----------------+-------+
|      0|  Alice| 20|     1|    cats are cute|      0|
|      0|  Alice| 20|     2|dogs are playfull|      0|
|      1|    Bob| 23|     3|    lions are big|      1|
|      1|    Bob| 23|     4|    cars are fast|      1|
|      2|Charles| 32|  null|             null|   null|
+-------+-------+---+------+-----------------+-------+



In [35]:
sc.stop()

## Part 4 : MLlib - Spark for data analysis and machine learning

In this part, we will see how to create a text classifiation application with Spark. We will use some data from the newsgroups [Usenet](https://en.wikipedia.org/wiki/Usenet_newsgroup) and the objective is to predict the topic of the news.
 
The train data is [here](./SparkData/20ng-train-all-terms.txt) and the test data [here](./SparkData/20ng-test-all-terms.txt).
The are obtained using the following commands: 
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-train-all-terms.txt`
and
`wget http://ana.cachopo.org/datasets-for-single-label-text-categorization/20ng-test-all-terms.txt`

Load these data as some RDDs and translate them in DataFrames.

In [36]:
sc=pyspark.SparkContext()
sqlContext = pyspark.SQLContext(sc)

def load_dataframe(path):
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.csv(path, sep = "\t", inferSchema=True, header = False)
    return(df)

train_data = load_dataframe("./SparkData/train-all-terms.txt")
test_data = load_dataframe("./SparkData/test-all-terms.txt")

train_data.show()
test_data.show()

+-----------+--------------------+
|        _c0|                 _c1|
+-----------+--------------------+
|alt.atheism|alt atheism faq a...|
|alt.atheism|alt atheism faq i...|
|alt.atheism|re gospel dating ...|
|alt.atheism|re university vio...|
|alt.atheism|re soc motss et a...|
|alt.atheism|re a visit from t...|
|alt.atheism|re political athe...|
|alt.atheism|re an anecdote ab...|
|alt.atheism|re political athe...|
|alt.atheism|re pompous ass km...|
|alt.atheism|re pompous ass li...|
|alt.atheism|re keith schneide...|
|alt.atheism|re keith schneide...|
|alt.atheism|re political athe...|
|alt.atheism|re political athe...|
|alt.atheism|re political athe...|
|alt.atheism|re don t more inn...|
|alt.atheism|re ancient islami...|
|alt.atheism|re political athe...|
|alt.atheism|re there must be ...|
+-----------+--------------------+
only showing top 20 rows

+-----------+--------------------+
|        _c0|                 _c1|
+-----------+--------------------+
|alt.atheism|re about the bib

In [37]:
# Split _c1 columns in lists of words
train_data = train_data.withColumn('_c1', functions.split(train_data._c1, ' '))
test_data = test_data.withColumn('_c1', functions.split(test_data._c1, ' '))

A first step is to represent our data, i.e. the messages in the form of a bag-of-word representation using the spark method `CountVectorizer` documented [here](https://spark.apache.org/docs/2.1.0/ml-features.html#countvectorizer)

In [38]:
# TO DO  - bag of word representation of test and train data
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="_c1", outputCol = 'features', vocabSize=1000)

model = cv.fit(train_data)

train_trans = model.transform(train_data)
test_trans = model.transform(test_data)

Displayong of the distint labels in the datasets

In [39]:
train_trans.select("_c0").distinct().sort("_c0").show(truncate=False)#.sort("label").show(truncate=False)

+------------------------+
|_c0                     |
+------------------------+
|alt.atheism             |
|comp.graphics           |
|comp.os.ms-windows.misc |
|comp.sys.ibm.pc.hardware|
|comp.sys.mac.hardware   |
|comp.windows.x          |
|misc.forsale            |
|rec.autos               |
|rec.motorcycles         |
|rec.sport.baseball      |
|rec.sport.hockey        |
|sci.crypt               |
|sci.electronics         |
|sci.med                 |
|sci.space               |
|soc.religion.christian  |
|talk.politics.guns      |
|talk.politics.mideast   |
|talk.politics.misc      |
|talk.religion.misc      |
+------------------------+



We will now apply a [NaiveBayes](https://spark.apache.org/docs/latest/ml-classification-regression.html#naive-bayes) classifier to our problem. 
Take the time to read the doc and apply it to our problem. You will first have to associate a number to each label. You can use the [`String Indexer`](https://spark.apache.org/docs/2.1.0/ml-features.html#stringindexer) function of pyspark for that.


In [40]:
# Transformation of the label into a number
# Instanciate
indexer = StringIndexer(inputCol="_c0", outputCol="label")
# Train transform
model = indexer.fit(train_trans)
indexed_train = model.transform(train_trans)
indexed_test = model.transform(test_trans)

In [41]:
# Training
nb = NaiveBayes(smoothing=1)
model = nb.fit(indexed_train)
predictions = model.transform(indexed_test)

In [43]:
# Evaluation
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Accuracy of the model : ' + str(evaluator.evaluate(predictions)))

Accuracy of the model : 0.625527911891


The Evaluation of the learned model can be done by using the [evaluation](https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html) module of MLlib. Print the accuracy of the obtained model.