-
Notifications
You must be signed in to change notification settings - Fork 2
/
ingestion_pipeline.conf
134 lines (114 loc) · 3.35 KB
/
ingestion_pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# General options
pramen {
environment.name = "MyEnv (dev)"
pipeline.name = "Example ingestion pipeline"
bookkeeping.enabled = false
email.if.no.changes = false
# A temporaty directory on HDFS or S3 depending on Hadoop configuration
temporary.directory = "/tmp"
}
# Metastore
pramen.metastore {
tables = [
{
# Minimal definition
name = "table1"
format = "delta"
path = "/data/lake/metastore/table1"
},
{
# Extended definition
name = "table2"
description = "Test table2"
format = "parquet"
path = "/data/lake/metastore/table2"
records.per.partition = 1000000
information.date.start = "2022-01-01"
}
]
}
# Sources
pramen.sources = [
{
# A source can be refered to by name
name = "postgre_events"
# This is a JDBC source, the data is coming from a relational database.
factory.class = "za.co.absa.pramen.core.source.JdbcSource"
# Define the connection string
jdbc = {
driver = "org.postgresql.Driver"
connection.primary.url = "jdbc:postgresql://myserver:5432/my_db1"
user = "my_user1"
password = "my_password1"
}
# Specifies the minimum records the data should have to be considered having data
minimum.records = 1
# Consider the pipeline as failed if at least one table has no data at the scheduled time.
# Useful for auto-retrying ingestion pipelines.
fail.if.no.data = false
# This is an event database that has an information date column
has.information.date.column = true
information.date.column = "info_date"
information.date.type = "date"
# Date format in Java format notation
information.date.format = "yyyy-MM-dd"
},
{
name = "postgre_snapshot"
factory.class = "za.co.absa.pramen.core.source.JdbcSource"
jdbc = {
driver = "org.postgresql.Driver"
connection.primary.url = "jdbc:postgresql://myserver:5432/my_db2"
user = "my_user2"
password = "my_password2"
}
has.information.date.column = false
}
]
# Email notification configuration. When 'send.to' is empty no emeils will be generated or sent.
mail {
smtp.host = "my.smtp.server.com"
smtp.port = "25"
smtp.auth = "false"
send.from = "Pramen <pramen.noreply@example.com>"
send.to = "email1@example.com, email2@example.com"
}
# The pipeline
pramen.operations = [
{
name = "Events sourcing"
type = "ingestion"
schedule.type = "daily"
source = "postgre_events"
tables = [
{
input.db.table = table1
output.metastore.table = table1
}
]
},
{
name = "Entities sourcing"
type = "ingestion"
schedule.type = "daily"
source = "postgre_entities"
tables = [
{
input.db.table = table2
output.metastore.table = table2
}
]
}
]