Skip to content

Commit

Permalink
[datafactory] fix data flow update does not work (#7062)
Browse files Browse the repository at this point in the history
  • Loading branch information
JessicaLHartog committed Jan 25, 2024
1 parent b9719fb commit 9101c64
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 81 deletions.
3 changes: 3 additions & 0 deletions src/datafactory/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Release History
===============
1.0.1
* Fix broken dataflow update command

1.0.0
+++++
* Support --enable-self-contained-interactive-authoring in integration-runtime self-hosted create
Expand Down
151 changes: 71 additions & 80 deletions src/datafactory/azext_datafactory/manual/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,50 @@ def datafactory_data_flow_delete(
)


def get_type_properties_from_properties(properties, create=True):
logger = get_logger()
type_properties_key = "typeProperties"
sources_key = "sources"
sources_value = []
sinks_key = "sinks"
sinks_value = []
transformations_key = "transformations"
transformations_value = []
script_lines_key = "scriptLines"
script_lines_value = []
if not create:
logger.warning(
"If the definition is not correct here, the resource in ADF may be malformed"
)
if type_properties_key in properties.keys():
logger.warning(
'Any malformed "%s" sub-item will result in an '
'incomplete definition once viewed on ADF.', type_properties_key
)
if ((sources_key in properties[type_properties_key].keys() or
sinks_key in properties[type_properties_key].keys())
and (script_lines_key not in properties[type_properties_key].keys())):
logger.warning(
"Not including a scriptLines in this case may result "
"in a malformed data flow"
)
if sources_key in properties[type_properties_key].keys():
sources_value = properties[type_properties_key][sources_key]
if sinks_key in properties[type_properties_key].keys():
sinks_value = properties[type_properties_key][sinks_key]
if transformations_key in properties[type_properties_key].keys():
transformations_value = properties[type_properties_key][transformations_key]
if script_lines_key in properties[type_properties_key].keys():
script_lines_value = properties[type_properties_key][script_lines_key]
type_properties = {
sources_key: sources_value,
sinks_key: sinks_value,
transformations_key: transformations_value,
script_lines_key: script_lines_value
}
return type_properties


def datafactory_data_flow_create(
client,
resource_group_name,
Expand All @@ -97,68 +141,28 @@ def datafactory_data_flow_create(
flow_type,
if_match=None,
):
logger = get_logger()
data_flow = {}
data_flow["properties"] = {}
data_flow_properties = {}
if flow_type not in DATA_FLOW_SUBTYPES:
raise CLIError(
f"Not a valid type of dataflow. Valid choices: {DATA_FLOW_SUBTYPES}"
)

data_flow["properties"]["type"] = flow_type
data_flow["properties"]["description"] = (
data_flow_properties["type"] = flow_type
data_flow_properties["description"] = (
properties["description"] if "description" in properties.keys() else ""
)
data_flow["properties"]["folder"] = (
data_flow_properties["folder"] = (
{"name": properties["folder"]["name"]}
if "folder" in properties.keys()
else None
)
data_flow["properties"]["annotations"] = (
data_flow_properties["annotations"] = (
properties["annotations"] if "annotations" in properties.keys() else []
)
data_flow["properties"]["typeProperties"] = {}
if "typeProperties" not in properties.keys():
data_flow["properties"]["typeProperties"] = {
"sources": [],
"sinks": [],
"transformations": [],
"scriptLines": [],
}
else:
logger.warning(
'Any malformed "typeProperty" sub-item will result in an incomplete definition '
"once viewed on ADF."
)
if (
"sources" in properties["typeProperties"].keys()
or "sinks" in properties["typeProperties"].keys()
) and ("scriptLines" not in properties["typeProperties"].keys()):

logger.warning(
"Not including a scriptLines in this case may result "
"in a malformed data flow"
)
data_flow["properties"]["typeProperties"]["sources"] = (
properties["typeProperties"]["sources"]
if "sources" in properties["typeProperties"].keys()
else []
)
data_flow["properties"]["typeProperties"]["sinks"] = (
properties["typeProperties"]["sinks"]
if "sinks" in properties["typeProperties"].keys()
else []
)
data_flow["properties"]["typeProperties"]["transformations"] = (
properties["typeProperties"]["transformations"]
if "transformations" in properties["typeProperties"].keys()
else []
)
data_flow["properties"]["typeProperties"]["scriptLines"] = (
properties["typeProperties"]["scriptLines"]
if "scriptLines" in properties["typeProperties"].keys()
else []
)
data_flow_properties["typeProperties"] = (
get_type_properties_from_properties(properties=properties, create=True)
)
data_flow = {"properties": data_flow_properties}
return client.create_or_update(
resource_group_name=resource_group_name,
factory_name=factory_name,
Expand All @@ -175,49 +179,36 @@ def datafactory_data_flow_update(
data_flow_name,
properties,
):
logger = get_logger()
data_flow_data = {}
data_flow_data["properties"] = {}

data_flow_properties = {}
# Avoid creating in the update command
try:
client.get(resource_group_name, factory_name, data_flow_name)
except Exception as e:
raise CLIError(
f"No data flow with this name `{data_flow_name}` exists - no update performed"
f"No data flow with this name '{data_flow_name}' exists - no update performed"
) from e

if "name" in properties.keys():
if "name" in properties.keys() and properties["name"] != data_flow_name:
raise CLIError(
"Do not update the name of the data flow via CLI - chance of naming collision"
)

if "type" not in properties.keys():
raise CLIError(
"Data flow type not defined in properties"
)
data_flow_properties["type"] = properties["type"]
if "annotations" in properties.keys():
data_flow_data["properties"]["annotations"] = properties["annotations"]
data_flow_properties["annotations"] = properties["annotations"]
if "description" in properties.keys():
data_flow_data["properties"]["description"] = properties["description"]
if "typeProperties" in properties.keys():
logger.warning(
"If the definition is not correct here, the resource in ADF may be malformed"
)

if "sinks" in properties["typeProperties"].keys():
data_flow_data["properties"]["typeProperties"]["sinks"] = properties[
"typeProperties"
]["sinks"]
if "sources" in properties["typeProperties"].keys():
data_flow_data["properties"]["typeProperties"]["sources"] = properties[
"typeProperties"
]["sources"]
if "transformations" in properties["typeProperties"].keys():
data_flow_data["properties"]["typeProperties"][
"transformations"
] = properties["typeProperties"]["transformations"]
if "scriptLines" in properties["typeProperties"].keys():
data_flow_data["properties"]["typeProperties"]["scriptLines"] = properties[
"typeProperties"
]["scriptLines"]

data_flow_properties["description"] = properties["description"]
if "folder" in properties.keys():
data_flow_properties["folder"] = {
"name": properties["folder"]["name"]
}
data_flow_properties["typeProperties"] = (
get_type_properties_from_properties(properties=properties, create=False)
)
data_flow_data = {"properties": data_flow_properties}
return client.create_or_update(
resource_group_name=resource_group_name,
factory_name=factory_name,
Expand Down
2 changes: 1 addition & 1 deletion src/datafactory/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from setuptools import setup, find_packages

# HISTORY.rst entry.
VERSION = "0.1.0"
VERSION = "1.0.1"
try:
from azext_datafactory.manual.version import VERSION
except ImportError:
Expand Down

0 comments on commit 9101c64

Please sign in to comment.