Skip to content

Commit

Permalink
Make Airflow code Pylint 2.8 compatible (#15534)
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk committed Apr 27, 2021
1 parent e229f35 commit 4b031d3
Show file tree
Hide file tree
Showing 46 changed files with 418 additions and 396 deletions.
28 changes: 12 additions & 16 deletions airflow/cli/commands/celery_command.py
Expand Up @@ -156,24 +156,20 @@ def worker(args):
if args.daemon:
# Run Celery worker as daemon
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

if args.umask:
umask = args.umask
with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle:
if args.umask:
umask = args.umask

ctx = daemon.DaemonContext(
files_preserve=[handle],
umask=int(umask, 8),
stdout=stdout,
stderr=stderr,
)
with ctx:
sub_proc = _serve_logs(skip_serve_logs)
worker_instance.run(**options)

stdout.close()
stderr.close()
ctx = daemon.DaemonContext(
files_preserve=[handle],
umask=int(umask, 8),
stdout=stdout_handle,
stderr=stderr_handle,
)
with ctx:
sub_proc = _serve_logs(skip_serve_logs)
worker_instance.run(**options)
else:
# Run Celery worker in the same process
sub_proc = _serve_logs(skip_serve_logs)
Expand Down
12 changes: 6 additions & 6 deletions airflow/cli/commands/dag_command.py
Expand Up @@ -195,19 +195,19 @@ def dag_show(args):
def _display_dot_via_imgcat(dot: Dot):
data = dot.pipe(format='png')
try:
proc = subprocess.Popen("imgcat", stdout=subprocess.PIPE, stdin=subprocess.PIPE)
with subprocess.Popen("imgcat", stdout=subprocess.PIPE, stdin=subprocess.PIPE) as proc:
out, err = proc.communicate(data)
if out:
print(out.decode('utf-8'))
if err:
print(err.decode('utf-8'))
except OSError as e:
if e.errno == errno.ENOENT:
raise SystemExit(
"Failed to execute. Make sure the imgcat executables are on your systems \'PATH\'"
)
else:
raise
out, err = proc.communicate(data)
if out:
print(out.decode('utf-8'))
if err:
print(err.decode('utf-8'))


def _save_dot_to_file(dot: Dot, filename: str):
Expand Down
18 changes: 9 additions & 9 deletions airflow/cli/commands/info_command.py
Expand Up @@ -188,17 +188,17 @@ def __init__(self, anonymizer):
def _get_version(cmd: List[str], grep: Optional[bytes] = None):
"""Return tools version."""
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as proc:
stdoutdata, _ = proc.communicate()
data = [f for f in stdoutdata.split(b"\n") if f]
if grep:
data = [line for line in data if grep in line]
if len(data) != 1:
return "NOT AVAILABLE"
else:
return data[0].decode()
except OSError:
return "NOT AVAILABLE"
stdoutdata, _ = proc.communicate()
data = [f for f in stdoutdata.split(b"\n") if f]
if grep:
data = [line for line in data if grep in line]
if len(data) != 1:
return "NOT AVAILABLE"
else:
return data[0].decode()

@staticmethod
def _task_logging_handler():
Expand Down
23 changes: 9 additions & 14 deletions airflow/cli/commands/kerberos_command.py
Expand Up @@ -34,19 +34,14 @@ def kerberos(args):
pid, stdout, stderr, _ = setup_locations(
"kerberos", args.pid, args.stdout, args.stderr, args.log_file
)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout,
stderr=stderr,
)

with ctx:
krb.run(principal=args.principal, keytab=args.keytab)

stdout.close()
stderr.close()
with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle:
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout_handle,
stderr=stderr_handle,
)

with ctx:
krb.run(principal=args.principal, keytab=args.keytab)
else:
krb.run(principal=args.principal, keytab=args.keytab)
23 changes: 9 additions & 14 deletions airflow/cli/commands/scheduler_command.py
Expand Up @@ -42,20 +42,15 @@ def scheduler(args):
"scheduler", args.pid, args.stdout, args.stderr, args.log_file
)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
job.run()

