# Spark Partitioning and Optimization (b) - Exercises with Results

## Exercise 4

#### Task 1
##### Load data from the file `creditcard.csv` as a DataFrame called creditcard.

#### Result:

In [None]:
val data_dir = "/FileStore/tables"

In [None]:
val creditcard = spark.read.format("csv")          
  .option("inferSchema", "true")             
  .option("header", "true")                    
  .load(data_dir + "/creditcard.csv")

#### Task 2
##### Print the schema of creditcard.

#### Result:

In [None]:
creditcard.printSchema()

#### Task 3
##### Since our creditcard data doesn’t have a unique identifier for the transaction, let’s generate one. We will assume that each row is unique.
##### We will use monotonicallyIncreasingId method to generate “uniqueID” column and save it as creditcard_withId variable.
##### Run the code:

```
val creditcard_withId = creditcard.withColumn("uniqueID", monotonicallyIncreasingId)
```

#### Result:

In [None]:
val creditcard_withId = creditcard.withColumn("uniqueID", monotonicallyIncreasingId)

#### Task 4
##### Print the schema of the resulting DataFrame.
##### Count the total number of rows in the creditcard_withId DataFrame.
##### Create a PairRDD from the first 10000 rows of creditcard_withId DataFrame so that each unique in the RDD is in the following format: (uniqueID, class). 
##### Name the PairRDD creditcard_select_rdd. Make sure to cast the uniqueID column of type Long to string using toString method.
##### Take the first 20 records and print them out.

#### Result:

In [None]:
creditcard_withId.printSchema()

In [None]:
creditcard_withId.count()

In [None]:
val creditcard_select_rdd = creditcard_withId
                        .limit(10000)
                        .select($"uniqueID", $"Class")
                        .map(row => (row.getLong(0).toString, row.getInt(1)))
                        .rdd

In [None]:
creditcard_select_rdd
.take(20)
.foreach(println)

#### Task 5
##### Create a Map from creditcard_select_rdd, sum up all class tags for each unique ID using reduceByKey (although this action is unneccessary, since we assumed all rows unique, it’s a nice way to practice,  and then convert it to a Map with String type key and Int type value. Save the resulting Map to creditcard_map variable.

#### Result:

In [None]:
val creditcard_map: Map[String, Int] = creditcard_select_rdd
                                       .reduceByKey(_ + _)
                                       .collectAsMap()
                                       .toMap

#### Task 6
##### Using the creditcard_map, look up the first 5000 transactions by their unique ID. Make sure to cast the uniqueID to String type when you perform the look up!
##### Create a broadcast variable from creditcard_map, perform the same look up operation and compare performance (use Spark UI to look at the time it took to perform the task and the memory usage). 

#### Result:

In [None]:
val isfraud_transaction = creditcard_withId
.limit(5000)
.select($"uniqueID")
.map(_.getLong(0).toString)
.rdd
.map(creditcard_map)
.collect

In [None]:
val creditcard_map_br = sc.broadcast(creditcard_map)

In [None]:
val isfraud_account_br = creditcard_withId
.limit(5000)
.select("uniqueID")
.map(_.getLong(0).toString)
.rdd
.map(creditcard_map_br.value)
.collect

#### Task 7
##### Free up the memory by removing the creditcard_map_br variable.

#### Result:

In [None]:
creditcard_map_br.unpersist
creditcard_map_br.destroy

#### Task 8
##### Take creditcard_withId and compute the total amount of all fraudulent transactions.
##### Perform the same action, but using the accumulator instead. 

#### Result:

In [None]:
val credit_fraud = creditcard_withId
.filter($"Class" > 0)
.select($"Amount")
.map(_.getDouble(0))
.reduce(_ + _)

In [None]:
val credit_fraud_acc = sc.doubleAccumulator("TotalFraudAmount")

creditcard_withId
.filter($"Class" > 0)
.select($"Amount")
.map(_.getDouble(0))
.foreach(amt => credit_fraud_acc.add(amt))

#### Task 9
##### Check the current Hadoop configuration.
##### Import it into the fs variable as we did in class and list the directories in FileSystem home directory. Don’t forget to import FileSystem and Path from org.apache.hadoop.fs library.
##### Copy the createDir, deleteDir, getArrayOfSubDir, and getArrayOfFiles functions from the class code and run them.
##### Create tmp_dir_path variable from the Path to tmp directory in Hadoop home as we did in class.
##### Create ex directory within the tmp and encode the path into ex_dir variable.

#### Result:

In [None]:
sc.hadoopConfiguration

In [None]:
import  org.apache.hadoop.fs.{FileSystem, Path}

val home_dir_path = "file:///"

val fs = FileSystem.get(sc.hadoopConfiguration)

fs.listStatus(new Path(home_dir_path))
.foreach(x => println(x.getPath ))

