Database schemas for Airflow. One of the biggest issue with Apache Airflow that it does not provide any good way to describe the database schemas within the system. It leads incorrect table definitions, hard to extend schemas and keep backwards compatibility of already existing pipelines.
This package was written in mind to use as the abstraction layer of table schemas, and it provides support for Presto, Apache Hive, and Amazon Redshift.
Installation is as simple as installing any other Python packages.
$ pip install dbsa
The following column types are supported:
Date type | Presto support | Hive support | Redshift support |
---|---|---|---|
dbsa.Boolean |
✓ | ✓ | ✓ |
dbsa.Tinyint |
✓ | ✓ | ✓ |
dbsa.Smallint |
✓ | ✓ | ✓ |
dbsa.Integer |
✓ | ✓ | ✓ |
dbsa.Bigint |
✓ | ✓ | ✓ |
dbsa.Real |
✓ | ✓ | ✓ |
dbsa.Double |
✓ | ✓ | ✓ |
dbsa.Decimal |
✓ | ✓ | ✓ |
dbsa.Varchar |
✓ | ✓ | ✓ |
dbsa.Char |
✓ | ✓ | ✓ |
dbsa.Varbinary |
✓ | ✓ | |
dbsa.JSON |
✓ | ✓ AS Varchar |
|
dbsa.Date |
✓ | ✓ | ✓ |
dbsa.Time |
✓ | ||
dbsa.Timestamp |
✓ | ✓ | ✓ |
dbsa.Array |
✓ | ✓ | |
dbsa.Map |
✓ | ✓ | |
dbsa.Row |
✓ | ✓ | |
dbsa.IPAddress |
✓ | ✓ AS Varchar |
The following table properties are supported:
Date type | Presto support | Hive support | Redshift support |
---|---|---|---|
dbsa.Format |
✓ | ✓ | |
dbsa.Bucket |
✓ | ✓ | |
dbsa.Sortkey |
✓ | ||
dbsa.DistributionKey |
✓ | ||
dbsa.DistributionStyle |
✓ |
You can set up a pii
object to describe how you wish to handle your PII information stored on HDFS or within Redshift.
import dbsa
pii = dbsa.PII(
EMAIL=dbsa.DataType(transform_on_insert="FUNC_SHA1({quoted_name} || CAST(created_at AS VARCHAR))"),
IP_ADDRESS=dbsa.DataType(drop_on=dbsa.PII.DELETE),
DEVICE_ID=dbsa.DataType(),
)
When you perform an INSERT
statement, the transformations will be done automatically, and if column drop was specified on INSERT, the values will be truncated.
If you set up transform_on_delete
or drop on DELETE
conditions, you must write a pipeline to specify when the condition met.
I recommend to create seperate files for each schemas and namespaces in the airflow/schemas
folder. You can describe a table with the following way:
import dbsa
class Metrics(dbsa.Table):
"""
You can add your table defintion here, which will show up in the documentation automatically.
It helps building a data dictionary of tables and columns, and also document your codebase.
"""
_format = dbsa.Format(format='ORC')
ds = dbsa.Partition(dbsa.Varchar(), comment="Date of the metrics are beging generated.")
aggregation = dbsa.Partition(dbsa.Varchar(), comment="Name of the aggregation. All metrics within an aggregation are populated at the same time - however aggregations can land at different times!")
metric = dbsa.Varchar(comment='Name of a standalone metric. (e.g: visits)')
dimensions = dbsa.Map(primitive_type=dbsa.Varchar(), data_type=dbsa.Varchar(), comment='Dimensions are used for the calculations')
grouping_id = dbsa.Bigint(comment='Unique grouping identifier of the selected dimensions.')
value = dbsa.Double(comment='Value of the metric.')
proportion = dbsa.Double(comment='Proportion of the metric and the total value if it is applicable.')
total = dbsa.Double(comment='Total value if it is applicable.')
This table definition is not binded to any dialect yet. To use the table, you must bind it to one. When creating the table instances, we must specify the name of the schema
, and fill the missing partitions. dbsa
will not quote your data since you can use functions, UDFs, so please put quotes around your data if it's needed.
from dbsa import presto, hive
presto_tbl = presto.Table(Metrics(schema='default', ds="'2019-07-27'"))
hive_tbl = hive.Table(Metrics(schema='default', ds="'2019-07-27'"))
After that, we can start generating SQL queries based on the dialect.
print(presto_tbl.get_delete_current_partition(ignored_partitions=['aggregation']))
# DELETE FROM "default"."metrics"
# WHERE "ds" = '2019-07-27'
print(hive_tbl.get_delete_current_partition(ignored_partitions=['aggregation']))
# ALTER TABLE `default`.`metrics` DROP IF EXISTS PARTITION(
# `ds` = '2019-07-27'
# ) PURGE
As you can see, the dialects are working quite easily, and we can even specify that ignore our aggregation
subpartition.
print(presto_tbl.get_create_table())
# CREATE TABLE IF NOT EXISTS "default"."metrics" (
# "metric" VARCHAR COMMENT 'Name of a standalone metric. (e.g: visits)',
# "dimensions" MAP(VARCHAR, VARCHAR) COMMENT 'Dimensions are used for the calculations',
# "grouping_id" BIGINT COMMENT 'Unique grouping identifier of the selected dimensions.',
# "value" DOUBLE COMMENT 'Value of the metric.',
# "proportion" DOUBLE COMMENT 'Proportion of the metric and the total value if it is applicable.',
# "total" DOUBLE COMMENT 'Total value if it is applicable.',
# "ds" VARCHAR COMMENT 'Date of the metrics are beging generated.',
# "aggregation" VARCHAR COMMENT 'Name of the aggregation. All metrics within an aggregation are populated at the same time - however aggregations can land at different times!'
# )
# COMMENT 'You can add your table defintion here, which will show up in the documentation automatically.
# It helps building a data dictionary of tables and columns, and also document your codebase.'
# WITH (
# partitioned_by = ARRAY[
# 'ds',
# 'aggregation'
# ],
# format = 'ORC'
# )
We support PartitionRetentionPolicy
to set up retentions for your tables. These policies are not enforced however, you must write an Airflow pipeline to drop the old partitions.
class IncomingEvents(dbsa.Table):
"""
Good example how you can set up a 30 days retention for one of your tables. Also, you can see how
you can set up PII classification for your data.
"""
_retention = dbsa.PartitionRetentionPolicy(earliest_partition={'ds': "'{{ macros.ds_add(ds, -30) }}'"})
email = dbsa.Varchar(comment='Email address marked as PII', pii=pii.EMAIL)
You must pick a dialect, and just run the following command.
dbsa-markdown {prest|hive|redshift} file.py