stdout.close()
stderr.close()
with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle:
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
)
with ctx:
job.run()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/webserver_command.py
Expand Up @@ -480,5 +480,5 @@ def monitor_gunicorn(gunicorn_master_pid: int):
monitor_gunicorn(gunicorn_master_proc.pid)

else:
gunicorn_master_proc = subprocess.Popen(run_args, close_fds=True)
monitor_gunicorn(gunicorn_master_proc.pid)
with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
monitor_gunicorn(gunicorn_master_proc.pid)
1 change: 1 addition & 0 deletions airflow/hooks/subprocess.py
Expand Up @@ -62,6 +62,7 @@ def pre_exec():

self.log.info('Running command: %s', command)

# pylint: disable=consider-using-with
self.sub_process = Popen( # pylint: disable=subprocess-popen-preexec-fn
command,
stdout=PIPE,
Expand Down
76 changes: 38 additions & 38 deletions airflow/models/dagbag.py
Expand Up @@ -316,45 +316,45 @@ def _load_modules_from_file(self, filepath, safe_mode):

def _load_modules_from_zip(self, filepath, safe_mode):
mods = []
current_zip_file = zipfile.ZipFile(filepath)
for zip_info in current_zip_file.infolist():
head, _ = os.path.split(zip_info.filename)
mod_name, ext = os.path.splitext(zip_info.filename)
if ext not in [".py", ".pyc"]:
continue
if head:
continue

if mod_name == '__init__':
self.log.warning("Found __init__.%s at root of %s", ext, filepath)

self.log.debug("Reading %s from %s", zip_info.filename, filepath)

if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
# todo: create ignore list
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info(
"File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename
)
continue

if mod_name in sys.modules:
del sys.modules[mod_name]
with zipfile.ZipFile(filepath) as current_zip_file:
for zip_info in current_zip_file.infolist():
head, _ = os.path.split(zip_info.filename)
mod_name, ext = os.path.splitext(zip_info.filename)
if ext not in [".py", ".pyc"]:
continue
if head:
continue

if mod_name == '__init__':
self.log.warning("Found __init__.%s at root of %s", ext, filepath)

self.log.debug("Reading %s from %s", zip_info.filename, filepath)

if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
# todo: create ignore list
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info(
"File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename
)
continue

if mod_name in sys.modules:
del sys.modules[mod_name]

try:
sys.path.insert(0, filepath)
current_module = importlib.import_module(mod_name)
mods.append(current_module)
except Exception as e: # pylint: disable=broad-except
self.log.exception("Failed to import: %s", filepath)
if self.dagbag_import_error_tracebacks:
self.import_errors[filepath] = traceback.format_exc(
limit=-self.dagbag_import_error_traceback_depth
)
else:
self.import_errors[filepath] = str(e)
try:
sys.path.insert(0, filepath)
current_module = importlib.import_module(mod_name)
mods.append(current_module)
except Exception as e: # pylint: disable=broad-except
self.log.exception("Failed to import: %s", filepath)
if self.dagbag_import_error_tracebacks:
self.import_errors[filepath] = traceback.format_exc(
limit=-self.dagbag_import_error_traceback_depth
)
else:
self.import_errors[filepath] = str(e)
return mods

def _process_modules(self, filepath, mods, file_last_changed_on_disk):
Expand Down
1 change: 0 additions & 1 deletion airflow/operators/bash.py
Expand Up @@ -149,7 +149,6 @@ def __init__(
self.skip_exit_code = skip_exit_code
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead")
self.sub_process = None

@cached_property
def subprocess_hook(self):
Expand Down
29 changes: 14 additions & 15 deletions airflow/providers/amazon/aws/operators/s3_file_transform.py
Expand Up @@ -135,25 +135,24 @@ def execute(self, context):
f_source.flush()

if self.transform_script is not None:
process = subprocess.Popen(
with subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name, *self.script_args],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True,
)

self.log.info("Output:")
for line in iter(process.stdout.readline, b''):
self.log.info(line.decode(self.output_encoding).rstrip())

process.wait()

if process.returncode:
raise AirflowException(f"Transform script failed: {process.returncode}")
else:
self.log.info(
"Transform script successful. Output temporarily located at %s", f_dest.name
)
) as process:
self.log.info("Output:")
for line in iter(process.stdout.readline, b''):
self.log.info(line.decode(self.output_encoding).rstrip())

