-
Notifications
You must be signed in to change notification settings - Fork 24
/
2-guide.text
1073 lines (872 loc) · 41.5 KB
/
2-guide.text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
Guide {#guide .unnumbered}
=====
In this first chapter, we will cover some of the concepts and technologies that
are used during the course. We will introduce the following topics (in a
different order):
The GDELT project
: a large database of "human society", constructed of mentions of "people,
organizations, locations, themes, counts, images and emotions" around the
planet. As mentioned before, will use the GDELT database to construct a
histogram of the most important themes during a certain timespan.
Apache Spark
: a framework for processing large amounts of data on multiple machines in a
robust way. We will build our application for labs 1 and 2 using Spark.
Amazon Web Services
: or AWS, which provide theoretically unlimited compute infrastructure,
allowing us to process a dataset as large as the entire GDELT database in
lab 2.
Apache Kafka
: a framework for building so-called data pipelines, in which potentially
many producers and consumers process real-time, streaming data. In lab 3,
we will take the application from labs 1 and 2 and modify it to process
data in real-time, using Kafka.
Scala
: a programming language that runs on the Java Virtual Machine (JVM). This is
our (mandatory!) language of choice during the lab assignments. We will use
it to program for both Apache Spark and Apache Kafka.
Docker
: an application that allows the user to package and run software (like Spark
and Kafka and the programs we write for them) in an isolated environment: a
container.
The GDELT project
-----------------
During the lab, we will use the GDELT Global Knowledge Graph version 2.0 (GKG
v2.0). This database is basically a massive table that "connects every person,
organization, location, count, theme, news source, and event across the planet
into a single massive network that captures what's happening around the world,
what its context is and who's involved, and how the world is feeling about it,
every single day."
The GKG is updated every 15 minutes and published as a series of compressed
comma-separated values (CSV) files (in which the columns are actually separated
using tabs). The first segment of the version 2 database was published back in
2015 and at the time of writing (September 2019) it contains 157378 segments
with a total compressed file size of 1.4 terabytes (TiB). Uncompressed, this
comes down to about 4.2 terabytes of raw data.
Read [this article] for a general introduction to the dataset, [this table] for
a quick overview of its columns and what they mean, and [this document] for a
more in depth description.
During the labs, we will use the `AllNames` column, which lists all "proper
names" (people, organizations, countries, etcetera) mentioned. It can be seen
as a summary of many of the other columns.
Docker
------
According to the [Docker Documentation]
> Docker is a platform for developers and sysadmins to develop, deploy, and run
> applications with containers. The use of Linux containers to deploy
> applications is called containerization. Containers are not new, but their
> use for easily deploying applications is. Containerization is increasingly
> popular because containers are:
>
> Flexible
>
> : Even the most complex applications can be containerized.
>
> Lightweight
>
> : Containers leverage and share the host kernel.
>
> Interchangeable
>
> : You can deploy updates and upgrades on-the-fly.
>
> Portable
>
> : You can build locally, deploy to the cloud, and run anywhere.
>
> Scalable
>
> : You can increase and automatically distribute container replicas.
>
> Stackable
>
> : You can stack services vertically and on-the-fly.
For this course, we use Docker primarily to ensure every student is using the
exact same platform for their applications, and to avoid certain
platform-specific issues and peculiarities.
A basic understanding of some [Docker] concepts helps in getting started with
this course. [Part 1: Orientation and setup] of the [Get Started Guide][Part 1:
Orientation and setup] covers the basic [Docker] concepts used in this course.
Before trying the lab assignments and tutorials in the next sections, make sure
you [Install Docker (stable)] and test your installation by running the simple
[Hello World image].
``` {.bash}
docker run hello-world
```
### Setting up Spark in Docker
In order to run Spark in a container, a `Dockerfile` is provided which can be
used to build images for `spark-submit` to run your Spark application,
`spark-shell` to run a Spark interactive shell, and the Spark history server to
view event logs from application runs. You need to build these images before
you get started. The Dockerfiles we provide assume that you run Docker from the
folder at which they are located. Don't move them around! They will stop
working.
To build a docker image from the Dockerfile, we use `docker build`:
``` {.bash}
docker build --target <target> -t <tag> .
```
Here `<target>` selects the target from the Dockerfile, `<tag>` sets the tag
for the resulting image, and the `.` sets the build context to the current
working directory.
We use `docker build` to build the images we need to use Spark and SBT.
- `sbt`[^sbt-image]
``` {.bash}
docker build \
--build-arg BASE_IMAGE_TAG="8" \
--build-arg SBT_VERSION="1.2.8" \
--build-arg SCALA_VERSION="2.11.12" \
-t hseeberger/scala-sbt \
github.com/hseeberger/scala-sbt.git#:debian
```
- `spark-shell`
``` {.bash}
docker build --target spark-shell -t spark-shell .
```
- `spark-submit`
``` {.bash}
docker build --target spark-submit -t spark-submit .
```
- `spark-history-server`
``` {.bash}
docker build --target spark-history-server -t spark-history-server .
```
You can then run the following commands from the Spark application root
(the folder containing the `build.sbt` file). Please make sure to use the
provided template project.
- Run SBT to package or test your application (`sbt <command>`)
``` {.bash}
docker run -it --rm -v "`pwd`":/root hseeberger/scala-sbt sbt
```
- Start a Spark shell (`spark-shell`)
``` {.bash}
docker run -it --rm -v "`pwd`":/io spark-shell
```
- Run your Spark application (`spark-submit`) (fill in the class name of your
application and the name of your project!)
``` {.bash}
docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events \
spark-submit --class <YOUR_CLASSNAME> \
target/scala-2.11/<YOUR_PROJECT_NAME>_2.11-1.0.jar
```
- Spawn the history server to view event logs, accessible at
[localhost:18080](http://localhost:18080)
``` {.bash}
docker run -it --rm -v "`pwd`"/spark-events:/spark-events \
-p 18080:18080 spark-history-server
```
The rest of the manual will not generally mention these Docker commands again,
so know that if we mention e.g. `spark-shell`, you should run the corresponding
`docker run` command listed above. You can create scripts or aliases for your
favorite shell to avoid having to type a lot.
Scala
-----
Apache Spark, our big data framework of choice for this lab, is implemented in
Scala, a compiled language on the JVM that supports a mix between functional
and object-oriented programming. It is compatible with Java libraries. Some
reasons why Spark was written in Scala are:
1. Compiling to the JVM makes the codebase extremely portable and deploying
applications as easy as sending the Java bytecode (typically packaged in a
**J**ava **AR**chive format, or JAR). This simplifies deploying to cloud
provider big data platforms as we don't need specific knowledge of the
operating system, or even the underlying architecture.
2. Compared to Java, Scala has some advantages in supporting more complex
types, type inference, and anonymous functions[^java_lambdas]. Matei
Zaharia, Apache Spark's original author, has said the following about why
Spark was implemented in Scala in a [Reddit AMA]:
> At the time we started, I really wanted a PL that supports a
> language-integrated interface (where people write functions inline, etc),
> because I thought that was the way people would want to program these
> applications after seeing research systems that had it (specifically
> Microsoft's DryadLINQ). However, I also wanted to be on the JVM in order to
> easily interact with the Hadoop filesystem and data formats for that. Scala
> was the only somewhat popular JVM language then that offered this kind of
> functional syntax and was also statically typed (letting us have some control
> over performance), so we chose that. Today there might be an argument to make
> the first version of the API in Java with Java 8, but we also benefitted from
> other aspects of Scala in Spark, like type inference, pattern matching, actor
> libraries, etc.
[^java_lambdas]: Since Java 8, Java also supports anonymous functions, or
lambda expression, but this version wasn't released at the time of Spark's
initial release.
Apache Spark provides interfaces to Scala, R, Java and Python, but we will be
using Scala to program in this lab. An introduction to Scala can be found on
the [Scala language site]. You can have a brief look at it, but you can also
pick up topics as you go through the lab.
Apache Spark
------------
Apache Spark provides a programming model for a resilient distributed
shared memory model. To elaborate on this, Spark allows you to program against
a *unified view* of memory (i.e. RDD or DataFrame), while the processing
happens *distributed* over *multiple nodes/machines/computers/servers* being
able to compensate for _failures of these nodes_.
This allows us to define a computation and scale this over multiple machines
without having to think about communication, distribution of data, and
potential failures of nodes. This advantage comes at a cost: All applications
have to comply with Spark's (restricted) programming model.
The programming model Spark exposes is based around the MapReduce paradigm.
This is an important consideration when you would consider using Spark, does my
problem fit into this paradigm?
Modern Spark exposes two APIs around this programming model:
1. Resilient Distributed Datasets
2. Spark SQL Dataframe/Datasets
In the rest of this section, we will demonstrate a simple application with
implementations using both APIs.
### Resilient Distributed Datasets
![Illustration of RDD abstraction of an RDD with a tuple of characters and
integers as elements.](./images/RDD){#fig:rdd_diagram}
RDDs are the original data abstraction used in Spark. Conceptually one can
think of these as a large, unordered list of Java/Scala/Python objects, let's
call these objects elements. This list of elements is divided in partitions
(which may still contain multiple elements), which can reside on different
machines. One can operate on these elements with a number of operations, which
can be subdivided in wide and narrow dependencies, see
[@tbl:narrow_wide_dependency]. An illustration of the RDD abstraction can be
seen in [@fig:rdd_diagram].
RDDs are immutable, which means that the elements cannot be altered, without
creating a new RDD. Furthermore, the application of transformations (wide or
narrow) is [lazy evaluation], meaning that the actual computation will be
delayed until results are requested (an action in Spark terminology). When
applying transformations, these will form a directed acyclic graph (DAG), that
instructs workers what operations to perform, on which elements to find a
specific result. This can be seen in [@fig:rdd_diagram] as the arrows between
elements.
-------------------------------------------
Narrow Dependency Wide Dependency
-------------------------- ----------------
`map` `coGroup`
`mapValues` `flatMap`
`flatMap` `groupByKey`
`filter` `reduceByKey`
`mapPartitions` `combineByKey`
`mapPartitionsWithIndex` `distinct`
`join` with sorted keys `join`
`intersection`
`repartition`
`coalesce`
`sort`
-------------------------------------------
: List of wide and narrow dependencies for (pair) RDD operations
{#tbl:narrow_wide_dependency}
Now that you have an idea of what the abstraction is about, let's demonstrate
some example code with the Spark shell. _If you want to paste pieces of code
into the spark shell from this guide, it might be useful to copy from the
github version, and use the `:paste` command in the spark shell to paste the
code. Hit `ctrl+D` to stop pasting._
``` {.scala}
$ docker run -it --rm -v "`pwd`":/io spark-shell
19/09/08 14:00:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://af29447c6dcd:4040
Spark context available as 'sc' (master = local[*], app id = local-1567951261349).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark
res2: org.apache.spark.sql.SparkSession =
org.apache.spark.sql.SparkSession@48a32c4f
```
When opening a Spark Shell, by default you get a SparkSession and SparkContext
object. This object contains the configuration of your session, i.e. whether
you are running in local or cluster mode, the name of your application, the
logging level etc.
Going back to our shell, let's first create some sample data that we can
demonstrate the RDD API around. Here we create an infinite list of repeating
characters from 'a' tot 'z'.
```{.scala}
scala> val charsOnce = ('a' to 'z').toStream
charsOnce: scala.collection.immutable.Stream[Char] = Stream(a, ?)
scala> val chars: Stream[Char] = charsOnce #::: chars
chars: Stream[Char] = Stream(a, ?)
```
Now we build a collection with the first 200000 integers, zipped with the
character stream. We display the first 30 results.
```{.scala}
scala> val rdd = sc.parallelize(chars.zip(1 to 200000), numSlices=20)
rdd: org.apache.spark.rdd.RDD[(Char, Int)] =
ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.take(30)
res2: Array[(Char, Int)] = Array((a,1), (b,2), (c,3), (d,4), (e,5), (f,6),
(g,7), (h,8), (i,9), (j,10), (k,11), (l,12), (m,13), (n,14), (o,15), (p,16),
(q,17), (r,18), (s,19), (t,20), (u,21), (v,22), (w,23), (x,24), (y,25), (z,26),
(a,27), (b,28), (c,29), (d,30))
```
Let's dissect what just happened. We created a Scala object that is a list of
tuples of `Char`s and `Int`s in the statement `(chars).zip(1 to 200000)`. With
`sc.parallelize` we are transforming a Scala sequence into an RDD. This allows
us to enter Spark's programming model. With the optional parameter `numSlices`
we indicate in how many partitions we want to subdivide the sequence.
Let's apply some (lazily evaluated) transformations to this RDD.
```{.scala}
scala> val mappedRDD = rdd.map({case (chr, num) => (chr, num+1)})
mappedRDD: org.apache.spark.rdd.RDD[(Char, Int)] =
MapPartitionsRDD[5] at map at <console>:25
```
We apply a `map` to the RDD, applying a function to all the elements in the
RDD. The function we apply pattern matches over the elements as being a tuple
of `(Char, Int)`, and add one to the integer. Scala's syntax can be a bit
foreign, so if this is confusing, spend some time looking at tutorials and
messing around in the Scala interpreter.
You might have noticed that the transformation completed awfully fast. This is
Spark's [lazy evaluation] in action. No computation will be performed until
an action is applied.
```{.scala}
scala> val reducedRDD = rdd.reduceByKey(_ + _)
reducedRDD: org.apache.spark.rdd.RDD[(Char, Int)] =
ShuffledRDD[6] at reduceByKey at <console>:25
```
Now we apply a `reduceByKey` operation, grouping all of the identical keys together and
merging the results with the specified function, in this case the `+` operator.
Now we will perform an action, which will trigger the computation of the
transformations on the data. We will use the collect action, which means to
gather all the results to the master, going out of the Spark programming model,
back to a Scala sequence. How many elements do you expect there to be in this
sequence after the previous transformations?
```{.scala}
scala> reducedRDD.collect
res3: Array[(Char, Int)] = Array((d,769300000), (x,769253844), (e,769307693),
(y,769261536), (z,769269228), (f,769315386), (g,769323079), (h,769330772),
(i,769138464), (j,769146156), (k,769153848), (l,769161540), (m,769169232),
(n,769176924), (o,769184616), (p,769192308), (q,769200000), (r,769207692),
(s,769215384), (t,769223076), (a,769276921), (u,769230768), (b,769284614),
(v,769238460), (w,769246152), (c,769292307))
```
Typically, we don't build the data first, but we actually load it from a
database or file system. Say we have some data in (multiple) files in a
specific format. As an example consider `sensordata.csv` (in the `example`
folder). We can load it as follows
```{.scala}
// sc.textFile can take multiple files as argument!
scala> val raw_data = sc.textFile("sensordata.csv")
raw_data: org.apache.spark.rdd.RDD[String] =
sensordata.csv MapPartitionsRDD[1] at textFile at <console>:24
scala> raw_data.take(10).foreach(println)
COHUTTA,3/10/14:1:01,10.27,1.73,881,1.56,85,1.94
COHUTTA,3/10/14:1:02,9.67,1.731,882,0.52,87,1.79
COHUTTA,3/10/14:1:03,10.47,1.732,882,1.7,92,0.66
COHUTTA,3/10/14:1:05,9.56,1.734,883,1.35,99,0.68
COHUTTA,3/10/14:1:06,9.74,1.736,884,1.27,92,0.73
COHUTTA,3/10/14:1:08,10.44,1.737,885,1.34,93,1.54
COHUTTA,3/10/14:1:09,9.83,1.738,885,0.06,76,1.44
COHUTTA,3/10/14:1:11,10.49,1.739,886,1.51,81,1.83
COHUTTA,3/10/14:1:12,9.79,1.739,886,1.74,82,1.91
COHUTTA,3/10/14:1:13,10.02,1.739,886,1.24,86,1.79
```
We can process this data to filter only measurements on `3/10/14:1:01`.
```{.scala}
scala> val filterRDD = raw_data.map(_.split(","))
.filter(x => x(1) == "3/10/14:1:01")
filterRDD: org.apache.spark.rdd.RDD[Array[String]] =
MapPartitionsRDD[11] at filter at <console>:25
scala> filterRDD.foreach(a => println(a.mkString(" ")))
COHUTTA 3/10/14:1:01 10.27 1.73 881 1.56 85 1.94
LAGNAPPE 3/10/14:1:01 9.59 1.602 777 0.09 88 1.78
NANTAHALLA 3/10/14:1:01 10.47 1.712 778 1.96 76 0.78
CHER 3/10/14:1:01 10.17 1.653 777 1.89 96 1.57
THERMALITO 3/10/14:1:01 10.24 1.75 777 1.25 80 0.89
ANDOUILLE 3/10/14:1:01 10.26 1.048 777 1.88 94 1.66
BUTTE 3/10/14:1:01 10.12 1.379 777 1.58 83 0.67
MOJO 3/10/14:1:01 10.47 1.828 967 0.36 77 1.75
CARGO 3/10/14:1:01 9.93 1.903 778 0.55 76 1.44
BBKING 3/10/14:1:01 10.03 0.839 967 1.17 80 1.28
```
You might have noticed that this is a bit tedious to work with, as we have to
convert everything to Scala objects, and aggregations rely on having a pair
RDD, which is fine when we have a single key, but for more complex
aggregations, this becomes a bit tedious to juggle with.
### Dataframe and Dataset
Our previous example is quite a typical use case for Spark. We have a big data
store of some structured (tabular) format (be it csv, JSON, parquet, or
something else) that we would like to analyse, typically in some SQL-like
fashion. Manually applying operations to rows like this is both labour
intensive, and inefficient, as we have knowledge of the 'schema' of data. This
is where DataFrames originate from. Spark has an optimized SQL query engine
that can optimize the compute path as well as provide a more efficient
representation of the rows when given a schema. From the
[Spark SQL, DataFrames and Datasets Guide]:
> Spark SQL is a Spark module for structured data processing. Unlike the basic
> Spark RDD API, the interfaces provided by Spark SQL provide Spark with more
> information about the structure of both the data and the computation being
> performed. Internally, Spark SQL uses this extra information to perform extra
> optimizations. There are several ways to interact with Spark SQL including
> SQL and the Dataset API. When computing a result the same execution engine is
> used, independent of which API/language you are using to express the
> computation. This unification means that developers can easily switch back
> and forth between different APIs based on which provides the most natural way
> to express a given transformation.
Under the hood, these are still immutable distributed collections of data (with
the same compute graph semantics, only now Spark can apply extra
optimizations because of the (structured) format.
Let's do the same analysis as last time using this API. First we will define a
schema. Let's take a look at a single row of the csv:
```
COHUTTA,3/10/14:1:01,10.27,1.73,881,1.56,85,1.94
```
So first a string field, a date, a timestamp, and some numeric information.
We can thus define the schema as such:
```{.scala}
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
```
If we import types first, and then enter this in our interactive shell we get
the following:
```{.scala}
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.types._
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(sensorname,StringType,false),
StructField(timestamp,TimestampType,false), StructField(numA,DoubleType,false),
StructField(numB,DoubleType,false), StructField(numC,LongType,false),
StructField(numD,DoubleType,false), StructField(numE,LongType,false),
StructField(numF,DoubleType,false))
```
An overview of the different [Spark SQL types] can be found online. For the
timestamp field we need to specify the format according to the [Java
date format]—in our case `MM/dd/yy:hh:mm`. Tying this
all together we can build a Dataframe like so.
```{.scala}
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
// Exiting paste mode, now interpreting.
df: org.apache.spark.sql.DataFrame =
[sensorname: string, timestamp: date ... 6 more fields]
scala> df.printSchema
root
|-- sensorname: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- numA: double (nullable = true)
|-- numB: double (nullable = true)
|-- numC: long (nullable = true)
|-- numD: double (nullable = true)
|-- numE: long (nullable = true)
|-- numF: double (nullable = true
scala> df.take(10).foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[COHUTTA,2014-03-10 01:02:00.0,9.67,1.731,882,0.52,87,1.79]
[COHUTTA,2014-03-10 01:03:00.0,10.47,1.732,882,1.7,92,0.66]
[COHUTTA,2014-03-10 01:05:00.0,9.56,1.734,883,1.35,99,0.68]
[COHUTTA,2014-03-10 01:06:00.0,9.74,1.736,884,1.27,92,0.73]
[COHUTTA,2014-03-10 01:08:00.0,10.44,1.737,885,1.34,93,1.54]
[COHUTTA,2014-03-10 01:09:00.0,9.83,1.738,885,0.06,76,1.44]
[COHUTTA,2014-03-10 01:11:00.0,10.49,1.739,886,1.51,81,1.83]
[COHUTTA,2014-03-10 01:12:00.0,9.79,1.739,886,1.74,82,1.91]
[COHUTTA,2014-03-10 01:13:00.0,10.02,1.739,886,1.24,86,1.79]
```
We can perform the same filtering operation as before in a couple of ways. We
can use really error prone SQL queries (not recommended unless you absolutely
love SQL and like debugging these command strings, this took me about 20
minutes to get right).
```{.scala}
scala> df.createOrReplaceTempView("sensor")
scala> val dfFilter = spark.sql("SELECT * FROM sensor
WHERE timestamp=TIMESTAMP(\"2014-03-10 01:01:00\")")
// I think the newline in the multiline string breaks it if you paste it
dfFilter: org.apache.spark.sql.DataFrame =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dfFilter.collect.foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78]
[THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89]
[BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67]
[CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44]
[LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78]
[CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57]
[ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66]
[MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75]
[BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28]
```
A slightly more sane and type-safe way would be to do the following.
```{.scala}
scala> val dfFilter = df.filter("timestamp = TIMESTAMP(\"2014-03-10 01:01:00\")")
dfFilter: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dfFilter.collect.foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78]
[THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89]
[BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67]
[CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44]
[LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78]
[CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57]
[ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66]
[MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75]
[BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28]
```
But this is still quite error-prone as writing these strings contains no
typechecking. This is not a big deal when writing these queries in an
interactive environment on a small dataset, but can be quite time consuming
when there's a typo at the end of a long running job that means two hours of
your (and the cluster's) time is wasted.
This is why the Spark community developed the Dataset abstraction. It is a sort
of middle ground between Dataframes and RDDs, where you get some of the type
safety of RDDs by operating on a [case class] (also known as product type).
This allows us to use the compile-time typechecking on the product types,
whilst still allowing Spark to optimize the query and storage of the data by
making use of schemas.
Let's dive in some code, first we need to define a product type for a row.
```{.scala}
scala> import java.sql.Timestamp
import java.sql.Timestamp
scala> :paste
// Entering paste mode (ctrl-D to finish)
case class SensorData (
sensorName: String,
timestamp: Timestamp,
numA: Double,
numB: Double,
numC: Long,
numD: Double,
numE: Long,
numF: Double
)
// Exiting paste mode, now interpreting.
defined class SensorData
```
Now we can convert a Dataframe (which actually is just an untyped Dataset) to a
typed Dataset using the `as` method.
```{.scala}
scala> :paste
// Entering paste mode (ctrl-D to finish)
val ds = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
.as[SensorData]
// Exiting paste mode, now interpreting.
ds: org.apache.spark.sql.Dataset[SensorData] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
```
Now we can apply compile time type-checked operations.
```{.scala}
scala> val dsFilter = ds.filter(a => a.timestamp ==
new Timestamp(2014 - 1900, 2, 10, 1, 1, 0, 0))
dsFilter: org.apache.spark.sql.Dataset[SensorData] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dsFilter.collect.foreach(println)
SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)
SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)
SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)
```
This provides us with more guarantees that are queries are valid (atleast on a
type level).
This was a brief overview of the 2 (or 3) different Spark APIs. You can always
find more information on the programming guides for [RDDs] and
[Dataframes/Datasets] and in the [Spark documentation]
### Packaging your application using SBT
We showed how to run Spark in interactive mode. Now we will explain how to
build applications that can be submitted using the `spark-submit` command.
First, we will explain how to structure a Scala project, using the [SBT build
tool]. The typical project structure is
```
├── build.sbt
├── project
│ └── build.properties
└── src
└── main
└── scala
└── example.scala
```
This is typical for JVM languages. More directories are added under the `scala`
folder to resemble the package structure.
The project's name, dependencies, and versioning is defined in the `build.sbt`
file. An example `build.sbt` file is
```
name := "Example"
scalaVersion := "2.11.12"
```
This specifies the Scala version of the project (2.11.12) and the name of the
project.
If you run `sbt` in this folder it will generate the project directory and
`build.properties`. `build.properties` contains the SBT version that is
used to build the project with, for backwards compatibility.
Open `example.scala` and add the following
```{.scala}
package example
object Example {
def main(args: Array[String]) {
println("Hello world!")
}
}
```
Start a `scala-sbt` container in the root folder (the one where `build.sbt` is located). This puts
you in interactive mode of SBT. We can compile the sources by writing the
`compile` command.
```
$ docker run -it --rm -v "`pwd`":/root hseeberger/scala-sbt sbt
Getting org.scala-sbt sbt 1.2.8 (this may take some time)...
...
[info] Loading settings for project root from build.sbt ...
[info] Set current project to Example (in build file:/root/)
[info] sbt server started at local:///root/.sbt/1.0/server/27dc1aa3fdf4049b492d/sock
sbt:Example> compile
...
[info] Done compiling.
[success] Total time: 0 s, completed Sep 8, 2019 2:17:14 PM
```
We can try to run the application by typing `run`.
```
sbt:Example> run
[info] Running example.Example
Hello world!
[success] Total time: 1 s, completed Sep 8, 2019 2:18:18 PM
```
Now let's add a function to `example.scala`.
```{.scala}
object Example {
def addOne(tuple: (Char, Int)) : (Char, Int) = tuple match {
case (chr, int) => (chr, int+1)
}
def main(args: Array[String]) {
println("Hello world!")
println(addOne('a', 1))
}
}
```
In your SBT session we can prepend any command with a tilde (`~`) to make them
run automatically on source changes.
```
sbt:Example> ~run
[info] Compiling 1 Scala source to ...
[info] Done compiling.
[info] Packaging ...
[info] Done packaging.
[info] Running example.Example
Hello world!
(a,2)
[success] Total time: 1 s, completed Sep 8, 2019 2:19:03 PM
1. Waiting for source changes in project hello... (press enter to interrupt)
```
We can also open an interactive session using SBT.
```
sbt:Example> console
[info] Starting scala interpreter...
Welcome to Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222).
Type in expressions for evaluation. Or try :help.
scala> example.Example.addOne('a', 1)
res1: (Char, Int) = (a,2)
scala> println("Interactive environment")
Interactive environment
```
To build Spark applications with SBT we need to include dependencies (Spark
most notably) to build the project. Modify your `build.sbt` file like so
```
name := "Example"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
```
We can now use Spark in the script. Modify `example.scala`.
```{.scala}
package example
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import java.sql.Timestamp
object ExampleSpark {
case class SensorData (
sensorName: String,
timestamp: Timestamp,
numA: Double,
numB: Double,
numC: Long,
numD: Double,
numE: Long,
numF: Double
)
def main(args: Array[String]) {
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
val spark = SparkSession
.builder
.appName("Example")
.getOrCreate()
val sc = spark.sparkContext // If you need SparkContext object
import spark.implicits._
val ds = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
.as[SensorData]
val dsFilter = ds.filter(a => a.timestamp ==
new Timestamp(2014 - 1900, 2, 10, 1, 1, 0, 0))
dsFilter.collect.foreach(println)
spark.stop
}
}
```
You can build a JAR using the `package` command in SBT. This JAR will be
located in the `target/scala-version/project_name_version.jar`.
You can run the JAR via a `spark-submit` container (which will run on local
mode). By mounting the `spark-events` directory the event log of the
application run is stored to be inspected later using the Spark history server.
```
$ docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events
spark-submit target/scala-2.11/example_2.11-0.1.0.jar
INFO:...
SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)
SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)
SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)
INFO:...
```
By default, Spark's logging is quite assertive. You can change the [log levels
to warn] to reduce the output.
For development purposes you can also try running the application from SBT
using the `run` command. You might run into some trouble with threads here,
which can be solved by running the application in a forked process, which can be
enabled by setting `fork in run := true` in `build.sbt`. You will also have to
set to change the log levels programmatically, if desired.
```{.scala}
import org.apache.log4j.{Level, Logger}
...
def main(args: Array[String]) {
...
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
...
}
```
You can also use this logger to log your application which might be helpful for
debugging on the AWS cluster later on.
You can inspect the event log from the application run using the Spark history
server. Start a `spark-history-server` container from the project root folder
and mount the `spark-events` folder in the container.
```
$ docker run -it --rm -v "`pwd`"/spark-events/:/spark-events -p 18080:18080
spark-history-server
starting org.apache.spark.deploy.history.HistoryServer, logging to
/spark/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-d5dfa4949b86.out
Spark Command: /usr/local/openjdk-8/bin/java -cp /spark/conf/:/spark/jars/*
-Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================