Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
183 lines (127 sloc) 4.92 KB
# This is just code I copied from Zeppelin notebook cells in the notebook I used to perform the benchmark,
# the reason I share it like this is that Zeppelin notebooks are hard to share (no parsing on github unlike jupyter notebooks).
# Also this code can easily be replicated with a real application(jar file...), you just have to initialize a spark and a sql context for each task
# but note that I used spark 1.6.2 version
## 1. Complex select statement with UDFs
//val filename = "lp100k.csv"
//val filename = "lp1m.csv"
//val filename = "lp5m.csv"
//val filename = "lp10m.csv"
//val filename = "lpdata.csv"
val prefix = "file:///"
//val prefix = "hdfs://"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter",",")
.option("inferSchema", "true") // Automatically infer data types
.option("mode", "DROPMALFORMED")
.load(prefix + filename)
df.registerTempTable("df")
val start = System.currentTimeMillis;
println("working on it...")
sqlContext.sql("select * from df where Voertuigsoort like '%Personenauto%' and (Merk like 'MERCEDES-BENZ' or Merk like 'BMW' ) and Inrichting like '%hatchback%' ").collect()
val end = System.currentTimeMillis;
val divider : Float = 1000;
val time1 = (end - start)/divider
println("Time elapsed: " + time1)
## 2. Sorting the dataset
//val filename = "lp100k.csv"
//val filename = "lp1m.csv"
//val filename = "lp5m.csv"
//val filename = "lp10m.csv"
//val filename = "lpdata.csv"
val prefix = "file:///"
//val prefix = "hdfs://"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter",",")
.option("inferSchema", "true") // Automatically infer data types
.option("mode", "DROPMALFORMED")
.load(prefix + filename)
df.registerTempTable("df")
val start = System.currentTimeMillis;
println("working on it...")
sqlContext.sql("select * from df order by 'Datum tenaamstelling' ").collect()
val end = System.currentTimeMillis;
val divider : Float = 1000;
val time2 = (end - start)/divider
println("Time elapsed: " + time2 + "s")
## 3. Joining the dataset
//val filename = "lp100k.csv"
//val filename = "lp1m.csv"
//val filename = "lp5m.csv"
//val filename = "lp10m.csv"
//val filename = "lpdata.csv"
val prefix = "file:///"
//val prefix = "hdfs://"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter",",")
.option("inferSchema", "true") // Automatically infer data types
.option("mode", "DROPMALFORMED")
.load(prefix + filename)
df.registerTempTable("df")
val join_df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter",",")
.option("inferSchema", "true") // Automatically infer data types
.option("mode", "DROPMALFORMED")
.load(prefix + "join_data.csv")
join_df.registerTempTable("join_df")
val start = System.currentTimeMillis;
println("working on it...")
sqlContext.sql("select * from df join join_df on df.Inrichting = join_df.type ").collect()
val end = System.currentTimeMillis;
val divider : Float = 1000;
val time3 = (end - start)/divider
println("Time elapsed: " + time3 + "s")
## 4. Self join
//val filename = "lp100k.csv"
//val filename = "lp1m.csv"
//val filename = "lp5m.csv"
//val filename = "lp10m.csv"
//val filename = "lpdata.csv"
val prefix = "file:///"
//val prefix = "hdfs://"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter",",")
.option("inferSchema", "true") // Automatically infer data types
.option("mode", "DROPMALFORMED")
.load(prefix + filename)
df.registerTempTable("df")
val start = System.currentTimeMillis;
println("working on it...")
df.join(df, Seq("Kenteken")).collect()
val end = System.currentTimeMillis;
val divider : Float = 1000;
val time4 = (end - start)/divider
println("Time elapsed: " + time4 + "s")
## 5. Grouping the data
//val filename = "lp100k.csv"
//val filename = "lp1m.csv"
//val filename = "lp5m.csv"
//val filename = "lp10m.csv"
//val filename = "lpdata.csv"
val prefix = "file:///"
//val prefix = "hdfs://"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter",",")
.option("inferSchema", "true") // Automatically infer data types
.option("mode", "DROPMALFORMED")
.load(prefix + filename)
df.registerTempTable("df")
val start = System.currentTimeMillis;
println("working on it...")
sqlContext.sql("select count(*) from df group by Merk").collect()
val end = System.currentTimeMillis;
val divider : Float = 1000;
val time5 = (end - start)/divider
println("Time elapsed: " + time5 + "s")