# 0. Abstract

The purpose of this notebook is to explore on arrayType filtering to replace wildcard filtering. We desire this because this might enhance the performance of our algorithms against a large set of cutomer table and offer table.

# 1. Random Table Generator

In [1]:
import scala.util.Random
import math.{ round, min, max }

trait Dummy_Data_Generator extends Serializable {
    
    /**
     * Simulate a double of range 0 (inclusive) to `value` (exclusive).
     */
    def random_double(
        rand: Random = new Random, 
        value: Double = 1): Double = {
        rand.nextDouble * value
    }
    
    /**
     * Randomly select some items (size less than or equal to `max_item` but greater than or
     * equal to `min_item`) from the given array and output a string with items seperated
     * by `sep`.
     */
    def random_array_to_string[T](
        rand: Random = new Random, 
        array: Array[T],
        min_item: Int = 1,
        max_item: Int = 3,
        sep: String = ",",
        duplicate: Boolean = false,
        prob_array: Array[Double] = Array()): String = {
        
      if(array.isEmpty)
        throw new Exception("Invalid configuration: simulate from empty array")
      else if (prob_array.length != array.length && !prob_array.isEmpty)
        throw new Exception("Invalid configuration: different length of prob_array and array.")
      else {     
        val len: Int = array.length
        
        // Declare output variable.
        var output = "" + sep  
          
        // If prob_array is not provided, then assume equal probability for each item.
        lazy val prob_each: Double = 1.0 / len 
        val probArray: Array[Double] = 
          if (prob_array.isEmpty) array.map(x => prob_each) else prob_array
        
        // Number of items that will be in the list
        var num = max(min(rand.nextInt(len), max_item), min_item)
          
        // Tail recursive method of simulating from `array`.
        def gen[T](array: Array[T], probArray: Array[Double], sim: Double): T = {
          if (array.length == 1 || sim <= probArray.head)
            array.head
          else
            gen(array.tail, probArray.tail, sim - probArray.head)        
        }
        
        // Simulate a random double from 0 (inclusive) to `probArray.sum` (exclusive).
        var sim = 0.5
          
        // Generate the output list  
        while (num > 0) {
            sim = random_double(rand, probArray.sum)
            output = output + 
                gen(array.tail, probArray.tail, sim - probArray.head).toString + sep
            num = num - 1
        }
        
        // Remove the first and last element of the output, which are `sep`'s.
        output.drop(1).dropRight(1)
      }
    }
    
}

defined trait Dummy_Data_Generator


In [2]:
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.{ SparkSession, SQLContext, Row }
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import scala.util.Random 

class Dummy_Table_Generator(spark: SparkSession, rand: Random) extends Dummy_Data_Generator {
    
  // Generate sparkContext from sparkSession.
  val sc = spark.sparkContext
    
  /**
   * Simulate a random table with `num_row` rows according to `col_schema` and `col_map`.
   */
  def random_table(num_row: Int, col_schema: Array[StructField], col_map: Int => Row) = {
    import spark.implicits._
      
    val rdd = sc.makeRDD(Range(1, num_row + 1).map(col_map))
    spark.createDataFrame(rdd, StructType(col_schema))
    }

}

defined class Dummy_Table_Generator


In [3]:
// Set up a SparkSession object.
val spark = SparkSession.builder
  .master("local[*]")
  .appName("Dummy Table Generator Example 1")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

import spark.implicits._

// Set seed for random number generator.
val rand = new Random(588)

// Table generator
val table_gen = new Dummy_Table_Generator(spark, rand)

spark = org.apache.spark.sql.SparkSession@5903a2ed
rand = scala.util.Random@3a18734d
table_gen = Dummy_Table_Generator@1380916e


Dummy_Table_Generator@1380916e

# 2. Customer Table

In [4]:
// List of customer descriptions
val descList: Array[String] = Array("L1", "L2", "L3", "L4", "L5", "L6", "L7", "L8", "L9", "L10",
                     "L11", "L12", "L13", "L14", "L15", "L16", "L17", "L18", "L19", "L20",
                     "L21", "L22", "L23", "L24", "L25", "L26", "L27", "L28", "L29", "L30")

