#TD 2: Introduction to Spark

**Why SPARK**

Apache Spark is a fast, flexible, and developer-friendly leading platform for large-scale SQL, machine learning, batch processing, and stream processing. It is a data processing framework that can quickly perform processing tasks on huge data sets and on a large number of clusters. It can also distribute data processing tasks across multiple computers, either by itself or in conjunction with other distributed computing tools.

Spark is written in Scala, it has API’s in Python, Java and Scala. We will use the API in Phyton, called Pyspark.

There are various types of cluster managers such as Apache Mesos, Hadoop, and Standalone Scheduler.

The Standalone Scheduler is a standalone Spark cluster manager enabling the installation of Spark on an empty set of machines.


**In this TD the basic operation of PySpark will be presented.**

**When you'll  find a TO DO: it means you'll need to code. Otherwise simply read and understand the examples**.

# **Set up Apache Spark 3.5.0 on google colab for a quick start.**  




# Installation of Pyspark on Colab


In [None]:
!pip install pyspark py4j #all you need to install



#Basics of Pyspark

##Basic Dataset

In [None]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.5.4


In [None]:
from pyspark.sql import SparkSession
#First, you need to create a SparkSession.
#SparkSession provides a unified entry point for reading data, executing transformations, and performing various operations on distributed datasets.
spark = SparkSession.builder.master("local[*]").appName("example_app").getOrCreate()
#appName" refers to the name given to your Spark application.
#master configuration is used to specify the cluster manager for the Spark application.
#local[*] Run Spark locally with as many worker threads as logical cores on your machine.
#if you do local[N] it will run locally with N threads.

#SparkSession will internally creates a SparkContext, that you will need configure using the attribute .sparkContext
Context=spark.sparkContext

In [None]:
#Let's create a list of 100 elements
big_list = range(100)
print('big_list',big_list)

#The parallelize method is used to create an RDD named rdd from this list.
#RDD stands for Resilient Distributed Dataset, and it is a fundamental data structure in Apache Spark.
#RDDs are an immutable, fault-tolerant collection of elements that can be processed in parallel across a distributed computing cluster.
#Immutable means that once an RDD is created, it cannot be modified or changed directly. You cannot add, remove, or update elements within the same RDD.
#Instead, any transformation applied to an RDD (such as map, filter, or flatMap) creates a new RDD, leaving the original RDD unchanged.
#Another immutable structure that you should have studied are tuples.
#RDDs provide a high-level, abstracted interface for distributed data processing, enabling fault tolerance and efficient parallel processing.
rdd = Context.parallelize(big_list, 2)

#Spark collect() is used to retrieve all the elements
#of the RDD/DataFrame/Dataset (from all nodes) to the driver program.
rddCollect = rdd.collect()
print('rdd Collect',rddCollect)

#You could also you take(N) to print or save a N values
# Take the first 5 elements of the RDD and print them
rddtake = rdd.take(5)
print('rdd Take',rddtake)

#We can  use collect() after the application of filter(), map(), groupBy(), all of these are transofmrations.
#Transformations in Spark are operations that specify how data should be processed to create a new RDD.
#However, these transformations do not immediately perform the computation. Instead, they build a logical plan that Spark executes only when an action,
#such as collect or count, is called. This approach ensures efficient resource utilization and optimization before execution.
#filter() is a tranformation operation that filters out some values.
#In this example we want to save only the odd numbers.
#The lambda function takes a single argument x (each element of the RDD) and returns True if x is odd (x % 2 != 0), and False otherwise.
odds = rdd.filter(lambda x: x % 2 != 0)
print('take 10 element',odds.take(10))

oddsCollect = odds.collect()
print('odds Collect',oddsCollect)