process.wait()

if process.returncode:
raise AirflowException(f"Transform script failed: {process.returncode}")
else:
self.log.info(
"Transform script successful. Output temporarily located at %s", f_dest.name
)

self.log.info("Uploading transformed file to S3")
f_dest.flush()
Expand Down
20 changes: 10 additions & 10 deletions airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Expand Up @@ -121,16 +121,15 @@ def execute(self, context) -> None:
table = AwsDynamoDBHook().get_conn().Table(self.dynamodb_table_name)
scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {}
err = None
f = NamedTemporaryFile()
try:
f = self._scan_dynamodb_and_upload_to_s3(f, scan_kwargs, table)
except Exception as e:
err = e
raise e
finally:
if err is None:
_upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix)
f.close()
with NamedTemporaryFile() as f:
try:
f = self._scan_dynamodb_and_upload_to_s3(f, scan_kwargs, table)
except Exception as e:
err = e
raise e
finally:
if err is None:
_upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix)

def _scan_dynamodb_and_upload_to_s3(self, temp_file: IO, scan_kwargs: dict, table: Any) -> IO:
while True:
Expand All @@ -150,5 +149,6 @@ def _scan_dynamodb_and_upload_to_s3(self, temp_file: IO, scan_kwargs: dict, tabl
if getsize(temp_file.name) >= self.file_size:
_upload_file_to_s3(temp_file, self.s3_bucket_name, self.s3_key_prefix)
temp_file.close()
# pylint: disable=consider-using-with
temp_file = NamedTemporaryFile()
return temp_file
1 change: 1 addition & 0 deletions airflow/providers/apache/beam/hooks/beam.py
Expand Up @@ -94,6 +94,7 @@ def __init__(
self.log.info("Running command: %s", " ".join(shlex.quote(c) for c in cmd))
self.process_line_callback = process_line_callback
self.job_id: Optional[str] = None
# pylint: disable=consider-using-with
self._proc = subprocess.Popen(
cmd,
shell=False,
Expand Down
41 changes: 20 additions & 21 deletions airflow/providers/apache/hive/transfers/hive_to_mysql.py
Expand Up @@ -96,33 +96,32 @@ def execute(self, context):
if self.hive_conf:
hive_conf.update(self.hive_conf)
if self.bulk_load:
tmp_file = NamedTemporaryFile()
hive.to_csv(
self.sql,
tmp_file.name,
delimiter='\t',
lineterminator='\n',
output_header=False,
hive_conf=hive_conf,
)
with NamedTemporaryFile() as tmp_file:
hive.to_csv(
self.sql,
tmp_file.name,
delimiter='\t',
lineterminator='\n',
output_header=False,
hive_conf=hive_conf,
)
mysql = self._call_preoperator()
mysql.bulk_load(table=self.mysql_table, tmp_file=tmp_file.name)
else:
hive_results = hive.get_records(self.sql, hive_conf=hive_conf)

mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)

if self.mysql_preoperator:
self.log.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)

self.log.info("Inserting rows into MySQL")
if self.bulk_load:
mysql.bulk_load(table=self.mysql_table, tmp_file=tmp_file.name)
tmp_file.close()
else:
mysql = self._call_preoperator()
mysql.insert_rows(table=self.mysql_table, rows=hive_results)

if self.mysql_postoperator:
self.log.info("Running MySQL postoperator")
mysql.run(self.mysql_postoperator)

self.log.info("Done.")

def _call_preoperator(self):
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
self.log.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)
self.log.info("Inserting rows into MySQL")
return mysql

0 comments on commit 4b031d3

Please sign in to comment.