-
Notifications
You must be signed in to change notification settings - Fork 0
/
Spark.scala
193 lines (149 loc) · 5.32 KB
/
Spark.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package demo.spark
import org.apache.spark
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.rdd.RDD
// intentionally verbose so it reads more easily
// shouldn't actually be all in one file either
// ===========================================================================
object Inputs {
trait HasGene { val gene: String }
trait HasSample { val sample: String }
sealed trait F
extends HasGene
with HasSample
// ---------------------------------------------------------------------------
case class F1(
gene: String,
sample: String,
other1: String)
extends F
object F1 {
def apply(line: String): F1 = { // factory of F1s
val it = line.split("\t").iterator
F1(
gene = it.next, // TODO: should add checks
sample = it.next,
other1 = it.next)
}
}
// ---------------------------------------------------------------------------
case class F2(
gene: String,
sample: String,
other21: String,
other22: String)
extends F
object F2 {
def apply(line: String): F2 = { // factory of F2s
val it = line.split("\t").iterator
F2(
gene = it.next,
sample = it.next,
other21 = it.next,
other22 = it.next)
}
}
}
// ===========================================================================
object Outputs {
import Inputs._
// ---------------------------------------------------------------------------
case class GeneCentric(
gene: String,
samples: Iterable[SampleCentric])
// ---------------------------------------------------------------------------
case class SampleCentric(
sample: String,
extras: Iterable[Extra])
// ---------------------------------------------------------------------------
sealed trait Extra
object Extra {
case class Extra1(
other1: String)
extends Extra
case class Extra2(
other21: String,
other22: String)
extends Extra
// factory of "extras" (either as Extra1 or Extra2,
// based on the type of f we get)
def apply(f: F): Extra =
// this pattern matching is safe because F is sealed
// (compiler will warn if we forget a case)
// pattern matching is one of the most powerful scala constructs,
// see http://alvinalexander.com/scala/using-match-expression-like-switch-statement
f match {
case F1(_, _, other1) => Extra1(other1)
case F2(_, _, other21, other22) => Extra2(other21, other22)
}
}
}
// ===========================================================================
object Demo extends App { // i.e. main ("App" trait)
import Inputs._
import Outputs._
import Outputs.Extra._
// ---------------------------------------------------------------------------
// These are simply type aliases used for illustration purposes here
type GenericSeq[A] = RDD[A]
type GenericIterable[A] = RDD[A]
val sc =
new spark.SparkContext(
new spark.SparkConf()
.setAppName("demo")
.setMaster("local"))
// ---------------------------------------------------------------------------
// read lines and transform each into a F1/F2
// "apply()" is an implicit factory that case classes all have
val f1s: GenericSeq[F] = readIn(args(0)).map(F1.apply)
val f2s: GenericSeq[F] = readIn(args(1)).map(F2.apply)
// ---------------------------------------------------------------------------
val genes: GenericSeq[(String /* genes */, Map[String /* sample */, Iterable[F]])] =
f1s
// combine both files
.union(f2s)
// group by "gene" since they both are guaranteed
// to have this property (they transitively extend HasGene via F)
.groupBy(f => f.gene)
// ignore key for now and focus on values (the groupings)
.mapValues(geneGroup =>
geneGroup
// within each such grouping,
// do the same thing but with "sample"
.groupBy(f => f.sample))
//---------------------------------------------------------------------------
val geneCentrics: GenericIterable[GeneCentric] =
genes
.map { case (genes, samples) => genes ->
samples
.mapValues(f =>
// lastly extract last bits of interest
f.map(Extra.apply))
// we can now build the sample-centric object
// (does not capture the parent gene, though it could)
// note that this uses a scala trick to be able to pass
// a tuple to the primary constructor
.map((SampleCentric.apply _).tupled) }
// we can now build the gene-centric object
.map((GeneCentric.apply _).tupled)
// ---------------------------------------------------------------------------
// write all as giant json array
val fw = new java.io.FileWriter("/tmp/demo.json")
fw.write( // TODO: a scalable solution should stream instead
geneCentrics
.map(geneCentric =>
net.liftweb.json.Serialization.writePretty
(geneCentric)
(net.liftweb.json.DefaultFormats))
.collect()
.mkString("[", ",", "]"))
fw.close()
println("done.")
// ===========================================================================
def readIn(filePath: String): GenericSeq[String] =
sc
.textFile(filePath)
// not quite as clean as the Scala version
// but I didn't want to complicate things too much here
.filter(!_.startsWith("gene\t"))
}