From f48ab1a4b187b3f870e40c50071ab18db2f6f8b2 Mon Sep 17 00:00:00 2001 From: JB Desbas Date: Sun, 3 Dec 2023 18:03:37 +0100 Subject: [PATCH] standardize : allow users to specify output encoding (#118) * add target encoding * docstring and change arg name * fix error when write to_file, factorize 'target_encoding or encoding' * target_encoding -> target-encoding * test target_encoding * test target_encoding raise UnicodeEncodeError * revert unnecessary changes * test target_encoding2 * add detected encoding assertion * fix formating errors with black * add open encoding --- clevercsv/console/commands/standardize.py | 29 ++++++-- tests/test_unit/test_console.py | 84 +++++++++++++++++++++++ 2 files changed, 108 insertions(+), 5 deletions(-) diff --git a/clevercsv/console/commands/standardize.py b/clevercsv/console/commands/standardize.py index 002eb7b..f25c9e4 100644 --- a/clevercsv/console/commands/standardize.py +++ b/clevercsv/console/commands/standardize.py @@ -69,6 +69,16 @@ def register(self) -> None: ), default=[], ) + self.add_argument( + "-E", + "--target-encoding", + help="Set the encoding of the output file(s)", + description=( + "If ommited, the output file encoding while be the same " + "as that of the original file." + ), + type=str, + ) self.add_argument( "-i", "--in-place", @@ -115,6 +125,7 @@ def handle(self) -> int: encodings = self.args.encoding num_chars = parse_int(self.args.num_chars, "num-chars") in_place = self.args.in_place + target_encoding = self.args.target_encoding if in_place and outputs: print( @@ -154,6 +165,7 @@ def handle(self) -> int: encoding=encoding, verbose=verbose, num_chars=num_chars, + target_encoding=target_encoding, ) if retval > 0 and global_retval == 0: global_retval = retval @@ -168,8 +180,10 @@ def handle_path( encoding: Optional[str] = None, num_chars: Optional[int] = None, verbose: bool = False, + target_encoding: Optional[str] = None, ) -> int: encoding = encoding or get_encoding(path) + target_encoding = target_encoding or encoding dialect = detect_dialect( path, num_chars=num_chars, encoding=encoding, verbose=verbose ) @@ -178,10 +192,10 @@ def handle_path( return 1 if self.args.in_place: - return self._in_place(path, dialect, encoding) + return self._in_place(path, dialect, encoding, target_encoding) elif output is None: return self._to_stdout(path, dialect, encoding) - return self._to_file(path, output, dialect, encoding) + return self._to_file(path, output, dialect, encoding, target_encoding) def _write_transposed( self, @@ -224,7 +238,11 @@ def _write_to_stream( self._write_direct(path, stream, dialect, encoding) def _in_place( - self, path: StrPath, dialect: SimpleDialect, encoding: Optional[str] + self, + path: StrPath, + dialect: SimpleDialect, + encoding: Optional[str], + target_encoding: Optional[str], ) -> int: """In-place mode overwrites the input file, if necessary @@ -235,7 +253,7 @@ def _in_place( """ tmpfd, tmpfname = tempfile.mkstemp(prefix="clevercsv_", suffix=".csv") - tmpid = os.fdopen(tmpfd, "w", newline="", encoding=encoding) + tmpid = os.fdopen(tmpfd, "w", newline="", encoding=target_encoding) self._write_to_stream(path, tmpid, dialect, encoding) tmpid.close() @@ -263,7 +281,8 @@ def _to_file( output: StrPath, dialect: SimpleDialect, encoding: Optional[str], + target_encoding: Optional[str], ) -> int: - with open(output, "w", newline="", encoding=encoding) as fp: + with open(output, "w", newline="", encoding=target_encoding) as fp: self._write_to_stream(path, fp, dialect, encoding) return 0 diff --git a/tests/test_unit/test_console.py b/tests/test_unit/test_console.py index ef67f2d..44dd098 100644 --- a/tests/test_unit/test_console.py +++ b/tests/test_unit/test_console.py @@ -21,6 +21,7 @@ from clevercsv._types import _DialectLike from clevercsv.console import build_application from clevercsv.dialect import SimpleDialect +from clevercsv.encoding import get_encoding from clevercsv.write import writer TableType = List[List[Any]] @@ -640,3 +641,86 @@ def test_standardize_in_place_multi_noop(self) -> None: self.assertEqual(contents, exp) finally: any(map(os.unlink, tmpfnames)) + + def test_standardize_target_encoding(self) -> None: + table: TableType = [["Å", "B", "C"], ["é", "ü", "中"], [4, 5, 6]] + dialect = SimpleDialect(delimiter=";", quotechar="", escapechar="") + encoding = "utf-8" + tmpfname = self._build_file(table, dialect, encoding=encoding) + + tmpfd, tmpoutname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv") + os.close(tmpfd) + + application = build_application() + tester = Tester(application) + tester.test_command( + "standardize", ["-o", tmpoutname, "-E", "utf-8", tmpfname] + ) + + # Excel format (i.e. RFC4180) *requires* CRLF + crlf = "\r\n" + exp = crlf.join(["Å,B,C", "é,ü,中", "4,5,6", ""]) + with open(tmpoutname, "r", newline="", encoding="utf-8") as fp: + output = fp.read() + + try: + self.assertEqual(exp, output) + finally: + os.unlink(tmpfname) + os.unlink(tmpoutname) + + def test_standardize_target_encoding2(self) -> None: + table: TableType = [["A", "B", "C"], ["é", "è", "à"], [4, 5, 6]] + dialect = SimpleDialect(delimiter=";", quotechar="", escapechar="") + encoding = "latin-1" + tmpfname = self._build_file(table, dialect, encoding=encoding) + self.assertEqual( + "ISO-8859-1", get_encoding(tmpfname, try_cchardet=False) + ) + tmpfd, tmpoutname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv") + os.close(tmpfd) + + application = build_application() + tester = Tester(application) + tester.test_command( + "standardize", + ["-o", tmpoutname, "-e", "latin-1", "-E", "utf-8", tmpfname], + ) + + # Excel format (i.e. RFC4180) *requires* CRLF + crlf = "\r\n" + exp = crlf.join(["A,B,C", "é,è,à", "4,5,6", ""]) + + self.assertEqual("utf-8", get_encoding(tmpoutname, try_cchardet=False)) + with open(tmpoutname, "r", newline="", encoding="utf-8") as fp: + output = fp.read() + + try: + self.assertEqual(exp, output) + + finally: + os.unlink(tmpfname) + os.unlink(tmpoutname) + + def test_standardize_target_encoding_raise_UnicodeEncodeError( + self, + ) -> None: + table: TableType = [["Å", "B", "C"], ["é", "ü", "中"], [4, 5, 6]] + dialect = SimpleDialect(delimiter=";", quotechar="", escapechar="") + encoding = "utf-8" + tmpfname = self._build_file(table, dialect, encoding=encoding) + + tmpfd, tmpoutname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv") + os.close(tmpfd) + + application = build_application() + tester = Tester(application) + try: + with self.assertRaises(UnicodeEncodeError): + tester.test_command( + "standardize", + ["-o", tmpoutname, "-E", "latin-1", tmpfname], + ) + finally: + os.unlink(tmpfname) + os.unlink(tmpoutname)