big_list range(0, 100)
rdd Collect [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
rdd Take [0, 1, 2, 3, 4]
take 10 element [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
odds Collect [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99]


In [None]:
# GroupBy() groups elements by whether they are even or odd
grouped_rdd = rdd.groupBy(lambda x: "Even" if x % 2 == 0 else "Odd")
#groupBy transformation groups elements based on a specified key, but it returns an RDD of key-value pairs where the key is the grouping key
# and the value is an iterable of the elements that share the same key.
#If you only need the grouped keys and want to perform some operation on each group,
#you typically use map or other transformations to process the grouped data.
#(So, if you try to print now grouped_rdd you'll have an error)

#map transformation in PySpark is used to transform each element of an RDD (Resilient Distributed Dataset) using a specified function.
#It applies the provided function to each element independently and creates a new RDD with the transformed values.
counted_rdd = grouped_rdd.map(lambda x: (x[0], len(x[1])))
# Collect and print the result
result = counted_rdd.collect()
print(result)

[('Odd', 50), ('Even', 50)]


In [None]:
#Now let's close the Context!
Context.stop()

###TO DO 1

In [None]:
small_list=[1,2,3,4]
#TO DO:  Let's create another Context
spark = SparkSession.builder.master("local[*]").appName("example_app").getOrCreate()
Context =spark.sparkContext
#TO DO: Let's parallelize the list
nums= Context.parallelize([1,2,3,4])
#TO DO: Use map() transformation to apply a square transformation on every element of RDD, so from [1,2,3,4] you will have [1,4,9,16]. The syntax is the same as the one for filter()
squared = nums.map(lambda x: x*x)
#TO DO: now print the result
squaredsmall_list=squared.collect()
print(squaredsmall_list)
Context.stop()

[1, 4, 9, 16]


In [None]:
new_list = ['Python', 'programming', 'is', 'awesome!']
spark = SparkSession.builder.master("local[*]").appName("example_app").getOrCreate()
Context =spark.sparkContext

rdd = Context.parallelize(new_list)

#TO DO: prints a new list in one line only with words with less than 8 letters using new_list

filtered_rdd = rdd.filter(lambda word: len(word) < 8)
# Collect and print the result
result = filtered_rdd.collect()
print(result)
# Stop the SparkContext
Context.stop()


['Python', 'is']



## More complex datasets

In [None]:
spark = SparkSession.builder.master("local[*]").appName("example_app").getOrCreate()
#The dataset now is a bit more complecated, it contains Name, Age and Height of people

columns = ["Name","Age","Height"]
data = [("Mark", 28,183), ("Jacob", 23,190), ("Christine",31,157),("Meridith",18,167)]

# Create DataFrame
#A DataFrame is a fundamental data structure in Apache Spark that represents a distributed collection of data organized into named columns.
#It is conceptually similar to a table in a relational database or a data frame in R or Python's pandas library.
#Spark's DataFrame provides a higher-level, more structured API for distributed data processing compared to Resilient Distributed Datasets (RDDs).
dfnocolumns = spark.createDataFrame(data)
dfnocolumns.show()


+---------+---+---+
|       _1| _2| _3|
+---------+---+---+
|     Mark| 28|183|
|    Jacob| 23|190|
|Christine| 31|157|
| Meridith| 18|167|
+---------+---+---+



In [None]:
#As you can see in the previouse table, the columns name are not present, to insert this information the following command needs to be used
df=dfnocolumns.toDF(*columns)
df.show()

+---------+---+------+
|     Name|Age|Height|
+---------+---+------+
|     Mark| 28|   183|
|    Jacob| 23|   190|
|Christine| 31|   157|
| Meridith| 18|   167|
+---------+---+------+



In [None]:
#If you want to print only the columns you can use
df.columns

['Name', 'Age', 'Height']

In [None]:
#If you want to print the overall scheme use
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Height: long (nullable = true)



In [None]:
#If you want to print basic statistics for numeric and string columns in a DataFrame use
df.describe().show()

+-------+---------+-----------------+------------------+
|summary|     Name|              Age|            Height|
+-------+---------+-----------------+------------------+
|  count|        4|                4|                 4|
|   mean|     NULL|             25.0|            174.25|
| stddev|     NULL|5.715476066494082|14.997221964972935|
|    min|Christine|               18|               157|
|    max| Meridith|               31|               190|
+-------+---------+-----------------+------------------+



In [None]:
#PySpark internally interprets "Christine" as the minimum value (first in lexicographical order) and
#"Meridith" as the maximum value (last in lexicographical order) for the "Name" column.

In [None]:
#If you just need to count how many samples are in the dataset use
print(df.count())

4


In [None]:
#If you want to compute a cross-tabulation of two columns, creating a contingency table use
df.crosstab('Age','Height').sort("Age_Height").show()

+----------+---+---+---+---+
|Age_Height|157|167|183|190|
+----------+---+---+---+---+
|        18|  0|  1|  0|  0|
|        23|  0|  0|  0|  1|
|        28|  0|  0|  1|  0|
|        31|  1|  0|  0|  0|
+----------+---+---+---+---+



In [None]:
from pyspark.ml.feature import VectorAssembler

#VectorAssembler is a feature transformer that merges multiple columns into a vector column. outputCol is the new name of the column
assembler = VectorAssembler(inputCols=['Age', 'Height'], outputCol = 'Attributes')
output = assembler.transform(df)


finalized_data = output.select("Name","Attributes")
finalized_data.show()

+---------+------------+
|     Name|  Attributes|
+---------+------------+
|     Mark|[28.0,183.0]|
|    Jacob|[23.0,190.0]|
|Christine|[31.0,157.0]|
| Meridith|[18.0,167.0]|
+---------+------------+




## Upload a dataset

In [None]:
#Either you use wget command and upload it automatically
!wget -q https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv

#Or you download BostonHousing.csv from moodle and use this two lines
# from google.colab import files
# files.upload()

#read a file
dataset = spark.read.csv('BostonHousing.csv',inferSchema=True, header =True)
#dataset = spark.read.json(".json") # same with jsons

#with text files you'll have to do
#sc = pyspark.SparkContext.getOrCreate()
#dataset = sc.textFile(".txt")

###TO DO 2

In [None]:
#TO DO: show the table
dataset.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2

In [None]:
#TO DO: show only the columns
dataset.columns

['crim',
 'zn',
 'indus',
 'chas',
 'nox',
 'rm',
 'age',
 'dis',
 'rad',
 'tax',
 'ptratio',
 'b',
 'lstat',
 'medv']

In [None]:
#TO DO: show the overall scheme
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [None]:
#TO DO: show the basic statistics for numeric and string columns
dataset.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|              crim|                zn|             indus|              chas|                nox|                rm|               age|              dis|              rad|               tax|           ptratio|                 b|             lstat|              medv|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|               506|               506|               506|               506|                506|               506|               506|              506|              

In [None]:
#TO DO: show how many samples are present
print(dataset.count())

506


In [None]:
#TO DO: compute a cross-tabulation of two ('crim','zn' for axample), creating a contingency table use
dataset.crosstab('crim','zn').sort("crim_zn").show()

+-------+---+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|crim_zn|0.0|100.0|12.5|17.5|18.0|20.0|21.0|22.0|25.0|28.0|30.0|33.0|34.0|35.0|40.0|45.0|52.5|55.0|60.0|70.0|75.0|80.0|82.5|85.0|90.0|95.0|
+-------+---+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|0.00632|  0|    0|   0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
|0.00906|  0|    0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|
|0.01096|  0|    0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|
|0.01301|  0|    0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
|0.01311|  0|    0| 

In [None]:
#TO DO: merges multiple columns into a vector( use 'crim', 'zn' and to compare "rm", "medv")


#VectorAssembler is a feature transformer that merges multiple columns into a vector column. outputCol is the new name of the column
assembler_ex = VectorAssembler(inputCols=['crim', 'zn'], outputCol = 'newAtt')
output_ex = assembler_ex.transform(dataset)
finalized_data_ex = output_ex.select("rm","medv","newAtt")
finalized_data_ex.show()


+-----+----+--------------+
|   rm|medv|        newAtt|
+-----+----+--------------+
|6.575|24.0|[0.00632,18.0]|
|6.421|21.6| [0.02731,0.0]|
|7.185|34.7| [0.02729,0.0]|
|6.998|33.4| [0.03237,0.0]|
|7.147|36.2| [0.06905,0.0]|
| 6.43|28.7| [0.02985,0.0]|
|6.012|22.9|[0.08829,12.5]|
|6.172|27.1|[0.14455,12.5]|
|5.631|16.5|[0.21124,12.5]|
|6.004|18.9|[0.17004,12.5]|
|6.377|15.0|[0.22489,12.5]|
|6.009|18.9|[0.11747,12.5]|
|5.889|21.7|[0.09378,12.5]|
|5.949|20.4| [0.62976,0.0]|
|6.096|18.2| [0.63796,0.0]|
|5.834|19.9| [0.62739,0.0]|
|5.935|23.1| [1.05393,0.0]|
| 5.99|17.5|  [0.7842,0.0]|
|5.456|20.2| [0.80271,0.0]|
|5.727|18.2|  [0.7258,0.0]|
+-----+----+--------------+
only showing top 20 rows



## Dataset Manipulation

In [None]:
address = [(1,"14851 Jeffrey Rd","DE"), (2,"43421 Margarita St","NY"),(3,"13111 Siemon Ave","CA"),(4,"97643 LetsGo Rd","TX")]
addressSP =spark.createDataFrame(address,["id","address","state"])
addressSP.show()

+---+------------------+-----+
| id|           address|state|
+---+------------------+-----+
|  1|  14851 Jeffrey Rd|   DE|
|  2|43421 Margarita St|   NY|
|  3|  13111 Siemon Ave|   CA|
|  4|   97643 LetsGo Rd|   TX|
+---+------------------+-----+



In [None]:
#regexp_replace() you can replace a column value with a string for another string/substring. regexp_replace()
from pyspark.sql.functions import regexp_replace
addressSP.withColumn('address', regexp_replace('address', 'Rd', 'Road')).show(truncate=False)

+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
|4  |97643 LetsGo Road |TX   |
+---+------------------+-----+



In [None]:
#Replace string column value conditionally
#we want to replaced Rd with Road in Jeffrey Rd but not it LetsGo Rd
from pyspark.sql.functions import when
# when().otherwise() is like if: else:

address = [(1,"14851 Jeffrey Rd","DE"), (2,"43421 Margarita St","NY"),(3,"13111 Siemon Ave","CA"),(4,"97643 LetsGo Rd","TX")]
addressSP = spark.createDataFrame(address, ["id", "address", "state"])
# Define the condition for replacement
replacement_condition = addressSP.address.contains("Jeffrey Rd")
# Use when() and regexp_replace() to conditionally replace "Rd" with "Road"
addressSP = addressSP.withColumn("address", when(replacement_condition, regexp_replace("address", "Rd", "Road")).otherwise(addressSP.address))
# Show the updated DataFrame
addressSP.show(truncate=False)


+---+------------------+-----+
|id |address           |state|
+---+------------------+-----+
|1  |14851 Jeffrey Road|DE   |
|2  |43421 Margarita St|NY   |
|3  |13111 Siemon Ave  |CA   |
|4  |97643 LetsGo Rd   |TX   |
+---+------------------+-----+



In [None]:
#replace column values from the python dictionary (map)
#we replace the string value of the state column with the full abbreviated name from a dictionary key-value pair
stateDic={'CA':'California','NY':'New York','DE':'Delaware','TX':'Texas'}
addressSP2=addressSP.rdd.map(lambda x:
    (x.id,x.address,stateDic[x.state])
    ).toDF(["id","address","state"])
addressSP2.show()

+---+------------------+----------+
| id|           address|     state|
+---+------------------+----------+
|  1|14851 Jeffrey Road|  Delaware|
|  2|43421 Margarita St|  New York|
|  3|  13111 Siemon Ave|California|
|  4|   97643 LetsGo Rd|     Texas|
+---+------------------+----------+



In [None]:
from pyspark.sql.functions import translate
#translate() string function you can replace character by character of DataFrame column value.
addressSP2.withColumn('address', translate('address', '123', 'ABC')) \
  .show(truncate=False)

+---+------------------+----------+
|id |address           |state     |
+---+------------------+----------+
|1  |A485A Jeffrey Road|Delaware  |
|2  |4C4BA Margarita St|New York  |
|3  |ACAAA Siemon Ave  |California|
|4  |9764C LetsGo Rd   |Texas     |
+---+------------------+----------+



In [None]:
#replace column value with a value from another DataFrame column
#we match the value from col2 in col1 and them with the ones in col3, creatin new_column
#Replace column with another column
from pyspark.sql.functions import expr
col = spark.createDataFrame(
   [("ABCDE_XYZ", "XYZ","FGH")],
    ("col1", "col2","col3")
  )
col.show()
col.withColumn("new_column",
              expr("regexp_replace(col1, col2, col3)")
              ).show()

+---------+----+----+
|     col1|col2|col3|
+---------+----+----+
|ABCDE_XYZ| XYZ| FGH|
+---------+----+----+

+---------+----+----+----------+
|     col1|col2|col3|new_column|
+---------+----+----+----------+
|ABCDE_XYZ| XYZ| FGH| ABCDE_FGH|
+---------+----+----+----------+



In [None]:
#Now let's close the Session!
spark.stop()