Most of the content in the current notebook is based on the explanations found on [sparkbyexamples](https://sparkbyexamples.com/pyspark/pyspark-rdd-transformations/#transformation-types).

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import trim, length, col

In [2]:
spark = (
    SparkSession.
    builder.
    appName("transformations_actions").
    master("local[4]").
    getOrCreate()
)

spark

23/09/03 23:01:42 WARN Utils: Your hostname, IdeaPad-L340 resolves to a loopback address: 127.0.1.1; using 192.168.100.16 instead (on interface wlp1s0)
23/09/03 23:01:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/03 23:01:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
file = "data/earthquakes/Eartquakes-1990-2023.csv"
earthquakes_ps = spark.read.csv(file, sep=",",
                               inferSchema=True, header=True)

earthquakes_ps.show(2, vertical=True)

                                                                                

-RECORD 0----------------------------
 time         | 631153353990         
 place        | 12 km NNW of Mead... 
 status       | reviewed             
 tsunami      | 0                    
 significance | 96                   
 data_type    | earthquake           
 magnitudo    | 2.5                  
 state        |  Alaska              
 longitude    | -149.6692            
 latitude     | 61.7302              
 depth        | 30.1                 
 date         | 1989-12-31 18:22:... 
-RECORD 1----------------------------
 time         | 631153491210         
 place        | 14 km S of Volcan... 
 status       | reviewed             
 tsunami      | 0                    
 significance | 31                   
 data_type    | earthquake           
 magnitudo    | 1.41                 
 state        |  Hawaii              
 longitude    | -155.2123333         
 latitude     | 19.3176667           
 depth        | 6.585                
 date         | 1989-12-31 18:24:... 
only showing

#### Transformations

Transformations are lazy operations meaning none of the transformations get executed until you call an action

##### Narrow

_Narrow transformations are the result of map() and filter() functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations_ (no shuffling)

##### Wide

_Wider transformations are the result of groupByKey() and reduceByKey() functions and these compute data that live on many partitions meaning there will be data movements between partitions to execute wider transformations_ (shuffling)

Some Wide and Narrow transformation functions are:

* __cache()__:	Caches the RDD

* __filter()__:	Returns a new RDD after applying filter function on source dataset.

* __flatMap()__:	Returns flattern map meaning if you have a dataset with array, it converts each elements in a array as a row. In other words it return 0 or more items in output for each element in dataset. (Mapping one-to-many)

* __map()__:	Applies transformation function on dataset and returns same number of elements in distributed dataset. (Mapping one-to-one to each element of the row)

* __mapPartitions()__:	Similar to map, but executs transformation function on each partition, This gives better performance than map function.

* __mapPartitionsWithIndex()__:	Similar to map Partitions, but also provides func with an integer value representing the index of the partition.

* __randomSplit()__:	Splits the RDD by the weights specified in the argument. For example * rdd.randomSplit(0.7,0.3)

* __union()__:	Comines elements from source dataset and the argument and returns combined dataset. This is similar to union function in Math set operations.

* __sample()__:	Returns the sample dataset.

* __intersection()__: Returns the dataset which contains elements in both source dataset and an argument (similar to intesect in SQL)

* __distinct()__: Returns the dataset by eliminating all duplicated elements.

* __repartition()__: Return a dataset with number of partition specified in the argument. This operation reshuffles the RDD randamly, It could either return lesser or more partioned RDD based on the input supplied. __repartition__ is an expensive operation since involves shuffling.

* __coalesce()__: Similar to repartition by operates better when we want to the decrease the partitions. Betterment acheives by reshuffling the data from fewer nodes compared with all nodes by repartition. It's less expensive than repartition since minimizes data movement and shuffling.

* __join()__: Merge two tables based on a common column ID

* __groupBy()__: Group By operation similiar to groupby pandas dataframe or GROUP BY in SQL

* __agg()__: Run some function over a RDD

In [4]:
earthquakes_ps = earthquakes_ps.withColumn("state", trim("state"))

In [5]:
# Filter function with trimmed column
earthquakes_ps.filter(earthquakes_ps["state"] == "Mexico").show(2, vertical=True)

-RECORD 0----------------------------
 time         | 631564020120         
 place        | 189 km SW of La C... 
 status       | reviewed             
 tsunami      | 0                    
 significance | 554                  
 data_type    | earthquake           
 magnitudo    | 6.0                  
 state        | Mexico               
 longitude    | -106.795             
 latitude     | 18.863               
 depth        | 33.0                 
 date         | 1990-01-05 12:27:... 
-RECORD 1----------------------------
 time         | 632196446840         
 place        | 5 km N of El Cort... 
 status       | reviewed             
 tsunami      | 0                    
 significance | 432                  
 data_type    | earthquake           
 magnitudo    | 5.3                  
 state        | Mexico               
 longitude    | -99.509              
 latitude     | 16.826               
 depth        | 28.4                 
 date         | 1990-01-12 20:07:... 
only showing

In [6]:
# Filter with regex and orderBy
(
    earthquakes_ps.
    filter(col("state").rlike(r"^\s*Mm?.+")).
    select("state").
    distinct().
    withColumn("length", length(col("state"))).
    orderBy(col("length").desc(), col("state").asc()).
    show()
)

[Stage 4:>                                                          (0 + 4) / 4]

+--------------------+------+
|               state|length|
+--------------------+------+
|Montenegro-Albani...|    32|
|Missouri-Arkansas...|    31|
|Missouri-Illinois...|    31|
|Macedonia-Serbia ...|    30|
|Mexico-Guatemala ...|    30|
|Myanmar-Thailand ...|    30|
|Mongolia-China bo...|    28|
|Myanmar-China bor...|    27|
|Myanmar-India bor...|    27|
|Mauritius - Reuni...|    26|
|Macquarie Island ...|    23|
|Marshall Islands ...|    23|
|Maldive Islands r...|    22|
|Mariana Islands r...|    22|
|Midway Islands re...|    21|
|  Mozambique Channel|    18|
|    Macedonia region|    16|
|    Marshall Islands|    16|
|    Mid-Indian Ridge|    16|
|      Mayotte region|    14|
+--------------------+------+
only showing top 20 rows



                                                                                

In [7]:
# FlatMap function
counter = 0
def add_string_state_column(row):
    global counter
    
    state = row.state + " " + "my_new_string"
    counter += 1
    
    return [(counter, state, 1)]

my_new_string_results = (
    earthquakes_ps.
    select("data_type", "state").
    rdd.
    flatMap(add_string_state_column)
)

my_new_string_results_df = spark.createDataFrame(my_new_string_results,
                                                 schema=["id", "state", "one"])
my_new_string_results_df.show(10)

                                                                                

+---+--------------------+---+
| id|               state|one|
+---+--------------------+---+
|  1|Alaska my_new_string|  1|
|  2|Hawaii my_new_string|  1|
|  3|California my_new...|  1|
|  4|California my_new...|  1|
|  5|California my_new...|  1|
|  6|California my_new...|  1|
|  7|California my_new...|  1|
|  8|California my_new...|  1|
|  9|California my_new...|  1|
| 10|Washington my_new...|  1|
+---+--------------------+---+
only showing top 10 rows



In [8]:
# Map function
my_new_string_results = (
    earthquakes_ps.
    select("data_type", "state", "magnitudo").
    rdd.
    map(lambda x: (str(x[0]) + "_type", str(x[1]) + "_state", float(x[2])))
)

my_new_string_results.take(10)

[('earthquake_type', 'Alaska_state', 2.5),
 ('earthquake_type', 'Hawaii_state', 1.41),
 ('earthquake_type', 'California_state', 1.11),
 ('earthquake_type', 'California_state', 0.98),
 ('earthquake_type', 'California_state', 2.95),
 ('earthquake_type', 'California_state', 2.77),
 ('earthquake_type', 'California_state', 1.13),
 ('earthquake_type', 'California_state', 0.83),
 ('earthquake_type', 'California_state', 1.59),
 ('earthquake_type', 'Washington_state', 2.2)]

In [9]:
# Taking a sample of the 0.000001 percent of the total rows
sample_examples = earthquakes_ps.sample(fraction=0.000001)
sample_size = sample_examples.count()

print(f"Sample size: {sample_size}\n")
sample_examples.show(vertical=True, truncate=False)

                                                                                

Sample size: 7



[Stage 14:>                                                         (0 + 3) / 3]

-RECORD 0---------------------------------------------
 time         | 833770364950                          
 place        | 21 km ENE of San Lucas, California    
 status       | reviewed                              
 tsunami      | 0                                     
 significance | 31                                    
 data_type    | earthquake                            
 magnitudo    | 1.41                                  
 state        | California                            
 longitude    | -120.8031667                          
 latitude     | 36.2193333                            
 depth        | 6.246                                 
 date         | 1996-06-02 21:52:44.95                
-RECORD 1---------------------------------------------
 time         | 871388234520                          
 place        | 7 km NNE of West Yellowstone, Montana 
 status       | reviewed                              
 tsunami      | 0                                     
 significa

                                                                                

In [10]:
# MapPartitions
def avg_magnitude_earthquakes_by_partition(partition_iterator):

    sum = 0
    size_partition = 0

    # Iterating over the current partition
    for row in partition_iterator:
        sum += row.magnitudo
        size_partition += 1

    mean = sum / size_partition
    return [mean]

results_by_partition = (
    earthquakes_ps.
    select("state", "magnitudo").
    rdd.
    mapPartitions(avg_magnitude_earthquakes_by_partition)
)

results_by_partition.collect()

                                                                                

[1.9941378714241778, 1.8647143033827378, 1.618168308839519, 1.6156452461095947]

In [14]:
# repartition
print(f"Current number of partitions: {earthquakes_ps.rdd.getNumPartitions()}")

print(f"\nCreating new partitions (repartition). This operation is expensive!")
earthquakes_ps = earthquakes_ps.repartition(8)
print(f"\tNew number of partitions: {earthquakes_ps.rdd.getNumPartitions()}")

Current number of partitions: 4

Creating new partitions (repartition). This operation is expensive!




	New number of partitions: 8


In [15]:
# coalesce
print(f"Current number of partitions: {earthquakes_ps.rdd.getNumPartitions()}")

print(f"\nReducing the number of partitions (coalesce).")
earthquakes_ps = earthquakes_ps.coalesce(4)
print(f"\tNew number of partitions: {earthquakes_ps.rdd.getNumPartitions()}")

Current number of partitions: 8

Reducing the number of partitions (coalesce).




	New number of partitions: 4




#### Actions

_RDD actions are PySpark operations that return the values to the driver program. Any function on RDD that returns other than RDD is considered as an action in PySpark programming_

* __aggregate()__ over RDD: Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." This function expects to functions, one for aggregate each partition and the other one to aggregate the results for all partitions.

* aggregation functions over RDD (__count()__, __countByValue()__, __min()__, __max()__ ,etc)

* display functions (__show()__, __take()__, __first()__, __head()__, __top()__, etc)

* __collect()__: Run the respective tasks in the DAG and bring the results into the current machine.

* __fold()__: Aggregate the elements of each partition, and then the results for all the partitions. Similar to __aggregate__ but this function only allows for one general function to aggregate instead of two.

* __reduce()__

In [39]:
# Note how seqOp returns two values and that's the reason why using combOp we need to access to the index 0 and 1
seqOp = (lambda initial_values, individual_rdd_values: (initial_values[0] + individual_rdd_values,
                                                        initial_values[1] + 1))

# Note how comOp is supossed to return two values and thats the reason we obtained a tuple of len 2
combOp = (lambda initial_values, results_by_partition: (initial_values[0] + results_by_partition[0],
                                                        initial_values[1] + results_by_partition[1]))

spark.sparkContext.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)

(10, 4)

In [48]:
# An object of type Row is passed to seqOp
seqOp  = lambda x, y: x + y.depth

# An object of type float now is passed to combOP
combOP = lambda x, y: x + y

earthquakes_ps.select("depth").rdd.aggregate(0, seqOp, combOP)

                                                                                

78748758.62224072

In [52]:
# In this case, now an object of type float is passed to seqOp and an object of type float is passed to combOP
seqOp  = lambda x, y: x + y
combOP = lambda x, y: x + y

flatten_func = lambda y: [y.depth]
earthquakes_ps.select("depth").rdd.flatMap(flatten_func).aggregate(0, seqOp, combOP)

                                                                                

78748758.62224072

In [13]:
#spark.stop()