diff --git a/README.md b/README.md index faae00f..0f066a6 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,14 @@ The csv column's will be team_captain, team_defend, team_str ``` +Up until now, ljson are handled. Jsons are handled if in the file indicated as input is in one of the following format: +- Json array with one element per line +- One json element in the first line + +TODO: + +It does not yet parse jsons formatted in another way. +But soon, it will be ## Installation @@ -44,14 +52,14 @@ optional arguments: every json in memory --sep SEP Separator used to create columns names --int_to_float Cast int to float - --path_output PATH_OUTPUT - Path output + --path_output PATH_OUTPUT Path output + --remove_null Remove null values (kept by default) + --is_json Indicate if input file is a json ``` Please refer to [here](https://github.com/Besedo/json-to-csv/examples) for examples. - ## Meta Distributed under the Apache license v2.0. See ``LICENSE`` for more information. diff --git a/json_to_csv/json_to_csv.py b/json_to_csv/json_to_csv.py index e45cdd1..68c5efa 100644 --- a/json_to_csv/json_to_csv.py +++ b/json_to_csv/json_to_csv.py @@ -25,6 +25,7 @@ def get_args(): parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float") parser.add_argument("--path_output", type=str, help="Path output") parser.add_argument("--remove_null", action='store_true', default=False, help="Remove null values (kept by default)") + parser.add_argument("--is_json", action='store_true', default=False, help="Indicate if input file is a json") args = parser.parse_args() return args @@ -190,7 +191,38 @@ def update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) return columns_list -def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): +def read_jsons_chunks(file_object, chunk_size=10000): + """Lazy function to read a json by chunk. + Default chunk size: 10k""" + # Check first element of a file + # If it is "[", that means we have a json array + first_line = file_object.readline() + if first_line[0] == '[': + while True: + # Parse the next real chunk_size lines + data = [] + for i in range(chunk_size): + # Here it works with one json, or an array of jsons with one json in each line + # TODO Make it work with no assumption over json + # Remove comma and to the next line + line = file_object.readline().strip(',\n') + # If EOF obtained or end of jsonarray send what's left of the data + if line == "" or line == "]": + yield data + return + else: + data.append(json.loads(line)) + if not data: + break + yield data + # End of file obtained + elif file_object.read() == ']': + return None + # Otherwise, we have one json in the file + else: + yield [json.loads(first_line)] + +def get_columns(list_data_paths, sep, logger, int_to_float, remove_null, is_json): """ Get the columns created accordingly to a list of files containing json @@ -199,6 +231,7 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): :param logger: logger (used to print) :param int_to_float: if set to true int will be casted to float :param remove_null: if set to true, will remove_null from json arrays + :param is_json: if set to true, inputs are considered as valid json :return: Exhaustive list of columns """ @@ -206,21 +239,40 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): columns_list = [] j = 0 + chunk_size = 50000 for data_file in list_data_paths: logger.info(data_file) json_list = [] - with open(data_file) as f: - for i, line in enumerate(f): - j += 1 - if (j % 500000 == 0): + # If we deal with json (or json array) file + if is_json: + f = open(data_file) + # Read json file by chunk + for x in read_jsons_chunks(f, chunk_size=chunk_size): + if j!=0 and (j % chunk_size == 0): columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found') json_list = [] try: - json_list.append(json.loads(line)) + json_list.extend(x) + # Maximum of chunk_size elements were added + j += chunk_size except: - logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") + logger.info("Json in line " + str(j) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") continue + # If we deal with ljson + else: + with open(data_file) as f: + for i, line in enumerate(f): + j += 1 + if (j % 50000 == 0): + columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) + logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found') + json_list = [] + try: + json_list.append(json.loads(line)) + except: + logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") + continue # A quicker solution would be to join directly to create a valid json if (len(json_list) > 0): columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) @@ -231,7 +283,7 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): return columns_list -def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False): +def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False, is_json=False): """ Get dataframe from files containing one json per line @@ -242,30 +294,51 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep :param sep: separator to use when creating columns' names :param int_to_float: if set to true int will be casted to float :param remove_null: if set to true, will remove_null from json arrays + :param is_json: if set to true, inputs are considered as valid json :return: dataframe or nothing if the dataframe is generated while streaming the files """ json_list = [] j = 0 + chunk_size = 50000 for data_file in list_data_paths: logger.info(data_file) - with open(data_file) as f: - for i, line in enumerate(f): - j += 1 - if (j % 500000 == 0): + json_list = [] + # If we deal with json (or json array) file + if is_json: + f = open(data_file) + # Read json file by chunk + for x in read_jsons_chunks(f, chunk_size=chunk_size): + if j!=0 and (j % chunk_size == 0): + update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) logger.info('Iteration ' + str(j) + ': Creating sub dataframe') - if columns: - update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) - json_list.clear() - - if (j % 100000 == 0): - logger.info(str(i) + ' documents processed') + json_list = [] try: - json_list.append(json.loads(line)) + json_list.extend(x) + # Maximum of chunk_size elements were added + j += chunk_size # -1 because we add 1 at the beginning of the loop except: logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") continue + # If we deal with ljson + else: + with open(data_file) as f: + for i, line in enumerate(f): + j += 1 + if (j % 50000 == 0): + logger.info('Iteration ' + str(j) + ': Creating sub dataframe') + if columns: + update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) + json_list.clear() + + if (j % 100000 == 0): + logger.info(str(i) + ' documents processed') + try: + json_list.append(json.loads(line)) + except: + logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped") + continue # A quicker solution would be to join directly to create a valid json logger.info('Convert to DataFrame') @@ -313,10 +386,10 @@ def main(): logger.info("Reading " + opt.path_data_jsonperline) data = [opt.path_data_jsonperline] - # Get list of columns if not in streaming + # Get list of columns if in streaming columns_list = None if opt.streaming: - columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null) + columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null, opt.is_json) # Sort columns in alphabetical order columns_list.sort() df = pd.DataFrame(columns=columns_list) @@ -326,7 +399,7 @@ def main(): df.to_csv(opt.path_output, encoding="utf-8", index=None, quoting=1) # Get dataframe - df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null) + df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null, is_json=opt.is_json) if not opt.streaming: logger.info("saving data to " + opt.path_output) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fb6c7ed --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pandas