diff --git a/json_to_csv/json_to_csv.py b/json_to_csv/json_to_csv.py index cfc918b..1851598 100644 --- a/json_to_csv/json_to_csv.py +++ b/json_to_csv/json_to_csv.py @@ -22,8 +22,10 @@ def get_args(): parser.add_argument("--path_data_jsonperline", type=str, help="File or folder of files containing one json per line") parser.add_argument("--streaming", action='store_true', default=False, help="Create the csv in a stream way instead of loading every json in memory") parser.add_argument("--sep", default='.', help="Separator used to create columns' names") - parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float") + parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float") parser.add_argument("--path_output", type=str, help="Path output") + parser.add_argument("--remove_null", action='store_true', default=False, help="Remove null values (kept by default)") + args = parser.parse_args() return args @@ -48,14 +50,15 @@ def setup_custom_logger(name): return logger -def _flatten(d, parent_key='', sep='_', int_to_float=False): +def _flatten(d, parent_key='', sep='_', int_to_float=False, remove_null=False): """ Flatten a nested dictionary to one leve dictionary (recursive function) :param d: dictionary :param parent_key: parent_key used to create field name :param sep: separator of nested fields - :param int_to_float: if set tu true int will be casted to float + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays :return: list of jsons flattened """ @@ -69,7 +72,7 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False): for w in v: my_elems_w = [] if isinstance(w, dict): - my_elems_w.extend(_flatten(w, sep=sep, int_to_float=int_to_float).items()) + my_elems_w.extend(_flatten(w, sep=sep, int_to_float=int_to_float, remove_null=remove_null).items()) elif isinstance(w, str): my_elems.append('"' + w + '"') continue @@ -77,6 +80,8 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False): my_elems.append(w) continue else: + if not remove_null: + my_elems.append('null') continue # Put in in alphabetical order my_elems_w = sorted(my_elems_w, key=lambda tup: tup[0]) @@ -102,7 +107,7 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False): my_elems = '[' + ','.join(my_elems) + ']' items.append((new_key, my_elems)) elif isinstance(v, dict): - items.extend(_flatten(v, new_key, sep=sep, int_to_float=int_to_float).items()) + items.extend(_flatten(v, new_key, sep=sep, int_to_float=int_to_float, remove_null=remove_null).items()) else: if isinstance(v, int) and int_to_float: items.append((new_key, float(v))) @@ -112,37 +117,37 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False): return dict(items) -def _transform_jsons(json_list, sep, int_to_float): +def _transform_jsons(json_list, sep, int_to_float, remove_null): """ Transform list of jsons by flattening those :param json_list: list of jsons :param sep: separator to use when creating columns' names - :param int_to_float: if set tu true int will be casted to float - + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays :return: list of jsons flattened """ # Transform - new_jsons = [_flatten(j, sep=sep, int_to_float=int_to_float) for j in json_list] + new_jsons = [_flatten(j, sep=sep, int_to_float=int_to_float, remove_null=remove_null) for j in json_list] return new_jsons -def update_df_list(df_list, json_list, sep, int_to_float): +def update_df_list(df_list, json_list, sep, int_to_float, remove_null): """ Update list of dataframes with list of jsons :param df_list: list of dataframes :param json_list: list of jsons :param sep: separator to use when creating columns' names - :param int_to_float: if set tu true int will be casted to float - + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays :return: list of dataframes udpated """ - data = _transform_jsons(json_list, sep, int_to_float) + data = _transform_jsons(json_list, sep, int_to_float, remove_null) df = pd.DataFrame(data) df_list.append(df) @@ -150,7 +155,7 @@ def update_df_list(df_list, json_list, sep, int_to_float): return df_list -def update_csv(path_csv, json_list, columns, sep, int_to_float): +def update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null): """ Append a csv with json list @@ -158,10 +163,11 @@ def update_csv(path_csv, json_list, columns, sep, int_to_float): :param json_list: list of json files :param columns: list of columns to dump (order is important) :param sep: separator to use when creating columns' names - :param int_to_float: if set tu true int will be casted to float + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays """ - data = _transform_jsons(json_list, sep, int_to_float) + data = _transform_jsons(json_list, sep, int_to_float, remove_null) df = pd.DataFrame(data) @@ -179,7 +185,7 @@ def update_csv(path_csv, json_list, columns, sep, int_to_float): return -def update_columns_list(columns_list, json_list, sep): +def update_columns_list(columns_list, json_list, sep, int_to_float, remove_null): """ Update columns list with new json information Sometimes jsons do not have the same fields @@ -188,10 +194,12 @@ def update_columns_list(columns_list, json_list, sep): :param columns_list: list of columns to update :param json_list: list of jsons :param sep: separator to use when creating columns' names + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays :return: list of columns updated """ - data = _transform_jsons(json_list, sep) + data = _transform_jsons(json_list, sep, int_to_float, remove_null) cols = [] for js in data: cols.extend(js.keys()) @@ -200,13 +208,15 @@ def update_columns_list(columns_list, json_list, sep): return columns_list -def get_columns(list_data_paths, sep, logger): +def get_columns(list_data_paths, sep, logger, int_to_float, remove_null): """ Get the columns created accordingly to a list of files containing json :param list_data_paths: list of files containing one json per line :param sep: separator to use when creating columns' names :param logger: logger (used to print) + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays :return: Exhaustive list of columns """ @@ -221,7 +231,7 @@ def get_columns(list_data_paths, sep, logger): for i, line in enumerate(f): j += 1 if (j % 500000 == 0): - columns_list = update_columns_list(columns_list, json_list, sep) + columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found') json_list = [] try: @@ -231,7 +241,7 @@ def get_columns(list_data_paths, sep, logger): continue # A quicker solution would be to join directly to create a valid json if (len(json_list) > 0): - columns_list = update_columns_list(columns_list, json_list, sep) + columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null) logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found') # Concatenate the dataframes created @@ -239,7 +249,7 @@ def get_columns(list_data_paths, sep, logger): return columns_list -def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False): +def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False): """ Get dataframe from files containing one json per line @@ -248,8 +258,8 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep :param path_csv: path to csv output if streaming :param logger: logger (used to print) :param sep: separator to use when creating columns' names - :param int_to_float: if set tu true int will be casted to float - + :param int_to_float: if set to true int will be casted to float + :param remove_null: if set to true, will remove_null from json arrays :return: dataframe or nothing if the dataframe is generated while streaming the files """ @@ -264,7 +274,7 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep if (j % 500000 == 0): logger.info('Iteration ' + str(j) + ': Creating sub dataframe') if columns: - update_csv(path_csv, json_list, columns, sep, int_to_float) + update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) json_list.clear() if (j % 100000 == 0): @@ -280,13 +290,13 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep if (len(json_list) > 0): logger.info('Iteration ' + str(j) + ': Creating last sub dataframe') if columns: - logger.info("updating csv with new data" + path_csv) - update_csv(path_csv, json_list, columns, sep, int_to_float) + logger.info("updating csv with new data " + path_csv) + update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null) json_list.clear() if not columns: # Concatenate the dataframes created - list_of_dfs = update_df_list([], json_list, sep, int_to_float) + list_of_dfs = update_df_list([], json_list, sep, int_to_float, remove_null) logger.info('Concatenate ' + str(len(list_of_dfs)) + ' DataFrames') df = pd.concat(list_of_dfs) @@ -324,7 +334,7 @@ def main(): # Get list of columns if not in streaming columns_list = None if opt.streaming: - columns_list = get_columns(data, opt.sep, logger) + columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null) # Sort columns in alphabetical order columns_list.sort() df = pd.DataFrame(columns=columns_list) @@ -334,7 +344,7 @@ def main(): df.to_csv(opt.path_output, encoding="utf-8", index=None) # Get dataframe - df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float) + df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null) if not opt.streaming: logger.info("saving data to " + opt.path_output)