This repository has been archived by the owner on Apr 4, 2021. It is now read-only.
/
EntitySpecification.twiki
1048 lines (893 loc) · 45.4 KB
/
EntitySpecification.twiki
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---++ Contents
* <a href="#Cluster_Specification">Cluster Specification</a>
* <a href="#Feed_Specification">Feed Specification</a>
* <a href="#Process_Specification">Process Specification</a>
---++ Cluster Specification
The cluster XSD specification is available here:
A cluster contains different interfaces which are used by Falcon like readonly, write, workflow and messaging.
A cluster is referenced by feeds and processes which are on-boarded to Falcon by its name.
Following are the tags defined in a cluster.xml:
<verbatim>
<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
</verbatim>
The colo specifies the colo to which this cluster belongs to and name is the name of the cluster which has to
be unique.
---+++ Interfaces
A cluster has various interfaces as described below:
<verbatim>
<interface type="readonly" endpoint="hftp://localhost:50010" version="0.20.2" />
</verbatim>
A readonly interface specifies the endpoint for Hadoop's HFTP protocol,
this would be used in the context of feed replication.
<verbatim>
<interface type="write" endpoint="hdfs://localhost:8020" version="0.20.2" />
</verbatim>
A write interface specifies the interface to write to hdfs, it's endpoint is the value of fs.defaultFS.
Falcon uses this interface to write system data to hdfs and feeds referencing this cluster are written to hdfs
using the same write interface.
<verbatim>
<interface type="execute" endpoint="localhost:8021" version="0.20.2" />
</verbatim>
An execute interface specifies the interface for job tracker, it's endpoint is the value of mapreduce.jobtracker.address.
Falcon uses this interface to submit the processes as jobs on !JobTracker defined here.
<verbatim>
<interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0" />
</verbatim>
A workflow interface specifies the interface for workflow engine, example of its endpoint is the value for OOZIE_URL.
Falcon uses this interface to schedule the processes referencing this cluster on workflow engine defined here.
<verbatim>
<interface type="registry" endpoint="thrift://localhost:9083" version="0.11.0" />
</verbatim>
A registry interface specifies the interface for metadata catalog, such as Hive Metastore (or HCatalog).
Falcon uses this interface to register/de-register partitions for a given database and table. Also,
uses this information to schedule data availability events based on partitions in the workflow engine.
Although Hive metastore supports both RPC and HTTP, Falcon comes with an implementation for RPC over thrift.
For Hive HA mode, make sure the uris are separated with comma and you only add protocol "thrift://" at the beginning.
See below for an example of Hive HA mode:
<verbatim>
<interface type="registry" endpoint="thrift://c6402.ambari.apache.org:9083,c6403.ambari.apache.org:9083" version="0.11.0" />
</verbatim>
<verbatim>
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.6" />
</verbatim>
A messaging interface specifies the interface for sending feed availability messages, it's endpoint is broker url with tcp address.
---+++ Locations
A cluster has a list of locations defined:
<verbatim>
<location name="staging" path="/projects/falcon/staging" />
<location name="working" path="/projects/falcon/working" /> <!--optional-->
</verbatim>
Location has the name and the path, name is the type of locations .Allowed values of name are staging, temp and working.
Path is the hdfs path for each location.
Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon
should have read/write/execute permission on these locations.
These locations MUST be created prior to submitting a cluster entity to Falcon.
*staging* should have 777 permissions and is a mandatory location .The parent dirs must have execute permissions so multiple
users can write to this location. *working* must have 755 permissions and is a optional location.
If *working* is not specified, falcon creates a sub directory in the *staging* location with 755 perms.
The parent dir for *working* must have execute permissions so multiple
users can read from this location
---+++ ACL
A cluster has ACL (Access Control List) useful for implementing permission requirements
and provide a way to set different permissions for specific users or named groups.
<verbatim>
<ACL owner="test-user" group="test-group" permission="*"/>
</verbatim>
ACL indicates the Access control list for this cluster.
owner is the Owner of this entity.
group is the one which has access to read.
permission indicates the permission.
---+++ Custom Properties
A cluster has a list of properties:
A key-value pair, which are propagated to the workflow engine.
<verbatim>
<property name="brokerImplClass" value="org.apache.activemq.ActiveMQConnectionFactory" />
</verbatim>
Ideally JMS impl class name of messaging engine (brokerImplClass)
should be defined here.
---++ Datasource Specification
The datasource entity contains connection information required to connect to a data source like MySQL database.
The datasource XSD specification is available here:
A datasource contains read and write interfaces which are used by Falcon to import or export data from or to
datasources respectively. A datasource is referenced by feeds which are on-boarded to Falcon by its name.
Following are the tags defined in a datasource.xml:
<verbatim>
<datasource colo="west-coast" description="Customer database on west coast" type="mysql"
name="test-hsql-db" xmlns="uri:falcon:datasource:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
</verbatim>
The colo specifies the colo to which the datasource belongs to and name is the name of the datasource which has to
be unique.
---+++ Interfaces
A datasource has two interfaces as described below:
<verbatim>
<interface type="readonly" endpoint="jdbc:hsqldb:localhost/db"/>
</verbatim>
A readonly interface specifies the endpoint and protocol to connect to a datasource.
This would be used in the context of import from datasource into HDFS.
<verbatim>
<interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
</verbatim>
A write interface specifies the endpoint and protocol to to write to the datasource.
Falcon uses this interface to export data from hdfs to datasource.
<verbatim>
<credential type="password-text">
<userName>SA</userName>
<passwordText></passwordText>
</credential>
</verbatim>
A credential is associated with an interface (read or write) providing user name and password to authenticate
to the datasource.
<verbatim>
<credential type="password-text">
<userName>SA</userName>
<passwordFile>hdfs-file-path</passwordText>
</credential>
</verbatim>
The credential can be specified via a password file present in the HDFS. This file should only be accessible by
the user.
---++ Feed Specification
The Feed XSD specification is available here.
A Feed defines various attributes of feed like feed location, frequency, late-arrival handling and retention policies.
A feed can be scheduled on a cluster, once a feed is scheduled its retention and replication process are triggered in a given cluster.
<verbatim>
<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
</verbatim>
A feed should have a unique name and this name is referenced by processes as input or output feed.
---+++ Storage
Falcon introduces a new abstraction to encapsulate the storage for a given feed which can either be
expressed as a path on the file system, File System Storage or a table in a catalog such as Hive, Catalog Storage.
<verbatim>
<xs:choice minOccurs="1" maxOccurs="1">
<xs:element type="locations" name="locations"/>
<xs:element type="catalog-table" name="table"/>
</xs:choice>
</verbatim>
Feed should contain one of the two storage options. Locations on File System or Table in a Catalog.
---++++ File System Storage
<verbatim>
<clusters>
<cluster name="test-cluster">
<validity start="2012-07-20T03:00Z" end="2099-07-16T00:00Z"/>
<retention limit="days(10)" action="delete"/>
<sla slaLow="hours(3)" slaHigh="hours(4)"/>
<locations>
<location type="data" path="/hdfsDataLocation/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/>
<location type="stats" path="/projects/falcon/clicksStats" />
<location type="meta" path="/projects/falcon/clicksMetaData" />
</locations>
</cluster>
..... more clusters </clusters>
</verbatim>
Feed references a cluster by it's name, before submitting a feed all the referenced cluster should be submitted to Falcon.
type: specifies whether the referenced cluster should be treated as a source or target for a feed. A feed can have multiple source and target clusters. If the type of cluster is not specified then the cluster is not considered for replication.
Validity of a feed on cluster specifies duration for which this feed is valid on this cluster.
Retention specifies how long the feed is retained on this cluster and the action to be taken on the feed after the expiry of retention period.
The retention limit is specified by expression frequency(times), ex: if feed should be retained for at least 6 hours then retention's limit="hours(6)".
The field partitionExp contains partition tags. Number of partition tags has to be equal to number of partitions specified in feed schema. A partition tag can be a wildcard(*), a static string or an expression. Atleast one of the strings has to be an expression.
sla specifies sla for the feed on this cluster. This is an optional parameter and sla can be same or different from the
global sla tag (mentioned outside the clusters tag ). This tag provides the user to flexibility to have
different sla for different clusters e.g. in case of replication. If this attribute is missing then the default global
sla is picked from the feed definition.
Location specifies where the feed is available on this cluster. This is an optional parameter and path can be same or different from the global locations tag value ( it is mentioned outside the clusters tag ) . This tag provides the user to flexibility to have feed at different locations on different clusters. If this attribute is missing then the default global location is picked from the feed definition. Also the individual location tags data, stats, meta are optional.
<verbatim>
<location type="data" path="/projects/falcon/clicks" />
<location type="stats" path="/projects/falcon/clicksStats" />
<location type="meta" path="/projects/falcon/clicksMetaData" />
</verbatim>
A location tag specifies the type of location like data, meta, stats and the corresponding paths for them.
A feed should at least define the location for type data, which specifies the HDFS path pattern where the feed is generated
periodically. ex: type="data" path="/projects/TrafficHourly/${YEAR}-${MONTH}-${DAY}/traffic"
The granularity of date pattern in the path should be at least that of a frequency of a feed.
Other location type which are supported are stats and meta paths, if a process references a feed then the meta and stats
paths are available as a property in a process.
---++++ Catalog Storage (Table)
A table tag specifies the table URI in the catalog registry as:
<verbatim>
catalog:$database-name:$table-name#partition-key=partition-value);partition-key=partition-value);*
</verbatim>
This is modeled as a URI (similar to an ISBN URI). It does not have any reference to Hive or HCatalog. Its quite
generic so it can be tied to other implementations of a catalog registry. The catalog implementation specified
in the startup config provides implementation for the catalog URI.
Top-level partition has to be a dated pattern and the granularity of date pattern should be at least that
of a frequency of a feed.
<verbatim>
<xs:complexType name="catalog-table">
<xs:annotation>
<xs:documentation>
catalog specifies the uri of a Hive table along with the partition spec.
uri="catalog:$database:$table#(partition-key=partition-value);+"
Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY}
</xs:documentation>
</xs:annotation>
<xs:attribute type="xs:string" name="uri" use="required"/>
</xs:complexType>
</verbatim>
Examples:
<verbatim>
<table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR};region=${region}" />
<table uri="catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
<table uri="catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</verbatim>
---+++ Partitions
<verbatim>
<partitions>
<partition name="country" />
<partition name="cluster" />
</partitions>
</verbatim>
A feed can define multiple partitions, if a referenced cluster defines partitions then the number of partitions in feed has to be equal to or more than the cluster partitions.
*Note:* This will only apply for !FileSystem storage but not Table storage as partitions are defined and maintained in
Hive (HCatalog) registry.
---+++ Groups
<verbatim>
<groups>online,bi</groups>
</verbatim>
A feed specifies a list of comma separated groups, a group is a logical grouping of feeds and a group is said to be
available if all the feeds belonging to a group are available. The frequency of all the feed which belong to the same group
must be same.
---+++ Availability Flags
<verbatim>
<availabilityFlag>_SUCCESS</availabilityFlag>
</verbatim>
An availabilityFlag specifies the name of a file which when present/created in a feeds data directory,
the feed is termed as available. ex: _SUCCESS, if this element is ignored then Falcon would consider the presence of feed's
data directory as feed availability.
---+++ Frequency
<verbatim>
<frequency>minutes(20)</frequency>
</verbatim>
A feed has a frequency which specifies the frequency by which this feed is generated.
ex: it can be generated every hour, every 5 minutes, daily, weekly etc.
valid frequency type for a feed are minutes, hours, days, months. The values can be negative, zero or positive.
---+++ SLA
<verbatim>
<sla slaLow="hours(40)" slaHigh="hours(44)" />
</verbatim>
A feed can have SLA and each SLA has two properties - slaLow and slaHigh. Both slaLow and slaHigh are written using
expressions like frequency. slaLow is intended to serve for alerting for feed instances which are in danger of missing their
availability SLAs. slaHigh is intended to serve for reporting the feeds which missed their SLAs. SLAs are relative to
feed instance time.
---+++ Import
<verbatim>
<import>
<source name="test-hsql-db" tableName="customer">
<extract type="full">
<mergepolicy>snapshot</mergepolicy>
</extract>
<fields>
<includes>
<field>id</field>
<field>name</field>
</includes>
</fields>
</source>
<arguments>
<argument name="--split-by" value="id"/>
<argument name="--num-mappers" value="2"/>
</arguments>
</import>
A feed can have an import policy associated with it. The souce name specified the datasource reference to the
datasource entity from which the data will be imported to HDFS. The tableName spcified the table or topic to be
imported from the datasource. The extract type specifies the pull mechanism (full or
incremental extract). Full extract method extracts all the data from the datasource. The incremental extraction
method feature implementation is in progress. The mergeplocy determines how the data is to be layed out on HDFS.
The snapshot layout creates a snapshot of the data on HDFS using the feed's location specification. Fields is used
to specify the projection columns. Feed import from database underneath uses sqoop to achieve the task. Any advanced
Sqoop options can be specified via the arguments.
---+++ Late Arrival
<verbatim>
<late-arrival cut-off="hours(6)" />
</verbatim>
A late-arrival specifies the cut-off period till which the feed is expected to arrive late and should be honored be processes referring to it as input feed by rerunning the instances in case the data arrives late with in a cut-off period.
The cut-off period is specified by expression frequency(times), ex: if the feed can arrive late
upto 8 hours then late-arrival's cut-off="hours(8)"
*Note:* This will only apply for !FileSystem storage but not Table storage until a future time.
---+++ Email Notification
<verbatim>
<notification type="email" to="bob@xyz.com"/>
</verbatim>
Specifying the notification element with "type" property allows users to receive email notification when a scheduled feed instance completes.
Multiple recipients of an email can be provided as comma separated addresses with "to" property.
To send email notification ensure that SMTP parameters are defined in Falcon startup.properties.
Refer to [[FalconEmailNotification][Falcon Email Notification]] for more details.
---+++ ACL
A feed has ACL (Access Control List) useful for implementing permission requirements
and provide a way to set different permissions for specific users or named groups.
<verbatim>
<ACL owner="test-user" group="test-group" permission="*"/>
</verbatim>
ACL indicates the Access control list for this cluster.
owner is the Owner of this entity.
group is the one which has access to read.
permission indicates the permission.
---+++ Custom Properties
<verbatim>
<properties>
<property name="tmpFeedPath" value="tmpFeedPathValue" />
<property name="field2" value="value2" />
<property name="queueName" value="hadoopQueue"/>
<property name="jobPriority" value="VERY_HIGH"/>
<property name="timeout" value="hours(1)"/>
<property name="parallel" value="3"/>
<property name="maxMaps" value="8"/>
<property name="mapBandwidth" value="1"/>
<property name="overwrite" value="true"/>
<property name="ignoreErrors" value="false"/>
<property name="skipChecksum" value="false"/>
<property name="removeDeletedFiles" value="true"/>
<property name="preserveBlockSize" value="true"/>
<property name="preserveReplicationNumber" value="true"/>
<property name="preservePermission" value="true"/>
<property name="order" value="LIFO"/>
</properties>
</verbatim>
A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority" are special properties
available to user to specify the Hadoop job queue and priority, the same values are used by Falcon's launcher job.
"timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while
waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and
order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY.
DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents
the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s
used by each mapper during replication. "overwrite" represents overwrite destination during replication.
"ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents
bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the
destination but not in source during replication. "preserveBlockSize" represents preserving block size during
replication. "preserveReplicationNumber" represents preserving replication number during replication.
"preservePermission" represents preserving permission during
---+++ Lifecycle
<verbatim>
<lifecycle>
<retention-stage>
<frequency>hours(10)</frequency>
<queue>reports</queue>
<priority>NORMAL</priority>
<properties>
<property name="retention.policy.agebaseddelete.limit" value="hours(9)"></property>
</properties>
</retention-stage>
</lifecycle>
</verbatim>
lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined a
retention-stage using lifecycle tag. You may define lifecycle at global level or a cluster level or both. Cluster level
configuration takes precedence and falcon falls back to global definition if cluster level specification is missing.
----++++ Retention Stage
As of now there are two ways to specify retention. One is through the <retention> tag in the cluster and another is the
new way through <retention-stage> tag in <lifecycle> tag. If both are defined for a feed, then the lifecycle tag will be
considered effective and falcon will ignore the <retention> tag in the cluster. If there is an invalid configuration of
retention-stage in lifecycle tag, then falcon will *NOT* fall back to retention tag even if it is defined and will
throw validation error.
In this new method of defining retention you can specify the frequency at which the retention should occur, you can
also define the queue and priority parameters for retention jobs. The default behavior of retention-stage is same as
the existing one which is to delete all instances corresponding to instance-time earlier than the duration provided in
"retention.policy.agebaseddelete.limit"
Property "retention.policy.agebaseddelete.limit" is a mandatory property and must contain a valid duration e.g. "hours(1)"
Retention frequency is not a mandatory parameter. If user doesn't specify the frequency in the retention stage then
it doesn't fallback to old retention policy frequency. Its default value is set to 6 hours if feed frequency is less
than 6 hours else its set to feed frequency as retention shouldn't be more frequent than data availability to avoid
wastage of compute resources.
In future, we will allow more customisation like customising how to choose instances to be deleted through this method.
---++ Process Specification
A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.
The different details of process are:
---+++ Name
Each process is identified with a unique name.
Syntax:
<verbatim>
<process name="[process name]">
...
</process>
</verbatim>
---+++ Tags
An optional list of comma separated tags which are used for classification of processes.
Syntax:
<verbatim>
...
<tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
</verbatim>
---+++ Pipelines
An optional list of comma separated word strings, specifies the data processing pipeline(s) to which this process belongs.
Only letters, numbers and underscore are allowed for pipeline string.
Syntax:
<verbatim>
...
<pipelines>test_Pipeline, dataReplication, clickStream_pipeline</pipelines>
</verbatim>
---+++ Cluster
The cluster on which the workflow should run. A process should contain one or more clusters. Cluster definition for the cluster name gives the end points for workflow execution, name node, job tracker, messaging and so on. Each cluster inturn has validity mentioned, which tell the times between which the job should run on that specified cluster.
Syntax:
<verbatim>
<process name="[process name]">
...
<clusters>
<cluster name="test-cluster1">
<validity start="2012-12-21T08:15Z" end="2100-01-01T00:00Z"/>
</cluster>
<cluster name="test-cluster2">
<validity start="2012-12-21T08:15Z" end="2100-01-01T00:00Z"/>
</cluster>
....
....
</clusters>
...
</process>
</verbatim>
---+++ Parallel
Parallel defines how many instances of the workflow can run concurrently. It should be a positive integer > 0.
For example, parallel of 1 ensures that only one instance of the workflow can run at a time. The next instance will start only after the running instance completes.
Syntax:
<verbatim>
<process name="[process name]">
...
<parallel>[parallel]</parallel>
...
</process>
</verbatim>
---+++ Order
Order defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only).
Syntax:
<verbatim>
<process name="[process name]">
...
<order>[order]</order>
...
</process>
</verbatim>
---+++ Timeout
A optional Timeout specifies the maximum time an instance waits for a dataset before being killed by the workflow engine, a time out is specified like frequency.
If timeout is not specified, falcon computes a default timeout for a process based on its frequency, which is six times of the frequency of process or 30 minutes if computed timeout is less than 30 minutes.
<verbatim>
<process name="[process name]">
...
<timeout>[timeunit]([frequency])</timeout>
...
</process>
</verbatim>
---+++ Frequency
Frequency defines how frequently the workflow job should run. For example, hours(1) defines the frequency as hourly, days(7) defines weekly frequency. The values for timeunit can be minutes/hours/days/months and the frequency number should be a positive integer > 0.
Syntax:
<verbatim>
<process name="[process name]">
...
<frequency>[timeunit]([frequency])</order>
...
</process>
</verbatim>
---+++ SLA
<verbatim>
<sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
</verbatim>
A process can have SLA which is defined by 2 optional attributes - shouldStartIn and shouldEndIn. All the attributes
are written using expressions like frequency. shouldStartIn is the time by which the process should have started.
shouldEndIn is the time by which the process should have finished.
---+++ Validity
Validity defines how long the workflow should run. It has 3 components - start time, end time and timezone. Start time and end time are timestamps defined in yyyy-MM-dd'T'HH:mm'Z' format and should always be in UTC. Timezone is used to compute the next instances starting from start time. The workflow will start at start time and end before end time specified on a given cluster. So, there will not be a workflow instance at end time.
Syntax:
<verbatim>
<process name="[process name]">
...
<validity start=[start time] end=[end time] timezone=[timezone]/>
...
</process>
</verbatim>
Examples:
<verbatim>
<process name="sample-process">
...
<frequency>days(1)</frequency>
<validity start="2012-01-01T00:40Z" end="2012-04-01T00:00" timezone="UTC"/>
...
</process>
</verbatim>
The daily workflow will start on Jan 1st 2012 at 00:40 UTC, it will run at 40th minute of every hour and the last instance will be at March 31st 2012 at 23:40 UTC.
<verbatim>
<process name="sample-process">
...
<frequency>hours(1)</frequency>
<validity start="2012-03-11T08:40Z" end="2012-03-12T08:00" timezone="PST8PDT"/>
...
</process>
</verbatim>
The hourly workflow will start on March 11th 2012 at 00:40 PST, the next instances will be at 01:40 PST, 03:40 PDT, 04:40 PDT and so on till 23:40 PDT. So, there will be just 23 instances of the workflow for March 11th 2012 because of DST switch.
---+++ Inputs
Inputs define the input data for the workflow. The workflow job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of [[FalconDocumentation][EL expressions]] and can optionally specify specific partition of input that the workflow requires. The components in partition should be subset of partitions defined in the feed.
For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in workflow actions like pig scripts and so on.
Syntax:
<verbatim>
<process name="[process name]">
...
<inputs>
<input name=[input name] feed=[feed name] start=[start el] end=[end el] partition=[partition]/>
...
</inputs>
...
</process>
</verbatim>
Example:
<verbatim>
<feed name="feed1">
...
<partition name="isFraud"/>
<partition name="country"/>
<frequency>hours(1)</frequency>
<locations>
<location type="data" path="/projects/bootcamp/feed1/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
...
</locations>
...
</feed>
<process name="sample-process">
...
<inputs>
<input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)" partition="*/US"/>
...
</inputs>
...
</process>
</verbatim>
The input for the workflow is a hourly feed and takes 0th and 1st hour data of today(the day when the workflow runs).
If the workflow is running for 2012-03-01T06:40Z, the inputs are /projects/bootcamp/feed1/2012-03-01-00/*/US and
/projects/bootcamp/feed1/2012-03-01-01/*/US. The property for this input is
input1=/projects/bootcamp/feed1/2012-03-01-00/*/US,/projects/bootcamp/feed1/2012-03-01-01/*/US
Also, feeds with Hive table storage can be used as inputs to a process. Several parameters from inputs are passed as
params to the user workflow or pig script.
<verbatim>
${wf:conf('falcon_input_database')} - database name associated with the feed for a given input
${wf:conf('falcon_input_table')} - table name associated with the feed for a given input
${wf:conf('falcon_input_catalog_url')} - Hive metastore URI for this input feed
${wf:conf('falcon_input_partition_filter_pig')} - value of ${coord:dataInPartitionFilter('$input', 'pig')}
${wf:conf('falcon_input_partition_filter_hive')} - value of ${coord:dataInPartitionFilter('$input', 'hive')}
${wf:conf('falcon_input_partition_filter_java')} - value of ${coord:dataInPartitionFilter('$input', 'java')}
</verbatim>
*NOTE:* input is the name of the input configured in the process, which is input.getName().
<verbatim><input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/></verbatim>
Example workflow configuration:
<verbatim>
<configuration>
<property>
<name>falcon_input_database</name>
<value>falcon_db</value>
</property>
<property>
<name>falcon_input_table</name>
<value>input_table</value>
</property>
<property>
<name>falcon_input_catalog_url</name>
<value>thrift://localhost:29083</value>
</property>
<property>
<name>falcon_input_storage_type</name>
<value>TABLE</value>
</property>
<property>
<name>feedInstancePaths</name>
<value>hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00</value>
</property>
<property>
<name>falcon_input_partition_filter_java</name>
<value>(ds='2012-04-21-00')</value>
</property>
<property>
<name>falcon_input_partition_filter_hive</name>
<value>(ds='2012-04-21-00')</value>
</property>
<property>
<name>falcon_input_partition_filter_pig</name>
<value>(ds=='2012-04-21-00')</value>
</property>
...
</configuration>
</verbatim>
---+++ Optional Inputs
User can mention one or more inputs as optional inputs. In such cases the job does not wait on those inputs which are mentioned as optional. If they are present it considers them otherwise continues with the mandatory ones. If some instances of the optional feed are present for the given data window, those are considered and passed on to the process. While checking for presence of an feed instance, Falcon looks for __availabilityFlag__ in the directory, if specified in the feed definition. If no __availabilityFlag__ is specified, presence of the instance directory is treated as indication of availability of data.
Example:
<verbatim>
<feed name="feed1">
...
<partition name="isFraud"/>
<partition name="country"/>
<frequency>hours(1)</frequency>
<locations>
<location type="data" path="/projects/bootcamp/feed1/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
...
</locations>
...
</feed>
<process name="sample-process">
...
<inputs>
<input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)" partition="*/US"/>
<input name="input2" feed="feed2" start="today(0,0)" end="today(1,0)" partition="*/UK" optional="true" />
...
</inputs>
...
</process>
</verbatim>
*Note:* This is only supported for !FileSystem storage but not Table storage at this point.
---+++ Outputs
Outputs define the output data that is generated by the workflow. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of [[FalconDocumentation][EL expression]].
For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path.
Syntax:
<verbatim>
<process name="[process name]">
...
<outputs>
<output name=[input name] feed=[feed name] instance=[instance el]/>
...
</outputs>
...
</process>
</verbatim>
Example:
<verbatim>
<feed name="feed2">
...
<frequency>days(1)</frequency>
<locations>
<location type="data" path="/projects/bootcamp/feed2/${YEAR}-${MONTH}-${DAY}"/>
...
</locations>
...
</feed>
<process name="sample-process">
...
<outputs>
<output name="output1" feed="feed2" instance="today(0,0)"/>
...
</outputs>
...
</process>
</verbatim>
The output of the workflow is feed instance for today. If the workflow is running for 2012-03-01T06:40Z,
the workflow generates output /projects/bootcamp/feed2/2012-03-01. The property for this output that is available
for workflow is: output1=/projects/bootcamp/feed2/2012-03-01
Also, feeds with Hive table storage can be used as outputs to a process. Several parameters from outputs are passed as
params to the user workflow or pig script.
<verbatim>
${wf:conf('falcon_output_database')} - database name associated with the feed for a given output
${wf:conf('falcon_output_table')} - table name associated with the feed for a given output
${wf:conf('falcon_output_catalog_url')} - Hive metastore URI for the given output feed
${wf:conf('falcon_output_dataout_partitions')} - value of ${coord:dataOutPartitions('$output')}
</verbatim>
*NOTE:* output is the name of the output configured in the process, which is output.getName().
<verbatim><output name="output" feed="clicks-summary-table" instance="today(0,0)"/></verbatim>
Example workflow configuration:
<verbatim>
<configuration>
<property>
<name>falcon_output_database</name>
<value>falcon_db</value>
</property>
<property>
<name>falcon_output_table</name>
<value>output_table</value>
</property>
<property>
<name>falcon_output_catalog_url</name>
<value>thrift://localhost:29083</value>
</property>
<property>
<name>falcon_output_storage_type</name>
<value>TABLE</value>
</property>
<property>
<name>feedInstancePaths</name>
<value>hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00</value>
</property>
<property>
<name>falcon_output_dataout_partitions</name>
<value>'ds=2012-04-21-00'</value>
</property>
....
</configuration>
</verbatim>
---+++ Custom Properties
The properties are key value pairs that are passed to the workflow. These properties are optional and can be used
in workflow to parameterize the workflow.
Syntax:
<verbatim>
<process name="[process name]">
...
<properties>
<property name=[key] value=[value]/>
...
</properties>
...
</process>
</verbatim>
The following are some special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job.
<verbatim>
<property name="queueName" value="hadoopQueue"/>
<property name="jobPriority" value="VERY_HIGH"/>
<!-- This property is used to turn off JMS notifications for this process. JMS notifications are enabled by default. -->
<property name="userJMSNotificationEnabled" value="false"/>
</verbatim>
---+++ Workflow
The workflow defines the workflow engine that should be used and the path to the workflow on hdfs.
Libraries required can be specified using lib attribute in the workflow element and will be comma separated HDFS paths.
The workflow definition on hdfs contains the actual job that should run and it should confirm to
the workflow specification of the engine specified. The libraries required by the workflow should
be in lib folder inside the workflow path.
The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also
be available for the workflow.
There are 4 engines supported today.
---++++ Oozie
As part of oozie workflow engine support, users can embed a oozie workflow.
Refer to oozie [[http://oozie.apache.org/docs/4.2.0/DG_Overview.html][workflow overview]] and
[[http://oozie.apache.org/docs/4.2.0/WorkflowFunctionalSpec.html][workflow specification]] for details.
Syntax:
<verbatim>
<process name="[process name]">
...
<workflow engine=[workflow engine] path=[workflow path] lib=[comma separated lib paths]/>
...
</process>
</verbatim>
Example:
<verbatim>
<process name="sample-process">
...
<workflow engine="oozie" path="/projects/bootcamp/workflow"/>
...
</process>
</verbatim>
This defines the workflow engine to be oozie and the workflow xml is defined at
/projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.
Libraries path can be overridden using lib attribute. e.g.: lib="/projects/bootcamp/wf/libs,/projects/bootcamp/oozie/libs" in the workflow element.
---++++ Pig
Falcon also adds the Pig engine which enables users to embed a Pig script as a process.
Example:
<verbatim>
<process name="sample-process">
...
<workflow engine="pig" path="/projects/bootcamp/pig.script" lib="/projects/bootcamp/wf/libs,/projects/bootcamp/pig/libs"/>
...
</process>
</verbatim>
This defines the workflow engine to be pig and the pig script is defined at
/projects/bootcamp/pig.script.
Feeds with Hive table storage will send one more parameter apart from the general ones:
<verbatim>$input_filter</verbatim>
---++++ Hive
Falcon also adds the Hive engine as part of Hive Integration which enables users to embed a Hive script as a process.
This would enable users to create materialized queries in a declarative way.
Example:
<verbatim>
<process name="sample-process">
...
<workflow engine="hive" path="/projects/bootcamp/hive-script.hql"/>
...
</process>
</verbatim>
This defines the workflow engine to be hive and the hive script is defined at
/projects/bootcamp/hive-script.hql.
Feeds with Hive table storage will send one more parameter apart from the general ones:
<verbatim>$input_filter</verbatim>
---++++ Spark
Falcon also adds the Spark engine as part of Spark Integration which enables users to run the Java/Python Spark application as a process.
When "spark" workflow engine is mentioned spark related parameters must be provided through <spark-attributes>
Examples:
<verbatim>
<process name="spark-process">
...
<workflow engine="spark" path="/resources/action">
<spark-attributes>
<master>local</master>
<name>Spark WordCount</name>
<class>org.examples.WordCount</class>
<jar>/resources/action/lib/spark-application.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m</spark-opts>
</spark-attributes>
...
</process>
</verbatim>
This defines the workflow engine to be spark and Java/Python Spark application must be defined with "jar" option that need to be executed.
There is flexibility to override the Spark master through process entity either to "yarn-client" or "yarn-cluster", if spark interface is already defined in cluster entity.
Input and Output data to the Spark application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity.
In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set.
For running the Spark SQL process entity, that read and write the data stored on Hive, the datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml
under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched by the YARN cluster.
The convenient way to do this is adding them through the --jars option and --file option of the spark-opts attribute.
Example:
<verbatim>
<process name="spark-process">
...
<workflow engine="spark" path="/resources/action">
<spark-attributes>
<master>local</master>
<name>Spark SQL</name>
<class>org.examples.SparkSQLProcessTable</class>
<jar>/resources/action/lib/spark-application.jar</jar>
<spark-opts>--num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar --files /usr/local/spark/conf/hive-site.xml</spark-opts>
</spark-attributes>
...
</process>
</verbatim>
Input and Output to the Spark SQL application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity.
If input feed is of table type, then input table partition, table name and database name will be set as input arguments. If output feed is of table type, then output table partition, table name and database name will be set as output arguments.
Once input and output arguments is set, then user's provided argument will be set.
---+++ Retry
Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances.
Syntax:
<verbatim>
<process name="[process name]">
...
<retry policy=[retry policy] delay=[retry delay] attempts=[retry attempts] onTimeout=[retry onTimeout]/>
...
</process>
</verbatim>
Examples:
<verbatim>
<process name="sample-process">
...
<retry policy="periodic" delay="minutes(10)" attempts="3" onTimeout="true"/>
...
</process>
</verbatim>
The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins.
*NOTE :* If user does a manual rerun with -force option (using the instance rerun API), then the runId will get reset and user might see more Falcon system retries than configured in the process definition.
To enable retries for instances for feeds, user will have to set the following properties in runtime.properties
<verbatim>
falcon.retry.policy=periodic
falcon.retry.delay=minutes(30)
falcon.retry.attempts=3
falcon.retry.onTimeout=false
<verbatim>
---+++ Late data
Late data handling defines how the late data should be handled. Each feed is defined with a late cut-off value which specifies the time till which late data is valid. For example, late cut-off of hours(6) means that data for nth hour can get delayed by upto 6 hours. Late data specification in process defines how this late data is handled.
Late data policy defines how frequently check is done to detect late data. The policies supported are: backoff, exp-backoff(exponention backoff) and final(at feed's late cut-off). The policy along with delay defines the interval at which late data check is done.
Late input specification for each input defines the workflow that should run when late data is detected for that input.
Syntax:
<verbatim>
<process name="[process name]">
...
<late-process policy=[late handling policy] delay=[delay]>
<late-input input=[input name] workflow-path=[workflow path]/>
...
</late-process>
...
</process>
</verbatim>