Skip to content

Commit

Permalink
standardize : allow users to specify output encoding (#118)
Browse files Browse the repository at this point in the history
* add target encoding

* docstring and change arg name

* fix error when write to_file, factorize 'target_encoding or encoding'

* target_encoding -> target-encoding

* test target_encoding

* test target_encoding raise UnicodeEncodeError

* revert unnecessary changes

* test target_encoding2

* add detected encoding assertion

* fix formating errors with black

* add open encoding
  • Loading branch information
jbdesbas committed Dec 3, 2023
1 parent 7201c3c commit f48ab1a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 5 deletions.
29 changes: 24 additions & 5 deletions clevercsv/console/commands/standardize.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ def register(self) -> None:
),
default=[],
)
self.add_argument(
"-E",
"--target-encoding",
help="Set the encoding of the output file(s)",
description=(
"If ommited, the output file encoding while be the same "
"as that of the original file."
),
type=str,
)
self.add_argument(
"-i",
"--in-place",
Expand Down Expand Up @@ -115,6 +125,7 @@ def handle(self) -> int:
encodings = self.args.encoding
num_chars = parse_int(self.args.num_chars, "num-chars")
in_place = self.args.in_place
target_encoding = self.args.target_encoding

if in_place and outputs:
print(
Expand Down Expand Up @@ -154,6 +165,7 @@ def handle(self) -> int:
encoding=encoding,
verbose=verbose,
num_chars=num_chars,
target_encoding=target_encoding,
)
if retval > 0 and global_retval == 0:
global_retval = retval
Expand All @@ -168,8 +180,10 @@ def handle_path(
encoding: Optional[str] = None,
num_chars: Optional[int] = None,
verbose: bool = False,
target_encoding: Optional[str] = None,
) -> int:
encoding = encoding or get_encoding(path)
target_encoding = target_encoding or encoding
dialect = detect_dialect(
path, num_chars=num_chars, encoding=encoding, verbose=verbose
)
Expand All @@ -178,10 +192,10 @@ def handle_path(
return 1

if self.args.in_place:
return self._in_place(path, dialect, encoding)
return self._in_place(path, dialect, encoding, target_encoding)
elif output is None:
return self._to_stdout(path, dialect, encoding)
return self._to_file(path, output, dialect, encoding)
return self._to_file(path, output, dialect, encoding, target_encoding)

def _write_transposed(
self,
Expand Down Expand Up @@ -224,7 +238,11 @@ def _write_to_stream(
self._write_direct(path, stream, dialect, encoding)

def _in_place(
self, path: StrPath, dialect: SimpleDialect, encoding: Optional[str]
self,
path: StrPath,
dialect: SimpleDialect,
encoding: Optional[str],
target_encoding: Optional[str],
) -> int:
"""In-place mode overwrites the input file, if necessary
Expand All @@ -235,7 +253,7 @@ def _in_place(
"""
tmpfd, tmpfname = tempfile.mkstemp(prefix="clevercsv_", suffix=".csv")
tmpid = os.fdopen(tmpfd, "w", newline="", encoding=encoding)
tmpid = os.fdopen(tmpfd, "w", newline="", encoding=target_encoding)
self._write_to_stream(path, tmpid, dialect, encoding)
tmpid.close()

Expand Down Expand Up @@ -263,7 +281,8 @@ def _to_file(
output: StrPath,
dialect: SimpleDialect,
encoding: Optional[str],
target_encoding: Optional[str],
) -> int:
with open(output, "w", newline="", encoding=encoding) as fp:
with open(output, "w", newline="", encoding=target_encoding) as fp:
self._write_to_stream(path, fp, dialect, encoding)
return 0
84 changes: 84 additions & 0 deletions tests/test_unit/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from clevercsv._types import _DialectLike
from clevercsv.console import build_application
from clevercsv.dialect import SimpleDialect
from clevercsv.encoding import get_encoding
from clevercsv.write import writer

TableType = List[List[Any]]
Expand Down Expand Up @@ -640,3 +641,86 @@ def test_standardize_in_place_multi_noop(self) -> None:
self.assertEqual(contents, exp)
finally:
any(map(os.unlink, tmpfnames))

def test_standardize_target_encoding(self) -> None:
table: TableType = [["Å", "B", "C"], ["é", "ü", "中"], [4, 5, 6]]
dialect = SimpleDialect(delimiter=";", quotechar="", escapechar="")
encoding = "utf-8"
tmpfname = self._build_file(table, dialect, encoding=encoding)

tmpfd, tmpoutname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv")
os.close(tmpfd)

application = build_application()
tester = Tester(application)
tester.test_command(
"standardize", ["-o", tmpoutname, "-E", "utf-8", tmpfname]
)

# Excel format (i.e. RFC4180) *requires* CRLF
crlf = "\r\n"
exp = crlf.join(["Å,B,C", "é,ü,中", "4,5,6", ""])
with open(tmpoutname, "r", newline="", encoding="utf-8") as fp:
output = fp.read()

try:
self.assertEqual(exp, output)
finally:
os.unlink(tmpfname)
os.unlink(tmpoutname)

def test_standardize_target_encoding2(self) -> None:
table: TableType = [["A", "B", "C"], ["é", "è", "à"], [4, 5, 6]]
dialect = SimpleDialect(delimiter=";", quotechar="", escapechar="")
encoding = "latin-1"
tmpfname = self._build_file(table, dialect, encoding=encoding)
self.assertEqual(
"ISO-8859-1", get_encoding(tmpfname, try_cchardet=False)
)
tmpfd, tmpoutname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv")
os.close(tmpfd)

application = build_application()
tester = Tester(application)
tester.test_command(
"standardize",
["-o", tmpoutname, "-e", "latin-1", "-E", "utf-8", tmpfname],
)

# Excel format (i.e. RFC4180) *requires* CRLF
crlf = "\r\n"
exp = crlf.join(["A,B,C", "é,è,à", "4,5,6", ""])

self.assertEqual("utf-8", get_encoding(tmpoutname, try_cchardet=False))
with open(tmpoutname, "r", newline="", encoding="utf-8") as fp:
output = fp.read()

try:
self.assertEqual(exp, output)

finally:
os.unlink(tmpfname)
os.unlink(tmpoutname)

def test_standardize_target_encoding_raise_UnicodeEncodeError(
self,
) -> None:
table: TableType = [["Å", "B", "C"], ["é", "ü", "中"], [4, 5, 6]]
dialect = SimpleDialect(delimiter=";", quotechar="", escapechar="")
encoding = "utf-8"
tmpfname = self._build_file(table, dialect, encoding=encoding)

tmpfd, tmpoutname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv")
os.close(tmpfd)

application = build_application()
tester = Tester(application)
try:
with self.assertRaises(UnicodeEncodeError):
tester.test_command(
"standardize",
["-o", tmpoutname, "-E", "latin-1", tmpfname],
)
finally:
os.unlink(tmpfname)
os.unlink(tmpoutname)

0 comments on commit f48ab1a

Please sign in to comment.