In [None]:
def createDir(dirName: String): Unit = {
    val dir = new Path(dirName)
    if(!fs.exists(dir)){
        fs.mkdirs(dir)
    }else{
        println("Directory already exists! Couldn't make a new one with the same name :(")
    }
}

def deleteDir(dirName: String): Unit = {
    val dir = new Path(dirName)
    if(fs.exists(dir)){
        fs.delete(dir, true)
    }else{
        println("Directory doesn't exist! Couldn't delete it :(")
    }
}

def getArrayOfSubDir(dirName: String): Array[String] = {
    fs.listStatus(new Path(dirName))
      .filter(_.isDirectory)
      .map(_.getPath.toString)
}

def getArrayOfFiles(dirName: String): Array[String] = {
    fs.listStatus(new Path(dirName))
      .filter(!_.isDirectory)
      .map(_.getPath.toString)
}

In [None]:
val tmp_dir_path = home_dir_path + "/tmp"

fs.listStatus(new Path(tmp_dir_path))
.foreach(x => println(x.getPath ))

In [None]:
val ex_dir = tmp_dir_path + "/ex"

createDir(ex_dir)

#### Task 10
##### Repartition `creditcard_withId` in memory by `Class` variable and write to `ex_test1` directory on disk partitioned by `Class`. List directories in the `ex_test1` folder.
##### List all files in the `ex_test1` and the generated nested folders.

#### Result:

In [None]:
creditcard_withId.repartition($"Class")
.write
.partitionBy("Class")
.parquet(ex_dir + "/ex_test1")

fs.listStatus(new Path(ex_dir + "/ex_test1"))
.foreach(x => println(x.getPath))

In [None]:
val test1_dir = getArrayOfSubDir(ex_dir + "/ex_test1")

test1_dir
.foreach(println)

test1_dir
.flatMap(getArrayOfFiles(_))
.foreach(println)

## Exercise 5

#### Task 1
##### Perform several tests as shown in class where you:
- Repartition data in memory
- Then partition files on disk so that several files are created within each Class folder
- Experiment with the number of lines per file (Hint: keep your number in the thousands, otherwise there will be too many to print out in console)

##### Compare your results and discuss within your group.

#### Result:

In [None]:
creditcard_withId.repartition(8)
.write
.partitionBy("Class")
.parquet(ex_dir + "/test2")

val test2_dir = getArrayOfSubDir(ex_dir + "/test2")

test2_dir
.foreach(println)

test2_dir
.flatMap(getArrayOfFiles(_))
.foreach(println)

In [None]:
creditcard_withId.repartition(4, $"Class", rand)
.write
.partitionBy("Class")
.parquet(ex_dir + "/test3")

val test3_dir = getArrayOfSubDir(ex_dir + "/test3")

test3_dir
.foreach(println)

test3_dir
.flatMap(getArrayOfFiles(_))
.foreach(println)

In [None]:
creditcard_withId.repartition($"Class")
.write
.option("maxRecordsPerFile", 5000) //<- set the maximum number of records per file
.partitionBy("Class")
.parquet(ex_dir + "/test4")

val test4_dir = getArrayOfSubDir(ex_dir + "/test4")

test4_dir
.foreach(println)

test4_dir
.flatMap(getArrayOfFiles(_))
.foreach(println)

#### Task 2
##### Delete ex_dir and unpersist any data you may have cached.

#### Result: 

In [None]:
deleteDir(ex_dir)

#### Task 3
##### Run the following code as a user-defined function to measure the time it takes Spark to perform a computation (we used it in class).

```
def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)/1000000 + "ms")
    result
}
```

#### Result: 

In [None]:
def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)/1000000 + "ms")
    result
}

#### Task 4
##### Run several combined tests using the techniques you have learned in class on creditcart_withId data. Compare their performance by using the time function and / or reviewing Spark UI job details for each test.
##### Discuss within your group.

#### Result:

In [None]:
time {
    creditcard_withId
    .filter($"Class" < 1)
    .select($"Amount")
    .map(_.getDouble(0))
    .reduce(_ + _)
}

In [None]:
time {
    creditcard_withId
    .filter($"Class" < 1)
    .repartition(8)
    .select($"Amount")
    .map(_.getDouble(0))
    .reduce(_ + _)
}

In [None]:
creditcard_withId.cache()
creditcard_withId.count()

time {
    creditcard_withId
    .filter($"Class" < 1)
    .repartition(8)
    .select($"Amount")
    .map(_.getDouble(0))
    .reduce(_ + _)
}

In [None]:
val total_not_fraud_acc = sc.doubleAccumulator("TotalNotFraudAmount")

time {
    creditcard
    .filter($"Class" < 1)
    .repartition(8)
    .select($"Amount")
    .map(_.getDouble(0))
    .foreach(amt => total_not_fraud_acc.add(amt))
}