// Column schema for the customer table.
val custColSchema = Array(
    StructField("Cust_ID", IntegerType, true),
    StructField("Cust_Desc", StringType, true))

// Column mapping for the customer table.
val custColMap = (x: Int) => Row(
    100000 + x,
    table_gen.random_array_to_string(rand, descList, 0, 3))

// The customer table.
val customer = table_gen.random_table(10, custColSchema, custColMap)

customer.show(false)

+-------+-----------+
|Cust_ID|Cust_Desc  |
+-------+-----------+
|100001 |L2,L20,L27 |
|100002 |L9,L30,L17 |
|100003 |L27,L3,L4  |
|100004 |L21,L26,L6 |
|100005 |L21,L17,L13|
|100006 |L8,L15,L3  |
|100007 |L16,L10,L5 |
|100008 |L3,L3,L9   |
|100009 |L28,L30,L22|
|100010 |L17,L19,L29|
+-------+-----------+



descList = Array(L1, L2, L3, L4, L5, L6, L7, L8, L9, L10, L11, L12, L13, L14, L15, L16, L17, L18, L19, L20, L21, L22, L23, L24, L25, L26, L27, L28, L29, L30)
custColSchema = Array(StructField(Cust_ID,IntegerType,true), StructField(Cust_Desc,StringType,true))
custColMap = > org.apache.spark.sql.Row = <function1>
customer = [Cust_ID: int, Cust_Desc: string]


[Cust_ID: int, Cust_Desc: string]

# 3. Offer Table

In [5]:
// Offer schema for the offer table.
val offerColSchema = Array(
    StructField("Offer_ID", IntegerType, true),
    StructField("Cust_Desc_from_Offer", StringType, true))

// Offer mapping for the offer table.
val offerColMap = (x: Int) => Row(
    1000 + x,
    table_gen.random_array_to_string(rand, descList, 1, 2))

// The offer table.
val offer = table_gen.random_table(10, offerColSchema, offerColMap)

offer.show(false)

+--------+--------------------+
|Offer_ID|Cust_Desc_from_Offer|
+--------+--------------------+
|1001    |L30,L25             |
|1002    |L4,L16              |
|1003    |L16,L26             |
|1004    |L15                 |
|1005    |L29,L29             |
|1006    |L9,L2               |
|1007    |L28,L4              |
|1008    |L8,L8               |
|1009    |L27,L2              |
|1010    |L21                 |
+--------+--------------------+



offerColSchema = Array(StructField(Offer_ID,IntegerType,true), StructField(Cust_Desc_from_Offer,StringType,true))
offerColMap = > org.apache.spark.sql.Row = <function1>
offer = [Offer_ID: int, Cust_Desc_from_Offer: string]


[Offer_ID: int, Cust_Desc_from_Offer: string]

# 4. Wildcard Filter

In [6]:
// User defined function to check intersection.
val custDescIntersectWildcard = udf {
    (str1: String, str2: String) => 
        var output: Boolean = str1.isEmpty
        val str1Updated = "," + str1 + ","
        for (s <- str2.split("\\,")) {
            output = output || (str1Updated contains ("," + s + ","))
        }
        output
}

custDescIntersectWildcard = UserDefinedFunction(<function2>,BooleanType,Some(List(StringType, StringType)))


UserDefinedFunction(<function2>,BooleanType,Some(List(StringType, StringType)))

In [7]:
val tgtWildcard = customer.crossJoin(offer)
    .where(custDescIntersectWildcard($"Cust_Desc", $"Cust_Desc_from_Offer"))
    
tgtWildcard.show(false)

