Skip to content

Commit

Permalink
fix(files): streamed files now will run in binary
Browse files Browse the repository at this point in the history
Sometimes decoding into utf8 throws errors from windows created files
  • Loading branch information
christopherpickering committed Feb 13, 2023
1 parent f1ebfd9 commit a2e7b1e
Showing 1 changed file with 77 additions and 66 deletions.
143 changes: 77 additions & 66 deletions runner/scripts/em_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,77 +153,88 @@ def save(self) -> Tuple[str, str, str]:
os.rename(self.data_file.name, new_data_file_name)
self.data_file.name = new_data_file_name # type: ignore[misc]

with open(self.data_file.name, "r", newline="") as data_file:
reader = csv.reader(data_file)

with open(self.file_path, mode="w") as myfile:
# if csv (1) or text (2) and had delimiter

if (
self.task.destination_file_type_id == 1
or self.task.destination_file_type_id == 2
or self.task.destination_file_type_id == 4
) and (
self.task.destination_ignore_delimiter is None
or self.task.destination_ignore_delimiter != 1
):
wrtr = (
csv.writer(
myfile,
delimiter=str(self.task.destination_file_delimiter)
.encode("utf-8")
.decode("unicode_escape"),
quoting=self.__quote_level(),
quotechar=self.__quotechar(),
# if ignore delimiter is checked, then use binary to copy contents
if self.task.destination_ignore_delimiter == 1:
with open(self.data_file.name, "rb") as data_file:
with open(self.file_path, mode="wb") as myfile:
for line in data_file:
myfile.write(line)

# otherwise use unicode.
else:
with open(self.data_file.name, "r", newline="") as data_file:
reader = csv.reader(data_file)

with open(self.file_path, mode="w") as myfile:
# if csv (1) or text (2) and had delimiter

if (
self.task.destination_file_type_id == 1
or self.task.destination_file_type_id == 2
or self.task.destination_file_type_id == 4
) and (
self.task.destination_ignore_delimiter is None
or self.task.destination_ignore_delimiter != 1
):
wrtr = (
csv.writer(
myfile,
delimiter=str(self.task.destination_file_delimiter)
.encode("utf-8")
.decode("unicode_escape"),
quoting=self.__quote_level(),
quotechar=self.__quotechar(),
)
if self.task.destination_file_delimiter is not None
and len(self.task.destination_file_delimiter) > 0
and (
self.task.destination_file_type_id == 2
or self.task.destination_file_type_id == 4
) # txt or other
else csv.writer(
myfile,
quoting=self.__quote_level(),
quotechar=self.__quotechar(),
)
)
if self.task.destination_file_delimiter is not None
and len(self.task.destination_file_delimiter) > 0
and (
self.task.destination_file_type_id == 2
or self.task.destination_file_type_id == 4
) # txt or other
else csv.writer(
for row in reader:
new_row = [
(x.strip('"').strip("'") if isinstance(x, str) else x)
for x in row
]

if (
self.task.destination_file_type_id == 1
or self.task.destination_file_type_id == 2
or self.task.destination_file_type_id == 4
) and (
self.task.destination_file_line_terminator is not None
and self.task.destination_file_line_terminator != ""
):
new_row.append(
self.task.destination_file_line_terminator
)

wrtr.writerow(new_row)

# if xlxs (3)
elif self.task.destination_file_type_id == 3:
wrtr = csv.writer(
myfile,
dialect="excel",
quoting=self.__quote_level(),
quotechar=self.__quotechar(),
)
)
for row in reader:
new_row = [
(x.strip('"').strip("'") if isinstance(x, str) else x)
for x in row
]

if (
self.task.destination_file_type_id == 1
or self.task.destination_file_type_id == 2
or self.task.destination_file_type_id == 4
) and (
self.task.destination_file_line_terminator is not None
and self.task.destination_file_line_terminator != ""
):
new_row.append(self.task.destination_file_line_terminator)

wrtr.writerow(new_row)

# if xlxs (3)
elif self.task.destination_file_type_id == 3:
wrtr = csv.writer(
myfile,
dialect="excel",
quoting=self.__quote_level(),
quotechar=self.__quotechar(),
)
for row in reader:
new_row = [
(x.strip('"').strip("'") if isinstance(x, str) else x)
for x in row
]
wrtr.writerow(new_row)

else:
for line in data_file:
myfile.write(line)
for row in reader:
new_row = [
(x.strip('"').strip("'") if isinstance(x, str) else x)
for x in row
]
wrtr.writerow(new_row)

else:
for line in data_file:
myfile.write(line)

RunnerLog(
self.task,
Expand Down

0 comments on commit a2e7b1e

Please sign in to comment.