# Spark (with PySpark)


In this tutorial, you will learn how to use Apache Spark, a framework for large-scale data processing, within a notebook.
Upon completing this lab you will be able to :
 + Program in Spark with the Python Language
 + Demonstrate how to read and process data using Spark
 + Compare and contrast RDD and Dataframes.
 + Build a simple machine learning application with Spark.



# Requirements : (Recommended) Just complile it in Google Colab


# Else: Installing Spark, PySpark and configuration to run it on jupyter notebooks

According to your OS, you have to follow the following tutorial :

+ [Windows](https://changhsinlee.com/install-pyspark-windows-jupyter/)
+ [Mac Os X](https://jmedium.com/pyspark-in-mac/)

At the end of the installation, you should be able to run the following code that is a kind of Hello word in PySpark.

In [10]:
!pip install findspark
!pip install pyspark
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [11]:
spark.stop()

## Part 1 :  Spark Context

When writing a spark program, the first thing to do is to define a `SparkContext`.

In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. The results from these tasks are delivered back to the driver.

Here, we will use the `findspark` package that has to be installed using the following command:

` pip3 install findspark`

Then we can use the `findspark.init()`function to locate the Spark process.




# New Section

In [12]:
import findspark

findspark.init()

import pyspark

sc=pyspark.SparkContext("local[*]","First program in Spark")

Try printing out sc to see its type.

In [13]:
print(sc)
print("Spark Version:", sc.version)
print("Master:", sc.master)
print("Application Name:", sc.appName)


<SparkContext master=local[*] appName=First program in Spark>
Spark Version: 3.5.4
Master: local[*]
Application Name: First program in Spark


You can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [14]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addArchive',
 'addFile',
 'addJobTag',


## Part 2 : Understanding Spark RDD’s

### WordCount in Spark
In this part, we will write the wordcount in Spark and apply it on the novel Dracula of Bram Stocker (from the Gutemberg project).

First read the [pg345.txt](./SparkData/pg345.txt) file.

In [15]:
data = sc.textFile('/content/pg345.txt')
type(data)

To see the content, of the file, we need to run the action `collect` on the  RDD `data`

In [17]:
from google.colab import files
uploaded = files.upload()

Saving pg345.txt to pg345.txt


In [18]:
data.collect()

['The Project Gutenberg EBook of Dracula, by Bram Stoker',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org/license',
 '',
 '',
 'Title: Dracula',
 '',
 'Author: Bram Stoker',
 '',
 'Release Date: August 16, 2013 [EBook #345]',
 '',
 'Language: English',
 '',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK DRACULA ***',
 '',
 '',
 '',
 '',
 'Produced by Chuck Greif and the Online Distributed',
 'Proofreading Team at http://www.pgdp.net (This file was',
 'produced from images generously made available by The',
 'Internet Archive)',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '                                DRACULA',
 '',
 '',
 '',
 '',
 '',
 '                                DRACULA',
 '',
 '                                  _by_',
 '',
 '                              Bram Stoke

With the RDD `data`, from the previous cell, execute a `flatMap()` for each line in the input and then convert it to lower case, remove the commas, split the words on a space and store in the RDD `words`

In [19]:
import re

words = data.flatMap(lambda line: re.sub(r'[^a-zA-Z\s]', '', line.lower()).split())




Execute the command with the action `take()` and retrieve the first 10 words from the `flatMap()`transformation

In [20]:
# Show the first 10 words after preprocessing
words.take(10)


['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 'by',
 'bram',
 'stoker',
 'this']

Perform a classic `map()` to create a tuple where each word has a count of 1

In [21]:
# Map each word to a tuple (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# Show the first 10 words after preprocessing
word_pairs.take(10)




[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('dracula', 1),
 ('by', 1),
 ('bram', 1),
 ('stoker', 1),
 ('this', 1)]

Write the reducing function

In [22]:
# Reduce by key (sum the counts for each word)
word_counts = word_pairs.reduceByKey(lambda count1, count2: count1 + count2)

# Retrieve and display the first 10 word-count pairs
word_counts.take(10)


[('of', 3738),
 ('dracula', 38),
 ('by', 531),
 ('bram', 5),
 ('stoker', 5),
 ('this', 669),
 ('for', 1534),
 ('use', 52),
 ('anyone', 6),
 ('at', 1095)]

Get all words that occur more than once and sort them alphabetically:

In [23]:
# Filter to keep only words that occur more than once
filtered_words = word_counts.filter(lambda x: x[1] > 1)

# Sort the filtered words alphabetically by the word (key)
sorted_words = filtered_words.map(lambda x: x[0]).sortBy(lambda word: word)

# Display the sorted words that occur more than once
sorted_words.collect()



['a',
 'abaft',
 'abandoned',
 'abating',
 'abbey',
 'abhorred',
 'abide',
 'able',
 'abnormally',
 'aboard',
 'aboon',
 'aboot',
 'about',
 'above',
 'abraham',
 'abreast',
 'abroad',
 'absence',
 'absent',
 'absolute',
 'absolutely',
 'absorbed',
 'absorbing',
 'accept',
 'accepted',
 'accepting',
 'accepts',
 'access',
 'accident',
 'accomplished',
 'accord',
 'accordance',
 'according',
 'accordingly',
 'account',
 'accumulation',
 'accuracy',
 'accurate',
 'accurately',
 'accustomed',
 'achieve',
 'achieved',
 'acknowledge',
 'acomin',
 'acquiesced',
 'acquiescence',
 'acrid',
 'across',
 'act',
 'action',
 'actions',
 'active',
 'acts',
 'actual',
 'actually',
 'ad',
 'add',
 'added',
 'addition',
 'additional',
 'address',
 'addressed',
 'addresses',
 'addressing',
 'adjusted',
 'admit',
 'advance',
 'advanced',
 'advantage',
 'adventure',
 'adventures',
 'advise',
 'afar',
 'affair',
 'affairs',
 'affect',
 'affected',
 'affects',
 'afford',
 'afield',
 'afore',
 'afraid',
 'af

Now, get all words that occur more than once and sort them by frequency. Select the first 20. Hints : look at the `takeOrdered` action.

In [24]:


# Step 1: Filter words with count greater than 1
words_more_than_once = word_counts.filter(lambda pair: pair[1] > 1)

# Step 2: Sort words by frequency in descending order and take the top 20
top_20_words = words_more_than_once.takeOrdered(20, key=lambda x: -x[1])

# Display the top 20 words and their counts
top_20_words



[('the', 8037),
 ('and', 5896),
 ('i', 4712),
 ('to', 4540),
 ('of', 3738),
 ('a', 2962),
 ('in', 2558),
 ('he', 2543),
 ('that', 2455),
 ('it', 2141),
 ('was', 1877),
 ('as', 1581),
 ('we', 1535),
 ('for', 1534),
 ('is', 1526),
 ('his', 1467),
 ('you', 1450),
 ('me', 1446),
 ('not', 1404),
 ('with', 1320)]

### A simple exercice

+ Create a Python collection of 10,000 integers
+ Create a Spark base RDD from that collection
+ Subtract one from each value using map
+ Perform action collect to view results
+ Perform action count to view counts
+ Apply transformation filter and view results with collect

In [25]:
from pyspark import SparkContext

# Step 1: Create a Python collection of 10,000 integers
numbers = list(range(1, 10001))  # Create integers from 1 to 10,000

# Step 2: Create a Spark RDD from the collection
numbers_rdd = sc.parallelize(numbers)

# Step 3: Subtract one from each value using map
numbers_subtracted = numbers_rdd.map(lambda x: x - 1)

# Step 4: Perform collect to view the results (showing the first 10 results for brevity)
numbers_subtracted.take(10)  # This will display the first 10 values

# Step 5: Perform count to view counts (no need for print, just return the count)
numbers_subtracted.count()

# Step 6: Apply filter and view results with collect
filtered_numbers = numbers_subtracted.filter(lambda x: x % 2 == 0)

# Displaying the first 10 filtered numbers
filtered_numbers.take(10)



[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

### An improved WordCount

Print the top 10 most frequent words with their probability of appearance


In [26]:
# Step 1: Calculate the total number of words in the dataset
total_words = word_counts.map(lambda x: x[1]).sum()

# Step 2: Calculate the probability for each word
word_probabilities = word_counts.mapValues(lambda count: count / total_words)

# Step 3: Sort words by frequency (probability) in descending order and take the top 10
top_10_words = word_probabilities.takeOrdered(10, key=lambda x: -x[1])

# Step 4: Display the top 10 words and their probabilities
top_10_words


[('the', 0.04918333751506955),
 ('and', 0.036081243995128784),
 ('i', 0.028835621049024228),
 ('to', 0.027783047445367148),
 ('of', 0.022875117037617267),
 ('a', 0.0181262965932109),
 ('in', 0.01565397254741171),
 ('he', 0.015562178337790453),
 ('that', 0.015023652308012411),
 ('it', 0.013102093519940763)]

Get rid of special characters (.,:!?')


In [27]:
import re

# Function to remove special characters
def clean_text(text):
    # Remove special characters (keeping only letters and spaces)
    return re.sub(r"[^\w\s]", "", text)

# Clean the words before further processing
cleaned_words = words.map(lambda word: clean_text(word))

cleaned_words.take(10)


['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'dracula',
 'by',
 'bram',
 'stoker',
 'this']

Identify the transformations and the actions in your script


In [28]:
# List of transformations and actions used in the script:

# 1. Transformation: flatMap - This splits each line into words and removes special characters
words = data.flatMap(lambda line: re.sub(r'[^a-zA-Z\s]', '', line.lower()).split())

# 2. Transformation: map - This converts each word to a tuple (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# 3. Action: take - This collects the first 10 word-count pairs to check the result
word_pairs.take(10)

# 4. Transformation: reduceByKey - This sums the counts for each word
word_counts = word_pairs.reduceByKey(lambda count1, count2: count1 + count2)

# 5. Action: take - This collects the first 10 word-count pairs
word_counts.take(10)

# 6. Transformation: filter - This filters out words that occur only once
filtered_words = word_counts.filter(lambda x: x[1] > 1)

# 7. Action: collect - This collects the filtered words
filtered_words.collect()

# 8. Transformation: sortBy - This sorts the words alphabetically
sorted_words = filtered_words.map(lambda x: x[0]).sortBy(lambda word: word)

# 9. Action: collect - This collects the sorted words
sorted_words.collect()

# 10. Action: sum - This calculates the total number of words in the dataset
word_counts.map(lambda x: x[1]).sum()

# 11. Transformation: mapValues - This calculates the probability for each word
word_probabilities = word_counts.mapValues(lambda count: count / total_words)

# 12. Action: takeOrdered - This gets the top 10 most frequent words based on probability
top_10_words = word_probabilities.takeOrdered(10, key=lambda x: -x[1])

# 13. Action: print - This prints the top 10 words and their probabilities
top_10_words


[('the', 0.04918333751506955),
 ('and', 0.036081243995128784),
 ('i', 0.028835621049024228),
 ('to', 0.027783047445367148),
 ('of', 0.022875117037617267),
 ('a', 0.0181262965932109),
 ('in', 0.01565397254741171),
 ('he', 0.015562178337790453),
 ('that', 0.015023652308012411),
 ('it', 0.013102093519940763)]

How many times are the transformations evaluated? (Hint: it depends)


In [29]:
word_pairs.cache()

PythonRDD[40] at RDD at PythonRDD.scala:53

Can you reduce this number? (Hint: check out "persist")

In [33]:
print(word_pairs.getStorageLevel())


Memory Serialized 1x Replicated


In [35]:
from pyspark import StorageLevel

word_pairs.unpersist()
word_pairs.persist(StorageLevel.DISK_ONLY)


PythonRDD[40] at RDD at PythonRDD.scala:53

In [36]:
sc.stop()

## Part 3 : Spark SQL and dataframes

In this part, you will explore Spark DataFrames and the SQL Context. In particular, we will work on a database that contains a sample of the world population by working on data that comes from [pplapi](http://pplapi.com/). The file [agents.json](./SparkData/agents.json) is a file that was extracted from this api using the following command :


`wget https://s3-eu-west-1.amazonaws.com/course.oc-static.com/courses/4297166/agents.json`

In [37]:
from google.colab import files
uploaded = files.upload()

Saving agents.json to agents.json


In [38]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.json("/content/agents.json")

Print the 5 first lines of the dataframe `df`

In [39]:
df.show(5)


+------------+----------+------------------+------------------+------+
|country_name|        id|          latitude|         longitude|   sex|
+------------+----------+------------------+------------------+------+
|       China| 227417393| 33.15219798270325|100.85840672174572|  Male|
|       Haiti|6821129477|19.325567983697297|-72.43795260265814|Female|
|       India|2078667700|23.645271492037235| 80.85636526088884|Female|
|       China| 477556555| 33.45864668881662| 93.33604038078953|Female|
|       India|1379059984|28.816938290678692|  80.7728698035823|Female|
+------------+----------+------------------+------------------+------+
only showing top 5 rows



Write the spark instructions that enable to display the number of French agents.

In [41]:
df.createOrReplaceTempView("agents")
spark.sql("SELECT COUNT(*) FROM agents WHERE country_name = 'France'").show()



+--------+
|count(1)|
+--------+
|      94|
+--------+



Write the spark instructions that enable to display the number of Indian female agents.

In [42]:
df.createOrReplaceTempView("agents")
spark.sql("SELECT COUNT(*) FROM agents WHERE country_name = 'India' AND sex = 'Female'").show()


+--------+
|count(1)|
+--------+
|     828|
+--------+



Using the notion of temporary view (function `createTempView`), create a temporary view associated to the dataframe `df`.

In [43]:
df.createTempView("temp_table")


Write some SQL query on the resulting table as shown in the example below.

In [47]:
spark.sql("SELECT country_name, id FROM temp_table ORDER BY id DESC LIMIT 10").show()


+-----------------+----------+
|     country_name|        id|
+-----------------+----------+
| French Polynesia|7170821229|
|       Cabo Verde|7167692449|
|         Suriname|7166451460|
|         Suriname|7166235088|
|            Macau|7166034642|
|       Montenegro|7164357515|
|Equatorial Guinea|7163867872|
|           Bhutan|7163256789|
|           Bhutan|7163004645|
|           Bhutan|7162877973|
+-----------------+----------+



In [48]:
spark.sql("SELECT * FROM temp_table LIMIT 10").show()


+------------+----------+------------------+------------------+------+
|country_name|        id|          latitude|         longitude|   sex|
+------------+----------+------------------+------------------+------+
|       China| 227417393| 33.15219798270325|100.85840672174572|  Male|
|       Haiti|6821129477|19.325567983697297|-72.43795260265814|Female|
|       India|2078667700|23.645271492037235| 80.85636526088884|Female|
|       China| 477556555| 33.45864668881662| 93.33604038078953|Female|
|       India|1379059984|28.816938290678692|  80.7728698035823|Female|
|       India|2278934249|24.223974351280358| 80.14372690674512|  Male|
| Philippines|4380736204|12.409991630883784|122.75874146810197|Female|
|       India|1375733494|22.385712662257426| 77.90320433636231|Female|
|     Nigeria|3693807307| 9.967458870426357| 7.562942449523648|Female|
|        Mali|6552202234|16.882575147323337|-3.949079041016242|Female|
+------------+----------+------------------+------------------+------+



In [49]:
spark.stop()
sc.stop()

You can also create a dataframe from an existing RDD as shown on the example below

In [50]:
sc=pyspark.SparkContext("local[*]","Dataframe examples in Spark")
sqlContext = pyspark.SQLContext(sc)





In [51]:
documents_rdd = sc.parallelize([
        [1, 'cats are cute', 0],
        [2, 'dogs are playfull', 0],
        [3, 'lions are big', 1],
        [4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
        [0, 'Alice', 20],
        [1, 'Bob', 23],
        [2, 'Charles', 32]])

In [52]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])

# printing the inferred schema for documents
documents_df.printSchema()


root
 |-- doc_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)



Some functions can be apply to a column or to different columns. Here, we compute the age avarage in the dataframe `users_df`.

In [53]:
from pyspark.sql import functions as fn

user_age_df = users_df.select(fn.avg('age')).show()

+--------+
|avg(age)|
+--------+
|    25.0|
+--------+




Compute the max of age in the dataframe `users_df`.

In [54]:
user_max_age_df = users_df.select(fn.max('age')).show()


+--------+
|max(age)|
+--------+
|      32|
+--------+



Join (as in SQL) the two dataframes `users_df` and `documents_df`

In [55]:
joined_df = users_df.join(documents_df, users_df.user_id == documents_df.user_id, "inner")
joined_df.show()



+-------+-----+---+------+-----------------+-------+
|user_id| name|age|doc_id|             text|user_id|
+-------+-----+---+------+-----------------+-------+
|      0|Alice| 20|     1|    cats are cute|      0|
|      0|Alice| 20|     2|dogs are playfull|      0|
|      1|  Bob| 23|     3|    lions are big|      1|
|      1|  Bob| 23|     4|    cars are fast|      1|
+-------+-----+---+------+-----------------+-------+



Outer Join (Left) (as in SQL) the two dataframes `users_df` and `documents_df`

In [56]:
left_joined_df = users_df.join(documents_df, users_df.user_id == documents_df.user_id, "left")
left_joined_df.show()


+-------+-------+---+------+-----------------+-------+
|user_id|   name|age|doc_id|             text|user_id|
+-------+-------+---+------+-----------------+-------+
|      0|  Alice| 20|     2|dogs are playfull|      0|
|      0|  Alice| 20|     1|    cats are cute|      0|
|      1|    Bob| 23|     4|    cars are fast|      1|
|      1|    Bob| 23|     3|    lions are big|      1|
|      2|Charles| 32|  NULL|             NULL|   NULL|
+-------+-------+---+------+-----------------+-------+

