In [1]:
# Python 3.5.2
# PySpark 2.4.5.4.1.1.2
# Azure's HDInsight 4.0 (2 master nodes, 8 worker nodes - 16 cores and 112 GB ram each)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1603734684199_0010,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


** pyspark imports ** 
1. udf is for the withColumn join 
2. broadcast is for broadcasting mapping_df before joining in sql_df 
3. the default is without broadcast - check get_sql_df function

** random, itertools imports ** 

For the generation of mapping_df's columns.
random used in the generation of product_name column 
itertools used in the generation of (department_id, product_id) columns 

In [2]:
from pyspark.sql.functions import udf, col, broadcast
import random
import itertools

In [3]:
num_departments = 15
num_product_id = 20000

The 2 following functions are used in our experiments.

We check how much time running the action takes using **time_function**.
We calculate an average of several runs using **avg_time_function**.

Arguments: **func** is the function to run, and **args** are its arguments.

In [4]:
def time_function(func, *args):
    import time 
    t1 = time.time()
    result = func(*args)
    t2 = time.time()
    total_time = t2-t1
    return total_time, result

In [5]:
def avg_time_function(iters, func, *args):
    total_time = 0
    for _ in range(iters):
        time, _ = time_function(func, *args)
        total_time += time
    return (total_time/iters)

## Create Dictionary Data

Here we create the dataframe that holds the names for different product_id's of the different departments.

In [6]:
base_data = itertools.product(range(num_departments), range(num_product_id))
mapped_data = map(lambda tup: (tup[0], tup[1], str(hash(random.random())%6789)), base_data)
mapping_data = list(mapped_data)

In [7]:
mapping_df = spark.createDataFrame(mapping_data, ['department_id', 'product_id', 'product_name'])

In [8]:
mapping_df.count()

300000

## Create Coded Data

The following function creates a dataframe, with **num_rows** rows, and **num_cols** columns of **product_id**.

Later on we will join each such column to get the name of each such **product_id**.

In [11]:
def get_data_df(num_rows, num_cols):
    did = "id % {} as department_id".format(num_departments)
    data_df = spark.range(num_rows).selectExpr(did)
    for i in range(num_cols):
        pid = "floor(rand()*{})".format(num_product_id)
        data_df = data_df.selectExpr("*", "floor(rand()*{}) as product_id{}".format(num_product_id,i))
    return data_df

In [12]:
num_rows = 10000
data_df = get_data_df(num_rows, 5)

In [13]:
data_df.show(35)

+-------------+-----------+-----------+-----------+-----------+-----------+
|department_id|product_id0|product_id1|product_id2|product_id3|product_id4|
+-------------+-----------+-----------+-----------+-----------+-----------+
|            0|       7000|       2544|       7239|      17716|      16790|
|            1|      17938|       4821|      14477|        950|      16758|
|            2|       9842|      14424|      12156|      14403|      16541|
|            3|      17608|       8616|       5722|       8825|      15340|
|            4|       7495|       7624|      17885|      17330|       2680|
|            5|       2608|       9340|      15481|      16591|      18968|
|            6|      11770|      12593|      10014|      12227|       9925|
|            7|      11765|      16057|       1201|        901|       2220|
|            8|      13440|      18077|        400|      12440|      15988|
|            9|      11389|       5505|      17030|      17371|      15744|
|           

## Let the joining begin

### 1st try - SQL Syntax

This function does the following:

1. For each **product_id** column, we join to mapping_df to get its **product_name**.
2. After finished joining all **product_id** columns, we group by all **product_name** columns and *count()* it.

In [14]:
def get_sql_df(data_df, mapping_df, num_cols):
#     mapping_df = broadcast(mapping_df)
    data_df.createOrReplaceTempView("data")
    mapping_df.createOrReplaceTempView("mapping")
    def query_generator(num_cols):
        select_clause = " , ".join(["b{}.product_name as product_name{}".format(i,i) for i in range(num_cols)])
        join_clause = " \n ".join(["join mapping b{} on a.department_id = b{}.department_id and a.product_id{} = b{}.product_id".format(i,i,i,i) for i in range(num_cols)])
        sql = """
                select a.*, {} 
                from data a
                {}
                """.format(select_clause, join_clause)
        return sql
        
    sql_df = spark.sql(query_generator(num_cols))
    product_name_columns_names = ["product_name{}".format(i) for i in range(num_cols)]
    sql_df = sql_df.groupBy(product_name_columns_names).count()
    return sql_df

In [15]:
get_sql_df(data_df, mapping_df, 5)

DataFrame[product_name0: string, product_name1: string, product_name2: string, product_name3: string, product_name4: string, count: bigint]

### 2nd try - the plain old dict

Here we do the following:

1. Collect **mapping_df**.
2. Create a dictionary with 2 hierarchies, **department_id** as its first hierarchy. The second hierarchy is **product_id** with **product_name** as its value.
3. Broadcast the dictionary to all executors.
4. Create a udf that maps a couple (**deparment_id**, **product_id**) into **product_name** using the broadcasted dictionary.

In [16]:
indexed_data = dict()
for department_id, product_id, product_name in mapping_df.collect():
    if department_id not in indexed_data:
        indexed_data[department_id] = dict()
    indexed_data[department_id][product_id] = product_name

In [17]:
broadcast_indexed_data = spark.sparkContext.broadcast(indexed_data)

In [18]:
join_udf = udf(lambda department_id, product_id: broadcast_indexed_data.value[department_id][product_id])

This function is similar to get_sql_df. The different is that instead of joining as in get_sql_df's first phase, we map each **product_id** column along with the **depratment_id** on its row into **product_name**.

The functionality is the same, the technicality is different.

In [19]:
def get_udf_df(data_df, num_cols):
    udf_df = data_df
    for i in range(num_cols):
        udf_df = udf_df.withColumn('product_name{}'.format(i), join_udf(col("department_id"), col("product_id{}".format(i))))
    product_name_columns_names = ["product_name{}".format(i) for i in range(num_cols)]
    udf_df = udf_df.groupBy(*product_name_columns_names).count()
    return udf_df

### Who is faster?

The function below is used in **avg_time_function** to check the performance of the different join approaches.

**process_df** is **avg_time_function**'s *func* parameter.

In [20]:
def process_df(df):
    df_count = df.count()
    return df, df_count

In [21]:
# 100 thousand in a variable for simplicity
k100=100000

**print_avg** is our experimenting function.

It creates *data_df* with **num_rows** rows and **num_cols** columns.
Then it joins *data_df* in the 2 different approaches using *get_sql_df* and *get_sql_df*.

After creating the dataframes we will process using *process_df* function, we use **avg_time_function** to get the relevant running time averages for each of 2 dataframes.

In [22]:
def print_avg(num_iterations, num_rows, num_cols=10):
    data_df = get_data_df(num_rows, num_cols)
    
    sql_df = get_sql_df(data_df, mapping_df, num_cols)
    udf_df = get_udf_df(data_df, num_cols)
    
    for df, name in [(sql_df, 'sql_df'), (udf_df, 'udf_df')]:
        average_run_time = avg_time_function(num_iterations, process_df, df)
        print("average run_time for {name} with {num_rows} rows is {average_run_time}".format(name=name, average_run_time=average_run_time, num_rows=num_rows))

In [23]:
print_avg(num_iterations=5, num_rows=k100, num_cols=5)

average run_time for sql_df with 100000 rows is 9.40359115600586
average run_time for udf_df with 100000 rows is 1.8074284076690674

In [24]:
print_avg(num_iterations=5, num_rows=k100, num_cols=10)

average run_time for sql_df with 100000 rows is 16.00410599708557
average run_time for udf_df with 100000 rows is 2.181958818435669

In [25]:
print_avg(num_iterations=5, num_rows=k100, num_cols=15)

average run_time for sql_df with 100000 rows is 27.68385329246521
average run_time for udf_df with 100000 rows is 3.11617431640625

In [31]:
print_avg(num_iterations=5, num_rows=10*k100, num_cols=10)