+-------+-----------+--------+--------------------+
|Cust_ID|Cust_Desc  |Offer_ID|Cust_Desc_from_Offer|
+-------+-----------+--------+--------------------+
|100002 |L9,L30,L17 |1001    |L30,L25             |
|100001 |L2,L20,L27 |1006    |L9,L2               |
|100002 |L9,L30,L17 |1006    |L9,L2               |
|100001 |L2,L20,L27 |1009    |L27,L2              |
|100003 |L27,L3,L4  |1002    |L4,L16              |
|100004 |L21,L26,L6 |1003    |L16,L26             |
|100003 |L27,L3,L4  |1007    |L28,L4              |
|100003 |L27,L3,L4  |1009    |L27,L2              |
|100004 |L21,L26,L6 |1010    |L21                 |
|100005 |L21,L17,L13|1010    |L21                 |
|100007 |L16,L10,L5 |1002    |L4,L16              |
|100006 |L8,L15,L3  |1004    |L15                 |
|100007 |L16,L10,L5 |1003    |L16,L26             |
|100006 |L8,L15,L3  |1008    |L8,L8               |
|100009 |L28,L30,L22|1001    |L30,L25             |
|100010 |L17,L19,L29|1005    |L29,L29             |
|100008 |L3,

tgtWildcard = [Cust_ID: int, Cust_Desc: string ... 2 more fields]


[Cust_ID: int, Cust_Desc: string ... 2 more fields]

# 5. ArrayType Filter

In [8]:
// Demonstration of arrayType in Spark Dataframe
customer.withColumn("Cust_Desc_Array", split(col("Cust_Desc"), "\\,")).show(false)

+-------+-----------+---------------+
|Cust_ID|Cust_Desc  |Cust_Desc_Array|
+-------+-----------+---------------+
|100001 |L2,L20,L27 |[L2, L20, L27] |
|100002 |L9,L30,L17 |[L9, L30, L17] |
|100003 |L27,L3,L4  |[L27, L3, L4]  |
|100004 |L21,L26,L6 |[L21, L26, L6] |
|100005 |L21,L17,L13|[L21, L17, L13]|
|100006 |L8,L15,L3  |[L8, L15, L3]  |
|100007 |L16,L10,L5 |[L16, L10, L5] |
|100008 |L3,L3,L9   |[L3, L3, L9]   |
|100009 |L28,L30,L22|[L28, L30, L22]|
|100010 |L17,L19,L29|[L17, L19, L29]|
+-------+-----------+---------------+



In [9]:
import scala.collection.mutable.WrappedArray

// User defined function to check intersection.
val custDescIntersect = udf {
    (dfArray1: WrappedArray[String], dfArray2: WrappedArray[String]) => 
        (dfArray1.head == "" || (dfArray1.toSet.intersect(dfArray2.toSet)).size > 0)
}

/* Comments
 * All of the first array will be enumerated, so we should put the shorter array as array1.
 */

custDescIntersect = UserDefinedFunction(<function2>,BooleanType,Some(List(ArrayType(StringType,true), ArrayType(StringType,true))))


UserDefinedFunction(<function2>,BooleanType,Some(List(ArrayType(StringType,true), ArrayType(StringType,true))))

In [10]:
// Find intersection.
val tgtArrayType = 
  customer.withColumn("Cust_Desc_Array", split(col("Cust_Desc"), "\\,"))
    .crossJoin(offer.withColumn("Cust_Desc_Offer_Array",
                                split(col("Cust_Desc_from_Offer"), "\\,")))
    .withColumn("Intersect", custDescIntersect($"Cust_Desc_Array", $"Cust_Desc_Offer_Array"))

tgtArrayType.selectExpr(
    "Cust_ID",
    "Cust_Desc",
    "Offer_ID",
    "Cust_Desc_from_Offer",
    "Intersect").orderBy("Cust_ID", "Offer_ID").show(20, false)

+-------+----------+--------+--------------------+---------+
|Cust_ID|Cust_Desc |Offer_ID|Cust_Desc_from_Offer|Intersect|
+-------+----------+--------+--------------------+---------+
|100001 |L2,L20,L27|1001    |L30,L25             |false    |
|100001 |L2,L20,L27|1002    |L4,L16              |false    |
|100001 |L2,L20,L27|1003    |L16,L26             |false    |
|100001 |L2,L20,L27|1004    |L15                 |false    |
|100001 |L2,L20,L27|1005    |L29,L29             |false    |
|100001 |L2,L20,L27|1006    |L9,L2               |true     |
|100001 |L2,L20,L27|1007    |L28,L4              |false    |
|100001 |L2,L20,L27|1008    |L8,L8               |false    |
|100001 |L2,L20,L27|1009    |L27,L2              |true     |
|100001 |L2,L20,L27|1010    |L21                 |false    |
|100002 |L9,L30,L17|1001    |L30,L25             |true     |
|100002 |L9,L30,L17|1002    |L4,L16              |false    |
|100002 |L9,L30,L17|1003    |L16,L26             |false    |
|100002 |L9,L30,L17|1004

