-
Notifications
You must be signed in to change notification settings - Fork 3
/
BuildVocabulary.kt
88 lines (81 loc) · 3.34 KB
/
BuildVocabulary.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package cc.datafabric
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.Flatten
import org.apache.beam.sdk.transforms.GroupByKey
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.util.GcsUtil
import org.apache.beam.sdk.util.gcsfs.GcsPath
import org.apache.beam.sdk.values.KV
import org.tartarus.Stemmer
import java.nio.channels.Channels
import java.util.Scanner
val gcsFactory = GcsUtil.GcsUtilFactory()
/**
* Count unique tokens (word stems). To compute a word stem Porter stemmer is used @see Stemmer
*
* NOTE: No parts of a sentences are filtered: articles, initials and so on will be left in a text
*
*/
fun main(args: Array<String>) {
val options = DataFlowDefaultOptionsBuilder.build(args)
options.jobName = "build-vocabulary"
options.templateLocation =
val p = Pipeline.create(options)
p
.apply(Create.ofProvider(options.getSource(), StringUtf8Coder.of()))
.apply(
ParDo.of(
object : DoFn<String, List<@JvmSuppressWildcards String>>() {
@ProcessElement
fun processElement(c: ProcessContext) {
val out = gcsFactory
.create(c.pipelineOptions)
.expand(GcsPath.fromUri(c.element()))
.map { x -> x.toString() }
c.output(out)
}
})
)
.apply(Flatten.iterables())
.apply(
ParDo
.of(object : DoFn<String, KV<KV<String, String>, String>>() {
val stemmer = Stemmer()
@ProcessElement
fun processElement(c: ProcessContext) {
val path = c.element()
val gcsUtil = gcsFactory.create(c.pipelineOptions)
val byteChannel = gcsUtil.open(GcsPath.fromUri(path))
val inputStream = Channels.newInputStream(byteChannel)
val scanner = Scanner(inputStream).useDelimiter("[^A-Za-z]+")
while (scanner.hasNext()) {
val wordArray = scanner.next().toLowerCase().toCharArray()
stemmer.add(wordArray, wordArray.size)
stemmer.stem()
val word = String(stemmer.resultBuffer, 0, stemmer.resultLength)
c.output(KV.of(KV.of(path.substringAfterLast("/"), word), word))
}
}
})
)
.apply(GroupByKey.create())
.apply(
ParDo.of(object : DoFn<KV<KV<String, String>, Iterable<@JvmSuppressWildcards String>>, String>() {
@ProcessElement
fun processElement(c: ProcessContext) {
val element = c.element()
c.output("${element.key!!.key!!}/${element.key!!.value}/${element.value.toList().size}")
}
}))
.apply(
TextIO
.write()
.to(options.getDestination())
.withSuffix(".txt")
)
p.run()
}