### Import Reuired Libraries

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

### Create SparkSession and SparkContext

In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/20 13:01:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/20 13:01:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Setup

In [3]:
# Create Dataframe with Array Column
array_data = [(1, ['apple', 'banana', 'cherry', None, 'melon'], [10, 12, 15, 17, 20])]
array_df = spark.createDataFrame(array_data, ['id', 'fruit', 'num_fruit'])

# Create Dataframe with Map Column
map_data = [(1, {'apple' : 10, 'banana': 12, 'cherry':15, 'melon': 20})]
map_df = spark.createDataFrame(map_data, ("id", "map_data"))

In [4]:
print("Schema for array_df")
array_df.printSchema()

print("Schema for map_df")
map_df.printSchema()

Schema for array_df
root
 |-- id: long (nullable = true)
 |-- fruit: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- num_fruit: array (nullable = true)
 |    |-- element: long (containsNull = true)

Schema for map_df
root
 |-- id: long (nullable = true)
 |-- map_data: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)



#### Transform

In [5]:
(array_df.select("fruit", 
                 F.transform("fruit", lambda x: F.length(x)).alias("str_len"))
 .show(truncate=False)
)

                                                                                

+------------------------------------+------------------+
|fruit                               |str_len           |
+------------------------------------+------------------+
|[apple, banana, cherry, null, melon]|[5, 6, 6, null, 5]|
+------------------------------------+------------------+



In [6]:
# Using SQL API
(array_df.selectExpr("fruit", "TRANSFORM(fruit, x -> length(x)) as str_len")
.show(truncate=False)
)

+------------------------------------+------------------+
|fruit                               |str_len           |
+------------------------------------+------------------+
|[apple, banana, cherry, null, melon]|[5, 6, 6, null, 5]|
+------------------------------------+------------------+



In [7]:
(array_df.select("num_fruit", 
                 F.transform("num_fruit", lambda x: x + 1).alias("num_fruit_1"))
 .show(truncate=False)
)

+--------------------+--------------------+
|num_fruit           |num_fruit_1         |
+--------------------+--------------------+
|[10, 12, 15, 17, 20]|[11, 13, 16, 18, 21]|
+--------------------+--------------------+



In [8]:
# Using SQL API
(array_df.selectExpr("num_fruit", "TRANSFORM(num_fruit, x -> x + 1) as num_fruit_1")
 .show(truncate=False)
)

+--------------------+--------------------+
|num_fruit           |num_fruit_1         |
+--------------------+--------------------+
|[10, 12, 15, 17, 20]|[11, 13, 16, 18, 21]|
+--------------------+--------------------+



In [9]:
def add_1(x):
  return x + 1

(array_df.select("num_fruit", F.transform("num_fruit", lambda x: add_1(x)).alias("num_fruit_1"))
.show(truncate=False)
)

+--------------------+--------------------+
|num_fruit           |num_fruit_1         |
+--------------------+--------------------+
|[10, 12, 15, 17, 20]|[11, 13, 16, 18, 21]|
+--------------------+--------------------+



#### Filter

In [10]:
(array_df.select("num_fruit", 
                 F.filter("num_fruit", lambda x: x%2 == 0).alias("even_num"))
.show(truncate=False)
)

+--------------------+------------+
|num_fruit           |even_num    |
+--------------------+------------+
|[10, 12, 15, 17, 20]|[10, 12, 20]|
+--------------------+------------+



In [11]:
def even_num(x):
    return x % 2 == 0
  
(array_df.select("num_fruit", 
                 F.filter("num_fruit", lambda x: even_num(x)).alias("even_num"))
.show(truncate=False))

+--------------------+------------+
|num_fruit           |even_num    |
+--------------------+------------+
|[10, 12, 15, 17, 20]|[10, 12, 20]|
+--------------------+------------+



In [12]:
# Using SQL API
(array_df.selectExpr("num_fruit", "filter(num_fruit, x -> x % 2 == 0) as even_num")
.show(truncate=False)
)

+--------------------+------------+
|num_fruit           |even_num    |
+--------------------+------------+
|[10, 12, 15, 17, 20]|[10, 12, 20]|
+--------------------+------------+



#### EXISTS

In [13]:
# To check if atleast one elements in the array satisfy some condition
(array_df.select('fruit', 
                 F.exists('fruit', lambda x: x.startswith('a')).alias("fruit_w_a"))
 .show(truncate=False)
)

+------------------------------------+---------+
|fruit                               |fruit_w_a|
+------------------------------------+---------+
|[apple, banana, cherry, null, melon]|true     |
+------------------------------------+---------+



In [14]:
# Using SQL API
(array_df.selectExpr("fruit", "exists(fruit, x -> startswith(x, 'a')) as fruit_w_a")
.show(truncate=False)
)

