# Installation Guide

To install the VaultSpeed SDK python package, download the pip.conf file from https://app.vaultspeed.com/artifacts by clicking the download button next the row for "vaultspeed-sdk".
When using Windows, rename it to pip.ini. for macOS or Linux, leave file extension as-is.
In the file, replace \<USERNAME\> and \<PASSWORD\> with your VaultSpeed Web App credentials. Make sure to escape the ´@´ symbol in your username, change it to ´%40´. 
If your password contains special characters, make sure to escape or encode them when needed, it will be part of a basic authentication URL so needs to be URL encoded.

If you are using a Python virtual environment, copy the config file to the directory where you have created your virtual Python environment. You can check whether it was picked up correctly by running ´pip config list´, this should return a list containing the same URL as in the config file.
After this, you can run ´pip install vaultspeed-sdk´ to install the SDK package. The package can be updated by running ´pip install vaultspeed-sdk --upgrade´.

For other tools, search for "Manage Python package repositories" (e.g. https://www.jetbrains.com/help/pycharm/installing-uninstalling-and-upgrading-packages.html#custom-repositories). 
Depending on the tool you might have to enter the URL from the config file in its entirety, or sometimes you can also enter the username and password separately if there is a "Basic HTTP" or "Basic Authentication" option. 
In this the URL would look similar to: ´https://app.vaultspeed.com/api/artifacts/pip/vaultspeed-sdk/simple´, and in this case you also do not have to URL encode your username and password.
After setting up the Package repository, the package ´vaultspeed-sdk´ should appear in the list of available packages in your tool.

# Getting started

To use the SDK, we must first authenticate to the VaultSpeed API. This is usually done by providing a username and password.


In [None]:
from vaultspeed_sdk.client import UserPasswordAuthentication

auth = UserPasswordAuthentication(api_url="https://app.vaultspeed.com/api", username="name@company", password="****")

All interactions with the VaultSpeed API are handled by the Client class. When creating a client, you have to pass authentication. In this case, we will use the one we just created. It is also possible to change the number of retries and the timeout time for the API calls.

In [None]:
from vaultspeed_sdk.client import Client, TaskConfig

client = Client(base_url="https://app.vaultspeed.com/api", auth=auth, retries=2, timeout = 120, caller="docs", task_config=TaskConfig(polling_interval=10, timeout=0, queue_timeout=600, show_progress=True))

The final part of the setup is the initialization of the System class. The system is the starting point for all interactions with VaultSpeed.

In [None]:
from vaultspeed_sdk.system import System

system = System(client=client)


From the system, you can access the system parameters, the database links, the projects, etc. From the project, you can then access the project parameters, the sources, the data vaults, and so on. The full structure with all the dependencies can be found at the end of this chapter.

The system parameters can be viewed as follows:

In [None]:
sys_param = system.parameters
for k, p in sys_param.items():
    print(p)


A parameter can be accessed and updated in multiple ways.


In [None]:
sys_param["USE_FMC"].value = "Y"
sys_param.DELETE_FLAG_POSITIVE_VALUE.value = "true"
sys_param.DELETE_FLAG_NEGATIVE_VALUE.value = "false"

The parameters are an instance of a Map. This is a special version of a python dictionary used by the VaultSpeed SDK. The main difference between a Map and a dict are:
* Elements can be accessed as attributes as well as with the standard dict methods.This is mainly useful when interacting with these objects from a Notebook or the python debugger since, in those cases, your IDE could provide auto-completion of the keys.
* Looping over the map returns the values instead of the keys.

When you try to set a parameter to a value that is not supported, an exception will be raised.

In [None]:
from vaultspeed_sdk.exceptions.allowed_values import AllowedValuesException

try:
    sys_param["USE_FMC"].value = "true"
except AllowedValuesException as e:
    print(e)

The SDK contains a set of specific exceptions, which can be found in vaultspeed_sdk.exceptions. Any error caught by the SDK will raise one of these exceptions.

To apply the parameter changes, we have to execute the save method.

In [None]:
sys_param.save()

From the System, you can also get a list of all the projects, get a specific project by name, create new projects, or delete a project.

In [None]:
moto_project = system.create_project(name="moto", description="example project")
dummy_project = system.create_project(name="dummy", description="dummy project")

for project in system.projects:
    print(project)

system.delete_project(dummy_project)

system.get_project(name="moto")


How the get_project method deals with a project that can not be found can be manipulated with the check attribute. If the check is true, an error is raised if the source is not found. Otherwise, None will be returned.
This allows for creating a simple create if not exist statement.

In [None]:
print(system.get_project(name="dummy", check=False))
try:
    system.get_project(name="dummy", check=True)
except KeyError as e:
    print(e)

project = system.get_project(name="moto", check=False) or system.create_project(name="moto", description="example project")
project

This pattern of the list property and the get, create, and delete methods is used across the SDK.

With a project instance, we can then view and update the project's properties.

In [None]:
print(project.name)
print(project.description)
project.description = "SDK example project"
print(project.description)

From the project, we can access the project parameters, these work in the same way as the system parameters.

Similarly to how we could access the projects from the system, we can access the sources and data vaults from the project.

In [None]:
print(project.parameters)
project.parameters.STORE_BK_FIELDS_IN_SAT.value = "Y"
project.parameters.save()

In [None]:
project.sources

In [None]:
project.data_vaults

# logging

The VaultSpeed SDK uses the standard logging package, so to change the log level add the following lines at the top of your script.

In [None]:
import logging

logging.basicConfig(level=logging.INFO)

# Source

Before creating a source, we first need to create a database link.

The Model used in this example can be found on the VaultSpeed GitHub.

In [None]:
from vaultspeed_sdk.database_link import DatabaseLinkTypes
from vaultspeed_sdk.models.metadata.database_type import DatabaseTypes

source_db_link = (system.get_database_link("vs_postgres", check=False) or
                  system.create_database_link(
                      name="vs_postgres",
                      link_type=DatabaseLinkTypes.AGENT,
                      database_type=DatabaseTypes.POSTGRESQL
                  ))
print(source_db_link)

The package vaultspeed_sdk.models.metadata contains a set of classes storing metadata that are used across the SDK, such as the database types. Each of those consists of an enum like DatabaseTypes and a class DatabaseType. The enum is used for inputs, like in the source creation above.
While the other class is used internally to retrieve identifiers and other properties from the API.

If we want to see if a certain database type can be used without the extended agent, we can retrieve this info as follows.

In [None]:
from vaultspeed_sdk.models.metadata.database_type import (DatabaseType, DatabaseTypes)

print(DatabaseType.get(client, DatabaseTypes.SNOWFLAKE).standard_agent)
print(DatabaseType.get(client, DatabaseTypes.JDBC).standard_agent)

We can now create a new source with this link.

In [None]:
from vaultspeed_sdk.models.metadata.cdc_type import CdcTypes
from vaultspeed_sdk.models.metadata.database_type import DatabaseTypes
from vaultspeed_sdk.models.metadata.source_type import SourceTypes

source = project.get_source("moto_sales", check=False) or \
         project.create_source(
             name="moto_sales",
             short_name="ms",
             bk_name="moto_sales",
             record_name="moto_sales",
             physical_schema="moto_sales",
             cdc_type=CdcTypes.CDC,
             database_link=source_db_link,
             src_type=SourceTypes.AGENT,
             database_type=DatabaseTypes.POSTGRESQL
         )
print(source)

We can also update one of the properties of the source.

In [None]:
source.bk_name = "sales"

Just like the project and system, we can now also change the source parameters.

In [None]:
source.parameters.USE_SOURCE_UK_AS_BK = "Y"
source.parameters.save()

The next step in the source configuration is the harvesting of source objects and selecting the objects we want to use in our release.
This will launch an agent task, so make sure that your agent is running and it can take half a minute before it finishes.

In [None]:
source_objects = source.get_source_objects(refresh=True)
print(source_objects)

The refresh attribute controls whether the object list is harvested through the agent or returns the already harvested objects. Since this is a new source, we should enable refresh.
Now that we harvested the objects, we can select the objects we want to use in our release. Note that in the output above, all object have "enabled: False", meaning that they are not selected.
In our case, we want to select all object, except the ones starting with "jrn_".

In [None]:
# select all objects
source.select_source_objects(source_objects, selected=True)

# Disable object that start with "jrn_", we show 2 methods:

# 1. one-by-one -> many API calls
for src_obj in source_objects:
    if src_obj.name.startswith("jrn_"):
        src_obj.disable()

# 2. in bulk
jrn_source_objects = [src_obj for src_obj in source_objects if src_obj.name.startswith("jrn_")]
print(jrn_source_objects)
source.select_source_objects(jrn_source_objects, selected=False)
print()
print(source.get_source_objects())

Next, we will configure some exclusions and a pattern removal

In [None]:
# exclude specific objects
source.exclude_object(pattern="jrn_%", reason="exclude jrn_ prefixed tables")

print(source.object_exclusions)

# exclude attributes
source.exclude_attribute(pattern="update_user", reason="internal value, not needed in DV")

print(source.attribute_exclusions)

# remove object patterns
source.remove_name_pattern(pattern="moto_", reason="example")

patterns = source.name_removal_patterns

print(source.name_removal_patterns)

## Source Release

We are now ready to create a source release and configure our source model.
This will launch an agent task, so give it a minute to run.

In [None]:
from vaultspeed_sdk.source.release import ReleaseParts

src_rel = source.create_release(number=1, comment="1", keep=ReleaseParts.ALL, import_src_mtd=True)
print(source.releases)

To see which objects are in the release, run the following.

In [None]:
print(src_rel.objects)

The source objects and their attributes can be retrieved in multiple ways, similar to the parameters.
The get_all method allows us to retrieve all objects matching a list of names.
Note that the attribute access (dot notation) is only possible if your object/attribute names are valid python identifiers.

In [None]:
print(src_rel.objects["addresses"])
print(src_rel.objects.get("addresses"))
print(src_rel.objects.addresses)
print(src_rel.objects.addresses.attributes.address_number)
print(src_rel.objects.addresses.attributes["address_number"])
print(src_rel.objects.get_all(["product_features", "product_feature_cat"]))

Next, we can start the configuration of our source model.
We will start by doing some mass updates of objects and attributes.

In [None]:
from vaultspeed_sdk.source.release import AttributeProperties

# do a mass update to make all attributes matching a pattern to be marked as non-historic
src_rel.mass_update_attributes_by_pattern(pattern="update_timestamp", attr_property=AttributeProperties.NON_HISTORIC, value=True)

# mass update some properties for a set of objects
src_rel.mass_update_objects_multi_active(multi_active=True, objects=src_rel.objects.get_all(["product_features", "product_feature_cat"]))
src_rel.mass_update_objects_cdc(cdc_type=CdcTypes.INCR, objects=src_rel.objects.get_all(["invoices"]))

Next, lets do some object and attribute configurations.

In [None]:
from vaultspeed_sdk.models.metadata.object_type import SourceObjectTypes

addresses = src_rel.objects["addresses"]

# update an object parameter
addresses.parameters.STORE_BK_FIELDS_IN_SAT = "Y"
addresses.parameters.save()
print(addresses.parameters)

print(f"addresses has CDC type: {addresses.cdc_type.value}")

# set some business keys
print(f"addresses attributes:")
for attr in addresses.attributes:
    if attr.name in ("street_name", "street_number", "postal_code", "city"):
        attr.business_key = True
    print(attr)

# update the abbreviated name of an attribute
parts = src_rel.objects["parts"]
parts.attributes["part_language_code"].abbreviated_name = "part_langua_code"

# change an object type
src_rel.objects.payments.object_type = SourceObjectTypes.NHL
src_rel.objects.customers.attributes.national_person_id.universal_identifier = True

Creating new relationships can be done as follows.

In [None]:
parts = src_rel.objects["parts"]
part_attr = parts.attributes
print(f"parts: {part_attr}")
codes_to_language = src_rel.objects["codes_to_language"]
ctl_attr = codes_to_language.attributes
print(f"codes_to_language: {ctl_attr}")
# create a relationship with multiple attributes
parts.create_relationship(codes_to_language, [(part_attr.part_number, ctl_attr.code), (part_attr.part_language_code, ctl_attr.language_code)])
for rel in parts.relationships:
    print(rel)
    print(rel.attributes)

Now let's create some driving key relationships.

In [None]:
pfcr = src_rel.objects["product_feat_class_rel"]
pfcr.object_type = SourceObjectTypes.LND
print(pfcr)

# create a driving key relationship
pfcr.create_relationship(src_rel.objects.product_features,
                         [(pfcr.attributes.product_feature_id, src_rel.objects.product_features.attributes.product_feature_id)], driving_key=True)
# create a normal relationship with a specific name
pfcr.create_relationship(src_rel.objects.product_feature_class,
                         [(pfcr.attributes.product_feature_class_id, src_rel.objects.product_feature_class.attributes.product_feature_class_id)],
                         name="relProdFeatClass")
print(pfcr.relationships)

# set the relationship on the product_id attribute to be a driving key
for rel in pfcr.relationships:
    if any([att for att in rel.attributes if att.attribute_name == "product_id"]):
        rel.driving_key = True
        print(rel)

Object splitting is not yet available in the SDK.

We have done all the needed configuration to our source model, so that means that we can now save the source model.

In [None]:
from vaultspeed_sdk.exceptions.release_issue import ReleaseIssue

try:
    src_rel.save_model()
except ReleaseIssue as e:
    print(e)

It looks like there are still some issue with our model, lets resolve them.

In [None]:
print(f"faulty objects:")
for obj in src_rel.objects:
    if not obj.valid:
        print(f"issue with {obj.name} is {obj.check_result}")

# fixing issues
src_rel.objects["product_features"].attributes["product_feature_language_code"].subsequence_attribute = True
src_rel.objects["product_feature_cat"].attributes["prod_feat_cat_language_code"].subsequence_attribute = True
src_rel.objects["payments"].create_relationship(src_rel.objects.customers,
                                                [(src_rel.objects.payments.attributes.customer_number,
                                                  src_rel.objects.customers.attributes.customer_number)])
src_rel.objects["payments"].create_relationship(src_rel.objects.invoices,
                                                [(src_rel.objects.payments.attributes.invoice_number,
                                                  src_rel.objects.invoices.attributes.invoice_number)])

ca = src_rel.objects["cust_addresses"]
ca.object_type = SourceObjectTypes.LND
ca.multi_active = True
ca.attributes["address_type"].subsequence_attribute = True
ca.create_relationship(src_rel.objects.customers, [(ca.attributes.customer_number, src_rel.objects.customers.attributes.customer_number)])
ca.create_relationship(src_rel.objects.addresses, [(ca.attributes.address_number, src_rel.objects.addresses.attributes.address_number)])

In [None]:
print("faulty objects:")
for obj in src_rel.objects:
    if not obj.valid:
        print(f"issue with {obj.name} is {obj.check_result}")

Now we can actually save our model.

In [None]:
src_rel.save_model()
print(f"saved: {src_rel.saved}, editable: {src_rel.editable}, locked: {src_rel.locked}")

We are now no longer allowed to modify the objects in the release.

In [None]:
from requests import HTTPError
from vaultspeed_sdk.exceptions.forbidden_action import ForbiddenActionException

try:
    src_rel.objects.get("parts").short_name = "prts"
except HTTPError as e:
    print(e)

After saving the release, we can set its HUBs to multi or single master and split some satellites.

In [None]:
from vaultspeed_sdk.models.metadata.hub_type import HubTypes

print(src_rel.hubs)
for hub in src_rel.hubs:
    if hub.name in (
    "hub_addresses", "hub_invoice_lines", "hub_invoices", "hub_parts", "hub_product_feature_cat", "hub_product_feature_class", "hub_product_features",
    "hub_product_sensors"):
        hub.hub_type = HubTypes.SINGLE_MASTER


In [None]:
print(src_rel.sats)
s_cust = src_rel.sats["sat_ms_customers"]
print(s_cust)
print(s_cust.attributes)
s_cust.create_split("name", "type", [attr for attr in s_cust.attributes if attr.name in ("first_name", "last_name", "gender")])
s_cust.create_split("birth", "type", [attr for attr in s_cust.attributes if attr.name in ("birthdate", "update_timestamp")])
print(s_cust.splits)
# add an extra attribute to an existing split
split = s_cust.splits["name"]
print(split.attributes)
split.attributes = list(split.attributes) + [attr for attr in s_cust.attributes if attr.name == "update_timestamp"]
print(split.attributes)

Now we can Lock the release.

In [None]:
src_rel.lock()
print(f"release locked: {src_rel.locked}")

We are no longer allowed to modify the HUBs and SATs of this release after locking.

In [None]:
try:
    src_rel.hubs.get("hub_addresses").hub_type = HubTypes.MULTI_MASTER
except ForbiddenActionException as e:
    print(e)

## Second Source
let's quickly configure a second source, so that we can integrate them in a Data Vault.

In [None]:
source = project.get_source("moto_mktg", check=False) or \
         project.create_source(
             name="moto_mktg",
             short_name="mm",
             bk_name="moto_mktg",
             record_name="moto_mktg",
             physical_schema="moto_mktg",
             cdc_type=CdcTypes.CDC,
             database_link=source_db_link,
             src_type=SourceTypes.AGENT,
             database_type=DatabaseTypes.POSTGRESQL
         )
source.parameters.USE_SOURCE_UK_AS_BK = "Y"
source.parameters.save()
source_objects = source.get_source_objects(refresh=True)
source.select_source_objects(source_objects, selected=True)
source.exclude_object(pattern="jrn_%", reason="exclude jrn_ prefixed tables")
source.exclude_attribute(pattern="update_user", reason="internal value, not needed in DV")

src_rel = source.create_release(number=1, comment="1", keep=ReleaseParts.ALL, import_src_mtd=True)
src_rel.mass_update_attributes_by_pattern(pattern="update_timestamp", attr_property=AttributeProperties.NON_HISTORIC, value=True)

In [None]:
camp_part_cont = src_rel.objects["camp_part_cont"]
campaigns = src_rel.objects["campaigns"]
camp_moto_chan_region = src_rel.objects["camp_moto_chan_region"]
camp_moto_channel = src_rel.objects["camp_moto_channel"]

camp_part_cont.object_type = SourceObjectTypes.LND
camp_part_cont.create_relationship(campaigns, [
    (camp_part_cont.attributes.campaign_code, campaigns.attributes.campaign_code),
    (camp_part_cont.attributes.campaign_start_date, campaigns.attributes.campaign_start_date)
])

camp_moto_chan_region.object_type = SourceObjectTypes.LND
camp_moto_chan_region.multi_active = True
camp_moto_chan_region.attributes["region"].subsequence_attribute = True
camp_moto_chan_region.create_relationship(campaigns, [
    (camp_moto_chan_region.attributes.campaign_code, campaigns.attributes.campaign_code),
    (camp_moto_chan_region.attributes.campaign_start_date, campaigns.attributes.campaign_start_date)
])
camp_moto_chan_region.create_relationship(src_rel.objects["channels"], [
    (camp_moto_chan_region.attributes.channel_id, src_rel.objects["channels"].attributes.channel_id)
])
camp_moto_chan_region.create_relationship(src_rel.objects["motorcycles"], [
    (camp_moto_chan_region.attributes.motorcycle_id, src_rel.objects["motorcycles"].attributes.motorcycle_id)
])

camp_moto_channel.object_type = SourceObjectTypes.LND
camp_moto_channel.multi_active = True
camp_moto_channel.parameters.STORE_BK_FIELDS_IN_SAT = "Y"
camp_moto_channel.attributes["from_date"].subsequence_attribute = True
camp_moto_channel.create_relationship(campaigns, [
    (camp_moto_channel.attributes.campaign_code, campaigns.attributes.campaign_code),
    (camp_moto_channel.attributes.campaign_start_date, campaigns.attributes.campaign_start_date)
])
camp_moto_channel.create_relationship(src_rel.objects["channels"], [
    (camp_moto_channel.attributes.channel_id, src_rel.objects["channels"].attributes.channel_id)
])

src_rel.objects.e_mails.object_type = SourceObjectTypes.SAT
src_rel.objects.phones.object_type = SourceObjectTypes.SAT
src_rel.objects.party_contacts.object_type = SourceObjectTypes.LND
src_rel.objects.campaign_motorcycles.object_type = SourceObjectTypes.LND

src_rel.save_model()

In [None]:
for hub in src_rel.hubs:
    if hub.name in ("hub_addresses", "hub_contacts", "hub_campaigns", "hub_channels"):
        hub.hub_type = HubTypes.SINGLE_MASTER

scm = src_rel.sats["lds_mm_campaign_motorcycles"]
scm.create_split("class", "type", [attr for attr in scm.attributes if attr.name in ("motorcycle_class_desc", "motorcycle_subclass_desc", "update_timestamp")])
scm.create_split("emo", "type", [attr for attr in scm.attributes if attr.name in ("motorcycle_comment", "motorcycle_emotion_desc", "update_timestamp")])
src_rel.lock()

# Data Vault

Now that we have a locked source release, we can create our Data Vault.

In [None]:
dv = project.create_data_vault(code="moto", name="moto_sf", database_type=DatabaseTypes.SNOWFLAKE)
print(project.data_vaults)

Existing data vaults can be retrieved by name or code.
Note that the code does not have to be unique, meaning that there can be multiple DVs with the same code, in those cases only the last DV with that code will be returned.
To get all of them, filter the project.data_vaults list.

In [None]:
print(project.get_data_vault(name="moto_sf"))
print(project.get_data_vault(code="moto"))
print([dv for dv in project.data_vaults if dv.code == "moto"])

Modify some DV parameters next.

In [None]:
dv.parameters.USE_MERGE_STATEMENT = "Y"
print(dv.parameters)

## Data Vault Release
Next, we will create a Data Vault Release and start configuring it.

In [None]:
from vaultspeed_sdk.models.util import get_last

# get the latest release for all our sources
latest_source_releases = []
for src in project.sources:
    if src.name in ("moto_sales", "moto_mktg"):
        locked_releases = [rel for rel in src.releases if rel.locked]
        if locked_releases:
            # we can use here the utility function "get_last" which will retrieve the element from 
            # the list with the largest value for its "date" attribute
            latest_source_releases.append(get_last(locked_releases))

print(latest_source_releases)
# create a new Data Vault Release
dv_release = dv.create_release(source_releases=latest_source_releases, name="v1", number=1, comment="first release")

print(dv.releases)

First, we will take a look at all the hubs in our Data Vault, and the groups that are automatically created by VaultSpeed.

In [None]:
grouped_hubs = dv_release.grouped_hubs
ungrouped_hubs = dv_release.ungrouped_hubs
print(grouped_hubs)
print(ungrouped_hubs)
print(grouped_hubs["hub_addresses"].elements)

We can now create a new hub group and configure it.

In [None]:
logging.basicConfig(level=logging.DEBUG)
cust = dv_release.create_hub_group(abbreviated_name="customers", short_name="customers", hubs=[ungrouped_hubs.hub_customers, ungrouped_hubs.hub_party])
print(cust)
print(cust.elements)

# change the short name
cust.short_name = "cust"
# enable business key concatenation
addr = dv_release.grouped_hubs["hub_addresses"]
print(addr.is_concat_necessary)
addr.business_key_concat = True

We can also create empty groups, these can later be filled in when we add more sources.

In [None]:
dv_release.create_hub_group("employees", "employees", [])

We can also add hubs to a group, or move hubs between groups.

In [None]:
products = dv_release.create_hub_group(abbreviated_name="products", short_name="products", hubs=[ungrouped_hubs.hub_products])

cust.add_hub_element(ungrouped_hubs.hub_motorcycles)
print(cust.elements)

hub_motorcycles = [he for he in cust.elements if he.name == "hub_motorcycles"][0]
cust.transfer_hub_element(hub_motorcycles, products)
print(cust.elements)
print(products.elements)

Finally, we can set one of the sources for addresses to be a slave source.

In [None]:
addr = grouped_hubs["hub_addresses"]
print(addr.elements)
mktg_addr = [he for he in addr.elements if he.source_name == "moto_mktg"][0]
print(mktg_addr.possible_master_slave_settings)

mktg_addr.set_master_slave(mktg_addr.possible_master_slave_settings["Slave of hub_addresses from source moto_sales"])

# alternatively, it can also be done based on a specific source object
moto_sales = project.get_source("moto_sales")
for src in mktg_addr.possible_master_slave_settings:
    if src == moto_sales:
        mktg_addr.set_master_slave(src)

Besides hubs, we also have Links and SATs which we can manipulate.

In [None]:
print(dv_release.links)
dv_release.links["lnk_cust_addresses_customerinvoiceaddressid"].short_name = "invoice"
dv_release.links["lnk_cust_addresses_customershiptoaddressid"].short_name = "ship"
print(dv_release.many_to_many_links)
party_contacts = dv_release.many_to_many_links["lnd_party_contacts"]
party_contacts.abbreviated_name = "customers_contacts"
party_contacts.short_name = "cust_cont"
dv_release.many_to_many_links["lnd_cust_addresses"].abbreviated_name = "customer_addresses"
print(dv_release.non_historical_links)

The last part of our DV which we can modify is the data types, their mappings and the exception values.

In [None]:
from vaultspeed_sdk.models.metadata.data_type_subgroup import DataTypeSubgroups

print(dv_release.data_types)
# create a new data type
if "gps" not in dv_release.data_types:
    gps = dv_release.create_data_type(name="gps", datatype_subgroup=DataTypeSubgroups.STRING, null_value="(0, 0, 0)", unknown_value="(-1, -1, -1)")
else:
    gps = dv_release.data_types["gps"]
print(gps)

# update a data type
dv_release.data_types["NUMBER"].max_data_length = 32

# update the data type mappings
print(dv_release.data_type_mappings)
dv_release.data_type_mappings["TIMESTAMP"].data_type_target = dv_release.data_types["TIMESTAMP_TZ"]
dv_release.data_type_mappings["BPCHAR"].data_length_target = "2*x"

# update the special values
special_values = dv_release.special_values
print(special_values)
special_values["VARCHAR"].null_value = "~NL~"
special_values["VARCHAR"].unknown_value = "~UN~"
special_values["DATE"].null_value = "31/12/2499"
special_values["DATE"].unknown_value = "31/12/2599"

Now that our DV has been configured, we can lock it.

In [None]:
dv_release.lock()

After locking, we can take a look at the completed DV structure (objects only for now).

In [None]:
dv_objects = dv_release.objects
print(dv_objects)

# Business Vault
After completing the Data Vault, we can now start on defining the Business Vault.
First we create a new release.

In [None]:
bv_release = dv_release.create_business_vault_release(name="v1", comment="first release")

Let's start by setting our business views.

In [None]:
business_views = bv_release.business_views
print(business_views)

# Change the business name and generate option for a Business View
business_views["sat_mm_products"].business_name = "sat_moto_products"
business_views["lks_ms_invoicelines_parts"].generate = False

print(business_views.sat_mm_products.business_name)
print(business_views.lks_ms_invoicelines_parts.generate)

# Get the attributes of a Business View and print them out
business_view_attributes = business_views["sat_mm_products"].attributes
print(business_view_attributes)

# Change the business name and generate option for a Business View Attribute
business_view_attributes["motorcycle_name"].business_name = "motorcycle_display_name"
business_view_attributes["update_timestamp"].generate = False

print(business_view_attributes["motorcycle_name"].business_name)
print(business_view_attributes["update_timestamp"].generate)

Next, we can create some PITs

In [None]:

from vaultspeed_sdk.business_vault.pit import FrequencyType, PitType, TimestampColumnTypes

snap_pit = bv_release.create_pit(
    name="daily_load",
    snapshot_frequency=1,
    frequency_type=FrequencyType.DAY,
    pit_type=PitType.SNAPSHOT,
    timestamp_type=TimestampColumnTypes.LOAD_TIMESTAMP,
    tables=[dv_objects.hub_contacts]
)
print(snap_pit)

det_pit = bv_release.create_pit(
    name="detailed_transactions",
    pit_type=PitType.DETAIL,
    timestamp_type=TimestampColumnTypes.TRANS_TIMESTAMP,
    tables=[dv_objects.lnd_campaign_motorcycles]
)

# Add and remove an extra DV object to an existing PIT template
det_pit.add_dv_object(dv_objects.lnk_cust_addresses)
det_pit.remove_dv_object(dv_objects.lnd_campaign_motorcycles)

print(bv_release.pits)

The DV objects have a function which allows you to look at which hubs are connected to each hub, and via which links. This can then be used to build up the bridges.

In [None]:
neighbours = dv_objects["hub_contacts"].get_adjacent_objects()
for neighbour in neighbours:
    print(f"hub: {neighbour[1]} via link: {neighbour[0]}")

Bridges can then be created as follows.

In [None]:
bridge_path = [dv_objects.hub_contacts, dv_objects.lnd_customers_contacts, dv_objects.hub_customers]
new_bridge = bv_release.create_bridge(name="contact_address", objects=bridge_path, objects_with_bks=[dv_objects["hub_customers"]],
                                      create_bridge_hk=True)
print(new_bridge.elements)
# complete the bridge by adding extra HUBs
new_path = new_bridge.elements + [dv_objects.lnk_cust_addresses, dv_objects.hub_addresses]
new_bridge.set_elements(objects=new_path, objects_with_bks=[be for be in new_bridge.elements if be.include_bk])

# remove the bridge hash key
new_bridge.create_bridge_hk = False

print(bv_release.bridges)

Finally, we will lock the business vault

In [None]:
bv_release.lock()
print(bv_release.locked)

# Generation
Now that our business vault has been completed, we can generate code.
A full generation can be done as follows.

In [None]:

from vaultspeed_sdk.models.metadata.load_type import LoadTypes
from vaultspeed_sdk.models.metadata.etl_generation_type import EtlGenerationTypes

my_ddls = system.generate_ddl(
    bv_release=bv_release,
    etl_generation_type=EtlGenerationTypes.SNOWFLAKESQL,
    load_type=LoadTypes.ALL
)

print(my_ddls)

my_etls = system.generate_etl(
    bv_release=bv_release,
    etl_generation_type=EtlGenerationTypes.SNOWFLAKESQL,
    load_type=LoadTypes.ALL
)

print(my_etls)

Creating a delta generation is done in a similar method.

In [None]:
from datetime import datetime

old_bv_release = bv_release

# set our DV release to a prod release
dv_release.upgrade_to_production(datetime.now())

# create a new DV release and make a change to a parameter
new_dv_release = dv.create_release(latest_source_releases, number=2, name="v2", comment="second release")
dv.parameters.ADD_RECORD_SOURCE_ATTRIBUTE = True
new_dv_release.lock()
new_bv_release = new_dv_release.business_vault_releases[0]

my_deltas = system.generate_delta(
    old_bv_release=old_bv_release,
    new_bv_release=new_bv_release,
    etl_generation_type=EtlGenerationTypes.SNOWFLAKESQL
)

print(my_deltas)

To deploy the generated code, we have multiple options, we can create a db link and deploy the code through that.
It's also possible to just download the code and store it locally. This processes the file names similar to the GIT deploy, meaning that it removes the generation ids and the timestamps from the file names, as well as putting it in the correct sub-folders.
The other options of a git deploy or a deployment with a custom script are also available.

In [None]:
from pathlib import Path

target_link = system.get_database_link(name="sf", check=False) or system.create_database_link(name="sf", link_type=DatabaseLinkTypes.AGENT, database_type=DatabaseTypes.SNOWFLAKE)

for ddl in my_ddls:
    ddl.deploy_to_target(target_link)

for etl in my_etls:
    etl.download_files_to(path=Path("."), keep_zip=False)

# FMC
We can also use the SDK to create all the FMC flows we need to load our Data Vault.

In [None]:
from datetime import timezone
from vaultspeed_sdk.fmc import FlowTypes

for load_type in [LoadTypes.INIT, LoadTypes.INCR]:
    for source in project.sources:
        dv.create_fmc_flow(
            name=f"{source.name}_{load_type.value.lower()}",
            description=f"{source.name}_{load_type.value.lower()}",
            start_date=datetime.now(timezone.utc).replace(minute=0, hour=0, second=0, microsecond=0),
            concurrency=4,
            flow_type=FlowTypes.FL,
            load_type=load_type,
            group_tasks=False,
            dv_connection_name="dv",
            source=source,
            schedule_interval="\"@hourly\"",
            src_connection_name="src"
        )

    dv.create_fmc_flow(
        name=f"{dv.code}_BV_{load_type.value.lower()}",
        description=f"{dv.code}_BV_{load_type.value.lower()}",
        start_date=datetime.now(timezone.utc).replace(minute=0, hour=0, second=0, microsecond=0),
        concurrency=4,
        flow_type=FlowTypes.BV,
        load_type=load_type,
        group_tasks=False,
        dv_connection_name="dv",
        schedule_interval="timedelta(hours=1)"
    )

print(dv.fmc_flows)

# Update Flows
for flow in dv.fmc_flows:
    # change the connection name for all flows
    flow.dv_connection_name = dv.code + "_target"
    if flow.load_type == LoadTypes.INCR:
        # increase the concurrency for all incremental loads
        flow.concurrency = 8

# increase the load frequency of the moto sales incremental load
dv.get_fmc_flow(name="moto_sales_incr").schedule_interval = "timedelta(minutes=30)"

In [None]:
from vaultspeed_sdk.models.base_generation import Generation
from typing import List

generations: List[Generation] = []
for flow in dv.fmc_flows:
    print(f"available generations for flow {flow.name}:")
    etl_generations = flow.etl_generations
    for gen in etl_generations:
        print(gen)
    # generate FMC code for the first ETL generation (not the second one, which was our delta example)
    if etl_generations:
        fmc_generation = flow.generate(get_last(etl_generations))
        generations.append(fmc_generation)
    else:
        print("No valid ETL generations where found for this FMC workflow")

# View  FMC generations
for flow in dv.fmc_flows:
    print(f"generations of flow {flow.name}:")
    for gen in flow.generations:
        print(gen)

# deploy/download FMC code
for generation in generations:
    generation.download_files_to(path=Path("."), keep_zip=False)

# VaultSpeed Studio
To show how the Studio part of the SDK works, we are going to start by defining some signatures, and then we are going to create a template for a calculated satellite.

We start by creating a new Business Vault release, since the previous one is already locked.

In [None]:
bv_release = dv_release.create_business_vault_release(name="v2", comment="adding studio templates")


## Signatures
We will start by defining a new layer/schema in which we can place attributes generated by a template.

In [None]:
info_layer = bv_release.create_signature_layer(name="information_mar", order=2)
print(info_layer)

We have made a type, so we will now update the properties of this layer to fix this.

In [None]:
info_layer.name = "information_mart"
info_layer.order = 1

print(bv_release.signature_layers)

Next, we can create a signature object.

In [None]:
dim_obj = bv_release.create_signature_object(name="DIM_LVL1")
print(bv_release.signature_objects)

We can now assign this signature to one of our physical objects from the raw and business vault.
These are our physical objects:

In [None]:
bv_release.objects

Let's assign our new signature object to one of the satellites.

In [None]:
products = bv_release.objects["sat_ms_products"]
products.add_signature(dim_obj)
print(products.signatures)

# remove a signature
products.remove_signature(dim_obj)
print(products.signatures)

# add it back, with this method of assigning signatures, multiple can be added at once
products.signatures = [dim_obj]
print(products.signatures)

The exact same can be done for the signature attributes as well.

In [None]:
# signature
dim_key = bv_release.create_signature_attribute(name="SRC_DIM_KEY")
print(bv_release.signature_attributes)

# physical attributes
address_attributes = bv_release.objects["hub_addresses"].attributes
print(address_attributes)
addr_hk = address_attributes["addresses_hkey"]

addr_hk.add_signature(dim_key)
print(addr_hk.signatures)

addr_hk.remove_signature(dim_key)
print(addr_hk.signatures)

addr_hk.signatures = [dim_key]
print(addr_hk.signatures)

# note that the BV objects don't have attributes on which signatures can be defined
print("bridge_contact_address" in bv_release.objects)

## Templates
Next up, we will create a template which, for each satellite, contains only the active records, indicates how long ago the last change was, the day on which this change happened, and finally we will also add some hashed attributes.

First define a new template build on top of our satellites, we will virtualize the result in views that will be called `curr_<sat_name>`.

In [None]:
from vaultspeed_sdk.models.metadata.storage_type import StorageTypes
from vaultspeed_sdk.models.metadata.object_type import ObjectTypes

template = bv_release.create_template(name="current_sat", description="views that contain only the currently active records", prefix="CURR",
                                      signature_name="CURR", storage_type=StorageTypes.VIEW, load_type=LoadTypes.ALL, base_type=ObjectTypes.SAT)
print(bv_release.templates)

Now, we add the template code.

In [None]:
template.template_etl = """
template curr_sat

comp_group_start max_ld_group INL_V_GRP
componentGroupConditionedBy [(TAB SAT : INSERT_ONLY_LOGIC = Y)]
componentgrouprepeatedbycomponent SAT

    consists of Aggregated inline_view max_ld
    componentrepeatedbycomponent SAT
    connectsFrom (SAT_MLD_SRC)

        Aggregated Artifact MAX_LOAD_TIMESTAMP
            expressedBy MAX(SAT_MLD_SRC.LOAD_TIMESTAMP)
            expressionRepeatedByColumn SAT_MLD_SRC.LOAD_TIMESTAMP

        Attribute OBJECT_H_KEY
            expressedBy SAT_MLD_SRC.OBJECT_H_KEY
            expressionRepeatedByColumn SAT_MLD_SRC.OBJECT_H_KEY

    consists of source table SAT_MLD_SRC
    componentrepeatedbycomponent SAT

comp_group_end

comp_group_start curr_sat_group INS_GRP
componentgrouprepeatedbycomponent CURR

    consists of target table curr_sat
    componentrepeatedbycomponent CURR
    connectsFrom (sat_filter)

        Attribute OBJECT_H_KEY
            expressedBy sat_src.OBJECT_H_KEY
            expressionRepeatedByColumn sat_src.OBJECT_H_KEY

        Attribute LOAD_CYCLE_ID
            expressedBy sat_src.LOAD_CYCLE_ID
            expressionRepeatedByColumn sat_src.LOAD_CYCLE_ID

        Attribute LOAD_TIMESTAMP
            expressedBy sat_src.LOAD_TIMESTAMP
            expressionRepeatedByColumn sat_src.LOAD_TIMESTAMP

        Attribute TRANS_TIMESTAMP
            expressedBy sat_src.TRANS_TIMESTAMP
            expressionRepeatedByColumn sat_src.TRANS_TIMESTAMP

        Attribute BUSINESS_SRC_KEY
            expressedBy sat_src.BUSINESS_SRC_KEY
            expressionRepeatedByColumn sat_src.BUSINESS_SRC_KEY

        Attribute PRIMARY_KEY
            expressedBy sat_src.PRIMARY_KEY
            expressionRepeatedByColumn sat_src.PRIMARY_KEY

        Attribute FOREIGN_KEY
            expressedBy sat_src.FOREIGN_KEY
            expressionRepeatedByColumn sat_src.FOREIGN_KEY

        Attribute OTHER_ATTR
            expressedBy sat_src.OTHER_ATTR
            expressionRepeatedByColumn sat_src.OTHER_ATTR

        Attribute TIME_SINCE_CHANGE
            expressedBy CURRENT_TIMESTAMP - sat_src.LOAD_TIMESTAMP

        Attribute SECRET_BK
            expressedBy HASHFUNC[sat_src.SECRET_BK]
            expressionRepeatedByColumn sat_src.SECRET_BK

        Attribute DAY_OF_CHANGE
            expressedBy date_trunc(#day#, sat_src.LOAD_TIMESTAMP)
            expressionRepeatedByColumn sat_src.LOAD_TIMESTAMP

    consists of source table sat_src
    componentrepeatedbycomponent SAT

    consists of inner join sat_max_ld_join
    componentConditionedBy [(TAB SAT : INSERT_ONLY_LOGIC = Y)]
    componentrepeatedbycomponent SAT
    connectsFrom (sat_src)
    connectsFrom (max_ld)

        Artifact GENERAL_EXPRESSION
            expressedBy sat_src.OBJECT_H_KEY = max_ld.OBJECT_H_KEY AND sat_src.LOAD_TIMESTAMP = max_ld.MAX_LOAD_TIMESTAMP

    consists of joined inline_view max_ld
    componentConditionedBy [(TAB SAT : INSERT_ONLY_LOGIC = Y)]
    componentrepeatedbycomponent SAT

    consists of filter sat_filter
    componentrepeatedbycomponent SAT
    connectsFrom (sat_max_ld_join)
                connectionConditionedBy [(TAB SAT : INSERT_ONLY_LOGIC = Y)]
                (sat_src)
                connectionConditionedBy [(TAB SAT : INSERT_ONLY_LOGIC = N)]

        Artifact GENERAL_EXPRESSION
            group_1 expressedBy sat_src.LOAD_END_TIMESTAMP = GTIMECAST[@#CURRENT_RECORD_LOAD_END_DATE#] and
            expressionConditionedBy [(TAB SAT : INSERT_ONLY_LOGIC = N)]
            expressionRepeatedByColumn sat_src.LOAD_END_TIMESTAMP

            group_2 expressedBy sat_src.DELETE_FLAG = GCASTFRMT[@#DELETE_FLAG_NEGATIVE_VALUE#]
            expressionRepeatedByColumn sat_src.DELETE_FLAG

comp_group_end
"""

We want to generate these calculated SATs for all satellites of the moto sales source.
The template dependencies contains a list of all object of the base type of the template.
The linked property of a dependency enables the generation for that object.

For each of the objects that we add, we will also print out its dependencies, these are the other objects that are available in the template.
Since our template is based on a SAT, this will include just the HUBs.

In [None]:
for dep in template.dependencies:
    print(dep)
    if "sat_ms" in dep.name and dep.tab_type == ObjectTypes.SAT:
        dep.linked = True
        print(dep.references)

The last thing we need to do it to define the attributes of the target object of the template.
To see which attributes are available to be used in the template we can look at the following.

In [None]:
template.attributes

Since our base type is a SAT, we will see all SAT and HUB signature attributes, as well as the user defined signature attributes on those objects.
Let's start by adding all the SAT attributes to the target of or template.
These are the so called "existing" attributes of the template.

In [None]:
for attr in template.attributes:
    if attr.object_type == ObjectTypes.SAT:
        template.create_target_attribute_from_existing(attr)
print(template.target_attributes)

Because our calculated SAT contains only active records we can remove the end date and the delete flag.

In [None]:
from vaultspeed_sdk.business_vault.template_target_attribute import TemplateTargetExistingAttribute

attr_to_remove = [attr for attr in template.attributes if attr.object_type == ObjectTypes.SAT and attr.name in ("LOAD_END_TIMESTAMP", "DELETE_FLAG")]
for attr in template.target_attributes:
    if isinstance(attr, TemplateTargetExistingAttribute) and attr.attribute_type in attr_to_remove:
        template.delete_target_attribute(attr)

We can now add a new attribute to the current sat which will store how long ago the last change was
In this case we do not need to base it on an existing attribute

In [None]:
template.create_target_attribute_unique(name="TIME_SINCE_CHANGE", data_type=dv_release.data_types["TIMESTAMP_NTZ"])

We can also create a new attribute which contains the load date truncated to the day.
In this case we could base the attribute on the load date. Then we do not have to specify the data type, it will use the same type of the load date attribute, unless specified otherwise.

In [None]:
template.create_target_attribute_unique(name="DAY_OF_CHANGE", attribute_type=template.get_attribute("LOAD_TIMESTAMP", ObjectTypes.SAT),
                                        data_type=dv_release.data_types["TIMESTAMP_NTZ"])

We can also add a new attribute for each business key that we tag as being secret, these attributes will contain hashed business keys.

In [None]:
secret_signature = bv_release.create_signature_attribute(name="SECRET_BK")
secret_attributes = [bv_release.objects["sat_ms_customers_name"].attributes["national_person_id"],
                     bv_release.objects["sat_ms_customers_birth"].attributes["national_person_id"]]
for attr in secret_attributes:
    attr.add_signature(secret_signature)

for attr in template.attributes:
    if attr == secret_signature:
        # if we don't provide a data type, our new attributes will inherit their data type from the underlying business key
        # so, since our attribute will contain a hash in hex format, we need to specify the data type and the data length
        template.create_target_attribute_non_unique(attr, prefix="secret", data_type=dv_release.data_types["VARCHAR"], data_length="64")

print(template.target_attributes)

To verify that our template is working properly we can generate some example code for one of our SATs

In [None]:
example_code, generation = system.generate_template_example(bv_release=bv_release, template=template, base_object=template.dependencies["sat_ms_addresses"],
                                                            etl_type=EtlGenerationTypes.SNOWFLAKESQL)
print(example_code)