Showing that the **SparkContext** is already defined.

In [0]:
sc

Showing that the **SparkSession** is already defined.

In [0]:
spark

Creating a RDD to show later on how the parallelization works.

In [0]:
rdd = sc.parallelize([("Kang", 1963), ("Galactus", 1966), ("Iron Man", 1963), ("Thor", 1951), ("Hulk", 1962)])
df = rdd.toDF(["name", "year"])
display(df)

name,year
Kang,1963
Galactus,1966
Iron Man,1963
Thor,1951
Hulk,1962


Showing the number of partitions used to store this specific RDD.

In [0]:
print(df.rdd.getNumPartitions())

8


We then create a function to compute the number of elements per partition and then print this number so we can have an idea of how the data is distributed across our partitions.

In [0]:
num = df.rdd.getNumPartitions()

def f(partition):
    count = 0
    for _ in partition:
        count += 1
    return([count])

partitions = df.rdd.mapPartitions(f).collect()

def f_index(index, partition):
    count = 0
    for _ in partition:
        count += 1
    return([index, count])

partitions_index = df.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index[2*i], partitions_index[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions if p != 0]), num))

For partition 0, there is/are 0 element(s).
For partition 1, there is/are 1 element(s).
For partition 2, there is/are 0 element(s).
For partition 3, there is/are 1 element(s).
For partition 4, there is/are 1 element(s).
For partition 5, there is/are 0 element(s).
For partition 6, there is/are 1 element(s).
For partition 7, there is/are 1 element(s).

Therefore, the DataFrame uses 5 partition(s) and could use 8.


In [0]:
df_r = df.repartition(4)

num_r = df_r.rdd.getNumPartitions()

partitions_r = df_r.rdd.mapPartitions(f).collect()

partitions_index_r = df_r.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num_r):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index_r[2*i], partitions_index_r[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions_r if p != 0]), num_r))

For partition 0, there is/are 1 element(s).
For partition 1, there is/are 1 element(s).
For partition 2, there is/are 1 element(s).
For partition 3, there is/are 2 element(s).

Therefore, the DataFrame uses 4 partition(s) and could use 4.


In [0]:
df_c = df.coalesce(4)

num_c = df_c.rdd.getNumPartitions()

partitions_c = df_c.rdd.mapPartitions(f).collect()

partitions_index_c = df_c.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num_c):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index_c[2*i], partitions_index_c[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions_c if p != 0]), num_c))

For partition 0, there is/are 1 element(s).
For partition 1, there is/are 1 element(s).
For partition 2, there is/are 1 element(s).
For partition 3, there is/are 2 element(s).

Therefore, the DataFrame uses 4 partition(s) and could use 4.


Reading a *.csv* file from a previously imported file.

In [0]:
path = "/FileStore/marvel_characters_info.csv"
df_marvel = spark.read.option("delimiter", ",")\
                 .option("header", True)\
                 .csv(path)
display(df_marvel)

ID,Name,Alignment,Gender,EyeColor,Race,HairColor,Publisher,SkinColor,Height,Weight
0,A-Bomb,good,Male,yellow,Human,No Hair,Marvel Comics,-,203.0,441.0
1,Abe Sapien,good,Male,blue,Icthyo Sapien,No Hair,Dark Horse Comics,blue,191.0,65.0
2,Abin Sur,good,Male,blue,Ungaran,No Hair,DC Comics,red,185.0,90.0
3,Abomination,bad,Male,green,Human / Radiation,No Hair,Marvel Comics,-,203.0,441.0
4,Abraxas,bad,Male,blue,Cosmic Entity,Black,Marvel Comics,-,-99.0,-99.0
5,Absorbing Man,bad,Male,blue,Human,No Hair,Marvel Comics,-,193.0,122.0
6,Adam Monroe,good,Male,blue,-,Blond,NBC - Heroes,-,-99.0,-99.0
7,Adam Strange,good,Male,blue,Human,Blond,DC Comics,-,185.0,88.0
8,Agent 13,good,Female,blue,-,Blond,Marvel Comics,-,173.0,61.0
9,Agent Bob,good,Male,brown,Human,Brown,Marvel Comics,-,178.0,81.0


In [0]:
num_marvel = df_marvel.rdd.getNumPartitions()

partitions_marvel = df_marvel.rdd.mapPartitions(f).collect()

partitions_index_marvel = df_marvel.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num_marvel):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index_marvel[2*i], partitions_index_marvel[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions_marvel if p != 0]), num_marvel))

For partition 0, there is/are 734 element(s).

Therefore, the DataFrame uses 1 partition(s) and could use 1.


In [0]:
df_test= df_marvel.coalesce(3)

num_test = df_test.rdd.getNumPartitions()

print(num_test)

1


In [0]:
df_r_marvel = df_marvel.repartition(8)

num_r_marvel = df_r_marvel.rdd.getNumPartitions()

partitions_r_marvel = df_r_marvel.rdd.mapPartitions(f).collect()

partitions_index_r_marvel = df_r_marvel.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num_r_marvel):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index_r_marvel[2*i], partitions_index_r_marvel[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions_r_marvel if p != 0]), num_r_marvel))

For partition 0, there is/are 91 element(s).
For partition 1, there is/are 91 element(s).
For partition 2, there is/are 92 element(s).
For partition 3, there is/are 92 element(s).
For partition 4, there is/are 92 element(s).
For partition 5, there is/are 92 element(s).
For partition 6, there is/are 92 element(s).
For partition 7, there is/are 92 element(s).

Therefore, the DataFrame uses 8 partition(s) and could use 8.


In [0]:
df_rep_marvel = df_r_marvel.repartition(3)

num_r_marvel = df_rep_marvel.rdd.getNumPartitions()

partitions_r_marvel = df_rep_marvel.rdd.mapPartitions(f).collect()

partitions_index_r_marvel = df_rep_marvel.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num_r_marvel):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index_r_marvel[2*i], partitions_index_r_marvel[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions_r_marvel if p != 0]), num_r_marvel))

For partition 0, there is/are 245 element(s).
For partition 1, there is/are 245 element(s).
For partition 2, there is/are 244 element(s).

Therefore, the DataFrame uses 3 partition(s) and could use 3.


In [0]:
df_coa_marvel = df_r_marvel.coalesce(3)

num_c_marvel = df_coa_marvel.rdd.getNumPartitions()

partitions_c_marvel = df_coa_marvel.rdd.mapPartitions(f).collect()

partitions_index_c_marvel = df_coa_marvel.rdd.mapPartitionsWithIndex(f_index).collect()

for i in range(num_c_marvel):
    print("For partition {0}, there is/are {1} element(s).".format(partitions_index_c_marvel[2*i], partitions_index_c_marvel[2*i+1]))
    
print("\nTherefore, the DataFrame uses {0} partition(s) and could use {1}.".format(len([p for p in partitions_c_marvel if p != 0]), num_c_marvel))

For partition 0, there is/are 182 element(s).
For partition 1, there is/are 276 element(s).
For partition 2, there is/are 276 element(s).

Therefore, the DataFrame uses 3 partition(s) and could use 3.
