Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Appflow start-flow command additional parameters #6632

Closed
guilhermeneves opened this issue Dec 31, 2021 · 6 comments
Closed

Feature Appflow start-flow command additional parameters #6632

guilhermeneves opened this issue Dec 31, 2021 · 6 comments
Labels
appflow closing-soon This issue will automatically close in 4 days unless further comments are made. feature-request A feature should be added or improved. response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days.

Comments

@guilhermeneves
Copy link

guilhermeneves commented Dec 31, 2021

Hi AWS team,

Please could you check if it's possible to include more options to aws appflow cli tool?
We need to be included an option in the start-flow command to be able to run as incremental from an orchestrator tool.

Currently we're using aws appflow delete-flow and aws appflow create-flow to achieve it, but it's not the best solution as the track flow history is removed after the delete.

The idea is to have a command like:
aws appflow start-flow --flow-name <NameFlow> --filter 1 --filter-value '2021-12-10 00:00:00' --filter-condition 'is after'

Any question please let me know.

Thanks,
Guilherme

@guilhermeneves guilhermeneves added feature-request A feature should be added or improved. needs-triage This issue or PR still needs to be triaged. labels Dec 31, 2021
@tim-finnigan tim-finnigan added appflow and removed needs-triage This issue or PR still needs to be triaged. labels Jan 3, 2022
@tim-finnigan
Copy link
Contributor

Hi @guilhermeneves, thanks for reaching out. These are the current parameter options for the start-flow command: https://awscli.amazonaws.com/v2/documentation/api/latest/reference/appflow/start-flow.html

And you’re asking for options --filter --filter-value and --filter-condition to be added for that command. Can you describe a bit more about what your use case is and what you're trying to do?

@tim-finnigan tim-finnigan added the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. label Jan 3, 2022
@guilhermeneves
Copy link
Author

Hi @tim-finnigan,

We're using aws appflow with Control-M to orchestrate it, so before-hand we create all the flows in appflow manually and then schedule it using control-m in order to process the data after appflow execution time. The problem is with start-flow I can only full load Salesforce tables, and as there are some big tables it takes a lot of time to collect the data, so we evaluated that with aws cli appflow create-flow and delete-flow as described in this request (#5156) it's possible to achieve delta data instead of full, but we'd like to keep using start-flow and stop-flow as it keep the records of each flow run in the AWS UI.

Any question please let me know.

Thanks,
Guilherme

@github-actions github-actions bot removed the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. label Jan 3, 2022
@tim-finnigan
Copy link
Contributor

Thanks @guilhermeneves for providing more context. I’m not familiar with Control-M so not sure if I fully understand your use case. Also I didn’t see Control-M listed here among applications supported by AppFlow: https://docs.aws.amazon.com/appflow/latest/userguide/app-specific.html

Can you explain a little more about how you are managing flows now and what your request is?

You mentioned wanting to track flow history. Can you use the list-flows or describe-flow-execution-records commands to accomplish this? I’m not sure exactly how those filter options you proposed would be used.

@tim-finnigan tim-finnigan added the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. label Jan 3, 2022
@github-actions
Copy link

github-actions bot commented Jan 8, 2022

Greetings! It looks like this issue hasn’t been active in longer than five days. We encourage you to check if this is still an issue in the latest release. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or upvote with a reaction on the initial post to prevent automatic closure. If the issue is already closed, please feel free to open a new one.

@github-actions github-actions bot added the closing-soon This issue will automatically close in 4 days unless further comments are made. label Jan 8, 2022
@guilhermeneves
Copy link
Author

Thanks @tim-finnigan for providing guidance on it with the list-flows and describe-flow-execution-records commands. Control-M is an orchestrator tool similar to: AWS Steps, AWS DataPipeline, Airflow, Prefect, NiFi, etc... We have to make it work with Control-M as it was the chosen tool from our Architecture team to orchestrate data pipelines here at the company. Basically what we did was to create these steps in Control-M by using AWS CLI with bash scripting (Control-M has a job that enables us to code bash commands):

We could achieve what we wanted by using the commands available:describe-flow, create-flow, delete-flow and describe-flow-execution-records, the tip of the describe-flow-execution-records helped a lot and also the possibility to pass a JSON string with the filter parameters in the --cli-input-json when creating make it possible to get Salesforce data incrementally instead of full loads everytime.

Below are the steps we did if someone gets interested in the future:

  1. Check if there is a running flow from previous run:
is_running=$(aws appflow describe-flow --flow-name {{flowName}} | jq '.lastRunExecutionDetails')
  1. Convert the last hour (start and end date) to UNIX Epoch ms timestamp to filter last hour in the batch load. (As we're in UTC-3 we subtracted -3, so if you are in another timezone you can change it)
delta_date_start=$([[ $delta_date != "null" ]] && date -d ${delta_date} +"%s%3N" || date -d "-4 hours" +%s%3N )
delta_date_end=$(date -d "-3 hours" +%s%3N)
  1. JSON Input variable to create a new flow in appflow to extract the data with Filter (delta load within start/end range). Notes: flowName, sourceConnection, bucketName, bucketPrefix, prefixRawPathFormat, deltaFieldDate are Control-M parameters, we use them to be able to create jobs dynamically using Control-M Application-Integrator
JSON_INPUT="{
    \"flowName\": \"{{flowName}}\",
    \"description\": \"{{flowName}}\",
    \"triggerConfig\": {
        \"triggerType\": \"OnDemand\"
    },
    \"sourceFlowConfig\": {
        \"connectorType\": \"Salesforce\",
        \"connectorProfileName\": \"{{sourceConnection}}\",
        \"sourceConnectorProperties\": {
            \"Salesforce\": {
                \"object\": \"{{tableName}}\",
                \"enableDynamicFieldUpdate\": true,
                \"includeDeletedRecords\": false
            }
        }
    },
    \"destinationFlowConfigList\": [
        {
            \"connectorType\": \"S3\",
            \"destinationConnectorProperties\": {
                \"S3\": {
                    \"bucketName\": \"{{bucketName}}\",
                    \"bucketPrefix\": \"{{bucketPrefix}}\",
                    \"s3OutputFormatConfig\": {
                        \"fileType\": \"CSV\",
                        \"prefixConfig\": {
                            \"prefixType\": \"PATH_AND_FILENAME\",
                            \"prefixFormat\": \"{{prefixRawPathFormat}}\"
                        },
                        \"aggregationConfig\": {
                            \"aggregationType\": \"SingleFile\"
                        }
                    }
                }
            }
        }
    ],
    \"tasks\": [
        {
            \"sourceFields\": [\"{{deltaFieldDate}}\"],
            \"connectorOperator\": {
                \"Salesforce\": \"PROJECTION\"
            },
            \"taskType\": \"Filter\",
            \"taskProperties\": {}
        },
        {
            \"sourceFields\": [\"{{deltaFieldDate}}\"],
            \"connectorOperator\": {
                \"Salesforce\": \"NO_OP\"
            },
            \"destinationField\": \"{{deltaFieldDate}}\",
            \"taskType\": \"Map\",
            \"taskProperties\": {
                \"DESTINATION_DATA_TYPE\": \"{{deltaFieldDate}}\",
                \"SOURCE_DATA_TYPE\": \"{{deltaFieldDate}}\"
            }
        },
        {
            \"sourceFields\": [\"{{deltaFieldDate}}\"],
            \"connectorOperator\": {
                \"Salesforce\": \"BETWEEN\"
            },
            \"taskType\": \"Filter\",
            \"taskProperties\": {
                \"DATA_TYPE\": \"datetime\",
                \"LOWER_BOUND\": \"$delta_date_start\",
                \"UPPER_BOUND\": \"$delta_date_end\"
            }
        },
        {
            \"sourceFields\": [],
            \"connectorOperator\": {
                \"Salesforce\": \"NO_OP\"
            },
            \"taskType\": \"Map_all\",
            \"taskProperties\": {}
        }
    ],
    \"tags\": {
        \"Project\": \"Data-Ingestion\",
        \"env\": \"PRD\"
    }
}"
  1. Here we check if the user wants a Salesforce table full load or incremental and parse the JSON variable accordingly. deltaFieldDate is the field in the table tha will control the Delta load, if not specified the script will move on with Full load:
JSON_INPUT=$([[ {{deltaFieldDate}} != "NOVALUE" ]] && echo $JSON_INPUT || echo $JSON_INPUT | jq 'del(.tasks[0:3])')
  1. Check if the script there is an existing flow, if so re-create it, if not just create:
if [[ -z "$is_running" ]]
then
   echo "CREATING FLOW..."
   aws appflow create-flow --cli-input-json "$JSON_INPUT"
else
   echo "REMOVING FLOW..."
   aws appflow delete-flow --flow-name {{flowName}} --force-delete
   sleep 60
   echo "CREATING FLOW..."
   aws appflow create-flow --cli-input-json "$JSON_INPUT"
fi
sleep 2
  1. Start the data extraction by starting the flow:
aws appflow start-flow --flow-name {{flowName}}
sleep 7
  1. Collect the number of records processed to be able to process with EMR Pyspark (This step is important, because Delta loads can brings no data, so the csv is not created by the appflow and then the process is not needed)
records_processed=$(aws appflow describe-flow-execution-records --flow-name {{flowName}} --max-results=1 | jq '.flowExecutions' | jq '.[0]' | jq '.executionResult.recordsProcessed')

if [[ $records_processed -ge 1 ]]
then
   aws s3 mv $s3_old_path_file $s3_new_path_file
   echo "> INF FILE NAME:"$s3_new_path_file
   echo "> INF PATH:s3://"{{bucketName}}"/"{{bucketPrefix}}"/"{{flowName}}
else
   echo "File not processed, due to 0 records ingested"
   echo "> INF FILE NAME:NODATA"
   echo "> INF PATH:NODATA"
fi

That's the way we manage to have on our orchestrator tool the possibility to dinamically have the option full/incremental loads using Appflow. So, I'm closing this request as with the current commands in the CLI is possible to achieve Full/Incremental Loads.

Thanks again @tim-finnigan for the quick answers here, I really appreciate your help and your availability to look at this issue.

Regards,
Guilherme Neves

@github-actions
Copy link

github-actions bot commented Jan 9, 2022

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
appflow closing-soon This issue will automatically close in 4 days unless further comments are made. feature-request A feature should be added or improved. response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days.
Projects
None yet
Development

No branches or pull requests

2 participants