Skip to content

Commit

Permalink
--TPCH Chnages
Browse files Browse the repository at this point in the history
 1. Memsql column tables PART and PARTSUPP will have same column as shard key as well as Columnstore key to get benefit of indexing in columnstore
 2. PART, PARTSUPP, CUSTOMER tables will act as column store data.
 3. Created extra tables called LINEITEM_PART and ORDER_CUST where LINEITEM is colocated with PART and ORDER is colocated with CUSTOMER. This is done as an option to column table indexing. This is just a workaround
 4. Earlier date handling was not efficient for snappy queries. As of now provided crude way to handle it efficiently.
 5. To avoid costly joins on tables, ordering of the tables in FROM clause is changed for multiple queries
  • Loading branch information
Kishor Bachhav committed Jul 26, 2016
1 parent 0daea47 commit 6153979
Show file tree
Hide file tree
Showing 12 changed files with 2,248 additions and 2,150 deletions.
Expand Up @@ -23,7 +23,8 @@ object TPCHColumnPartitionedTable {
"P_CONTAINER VARCHAR(10) NOT NULL,"+
"P_RETAILPRICE DECIMAL(15,2) NOT NULL,"+
"P_COMMENT VARCHAR(23) NOT NULL," +
"KEY (P_PARTKEY) USING CLUSTERED COLUMNSTORE)"
"KEY (P_PARTKEY) USING CLUSTERED COLUMNSTORE,"+
"SHARD KEY (P_PARTKEY))"
)
println("Created Table PART")
}
Expand All @@ -36,7 +37,8 @@ object TPCHColumnPartitionedTable {
"PS_AVAILQTY INTEGER NOT NULL," +
"PS_SUPPLYCOST DECIMAL(15,2) NOT NULL," +
"PS_COMMENT VARCHAR(199) NOT NULL," +
"KEY (PS_PARTKEY) USING CLUSTERED COLUMNSTORE)"
"KEY (PS_PARTKEY) USING CLUSTERED COLUMNSTORE,"+
"SHARD KEY (PS_PARTKEY))"
// stmt.execute("CREATE TABLE PARTSUPP ( " +
// "PS_PARTKEY INTEGER NOT NULL," +
// "PS_SUPPKEY INTEGER NOT NULL," +
Expand Down
@@ -0,0 +1,80 @@
package io.snappydata.benchmark.memsql

import java.io.{PrintStream, FileOutputStream}
import java.sql.DriverManager
import java.util.Date

/**
* Created by kishor on 21/7/16.
*/
object ConcurrentMemsql {



def main(args: Array[String]): Unit = {

val host = args(0)
val port = 3306
val dbName = "TPCH"
val user = "root"
val password = ""

val readerThread = new Thread(new Runnable {
def run() {
Class.forName("com.mysql.jdbc.Driver")
val dbAddress = "jdbc:mysql://" + host + ":" + port + "/"
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement
stmt.execute("USE " + dbName)
val avgFileStream = new FileOutputStream(new java.io.File(s"reader.out"))
val avgPrintStream = new PrintStream(avgFileStream)
for (i <- 1 to 100000) {

var starttime = System.nanoTime()
// val rs = stmt.executeQuery("select count(*) as counter from PARTSUPP where ps_suppkey = 18692 and Ps_partkey = 7663535; ")
val rs = stmt.executeQuery("select PS_AVAILQTY as counter from PARTSUPP where ps_suppkey = 18692 and PS_partkeY = 653535")
var count = 0
while (rs.next()) {
count = rs.getInt("counter")
//just iterating over result
//count+=1
}
var timetaken = (System.nanoTime() - starttime)/1000

avgPrintStream.println(s"Total time taken $timetaken results : $count ${new Date()} ")

}
avgPrintStream.close()
}
}).start()

val writerThread = new Thread(new Runnable {
def run() {
Class.forName("com.mysql.jdbc.Driver")
val dbAddress = "jdbc:mysql://" + host + ":" + port + "/"
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement
stmt.execute("USE " + dbName)
val avgFileStream = new FileOutputStream(new java.io.File(s"writer.out"))
val avgPrintStream = new PrintStream(avgFileStream)
var startCounter = 7653535
avgPrintStream.println(s"insertion started ${new Date()}")
for (i <- 1 to 100000) {
startCounter+=1
try {
var starttime = System.nanoTime()
// val rs = stmt.execute(s"insert into PARTSUPP values ($startCounter, 18692 , 2, 4.11, 'aa') ")
val rs = stmt.execute(s"update PARTSUPP set PS_AVAILQTY = PS_AVAILQTY +1")
} catch {
case e => avgPrintStream.println(e)
}
}

avgPrintStream.println(s"insertion ended ${new Date()}")
avgPrintStream.close()

}

}).start()
}
}
Expand Up @@ -76,7 +76,7 @@ object TPCH_Memsql {
}
}else{
var totalTimeForLast5Iterations:Long = 0
for (i <- 1 to 3) {
for (i <- 1 to 4) {
val startTime = System.currentTimeMillis()
rs = queryExecution(queryNumber, stmt)
//rs = stmt.executeQuery(query)
Expand All @@ -86,7 +86,7 @@ object TPCH_Memsql {
val endTime = System.currentTimeMillis()
val iterationTime = endTime - startTime
queryPrintStream.println(s"$iterationTime")
if (i > 1) {
if (i > 2) {
totalTimeForLast5Iterations += iterationTime
}
if (queryNumber.equals("q13")) {
Expand All @@ -97,7 +97,7 @@ object TPCH_Memsql {
}
}
queryPrintStream.println(s"${totalTimeForLast5Iterations / 2}")
avgPrintStream.println(s"$queryNumber,${totalTimeForLast5Iterations / 2}")
avgPrintStream.println(s"$queryNumber,${totalTimeForLast5Iterations /2}")
}
println(s"Finished executing $queryNumber")

Expand Down
Expand Up @@ -11,7 +11,7 @@ object TPCH_Memsql_Query {

val host = args(0)
val queries:Array[String] = args(1).split(",")
val port = 3306
val port = 3307
val dbName = "TPCH"
val user = "root"
val password = ""
Expand All @@ -21,7 +21,7 @@ object TPCH_Memsql_Query {
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement

val isResultCollection = false
val isResultCollection =false

stmt.execute("USE " + dbName)

Expand Down
Expand Up @@ -9,7 +9,7 @@ object TPCH_Memsql_Tables {
def main(args: Array[String]) {

val host = args(0)
val port = 3306
val port = 3307
val dbName = "TPCH"
val user = "root"
val password = ""
Expand All @@ -29,17 +29,17 @@ object TPCH_Memsql_Tables {

TPCHReplicatedTable.createSupplierTable_Memsql(stmt)

TPCHRowPartitionedTable.createPartTable_Memsql(stmt)

TPCHRowPartitionedTable.createPartSuppTable_Memsql(stmt)

TPCHRowPartitionedTable.createCustomerTable_Memsql(stmt)

// TPCHColumnPartitionedTable.createPartTable_Memsql(stmt)
// TPCHRowPartitionedTable.createPartTable_Memsql(stmt)
//
// TPCHColumnPartitionedTable.createPartSuppTable_Memsql(stmt)
// TPCHRowPartitionedTable.createPartSuppTable_Memsql(stmt)
//
// TPCHColumnPartitionedTable.createCustomerTable_Memsql(stmt)
// TPCHRowPartitionedTable.createCustomerTable_Memsql(stmt)
//
TPCHColumnPartitionedTable.createPartTable_Memsql(stmt)

TPCHColumnPartitionedTable.createPartSuppTable_Memsql(stmt)

TPCHColumnPartitionedTable.createCustomerTable_Memsql(stmt)

TPCHColumnPartitionedTable.createOrderTable_Memsql(stmt)

Expand All @@ -55,4 +55,4 @@ object TPCH_Memsql_Tables {


}
}
}

0 comments on commit 6153979

Please sign in to comment.