In [None]:
%%configure -f 
{ "numExecutors":4, "executorMemory":"1G", "executorCores":1, "driverMemory":"1G", "driverCores":1 }

In [None]:
case class Item(category: String, name: String, price: Double)

In [None]:
import scala.collection.mutable.ListBuffer

val categoryListBuffer: ListBuffer[String] = new ListBuffer

for (i <- 1 to 5) {
    
    categoryListBuffer += java.util.UUID.randomUUID.toString
}

val randomGenerator = scala.util.Random

val itemListBuffer: ListBuffer[Item] = new ListBuffer

for (i <- 1 to 25) {

    itemListBuffer += new Item(categoryListBuffer(randomGenerator.nextInt(5)), f"Item$i", randomGenerator.nextDouble() * 100)
}

In [None]:
val items = sqlContext.createDataFrame(itemListBuffer.toList)

items.take(5)

## <font color='red'>What not to do - Avoid using Collect, Loop and Broadcast</font> 

### <font color='red'>Collect</font> is a potential bottleneck at large data size

In [None]:
val uniqueCategoriesList = items.select("category").distinct.collect

In [None]:
import scala.collection.mutable.Map

var indexedUniqueCategoriesMap : Map[String, Long] = Map[String, Long]()

for (i <- 0 to uniqueCategoriesList.length - 1) {
    
    indexedUniqueCategoriesMap += (uniqueCategoriesList(i).get(0).toString -> i)
}

### <font color='red'>Broadcast</font> is a potential bottleneck at large data size

In [None]:
sc.broadcast(indexedUniqueCategoriesMap)

In [None]:
import org.apache.spark.sql.functions._

val lookupIndex: (String => Long) = (categoryName: String) => {

    indexedUniqueCategoriesMap.get(categoryName).getOrElse(-1)
}

val lookupIndexFunction = udf(lookupIndex)

In [None]:
var indexedItems = items.withColumn("index", lookupIndexFunction(items("category")))

indexedItems.take(5)

## <font color='green'>What to do - Maintain the data always as distributed DataFrame</font>

In [None]:
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}

val uniqueCategories = items.select("category").distinct.withColumnRenamed("category", "uniquecategory")

val indexedUniqueCategories = sqlContext.createDataFrame(uniqueCategories.rdd.zipWithIndex().map(
    r => Row.fromSeq(Seq(r._2) ++ r._1.toSeq)), StructType(
          Array(StructField("index", LongType, false)) ++ uniqueCategories.schema.fields))

indexedUniqueCategories.take(5)

### Method 1 - Using JOIN of two DataFrames

In [None]:
indexedItems = items.join(indexedUniqueCategories, items("category")
                                     === indexedUniqueCategories("uniquecategory")).drop("uniquecategory")

indexedItems.take(5)

### Method 2 - Using DataFrame as lookup table in UDF  

In [None]:
val getIndex: (String => Long) = (categoryName: String) => {

    indexedUniqueCategories.filter(indexedUniqueCategories("uniquecategory")
                            === categoryName).select("index").first().get(0).asInstanceOf[Long]
}

val getIndexFunction = udf(getIndex)

In [None]:
indexedItems = items.withColumn("index", getIndexFunction(items("category")))

indexedItems.take(5)

In [None]:
val reorderedIndexedItems = indexedItems.select("index", "category", "name", "price")

reorderedIndexedItems.take(5)