tgtArrayType = [Cust_ID: int, Cust_Desc: string ... 5 more fields]


[Cust_ID: int, Cust_Desc: string ... 5 more fields]

# 6. MapType Filter

In [11]:
// Offers in the Ln list.
val LnOffer = offer
    .withColumn("Cust_Desc_Offer_Array",
                split(col("Cust_Desc_from_Offer"), "\\,"))
    .where(array_contains(col("Cust_Desc_Offer_Array"), "L30"))
    .show(false)

// Need a way to automates this.

+--------+--------------------+---------------------+
|Offer_ID|Cust_Desc_from_Offer|Cust_Desc_Offer_Array|
+--------+--------------------+---------------------+
|1001    |L30,L25             |[L30, L25]           |
+--------+--------------------+---------------------+



LnOffer: Unit = ()


In [12]:
val allOffer: Array[String] = offer.select("Offer_ID").distinct.collect
    .map(x => x.toString.drop(1).dropRight(1))

val offerListMap: Map[String, Array[String]] = Map(
    "L1" -> Array[String](),
    "L2" -> Array[String]("1006", "1009"),
    "L3" -> Array[String](),
    "L4" -> Array[String]("1002", "1007"),
    "L5" -> Array[String](),
    "L6" -> Array[String](),
    "L7" -> Array[String](),
    "L8" -> Array[String]("1008"),
    "L9" -> Array[String]("1006"),
    "L10" -> Array[String](),
    "L12" -> Array[String](),
    "L13" -> Array[String](),
    "L14" -> Array[String](),
    "L15" -> Array[String]("1004"),
    "L16" -> Array[String]("1002", "1003"),
    "L17" -> Array[String](),
    "L18" -> Array[String](),
    "L19" -> Array[String](),
    "L20" -> Array[String](),
    "L21" -> Array[String]("1010"),
    "L22" -> Array[String](),
    "L23" -> Array[String](),
    "L24" -> Array[String](),
    "L25" -> Array[String]("1001"),
    "L26" -> Array[String]("1003"),
    "L27" -> Array[String]("1009"),
    "L28" -> Array[String]("1007"),
    "L29" -> Array[String]("1005"),
    "L30" -> Array[String]("1001"),
    "" -> descList) 

def listOfferMap(dfMap: Map[String, Array[String]]) = udf (
    (str: String) => {
            str.split("\\,").flatMap(x => dfMap.getOrElse(x, Array[String]("NULL"))).distinct
    }
)

