Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADF data flow's checkpoint key cannot be overridden #25380

Open
1 task done
fabrideci opened this issue Mar 22, 2024 · 0 comments
Open
1 task done

ADF data flow's checkpoint key cannot be overridden #25380

fabrideci opened this issue Mar 22, 2024 · 0 comments

Comments

@fabrideci
Copy link

fabrideci commented Mar 22, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Community Note

  • Please vote on this issue by adding a 馃憤 reaction to the original issue to help the community and maintainers prioritize this request
  • Please do not leave "+1" or "me too" comments, they generate extra noise for issue followers and do not help prioritize the request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment and review the contribution guide to help.

Terraform Version

1.7.4

AzureRM Provider Version

3.94.0

Affected Resource(s)/Data Source(s)

azurerm_data_factory_pipeline

Terraform Configuration Files

resource "azurerm_resource_group" "example" {
  name     = "example-resources"
  location = "West Europe"
}

resource "azurerm_storage_account" "example" {
  name                     = "issuewthchpkeyazrm"
  resource_group_name      = azurerm_resource_group.example.name
  location                 = azurerm_resource_group.example.location
  account_tier             = "Standard"
  account_replication_type = "LRS"
  account_kind             = "StorageV2"
  access_tier              = "Cool"
  is_hns_enabled           = "true"
}

resource "azurerm_storage_data_lake_gen2_filesystem" "example" {
  name               = "example-fs"
  storage_account_id = azurerm_storage_account.example.id
}

resource "azurerm_data_factory" "example" {
  name                = "example"
  location            = azurerm_resource_group.example.location
  resource_group_name = azurerm_resource_group.example.name
}

resource "azurerm_data_factory_linked_service_data_lake_storage_gen2" "example" {
  name                 = "datalake"
  data_factory_id      = azurerm_data_factory.example.id
  use_managed_identity = true
  url                  = "https://${azurerm_storage_account.example.name}.dfs.core.windows.net"

  depends_on = [azurerm_storage_account.example]
}

resource "azurerm_data_factory_dataset_json" "dynamic_fs" {
  name                = "exampleDynamicDatasetJson"
  data_factory_id     = azurerm_data_factory.example.id
  linked_service_name = azurerm_data_factory_linked_service_data_lake_storage_gen2.example.name

  parameters = {
    "fileSystem" : ""
  }

  azure_blob_storage_location {
    container = "@dataset().fileSystem"
    path      = ""
    filename  = ""
  }

  encoding = "UTF-8"

  depends_on = [azurerm_storage_data_lake_gen2_filesystem.example]
}

resource "azurerm_data_factory_dataset_parquet" "dynamic_fs" {
  name                = "exampleDynamicDatasetParquet"
  data_factory_id     = azurerm_data_factory.example.id
  linked_service_name = azurerm_data_factory_linked_service_data_lake_storage_gen2.example.name

  parameters = {
    "fileSystem" : ""
  }

  azure_blob_storage_location {
    container = "@dataset().fileSystem"
  }

  compression_codec = "snappy"

  depends_on = [azurerm_storage_data_lake_gen2_filesystem.example]
}

resource "azurerm_data_factory_data_flow" "example" {
  name            = "exampleSinkDataflow"
  data_factory_id = azurerm_data_factory.example.id

  source {
    name = "sourceNewDataJsons"

    dataset {
      name = azurerm_data_factory_dataset_json.dynamic_fs.name
      parameters = {
        "fileSystem" = azurerm_storage_data_lake_gen2_filesystem.example.name
      }
    }
  }

  transformation {
    name = "removeDuplicateRows"
  }

  transformation {
    name = "addPkColumns"
  }

  sink {
    name = "sinkNewDataParquet"

    dataset {
      name = azurerm_data_factory_dataset_parquet.dynamic_fs.name
      parameters = {
        "fileSystem" = azurerm_storage_data_lake_gen2_filesystem.example.name
      }
    }
  }

  script = <<EOT
parameters{
	date as string,
	model as string,
	region as string
}
source(allowSchemaDrift: true,
	validateSchema: false,
	ignoreNoFilesFound: false,
	enableCdc: true,
	mode: 'read',
	skipInitialLoad: false,
	documentForm: 'documentPerLine',
	wildcardPaths:[("region={$region}/model={$model}/year={substring($date, 0, 4)}/month={substring($date, 6, 2)}/**/*.json")]) ~> sourceNewDataJsons
addPkColumns aggregate(groupBy(id,
		createdAt),
	each(match(!in(['id','createdAt'],name)), $$ = first($$)),
	partitionBy('hash', 1)) ~> removeDuplicateRows
sourceNewDataJsons derive(id = toString(byName('id')),
		region = $region,
		model = $model,
		year = substring($date, 0, 4),
		month = substring($date, 6, 2),
		createdAt = toString(byName('createdAt'))) ~> addPkColumns
removeDuplicateRows sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'parquet',
	umask: 0022,
	preCommands: [],
	postCommands: [],
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	partitionBy('key',
		0,
		region,
		model,
		year,
		month
	),
	fileSystem: '${azurerm_storage_data_lake_gen2_filesystem.example.name}',
	compressionCodec: 'snappy') ~> sinkNewDataParquet
