In [48]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Salting Spark App")
.config("spark.sql.shuffle.partitions",4)
.master("local[*]")
.getOrCreate()

import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20dac0c0


#### Source Data

In [49]:
val tranDF=spark.read.option("header","true").option("inferSchema","true").csv("../data/stock_transactions.csv")
val dimDF=spark.read.option("header","true").option("inferSchema","true").csv("../data/dim_org.csv")

tranDF: org.apache.spark.sql.DataFrame = [tick_id: string, acct_id: int ... 4 more fields]
dimDF: org.apache.spark.sql.DataFrame = [tick: string, tick_name: string ... 2 more fields]


#### Simple join

In [50]:
val joinedDF=tranDF.join(dimDF,tranDF("tick_id")===dimDF("tick")).drop("tick")
//joinedDF.show(false)
joinedDF.sort($"tick_id").show(500)
joinedDF.count

+-------+-------+------+-----+----+----------+------------+-----------+-------+
|tick_id|acct_id|volume| rate|type|        ts|   tick_name|        CEO|founded|
+-------+-------+------+-----+----+----------+------------+-----------+-------+
|    ABC|      8|    89| 8345|   B|1620490952|     ABC LTD| Ajay Batra| 2020.0|
|    AMZ|      5|    67| 5345|   S|1620490948|      Amazon|         JB| 2013.0|
|    AMZ|      2|     3| 2345|   S|1620490945|      Amazon|         JB| 2013.0|
|    AMZ|      5|    89| 5345|   S|1620490949|      Amazon|         JB| 2013.0|
|    AMZ|      6|    89| 6345|   S|1620490950|      Amazon|         JB| 2013.0|
|    AMZ|      7|    89| 7345|   S|1620490951|      Amazon|         JB| 2013.0|
|    AMZ|      4|    55| 4345|   S|1620490947|      Amazon|         JB| 2013.0|
|    AMZ|      1|  1122| 1345|   B|1620490944|      Amazon|         JB| 2013.0|
|    AMZ|      3|     4| 3345|   S|1620490946|      Amazon|         JB| 2013.0|
|    APL|      5|    89| 5345|   B|16204

joinedDF: org.apache.spark.sql.DataFrame = [tick_id: string, acct_id: int ... 7 more fields]
res34: Long = 46


#### Problem
* Even though this join with small datasets is working fine, it will have problems because ticks in fact will go uneven as it is now. so to solve this, we can use salting as follows.

#### Salting

In [51]:
val saltedTranDF=tranDF.withColumn("salted_tick",concat($"tick_id",lit("_"),floor(rand(123456)*2)))
saltedTranDF.show()

+-------+-------+------+----+----+----------+-----------+
|tick_id|acct_id|volume|rate|type|        ts|salted_tick|
+-------+-------+------+----+----+----------+-----------+
|    AMZ|      1|  1122|1345|   B|1620490944|      AMZ_1|
|    AMZ|      2|     3|2345|   S|1620490945|      AMZ_0|
|    AMZ|      3|     4|3345|   S|1620490946|      AMZ_0|
|    AMZ|      4|    55|4345|   S|1620490947|      AMZ_0|
|    AMZ|      5|    67|5345|   S|1620490948|      AMZ_0|
|    AMZ|      5|    89|5345|   S|1620490949|      AMZ_0|
|    AMZ|      6|    89|6345|   S|1620490950|      AMZ_0|
|    AMZ|      7|    89|7345|   S|1620490951|      AMZ_1|
|    ABC|      8|    89|8345|   B|1620490952|      ABC_1|
|    APL|      9|    89|9345|   B|1620490953|      APL_0|
|    APL|      3|    89|3345|   B|1620490954|      APL_0|
|    APL|      4|    89|4345|   B|1620490955|      APL_0|
|    APL|      5|    89|5345|   B|1620490956|      APL_0|
|   MSFT|      6|    89|6345|   B|1620490957|     MSFT_0|
|   MSFT|     

saltedTranDF: org.apache.spark.sql.DataFrame = [tick_id: string, acct_id: int ... 5 more fields]


In [52]:
val saltedDimDF=dimDF.withColumn("salt_str",lit("0,1"))
.withColumn("salt_val",explode(split($"salt_str",","))).drop("salt_str")
.withColumn("salted_tick_id",concat($"tick",lit("_"),$"salt_val"))
//explode(split(lit("1,2,3,4,5",",")
saltedDimDF.show(false)

+----+------------+-----------+-------+--------+--------------+
|tick|tick_name   |CEO        |founded|salt_val|salted_tick_id|
+----+------------+-----------+-------+--------+--------------+
|AMZ |Amazon      |JB         |2013.0 |0       |AMZ_0         |
|AMZ |Amazon      |JB         |2013.0 |1       |AMZ_1         |
|ABC | ABC LTD    | Ajay Batra|2020.0 |0       |ABC_0         |
|ABC | ABC LTD    | Ajay Batra|2020.0 |1       |ABC_1         |
|APL | APl PVT LTD| Josh Vrum |1998.0 |0       |APL_0         |
|APL | APl PVT LTD| Josh Vrum |1998.0 |1       |APL_1         |
|MSFT| Microsoft  | SN        |1975.0 |0       |MSFT_0        |
|MSFT| Microsoft  | SN        |1975.0 |1       |MSFT_1        |
|GOOG| Google     | SP        |1998.0 |0       |GOOG_0        |
|GOOG| Google     | SP        |1998.0 |1       |GOOG_1        |
+----+------------+-----------+-------+--------+--------------+



saltedDimDF: org.apache.spark.sql.DataFrame = [tick: string, tick_name: string ... 4 more fields]


In [53]:
val saltedJoinedDF=saltedTranDF.join(saltedDimDF,saltedTranDF("salted_tick")===saltedDimDF("salted_tick_id"))
.drop("salt_val","salted_tick","salted_tick_id","tick")
saltedJoinedDF.sort($"tick_id").show(500)
saltedJoinedDF.count

+-------+-------+------+-----+----+----------+------------+-----------+-------+
|tick_id|acct_id|volume| rate|type|        ts|   tick_name|        CEO|founded|
+-------+-------+------+-----+----+----------+------------+-----------+-------+
|    ABC|      8|    89| 8345|   B|1620490952|     ABC LTD| Ajay Batra| 2020.0|
|    AMZ|      5|    67| 5345|   S|1620490948|      Amazon|         JB| 2013.0|
|    AMZ|      2|     3| 2345|   S|1620490945|      Amazon|         JB| 2013.0|
|    AMZ|      5|    89| 5345|   S|1620490949|      Amazon|         JB| 2013.0|
|    AMZ|      6|    89| 6345|   S|1620490950|      Amazon|         JB| 2013.0|
|    AMZ|      7|    89| 7345|   S|1620490951|      Amazon|         JB| 2013.0|
|    AMZ|      4|    55| 4345|   S|1620490947|      Amazon|         JB| 2013.0|
|    AMZ|      1|  1122| 1345|   B|1620490944|      Amazon|         JB| 2013.0|
|    AMZ|      3|     4| 3345|   S|1620490946|      Amazon|         JB| 2013.0|
|    APL|      5|    89| 5345|   B|16204

saltedJoinedDF: org.apache.spark.sql.DataFrame = [tick_id: string, acct_id: int ... 7 more fields]
res37: Long = 46