average run_time for sql_df with 1000000 rows is 17.9197660446167
average run_time for udf_df with 1000000 rows is 2.8207101821899414

In [33]:
print_avg(num_iterations=5, num_rows=30*k100, num_cols=10)

average run_time for sql_df with 3000000 rows is 19.310581111907958
average run_time for udf_df with 3000000 rows is 3.5375089168548586

In [34]:
print_avg(num_iterations=5, num_rows=100*k100, num_cols=10)

average run_time for sql_df with 10000000 rows is 23.55312309265137
average run_time for udf_df with 10000000 rows is 6.80124945640564

In [35]:
print_avg(num_iterations=5, num_rows=500*k100, num_cols=10)

average run_time for sql_df with 50000000 rows is 45.740611982345584
average run_time for udf_df with 50000000 rows is 24.278624248504638

### What is the plan?

Running *.explain()* below helps understanding Spark's under the hood regarding the 2 different approaches.

In [37]:
num_cols = 5
data_df = get_data_df(k100, num_cols)
sql_df = get_sql_df(data_df, mapping_df, num_cols)
udf_df = get_udf_df(data_df, num_cols)

In [38]:
sql_df.explain()

== Physical Plan ==
*(19) HashAggregate(keys=[product_name0#5632, product_name1#5633, product_name2#5634, product_name3#5635, product_name4#5636], functions=[count(1)])
+- Exchange hashpartitioning(product_name0#5632, product_name1#5633, product_name2#5634, product_name3#5635, product_name4#5636, 200)
   +- *(18) HashAggregate(keys=[product_name0#5632, product_name1#5633, product_name2#5634, product_name3#5635, product_name4#5636], functions=[partial_count(1)])
      +- *(18) Project [product_name#2 AS product_name0#5632, product_name#5639 AS product_name1#5633, product_name#5642 AS product_name2#5634, product_name#5645 AS product_name3#5635, product_name#5648 AS product_name4#5636]
         +- *(18) SortMergeJoin [department_id#5605L, product_id4#5625L], [department_id#5646L, product_id#5647L], Inner
            :- *(15) Sort [department_id#5605L ASC NULLS FIRST, product_id4#5625L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(department_id#5605L, product_id4#5

In [39]:
udf_df.explain()

== Physical Plan ==
*(3) HashAggregate(keys=[product_name0#5680, product_name1#5689, product_name2#5699, product_name3#5710, product_name4#5722], functions=[count(1)])
+- Exchange hashpartitioning(product_name0#5680, product_name1#5689, product_name2#5699, product_name3#5710, product_name4#5722, 200)
   +- *(2) HashAggregate(keys=[product_name0#5680, product_name1#5689, product_name2#5699, product_name3#5710, product_name4#5722], functions=[partial_count(1)])
      +- *(2) Project [pythonUDF0#5755 AS product_name0#5680, pythonUDF1#5756 AS product_name1#5689, pythonUDF2#5757 AS product_name2#5699, pythonUDF3#5758 AS product_name3#5710, pythonUDF4#5759 AS product_name4#5722]
         +- BatchEvalPython [<lambda>(department_id#5605L, product_id0#5607L), <lambda>(department_id#5605L, product_id1#5610L), <lambda>(department_id#5605L, product_id2#5614L), <lambda>(department_id#5605L, product_id3#5619L), <lambda>(department_id#5605L, product_id4#5625L)], [department_id#5605L, product_id0#5607

### Making sure it is the same data


For the skeptical among us, here is a code to check that both dataframes, *sql_df* and *udf_df*, contain the same data.

In [40]:
columns_order = udf_df.columns

In [41]:
udf_df.subtract(sql_df.select(columns_order)).show()

+-------------+-------------+-------------+-------------+-------------+-----+
|product_name0|product_name1|product_name2|product_name3|product_name4|count|
+-------------+-------------+-------------+-------------+-------------+-----+
+-------------+-------------+-------------+-------------+-------------+-----+

In [None]:
sql_df.select(columns_order).subtract(udf_df).show()