allOffer = Array(1005, 1008, 1010, 1002, 1001, 1006, 1007, 1003, 1004, 1009)
offerListMap = Map(L8 -> Array(1008), "" -> Array(L1, L2, L3, L4, L5, L6, L7, L8, L9, L10, L11, L12, L13, L14, L15, L16, L17, L18, L19, L20, L21, L22, L23, L24, L25, L26, L27, L28, L29, L30), L19 -> Array(), L27 -> Array(1009), L1 -> Array(), L16 -> Array(1002, 1003), L15 -> Array(1004), L26 -> Array(1003), L22 -> Array(), L5 -> Array(), L25 -> Array(1001), L14 -> Array(), L4 -> Array(1002, 1007), L21 -> Array(1010), L9 -> Array(1006), L10 -> Array(), L17 -> Array(), L20 -> Array(), L6 -> Array(), L24 -> Array(), L29 -> Array(1005), L13 -> Array(), L3 -> Array(), L7 -> Array(), L12 -> Array(), L30 -> Array(1001), L23 -> Array(), L2 -> Array(1006, 1009), L18 -> Array(), ...


Map(L8 -> Array(1008), "" -> Array(L1, L2, L3, L4, L5, L6, L7, L8, L9, L10, L11, L12, L13, L14, L15, L16, L17, L18, L19, L20, L21, L22, L23, L24, L25, L26, L27, L28, L29, L30), L19 -> Array(), L27 -> Array(1009), L1 -> Array(), L16 -> Array(1002, 1003), L15 -> Array(1004), L26 -> Array(1003), L22 -> Array(), L5 -> Array(), L25 -> Array(1001), L14 -> Array(), L4 -> Array(1002, 1007), L21 -> Array(1010), L9 -> Array(1006), L10 -> Array(), L17 -> Array(), L20 -> Array(), L6 -> Array(), L24 -> Array(), L29 -> Array(1005), L13 -> Array(), L3 -> Array(), L7 -> Array(), L12 -> Array(), L30 -> Array(1001), L23 -> Array(), L2 -> Array(1006, 1009), L18 -> Array(), ...

In [13]:
val tgtMapType = customer.select(
    $"Cust_ID", 
    $"Cust_Desc", 
    listOfferMap(offerListMap)($"Cust_Desc") as "Offers_Linked")

tgtMapType.show(false)

+-------+-----------+------------------+
|Cust_ID|Cust_Desc  |Offers_Linked     |
+-------+-----------+------------------+
|100001 |L2,L20,L27 |[1006, 1009]      |
|100002 |L9,L30,L17 |[1006, 1001]      |
|100003 |L27,L3,L4  |[1009, 1002, 1007]|
|100004 |L21,L26,L6 |[1010, 1003]      |
|100005 |L21,L17,L13|[1010]            |
|100006 |L8,L15,L3  |[1008, 1004]      |
|100007 |L16,L10,L5 |[1002, 1003]      |
|100008 |L3,L3,L9   |[1006]            |
|100009 |L28,L30,L22|[1007, 1001]      |
|100010 |L17,L19,L29|[1005]            |
+-------+-----------+------------------+



tgtMapType = [Cust_ID: int, Cust_Desc: string ... 1 more field]


[Cust_ID: int, Cust_Desc: string ... 1 more field]

In [14]:
tgtMapType.withColumn("Offers_Linked", explode($"Offers_Linked"))
    .join(offer, col("Offer_ID") === col("Offers_Linked"), "inner")
    .orderBy("Cust_ID", "Offer_ID").show(false)

+-------+-----------+-------------+--------+--------------------+
|Cust_ID|Cust_Desc  |Offers_Linked|Offer_ID|Cust_Desc_from_Offer|
+-------+-----------+-------------+--------+--------------------+
|100001 |L2,L20,L27 |1006         |1006    |L9,L2               |
|100001 |L2,L20,L27 |1009         |1009    |L27,L2              |
|100002 |L9,L30,L17 |1001         |1001    |L30,L25             |
|100002 |L9,L30,L17 |1006         |1006    |L9,L2               |
|100003 |L27,L3,L4  |1002         |1002    |L4,L16              |
|100003 |L27,L3,L4  |1007         |1007    |L28,L4              |
|100003 |L27,L3,L4  |1009         |1009    |L27,L2              |
|100004 |L21,L26,L6 |1003         |1003    |L16,L26             |
|100004 |L21,L26,L6 |1010         |1010    |L21                 |
|100005 |L21,L17,L13|1010         |1010    |L21                 |
|100006 |L8,L15,L3  |1004         |1004    |L15                 |
|100006 |L8,L15,L3  |1008         |1008    |L8,L8               |
|100007 |L

# 7. Some Performance

In [15]:
  def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)/1e6 + "ms")
    result
  }

val a = (0 until 10000 by 1).toSet   // smaller data
val b = (0 until 1000000 by 1).toSet // bigger data