EOT

  depends_on = [azurerm_data_factory_dataset_json.dynamic_fs, azurerm_data_factory_dataset_parquet.dynamic_fs]
}

resource "azurerm_data_factory_pipeline" "example" {
  name            = "Example pipeline"
  data_factory_id = azurerm_data_factory.example.id

  activities_json = <<JSON
[
    {
        "name": "Sink new telemetry data into a temporary Parquet",
        "type": "ExecuteDataFlow",
        "dependsOn": [],
        "policy": {
            "timeout": "0.12:00:00",
            "retry": 3,
            "retryIntervalInSeconds": 30,
            "secureOutput": false,
            "secureInput": false
        },
        "userProperties": [],
        "typeProperties": {
            "dataFlow": {
                "referenceName": "exampleSinkDataflow",
                "type": "DataFlowReference",
                "parameters": {
                    "date": {
                        "value": "'2024-03-22'",
                        "type": "Expression"
                    },
                    "model": {
                        "value": "'test_model'",
                        "type": "Expression"
                    },
                    "region": {
                        "value": "'test_region'",
                        "type": "Expression"
                    }
                },
                "datasetParameters": {
                    "sourceNewDataJsons": {
                        "fileSystem": "example-fs"
                    },
                    "sinkNewDataParquet": {
                        "fileSystem": "example-fs"
                    }
                }
            },
            "compute": {
                "coreCount": 32,
                "computeType": "MemoryOptimized"
            },
            "traceLevel": "Fine",
            "continuationSettings": {
                "customizedCheckpointKey": {
                    "value": "@base64(concat('test_region', 'test_model', substring(pipeline().parameters.date, 0, 4), substring(pipeline().parameters.date, 5, 2)))",
                    "type": "Expression"
                }
            }
        }
    }
]
  JSON
}

Debug Output/Panic Output

-

Expected Behaviour

The "Example pipeline" should run the data flow "exampleSinkDataflow" with an overridden checkpoint key - as recommended by Microsoft in case "you're using a dynamic pattern for your source tables or folders"

Actual Behaviour

Even if the terraform apply command goes through without printing any error or warning, the checkpoint key doesn't get overridden. Indeed, if trying to run a terraform plan command just afterwards, it requests again to update in place the "Example pipeline" activities JSON to add the portion of code to override the checkpoint key (which should've been already there though):
"continuationSettings": { "customizedCheckpointKey": { "value": "@base64(concat('test_region', 'test_model', substring(pipeline().parameters.date, 0, 4), substring(pipeline().parameters.date, 5, 2)))", "type": "Expression" } }

Steps to Reproduce

  1. Run a terraform apply command with the configuration provided above
  2. Check on the ADF portal the activity block called "Sink new telemetry data into a temporary Parquet" to see if the checkpoint key is overridden or not
  3. Run a terraform plan command to see that indeed Terraform detects the portion of JSON code needed to override the key is not present on ADF
  4. Run a terraform apply command again to see nothing changes

It's worth to mention that it seems there's an ongoing bug with the ADF portal regarding this specific behaviour. I suppose this might be conflicting with integrations as well like the AzureRM provider, but theoretically it should affect UI only.

After step 1, if you go into the ADF portal and do the trick described in the MS question linked before to let the checkpoint key option show up, the data pipeline works fine but still Terraform is not able to detect the checkpoint key is being overridden and keeps asking me to update in place the state to add the missing code. Also, trying to manually add the missing portion of the JSON on the portal's pipeline editor doesn't work either (the checkpoint key option shows up but then the data pipeline fails as it was not really set).

Important Factoids

No response

References

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant