Skip to content

Commit

Permalink
EC2 CreateInstance: terminate instances in on_kill (#36828)
Browse files Browse the repository at this point in the history
  • Loading branch information
AchimGaedkeLynker committed Jan 18, 2024
1 parent 9563dc5 commit d18c01a
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions airflow/providers/amazon/aws/operators/ec2.py
Expand Up @@ -183,22 +183,36 @@ def execute(self, context: Context):
MaxCount=self.max_count,
**self.config,
)["Instances"]
instance_ids = []
for instance in instances:
instance_ids.append(instance["InstanceId"])
self.log.info("Created EC2 instance %s", instance["InstanceId"])

instance_ids = self._on_kill_instance_ids = [instance["InstanceId"] for instance in instances]
for instance_id in instance_ids:
self.log.info("Created EC2 instance %s", instance_id)

if self.wait_for_completion:
ec2_hook.get_waiter("instance_running").wait(
InstanceIds=[instance["InstanceId"]],
InstanceIds=[instance_id],
WaiterConfig={
"Delay": self.poll_interval,
"MaxAttempts": self.max_attempts,
},
)

# leave "_on_kill_instance_ids" in place for finishing post-processing
return instance_ids

def on_kill(self) -> None:
instance_ids = getattr(self, "_on_kill_instance_ids", [])

if instance_ids:
self.log.info("on_kill: Terminating instance/s %s", ", ".join(instance_ids))
ec2_hook = EC2Hook(
aws_conn_id=self.aws_conn_id,
region_name=self.region_name,
api_type="client_type",
)
ec2_hook.conn.terminate_instances(InstanceIds=instance_ids)
super().on_kill()


class EC2TerminateInstanceOperator(BaseOperator):
"""
Expand Down

0 comments on commit d18c01a

Please sign in to comment.