diff --git a/README.md b/README.md index faae00f..0f066a6 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,14 @@ The csv column's will be team_captain, team_defend, team_str ``` +Up until now, ljson are handled. Jsons are handled if in the file indicated as input is in one of the following format: +- Json array with one element per line +- One json element in the first line + +TODO: + +It does not yet parse jsons formatted in another way. +But soon, it will be ## Installation @@ -44,14 +52,14 @@ optional arguments: every json in memory --sep SEP Separator used to create columns names --int_to_float Cast int to float - --path_output PATH_OUTPUT - Path output + --path_output PATH_OUTPUT Path output + --remove_null Remove null values (kept by default) + --is_json Indicate if input file is a json ``` Please refer to [here](https://github.com/Besedo/json-to-csv/examples) for examples. - ## Meta Distributed under the Apache license v2.0. See ``LICENSE`` for more information. diff --git a/json_to_csv/json_to_csv.py b/json_to_csv/json_to_csv.py index e45cdd1..dc70eb7 100644 --- a/json_to_csv/json_to_csv.py +++ b/json_to_csv/json_to_csv.py @@ -25,6 +25,7 @@ def get_args(): parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float") parser.add_argument("--path_output", type=str, help="Path output") parser.add_argument("--remove_null", action='store_true', default=False, help="Remove null values (kept by default)") + parser.add_argument("--is_json", action='store_true', default=False, help="Indicate if input file is a json") args = parser.parse_args() return args @@ -190,7 +191,78 @@ def update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) return columns_list -def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): +def read_jsons_chunks(file_object, chunk_size=10000): + """Lazy function to read a json by chunk. + Default chunk size: 10k""" + + # Parse the next real chunk_size lines + chunk = file_object.read(1000000) + data = [] + i = 0 + nb_bracket = 0 + nb_quotes = 0 + example = "" + count_escape_char = 0 + while True: + # Read cahracter by character + for k, c in enumerate(chunk): + # Check quoting + if c == '"': + # Check only when '"' is a delimiter of field or value in json + if count_escape_char % 2 == 0: + nb_quotes += 1 + # Check beginning of brackets + elif c == '{' and nb_quotes % 2 == 0: + # Check only when '{' is a delimiter of field or value in json + if count_escape_char % 2 == 0: + nb_bracket += 1 + # Check ending of brackets + elif c == '}' and nb_quotes % 2 == 0: + # Check only when '"' is a delimiter of field or value in json + if count_escape_char % 2 == 0: + nb_bracket -= 1 + # This means we finished to read one json + if nb_bracket == 0 and nb_quotes % 2 == 0: + example += c + data.append(json.loads(example)) + i += 1 + # When chunk_size jsons obtained, dump those + if i % chunk_size == 0: + yield(data) + data = [] + + # Initialize those + example = "" + continue + # If we are in between 2 json examples or at the beginning + elif c in ['[', ',', '\n'] and nb_bracket == 0 and nb_quotes % 2 == 0: + continue + # If we are at the end of the file + if c in [']', ''] and nb_bracket == 0 and nb_quotes % 2 == 0: + # If EOF obtained or end of jsonarray send what's left of the data + if example == "" or example == "]": + yield(data) + return + if c == "\\": + count_escape_char += 1 + else: + count_escape_char = 0 + # Append character to the json example + example += c + + # If at the end of the chunk, read new chunk + if k == len(chunk) - 1: + chunk = file_object.read(1000000) + # Keep what's left of the chunk + elif len(chunk) != 0: + chunk = chunk[k:] + # if k == 0 that means that we read the whole file + else: + break + + + +def get_columns(list_data_paths, sep, logger, int_to_float, remove_null, is_json): """ Get the columns created accordingly to a list of files containing json @@ -199,6 +271,7 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): :param logger: logger (used to print) :param int_to_float: if set to true int will be casted to float :param remove_null: if set to true, will remove_null from json arrays + :param is_json: if set to true, inputs are considered as valid json :return: Exhaustive list of columns """ @@ -206,21 +279,40 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): columns_list = [] j = 0 + chunk_size = 50000 for data_file in list_data_paths: logger.info(data_file) json_list = [] - with open(data_file) as f: - for i, line in enumerate(f): - j += 1 - if (j % 500000 == 0): + # If we deal with json (or json array) file + if is_json: + f = open(data_file) + # Read json file by chunk + for x in read_jsons_chunks(f, chunk_size=chunk_size): + if j!=0 and (j % chunk_size == 0): columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found') json_list = [] try: - json_list.append(json.loads(line)) + json_list.extend(x) + # Maximum of chunk_size elements were added + j += chunk_size except: - logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") + logger.info("Json in line " + str(j) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") continue + # If we deal with ljson + else: + with open(data_file) as f: + for i, line in enumerate(f): + j += 1 + if (j % 50000 == 0): + columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) + logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found') + json_list = [] + try: + json_list.append(json.loads(line)) + except: + logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") + continue # A quicker solution would be to join directly to create a valid json if (len(json_list) > 0): columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) @@ -231,7 +323,7 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): return columns_list -def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False): +def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False, is_json=False): """ Get dataframe from files containing one json per line @@ -242,30 +334,52 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep :param sep: separator to use when creating columns' names :param int_to_float: if set to true int will be casted to float :param remove_null: if set to true, will remove_null from json arrays + :param is_json: if set to true, inputs are considered as valid json :return: dataframe or nothing if the dataframe is generated while streaming the files """ json_list = [] j = 0 + chunk_size = 50000 for data_file in list_data_paths: logger.info(data_file) - with open(data_file) as f: - for i, line in enumerate(f): - j += 1 - if (j % 500000 == 0): + json_list = [] + # If we deal with json (or json array) file + if is_json: + f = open(data_file) + # Read json file by chunk + for x in read_jsons_chunks(f, chunk_size=chunk_size): + if j!=0 and (j % chunk_size == 0): logger.info('Iteration ' + str(j) + ': Creating sub dataframe') if columns: update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) - json_list.clear() - - if (j % 100000 == 0): - logger.info(str(i) + ' documents processed') + json_list = [] try: - json_list.append(json.loads(line)) + json_list.extend(x) + # Maximum of chunk_size elements were added + j += chunk_size # -1 because we add 1 at the beginning of the loop except: logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") continue + # If we deal with ljson + else: + with open(data_file) as f: + for i, line in enumerate(f): + j += 1 + if (j % 50000 == 0): + logger.info('Iteration ' + str(j) + ': Creating sub dataframe') + if columns: + update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) + json_list.clear() + + if (j % 100000 == 0): + logger.info(str(i) + ' documents processed') + try: + json_list.append(json.loads(line)) + except: + logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") + continue # A quicker solution would be to join directly to create a valid json logger.info('Convert to DataFrame') @@ -313,10 +427,10 @@ def main(): logger.info("Reading " + opt.path_data_jsonperline) data = [opt.path_data_jsonperline] - # Get list of columns if not in streaming + # Get list of columns if in streaming columns_list = None if opt.streaming: - columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null) + columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null, opt.is_json) # Sort columns in alphabetical order columns_list.sort() df = pd.DataFrame(columns=columns_list) @@ -326,7 +440,7 @@ def main(): df.to_csv(opt.path_output, encoding="utf-8", index=None, quoting=1) # Get dataframe - df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null) + df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null, is_json=opt.is_json) if not opt.streaming: logger.info("saving data to " + opt.path_output) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fb6c7ed --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pandas