+------------------------------------+---------+
|fruit                               |fruit_w_a|
+------------------------------------+---------+
|[apple, banana, cherry, null, melon]|true     |
+------------------------------------+---------+



#### FORALL

In [15]:
# To check if all elements in the array satisfy some condition
(array_df
 .select("num_fruit", 
         F.forall("num_fruit", lambda x: x < 15).alias("lt_15"),
        F.forall("num_fruit", lambda x: x < 25).alias("lt_25"))
 .show(truncate=False)
)
 

+--------------------+-----+-----+
|num_fruit           |lt_15|lt_25|
+--------------------+-----+-----+
|[10, 12, 15, 17, 20]|false|true |
+--------------------+-----+-----+



In [16]:
# Using SQL API
(array_df.selectExpr("num_fruit", 
                     "forall(num_fruit, x -> x < 15) as lt_15",
                    "forall(num_fruit, x -> x < 25) as lt_25")
.show(truncate=False)
)

+--------------------+-----+-----+
|num_fruit           |lt_15|lt_25|
+--------------------+-----+-----+
|[10, 12, 15, 17, 20]|false|true |
+--------------------+-----+-----+



#### AGGREGATE

In [17]:
(array_df.select("num_fruit", 
                 F.aggregate("num_fruit", F.lit(0.0), lambda x, y: x + y).alias("sum"))
.show(truncate=False)
)

+--------------------+----+
|num_fruit           |sum |
+--------------------+----+
|[10, 12, 15, 17, 20]|74.0|
+--------------------+----+



In [18]:
def add(x, y):
  return x + y

(array_df.select("num_fruit", 
                 F.aggregate("num_fruit", F.lit(0.0), add).alias("sum"))
.show(truncate=False)
)

+--------------------+----+
|num_fruit           |sum |
+--------------------+----+
|[10, 12, 15, 17, 20]|74.0|
+--------------------+----+



In [19]:
(array_df.select("num_fruit", 
                 F.aggregate("num_fruit", F.lit(0.0), add, lambda x: x + 10).alias("sum"))
.show(truncate=False)
)

+--------------------+----+
|num_fruit           |sum |
+--------------------+----+
|[10, 12, 15, 17, 20]|84.0|
+--------------------+----+



In [20]:
# Using SQL API
(array_df
 .selectExpr("num_fruit", "aggregate(num_fruit, 0, (x,y) -> int(x + y)) as sum")
 .show(truncate=False)
       )

+--------------------+---+
|num_fruit           |sum|
+--------------------+---+
|[10, 12, 15, 17, 20]|74 |
+--------------------+---+



In [21]:
# Let's try to apply the aggregate function on string array function.
(array_df.select("fruit", 
                 F.aggregate("fruit", F.lit("START"), lambda x, y: F.concat_ws(",", x, y), lambda x: 
                             F.concat(x, F.lit(","), F.lit("END"))).alias("concat"))
.show(truncate=False)
)

+------------------------------------+-----------------------------------+
|fruit                               |concat                             |
+------------------------------------+-----------------------------------+
|[apple, banana, cherry, null, melon]|START,apple,banana,cherry,melon,END|
+------------------------------------+-----------------------------------+



#### zip_with

In [22]:
(array_df
 .select("fruit", "num_fruit", F.zip_with("fruit", "num_fruit", lambda x, y: F.concat_ws("#", x , y)).alias("zip_col"))
 .show(truncate=False)
)

