-
Notifications
You must be signed in to change notification settings - Fork 431
/
SparkSQL8-script.scala
134 lines (110 loc) · 4.56 KB
/
SparkSQL8-script.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Adapted from SparkSQL8, but written as a script for easy use with the
// spark-shell command.
import util.Verse
import org.apache.spark.sql.DataFrame
// For HDFS:
// val inputRoot = "hdfs://my_name_node_server:8020"
val inputRoot = "."
val inputPath = s"$inputRoot/data/kjvdat.txt"
// We already have sqlContext from the Spark Shell and SBT console.
// Regex to match the fields separated by "|".
// Also strips the trailing "~" in the KJV file.
val lineRE = """^\s*([^|]+)\s*\|\s*([\d]+)\s*\|\s*([\d]+)\s*\|\s*(.*)~?\s*$""".r
// Use flatMap to effectively remove bad lines.
val versesRDD = sc.textFile(inputPath).flatMap {
case lineRE(book, chapter, verse, text) =>
Seq(Verse(book, chapter.toInt, verse.toInt, text))
case line =>
Console.err.println(s"Unexpected line: $line")
Nil // or use Seq.empty[Verse]. It will be eliminated by flattening.
}
// Create a DataFrame and create as a temporary "view".
val verses = spark.createDataFrame(versesRDD)
verses.createOrReplaceTempView("kjv_bible")
verses.cache()
// print the 1st 20 lines. Pass an integer argument to show a different number
// of lines:
verses.show()
verses.show(10)
verses.show(10, truncate = false) // wider output
val godVerses = spark.sql("SELECT * FROM kjv_bible WHERE text LIKE '%God%'")
println("Number of verses that mention God: "+godVerses.count())
godVerses.show(truncate = false)
println("The query plan:")
godVerses.queryExecution // Compare with godVerses.explain(true)
// Use the DataFrame API:
val godVersesDF = verses.filter(verses("text").contains("God"))
println("The query plan:")
godVersesDF.queryExecution
println("Number of verses that mention God: "+godVersesDF.count())
godVersesDF.show()
// Use GroupBy and column aliasing.
val counts = spark.sql("SELECT book, COUNT(*) as count FROM kjv_bible GROUP BY book")
counts.show(100) // print the 1st 100 lines, but there are only 66 books/records...
// Exercise: Sort the output by the book names. Sort by the counts.
// Use "coalesce" when you have too many small partitions. The integer
// passed to "coalesce" is the number of output partitions (1 in this case).
val counts1 = counts.coalesce(1)
val nPartitions = counts.rdd.partitions.size
val nPartitions1 = counts1.rdd.partitions.size
println(s"counts.count (can take a while, # partitions = $nPartitions):")
println(s"result: ${counts.count}")
println(s"counts1.count (usually faster, # partitions = $nPartitions1):")
println(s"result: ${counts1.count}")
// DataFrame version:
val countsDF = verses.groupBy("book").count()
countsDF.show(100)
countsDF.count
// Exercise: Sort the last output by the words, by counts. How much overhead does this add?
// Aggregations, a la data warehousing:
// NOTE: the following import is here so this script works in Spark Shell, but
// it's already done in the setup for the SBT Console:
import org.apache.spark.sql.functions._ // for min, max, etc.
verses.groupBy("book").agg(
max(verses("chapter")),
max(verses("verse")),
count(verses("*"))
).sort($"count(1)".desc, $"book").show(100)
// Alternative way of referencing columns in "verses":
verses.groupBy("book").agg(
max($"chapter"),
max($"verse"),
count($"*")
).sort($"count(1)".desc, $"book").show(100)
// With just a single column, cube and rollup make less sense,
// but in a bigger dataset, you could do cubes and rollups, too.
verses.cube("book").agg(
max($"chapter"),
max($"verse"),
count($"*")
).sort($"count(1)".desc, $"book").show(100)
verses.rollup("book").agg(
max($"chapter"),
max($"verse"),
count($"*")
).sort($"count(1)".desc, $"book").show(100)
// Map a field to a method to apply to it, but limited to at most
// one method per field.
verses.rollup("book").agg(Map(
"chapter" -> "max",
"verse" -> "max",
"*" -> "count"
)).sort($"count(1)".desc, $"book").show(100)
// Exercise: Try a JOIN with the "abbrevs_to_names" data to convert the book
// abbreviations to full titles, using either a SQL query or the DataFrame API.
// (See solns/SparkSQL-..-script.scala)
// Here is some setup code to load and parse the dataset:
val abbrevsNamesPath = s"$inputRoot/data/abbrevs-to-names.tsv"
case class Abbrev(abbrev: String, name: String)
val abbrevNamesRDD = sc.textFile(abbrevsNamesPath).flatMap { line =>
val ary=line.split("\t")
if (ary.length != 2) {
Console.err.println(s"Unexpected line: $line")
Nil // or use Seq.empty[Abbrev]. It will be eliminated by flattening.
} else {
Seq(Abbrev(ary(0), ary(1)))
}
}
val abbrevNames = spark.createDataFrame(abbrevNamesRDD)
abbrevNames.createOrReplaceTempView("abbrevs_to_names")
// Exercise: Play with other methods in the DataFrame DSL.