# Initialization
The initialization is performed in 3 steps:
* Import the Spark and the PostgreSQL driver libraries
* Import the TDM library
* Setup of the database parameters (to be modified according to your installation)

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.4`
import $ivy.`org.postgresql:postgresql:42.2.5`

In [None]:
val path = java.nio.file.FileSystems.getDefault().getPath(new java.io.File(".").getCanonicalPath + "/TDM-assembly-0.2.0.jar")
val x = ammonite.ops.Path(path)
interp.load.cp(x)

In [None]:
val user: String = "user"
val password: String = "password"
val url: String = "jdbc:postgresql://localhost:5432/db"

val tweetTable: String = "tweet"
val hashtagTable: String = "tweet_hashtag"

# Import data
Two anonymized CSV files are provided to reproduce the experiment. This step consists in creating the tables and inserting the data in your database.

In [None]:
import java.sql._
import java.util.Properties

Class.forName("org.postgresql.Driver")
val props: Properties = new Properties()
props.setProperty("user", user)
props.setProperty("password", password)

val connection: Connection = DriverManager.getConnection(url, props)

## Create tables

In [None]:
val createTableTweet: String = s"""
    CREATE TABLE ${tweetTable} (
        id TEXT PRIMARY KEY,
        from_user_id TEXT,
        time BIGINT
    )
"""

val createTableHashtag: String = s"""
    CREATE TABLE ${hashtagTable} (
        tweet_id TEXT,
        hashtag TEXT,
        PRIMARY KEY (tweet_id, hashtag)
    )
"""

val stmt: Statement = connection.createStatement()
stmt.execute(createTableTweet)
stmt.execute(createTableHashtag)
stmt.close()

## Import CSV

In [None]:
import java.io.FileReader
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection

val cm: CopyManager = new CopyManager(connection.asInstanceOf[BaseConnection])
cm.copyIn(s"COPY ${tweetTable} FROM STDIN", new FileReader("tweets_part1.csv"))
cm.copyIn(s"COPY ${tweetTable} FROM STDIN", new FileReader("tweets_part2.csv"))
cm.copyIn(s"COPY ${tweetTable} FROM STDIN", new FileReader("tweets_part3.csv"))
cm.copyIn(s"COPY ${hashtagTable} FROM STDIN", new FileReader("hashtags_part1.csv"))
cm.copyIn(s"COPY ${hashtagTable} FROM STDIN", new FileReader("hashtags_part2.csv"))

# Experiment
After having created the tables and having filled them, we carry out two experiments, each run with Spark and TDM: 
* The first one to mesure the global execution time by forcing the computation of operators at each step,
* The second one to mesure the global execution time by forcing the computation of operators only at the last step.

Before launching the experiments for different volumes of data, it is necessary to define the general parameters: 
* Dimensions of the tensors,
* Queries to be run against the database,
* SparkSession initialization.

Starting from 2 tensors:
* U of order 1 that contains user id as dimension and the number of published tweets by each user as tensor's values,
* UHT of order 3 that contains user id, hashtag and time as dimensions, and the number of times a user has used a hashtag per time slice (1 hour).

We apply 2 operators:
* A selection on U to keep users who have published at least 100 tweets,
* A natural join between UHT and U to keep only the most active users.

In the following experiements, we vary the size of U from 0 to 10^6 by step of 100 000, and we repeat each execution 5 times to produce an average time.

## Initialisation

In [None]:
import tdm._
import tdm.core._
import java.util.Properties
import org.apache.spark.sql._

val spark = {
  SparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

val props: Properties = new Properties()
props.setProperty("user", user)
props.setProperty("password", password)
props.setProperty("url", url)

object User extends TensorDimension[String]
object Hashtag extends TensorDimension[String]
object Time extends TensorDimension[Long]
object Retweet extends TensorDimension[String]

val queryUser = s"""
  SELECT DISTINCT from_user_id AS user, COUNT(*)::INTEGER AS freq
    FROM ${tweetTable} t 
    GROUP BY from_user_id
  """

val queryUserHashtagTime = s"""
  SELECT t.from_user_id AS u, ht.hashtag AS ht, FLOOR(t.time / (3600)) AS t, COUNT(*)::INTEGER AS freq
    FROM ${tweetTable} t INNER JOIN
        ${hashtagTable} ht ON t.id = ht.tweet_id
    GROUP BY u, ht, t
    HAVING COUNT(*) > 5
  """

## Force compute at each step

### TDM

In [None]:
var timeTDMForceCompute = Seq[Int]()

for (i <- 0 to 1000000 by 100000) {
    val nbIterations = 5
    var endTime = 0
    println(i)
    for (j <- 0 until nbIterations) {
        val startTime = System.currentTimeMillis()

        val tensorUserAll = TensorBuilder[Int](props)
            .addDimension(User, "user")
            .build(queryUser + " LIMIT " + i, "freq")

        val tensorUserHashtagTime = TensorBuilder[Int](props)
            .addDimension(User, "u")
            .addDimension(Hashtag, "ht")
            .addDimension(Time, "t")
            .build(queryUserHashtagTime, "freq")
        tensorUserHashtagTime.count()

        val tensorUser = tensorUserAll.selection(_ > 100)
        tensorUser.count()

        val tensorFinal = tensorUserHashtagTime.naturalJoin(tensorUser)
        tensorFinal.count()

        endTime += (System.currentTimeMillis() - startTime).toInt

        println("Execution time: " + (endTime / 1000) + "s")
    }
    timeTDMForceCompute = timeTDMForceCompute :+ (endTime / nbIterations).toInt 
}

### Spark

In [None]:
import org.apache.spark.sql.functions.col

val jdbcReader = spark.read.format("jdbc")
props.forEach((key, value) => jdbcReader.option(key.toString, value.toString))

var timeSparkForceCompute = Seq[Int]()

for (i <- 0 to 1000000 by 100000) {
    val nbIterations = 5
    var endTime = 0
    println(i)
    for (j <- 0 until nbIterations) {
        val startTime = System.currentTimeMillis()

        val dfUserAll = jdbcReader.option("query", queryUser + " LIMIT " + i).load() // user - freq
        dfUserAll.count()

        val dfUserHashtagTime = jdbcReader.option("query", queryUserHashtagTime).load() // u - ht - t - freq
        dfUserHashtagTime.count()

        val index = dfUserAll.columns.indexOf("freq")
        val dfUser = dfUserAll.filter(r => r.get(index).asInstanceOf[Int] > 100)
        dfUser.count()

        val dfFinal = dfUserHashtagTime.join(dfUser.drop("freq"), col("u") === col("user"))
        dfFinal.count()

        endTime += (System.currentTimeMillis() - startTime).toInt

        println("Execution time: " + (endTime / 1000) + "s")
    }
    
    timeSparkForceCompute = timeSparkForceCompute :+ (endTime / nbIterations).toInt 
}

## Force compute only at the last step

### TDM

In [None]:
var timeTDM = Seq[Int]()

for (i <- 0 to 1000000 by 100000) {
    println(i)
    val nbIterations = 5
    var endTime = 0
    for (j <- 0 until nbIterations) {
        val startTime = System.currentTimeMillis()

        val tensorUserAll = TensorBuilder[Int](props)
            .addDimension(User, "user")
            .build(queryUser + " LIMIT " + i, "freq")

        val tensorUserHashtagTime = TensorBuilder[Int](props)
            .addDimension(User, "u")
            .addDimension(Hashtag, "ht")
            .addDimension(Time, "t")
            .build(queryUserHashtagTime, "freq")

        val tensorUser = tensorUserAll.selection(_ > 100)

        val tensorFinal = tensorUserHashtagTime.naturalJoin(tensorUser)
        tensorFinal.count()

        endTime += (System.currentTimeMillis() - startTime).toInt

        println("Execution time: " + (endTime / 1000) + "s")
    }
    
    timeTDM = timeTDM :+ (endTime / nbIterations).toInt 
}

### Spark

In [None]:
import org.apache.spark.sql.functions.col

val jdbcReader = spark.read.format("jdbc")
props.forEach((key, value) => jdbcReader.option(key.toString, value.toString))

var timeSpark = Seq[Int]()

for (i <- 0 to 1000000 by 100000) {
    println(i)
    val nbIterations = 5
    var endTime = 0
    for (j <- 0 until nbIterations) {
        val startTime = System.currentTimeMillis()

        val dfUserAll = jdbcReader.option("query", queryUser + " LIMIT " + i).load() // user - freq

        val dfUserHashtagTime = jdbcReader.option("query", queryUserHashtagTime).load() // u - ht - t - freq

        val index = dfUserAll.columns.indexOf("freq")
        val dfUser = dfUserAll.filter(r => r.get(index).asInstanceOf[Int] > 100)

        val dfFinal = dfUserHashtagTime.join(dfUser.drop("freq"), col("u") === col("user"))
        dfFinal.count()

        endTime += (System.currentTimeMillis() - startTime).toInt

        println("Execution time: " + (endTime / 1000) + "s")
    }
    
    timeSpark = timeSpark :+ (endTime / nbIterations).toInt 
}

# Results
For the 4 cases (experiments defined in section 3), we summarize the results obtained with curves showing the evolution of the execution time following the size of the tensor U.

In [None]:
import $ivy.`org.plotly-scala::plotly-almond:0.7.2`

import plotly._
import plotly.element._
import plotly.layout._
import plotly.Almond._

In [None]:
val x = 0 to 1000000 by 100000

val plot = Seq(
    Scatter(
        x, timeTDM.map(_ / 1000), name = "Execution with TDM"
    ),
    Scatter(
        x, timeSpark.map(_ / 1000), name = "Execution with Spark"
    ),
    Scatter(
        x, timeTDMForceCompute.map(_ / 1000), name = "Forced execution with TDM"
    ),
    Scatter(
        x, timeSparkForceCompute.map(_ / 1000), name = "Forced execution with Spark"
    )
)

plot.plot(title = "Execution time", 
          xaxis = Axis(title = "Size of Tensor U"),
          yaxis = Axis(title = "Time (s)", range = (0.0, 100.0)))