+------------------------------------+--------------------+----------------------------------------------+
|fruit                               |num_fruit           |zip_col                                       |
+------------------------------------+--------------------+----------------------------------------------+
|[apple, banana, cherry, null, melon]|[10, 12, 15, 17, 20]|[apple#10, banana#12, cherry#15, 17, melon#20]|
+------------------------------------+--------------------+----------------------------------------------+



In [23]:
# Using SQL API
(array_df
 .selectExpr("fruit", "num_fruit", "zip_with(fruit, num_fruit, (x,y) -> concat_ws('#', x, y)) as zip_col")
 .show(truncate=False)
)

+------------------------------------+--------------------+----------------------------------------------+
|fruit                               |num_fruit           |zip_col                                       |
+------------------------------------+--------------------+----------------------------------------------+
|[apple, banana, cherry, null, melon]|[10, 12, 15, 17, 20]|[apple#10, banana#12, cherry#15, 17, melon#20]|
+------------------------------------+--------------------+----------------------------------------------+



In [24]:
(array_df.withColumn("dup_num_fruit", F.col("num_fruit"))
 .select("num_fruit", "dup_num_fruit", F.zip_with("num_fruit", "dup_num_fruit", lambda x, y: x + y).alias("zip_nums"))
 .show(truncate=False)
       )

+--------------------+--------------------+--------------------+
|num_fruit           |dup_num_fruit       |zip_nums            |
+--------------------+--------------------+--------------------+
|[10, 12, 15, 17, 20]|[10, 12, 15, 17, 20]|[20, 24, 30, 34, 40]|
+--------------------+--------------------+--------------------+



In [25]:
# Using SQL API
(array_df
 .withColumn("dup_num_fruit", F.col("num_fruit"))
 .selectExpr("num_fruit", "dup_num_fruit", "zip_with(num_fruit, dup_num_fruit, (x,y) -> x + y) as zip_nums")
 .show(truncate=False)
)

+--------------------+--------------------+--------------------+
|num_fruit           |dup_num_fruit       |zip_nums            |
+--------------------+--------------------+--------------------+
|[10, 12, 15, 17, 20]|[10, 12, 15, 17, 20]|[20, 24, 30, 34, 40]|
+--------------------+--------------------+--------------------+



#### map_filter

In [26]:
map_df.show(truncate=False)

+---+------------------------------------------------------+
|id |map_data                                              |
+---+------------------------------------------------------+
|1  |{banana -> 12, cherry -> 15, apple -> 10, melon -> 20}|
+---+------------------------------------------------------+



In [27]:
# Returns a map whose key-value pairs satisfy a predicate.
(map_df.select(F.map_filter("map_data", lambda k, v: v > 12).alias("filtered_data"))
.show(truncate=False))

+---------------------------+
|filtered_data              |
+---------------------------+
|{cherry -> 15, melon -> 20}|
+---------------------------+



In [28]:
(map_df.select(F.map_filter("map_data", lambda k, v: k.contains("a")).alias("filtered_data"))
.show(truncate=False))

+---------------------------+
|filtered_data              |
+---------------------------+
|{banana -> 12, apple -> 10}|
+---------------------------+



In [29]:
# Using SQL API
(map_df.selectExpr("map_data", "map_filter(map_data, (k, v) -> v > 12) as filtered_data")
.show(truncate=False))

+------------------------------------------------------+---------------------------+
|map_data                                              |filtered_data              |
+------------------------------------------------------+---------------------------+
|{banana -> 12, cherry -> 15, apple -> 10, melon -> 20}|{cherry -> 15, melon -> 20}|
+------------------------------------------------------+---------------------------+



#### map_zip_with

In [30]:
(map_df.withColumn("dup_map_data", F.col("map_data"))
 .select(F.map_zip_with("map_data", "dup_map_data", lambda k, v1, v2: v1 + v2).alias("map_zipped_data"))
 .show(truncate=False)
)

+------------------------------------------------------+
|map_zipped_data                                       |
+------------------------------------------------------+
|{banana -> 24, cherry -> 30, apple -> 20, melon -> 40}|
+------------------------------------------------------+



In [31]:
# Using SQL API
(map_df
 .withColumn("dup_map_data", F.col("map_data"))
 .selectExpr("map_zip_with(map_data, dup_map_data, (k, v1, v2) -> v1 + v2) as map_zipped_data")
 .show(truncate=False))

+------------------------------------------------------+
|map_zipped_data                                       |
+------------------------------------------------------+
|{banana -> 24, cherry -> 30, apple -> 20, melon -> 40}|
+------------------------------------------------------+



#### transform_keys

In [32]:
(map_df.select(F.transform_keys("map_data", lambda k, v: F.upper(k)).alias("upper_key"))
.show(truncate=False)
)

+------------------------------------------------------+
|upper_key                                             |
+------------------------------------------------------+
|{BANANA -> 12, CHERRY -> 15, APPLE -> 10, MELON -> 20}|
+------------------------------------------------------+



In [33]:
# Using SQL API
(map_df
 .selectExpr("transform_keys(map_data, (k, v) -> upper(k)) as upper_key")
 .show(truncate=False))

+------------------------------------------------------+
|upper_key                                             |
+------------------------------------------------------+
|{BANANA -> 12, CHERRY -> 15, APPLE -> 10, MELON -> 20}|
+------------------------------------------------------+



#### transform_values

In [34]:
# Applies a function to every key-value pair in a map and returns a map with the results of those applications as the new values for the pairs.

(map_df.select(F.transform_values("map_data", lambda k, v: v * 2).alias("double_value"))
.show(truncate=False)
)

+------------------------------------------------------+
|double_value                                          |
+------------------------------------------------------+
|{banana -> 24, cherry -> 30, apple -> 20, melon -> 40}|
+------------------------------------------------------+



In [35]:
# Using SQL API
(map_df
 .selectExpr("transform_values(map_data, (k, v) -> v * 2) as double_value")
 .show(truncate=False))

+------------------------------------------------------+
|double_value                                          |
+------------------------------------------------------+
|{banana -> 24, cherry -> 30, apple -> 20, melon -> 40}|
+------------------------------------------------------+



### End of the Notebook