a = Set(2163, 8607, 645, 892, 69, 5385, 5810, 7375, 5659, 9929, 2199, 8062, 3021, 8536, 5437, 1322, 1665, 5509, 5686, 1036, 9982, 2822, 7304, 9131, 2630, 6085, 3873, 4188, 1586, 8618, 1501, 4201, 2452, 8960, 9405, 809, 7373, 8930, 4560, 7766, 4447, 3962, 1879, 5422, 1337, 1718, 2094, 6836, 5469, 9208, 3944, 1411, 7427, 5365, 6387, 629, 8186, 3883, 5116, 6405, 9458, 5561, 6979, 2612, 4094, 6167, 1024, 5918, 1469, 7272, 8398, 365, 5088, 9273, 2744, 1369, 4835, 138, 6669, 6355, 2889, 1823, 1190, 1168, 2295, 2306, 7890, 4571, 3053, 4101, 4450, 9786, 3345, 9653, 760, 4005, 5857, 4464, 9886, 2341, 101, 2336, 9077, 3008, 2109, 4824, 5593, 2131, 1454, 7854, 4909, 2031, 5136, 8154, 7973, 6530, 7569, 5896, 1633, 6231, 2778, 8288, 7691, ...


time: [R](block: => R)R


Set(2163, 8607, 645, 892, 69, 5385, 5810, 7375, 5659, 9929, 2199, 8062, 3021, 8536, 5437, 1322, 1665, 5509, 5686, 1036, 9982, 2822, 7304, 9131, 2630, 6085, 3873, 4188, 1586, 8618, 1501, 4201, 2452, 8960, 9405, 809, 7373, 8930, 4560, 7766, 4447, 3962, 1879, 5422, 1337, 1718, 2094, 6836, 5469, 9208, 3944, 1411, 7427, 5365, 6387, 629, 8186, 3883, 5116, 6405, 9458, 5561, 6979, 2612, 4094, 6167, 1024, 5918, 1469, 7272, 8398, 365, 5088, 9273, 2744, 1369, 4835, 138, 6669, 6355, 2889, 1823, 1190, 1168, 2295, 2306, 7890, 4571, 3053, 4101, 4450, 9786, 3345, 9653, 760, 4005, 5857, 4464, 9886, 2341, 101, 2336, 9077, 3008, 2109, 4824, 5593, 2131, 1454, 7854, 4909, 2031, 5136, 8154, 7973, 6530, 7569, 5896, 1633, 6231, 2778, 8288, 7691, ...

In [16]:
time { a.intersect(b).size > 0 }

Elapsed time: 6.689314ms


<console>:45: error: missing argument list for method time
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `time _` or `time(_)` instead of `time`.
       time
       ^
lastException: Throwable = null


true

# 8. Performance Testing - Wildcard Filter

In [17]:
// The customer table with 100,000 customers. 
val customerP = table_gen.random_table(100000, custColSchema, custColMap).cache
// The Offer table with 500 offers.
val offerP = table_gen.random_table(500, offerColSchema, offerColMap).cache

// memory info
val mb = 1024*1024
val runtime = Runtime.getRuntime
println("** Used Memory:  " + (runtime.totalMemory - runtime.freeMemory) / mb)
println("** Free Memory:  " + runtime.freeMemory / mb)
println("** Total Memory: " + runtime.totalMemory / mb)
println("** Max Memory:   " + runtime.maxMemory / mb)

** Used Memory:  362
** Free Memory:  325
** Total Memory: 688
** Max Memory:   910


customerP = [Cust_ID: int, Cust_Desc: string]
offerP = [Offer_ID: int, Cust_Desc_from_Offer: string]
mb = 1048576
runtime = java.lang.Runtime@5451b697


java.lang.Runtime@5451b697

In [18]:
time {
    customerP.crossJoin(offerP)
    .where(custDescIntersectWildcard($"Cust_Desc", $"Cust_Desc_from_Offer"))
    .select($"Cust_ID", $"Offer_ID", $"Cust_Desc", $"Cust_Desc_from_Offer")
    .orderBy($"Cust_ID", $"Offer_ID")
    .count
}

Elapsed time: 42566.034779ms


10443105

# 9. Performance Testing - ArrayType Filter

In [19]:
// Sleep for 5 seconds before executing the next command
Thread.sleep(5000)

// The Filtered table
time { customerP
      .withColumn("Cust_Desc_Array", split(col("Cust_Desc"), "\\,"))
      .crossJoin(offerP.withColumn("Cust_Desc_Offer_Array",
                                  split(col("Cust_Desc_from_Offer"), "\\,")))
      .where(custDescIntersect($"Cust_Desc_Array", $"Cust_Desc_Offer_Array"))
      .select($"Cust_ID", $"Offer_ID", $"Cust_Desc", $"Cust_Desc_from_Offer")
      .orderBy($"Cust_ID", $"Offer_ID")
      .count }

Elapsed time: 74927.646875ms


10443105

# 10. Performance Testing - MapType Filter

In [20]:
// Sleep for 5 seconds before executing the next command
Thread.sleep(5000)

// Obtain all offers
val allOfferP: Array[String] = offerP.select("Offer_ID").distinct.collect
    .map(x => x.toString.drop(1).dropRight(1))

// Declare list offer map
var offerListMapP = Map("" -> allOfferP)

// Find all list offer map
for (lx <- descList) {
    var LxOffer: Array[String] = offerP
        .withColumn("Cust_Desc_Offer_Array",
                    split(col("Cust_Desc_from_Offer"), "\\,"))
        .where(array_contains(col("Cust_Desc_Offer_Array"), lx))
        .select($"Offer_ID")
        .collect.toSeq.toArray.map(x => x.toString.drop(1).dropRight(1))
    offerListMapP += (lx -> LxOffer)
}

allOfferP = Array(1088, 1238, 1342, 1025, 1084, 1127, 1395, 1460, 1483, 1139, 1143, 1270, 1303, 1322, 1339, 1352, 1265, 1223, 1157, 1466, 1005, 1016, 1133, 1160, 1199, 1212, 1417, 1468, 1331, 1068, 1198, 1344, 1404, 1165, 1183, 1378, 1500, 1226, 1259, 1031, 1175, 1201, 1426, 1441, 1496, 1051, 1243, 1274, 1296, 1415, 1030, 1034, 1064, 1156, 1163, 1489, 1276, 1307, 1372, 1480, 1235, 1493, 1019, 1269, 1294, 1135, 1350, 1418, 1148, 1210, 1280, 1363, 1056, 1085, 1398, 1046, 1093, 1260, 1208, 1077, 1215, 1290, 1457, 1249, 1324, 1065, 1125, 1295, 1332, 1469, 1176, 1291, 1061, 1197, 1207, 1456, 1477, 1055, 1244, 1300, 1425, 1114, 1206, 1288, 1306, 1311, 1170, 1266, 1382, 1122, 1145, 1247, 1304, 1488, 1491, 1008, 1172, 1391, 1302, 1485, 1047, 1190, 1400, 1427, 1458, 1434, 1473, 11...


Array(1088, 1238, 1342, 1025, 1084, 1127, 1395, 1460, 1483, 1139, 1143, 1270, 1303, 1322, 1339, 1352, 1265, 1223, 1157, 1466, 1005, 1016, 1133, 1160, 1199, 1212, 1417, 1468, 1331, 1068, 1198, 1344, 1404, 1165, 1183, 1378, 1500, 1226, 1259, 1031, 1175, 1201, 1426, 1441, 1496, 1051, 1243, 1274, 1296, 1415, 1030, 1034, 1064, 1156, 1163, 1489, 1276, 1307, 1372, 1480, 1235, 1493, 1019, 1269, 1294, 1135, 1350, 1418, 1148, 1210, 1280, 1363, 1056, 1085, 1398, 1046, 1093, 1260, 1208, 1077, 1215, 1290, 1457, 1249, 1324, 1065, 1125, 1295, 1332, 1469, 1176, 1291, 1061, 1197, 1207, 1456, 1477, 1055, 1244, 1300, 1425, 1114, 1206, 1288, 1306, 1311, 1170, 1266, 1382, 1122, 1145, 1247, 1304, 1488, 1491, 1008, 1172, 1391, 1302, 1485, 1047, 1190, 1400, 1427, 1458, 1434, 1473, 11...

In [21]:
time { customerP.select(
    $"Cust_ID", 
    $"Cust_Desc", 
    listOfferMap(offerListMapP)($"Cust_Desc") as "Offers_Linked")
      .withColumn("Offers_Linked", explode($"Offers_Linked"))
      .join(offerP, col("Offer_ID") === col("Offers_Linked"), "inner")
      .select($"Cust_ID", $"Offer_ID", $"Cust_Desc", $"Cust_Desc_from_Offer")
      .orderBy("Cust_ID", "Offer_ID")
      .count
}

Elapsed time: 12167.463194ms


10443105

# 11. SQL Wildcard vs Scala Contains

In [22]:
// Sleep for 5 seconds before executing the next command
Thread.sleep(5000)

// Uncache the performance testing tables.
customerP.unpersist()
offerP.unpersist()

// Column schema for the customer table.
val custColSchemaW = Array(
    StructField("Cust_ID", IntegerType, true),
    StructField("Cust_Desc", StringType, true))

// Column mapping for the customer table.
// Can have up to 30 lists
val custColMapW = (x: Int) => Row(
    100000 + x,
    table_gen.random_array_to_string(rand, descList, 0, 10))

// Offer schema for the offer table.
val offerColSchemaW = Array(
    StructField("Offer_ID", IntegerType, true),
    StructField("Cust_Desc_from_Offer", StringType, true))

// Offer mapping for the offer table.
// Each offer can only be of one list.
val offerColMapW = (x: Int) => Row(
    1000 + x,
    table_gen.random_array_to_string(rand, descList, 1, 1))

// The customer table with 100,000 customers. 
val customerW = table_gen.random_table(100000, custColSchemaW, custColMapW).cache
// The Offer table with 1000 offers.
val offerW = table_gen.random_table(1000, offerColSchemaW, offerColMapW).cache

custColSchemaW = Array(StructField(Cust_ID,IntegerType,true), StructField(Cust_Desc,StringType,true))
custColMapW = > org.apache.spark.sql.Row = <function1>
offerColSchemaW = Array(StructField(Offer_ID,IntegerType,true), StructField(Cust_Desc_from_Offer,StringType,true))
offerColMapW = > org.apache.spark.sql.Row = <function1>
customerW = [Cust_ID: int, Cust_Desc: string]
offerW = [Offer_ID: int, Cust_Desc_from_Offer: string]


[Offer_ID: int, Cust_Desc_from_Offer: string]

In [23]:
// Sleep for 5 seconds before executing the next command
Thread.sleep(5000)

time {
    customerW.crossJoin(offerW)
    .where(custDescIntersectWildcard($"Cust_Desc", $"Cust_Desc_from_Offer"))
    .select($"Cust_ID", $"Offer_ID", $"Cust_Desc", $"Cust_Desc_from_Offer")
    .orderBy($"Cust_ID", $"Offer_ID")
    .count
}

Elapsed time: 61339.6458ms


28342849

In [24]:
// Sleep for 5 seconds before executing the next command
Thread.sleep(5000)

customerW.createOrReplaceTempView("customer")
offerW.createOrReplaceTempView("offer")

val wildcardSQL = """
    SELECT
        C.Cust_ID,
        C.Cust_Desc,
        O.Offer_ID,
        O.Cust_Desc_from_Offer
    FROM
        customer C
    CROSS JOIN
        offer O
    WHERE
        concat(',', upper(nvl(C.Cust_Desc, '')), ',') LIKE
            concat('%,', upper(nvl(O.Cust_Desc_from_Offer, '')), ',%') OR
        C.Cust_Desc = ''
    ORDER BY
        C.Cust_ID,
        O.Offer_ID"""

time {
    spark.sql(wildcardSQL).count
}

Elapsed time: 214425.422902ms


wildcardSQL = 


"
    SELECT
        C.Cust_ID,
        C.Cust_Desc,
        O.Offer_ID,
        O.Cust_Desc_from_Offer
    FROM
        customer C
    CROSS JOIN
        offer O
    WHERE
        concat(',', upper(nvl(C.Cust_Desc, '')), ',') LIKE
            concat('%,', upper(nvl(O.Cust_Desc_from_Offer, '')), ',%') OR
        C.Cust_Desc = ''
    ORDER BY
        C.Cust_ID,
        O.Offer_ID"


28342849

# 12. Conclusion

In the first part of our performance analysis, we can see that `MapType Filter` has the best performance in the filtering operation of interest. However, it should be noted that it is possible for `MapType Filter` to cause `OutOfMemoryError` against large data, which should be carefully assessed.

In the second part of the analysis, we compared filtering performance between `SQL wildcards filter` and `Scala Array contains`. The result shows that `Scala Array contains` is much faster than its counter part.