# Conforming data changes to Delta Format
Using a parameterized notebook we are able to call a notebook from a pipeline and provide it parameters as to what we would like it to run.

The first cell is our parameter cell, the value of current_version will be changed by our Pipeline.

In [10]:
current_version = "1974"

StatementMeta(, f3e30052-b5fc-451c-8f76-4e07cd29e395, 15, Finished, Available)

This is a JSON Object that maps our parquet files to specific join criteria for the merge operation. 

Merges use basic matching criteria to determine if objects exist in the table and because we have different columns for each Primary Key/Id we need to provide a mapping.

In [11]:
control_object = {
    "appInventory.parquet": "source.Id = target.Id",
    "appKiosk.parquet": "source.Id = target.Id",
    "appPurchaseLineItems.parquet": "source.Id = target.Id",
    "appPurchases.parquet": "source.Id = target.Id",
    "appPurchaseUser.parquet": "source.PurchasesId = target.PurchasesId AND source.PurchasingUsersId = target.PurchasingUsersId",
    "appPurchaseUserCreditCard.parquet": "source.PurchasesId = target.PurchasesId AND source.PaymentCardsId = target.PaymentCardsId",
    "appRentals.parquet":  "source.Id = target.Id",
    "appReturns.parquet": "source.Id = target.Id",
    "appUserAddresses.parquet":"source.Id = target.Id",
    "appUserCreditCards.parquet":"source.Id = target.Id",
    "appUserReviews.parquet":"source.Id = target.Id",
    "appUsers.parquet":"source.Id = target.Id",
    "appUserSubscriptionStatus.parquet":"source.Id = target.Id",
    "dboactors.parquet": "source.actor_id = target.actor_id",
    "dboactorstomoviesjoin.parquet": "source.actor_id = target.actor_id AND source.movie_id = target.movie_id",
    "dbocriticreviews.parquet": "source.critic_review_id = target.critic_review_id",
    "dbodirectors.parquet": "source.director_id = target.director_id",
    "dbodirectorstomoviesjoin.parquet": "source.director_id = target.director_id AND source.movie_id = target.movie_id",
    "dbogenres.parquet": "source.genre_id = target.genre_id",
    "dbomovies.parquet": "source.movie_id = target.movie_id"
}


StatementMeta(, f3e30052-b5fc-451c-8f76-4e07cd29e395, 16, Finished, Available)

Similar to the Full Load process, we use the runMultiple method provided in the mssparkutils library to run these jobs in parallel, hopefully speeding up the ingestion process. 

You will also note we are using the current_version variable to select our directory to load from.

In [None]:
from notebookutils import mssparkutils

# Replace this with your directory name that you loaded to initially
directory = 'Files/ChangeTrackingDeltas/' + current_version

# This puts all the files in the directory listed above into a list we can iterate over
parquet_files = mssparkutils.fs.ls(directory)

# Here we loop through each file and load it to a delta table using the name of the file before the dot to name the table
# This will overwrite the tables with the data that is read, make sure that is intended.

p_notebooks = []

for parquet in parquet_files:
    print(parquet.path)
    table_name = parquet.name.split('.')[0]
    notebook = {
        "name": table_name,
        "path": "DeltaSingleTable_ForParallel",
        "args": {
            "parquet_name": parquet.name,
            "parquet_path": parquet.path,
            "match_method": control_object[parquet.name]
        },
    }
    p_notebooks.append(notebook)

notebooks = {
    "activities": p_notebooks
}

mssparkutils.notebook.runMultiple(notebooks, {"displayDAGViaGraphviz": True})