Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions json_to_csv/json_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def get_args():
parser.add_argument("--path_data_jsonperline", type=str, help="File or folder of files containing one json per line")
parser.add_argument("--streaming", action='store_true', default=False, help="Create the csv in a stream way instead of loading every json in memory")
parser.add_argument("--sep", default='.', help="Separator used to create columns' names")
parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float")
parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float")
parser.add_argument("--path_output", type=str, help="Path output")
parser.add_argument("--remove_null", action='store_true', default=False, help="Remove null values (kept by default)")

args = parser.parse_args()
return args

Expand All @@ -48,7 +50,7 @@ def setup_custom_logger(name):
return logger


def _flatten(d, parent_key='', sep='_', int_to_float=False):
def _flatten(d, parent_key='', sep='_', int_to_float=False, remove_null=False):
"""
Flatten a nested dictionary to one leve dictionary (recursive function)

Expand All @@ -69,14 +71,17 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False):
for w in v:
my_elems_w = []
if isinstance(w, dict):
my_elems_w.extend(_flatten(w, sep=sep, int_to_float=int_to_float).items())
my_elems_w.extend(_flatten(w, sep=sep, int_to_float=int_to_float, remove_null=remove_null).items())
elif isinstance(w, str):
my_elems.append('"' + w + '"')
continue
elif w != None:
my_elems.append(w)
continue
else:
if not remove_null:
my_elems.append('null')

continue
# Put in in alphabetical order
my_elems_w = sorted(my_elems_w, key=lambda tup: tup[0])
Expand All @@ -102,17 +107,19 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False):
my_elems = '[' + ','.join(my_elems) + ']'
items.append((new_key, my_elems))
elif isinstance(v, dict):
items.extend(_flatten(v, new_key, sep=sep, int_to_float=int_to_float).items())
items.extend(_flatten(v, new_key, sep=sep, int_to_float=int_to_float, remove_null=remove_null).items())
else:
if isinstance(v, int) and int_to_float:
items.append((new_key, float(v)))
else:
if v != None:
items.append((new_key, v))
elif not remove_null:
items.append((new_key, 'null'))
return dict(items)


def _transform_jsons(json_list, sep, int_to_float):
def _transform_jsons(json_list, sep, int_to_float, remove_null):
"""
Transform list of jsons by flattening those

Expand All @@ -125,11 +132,11 @@ def _transform_jsons(json_list, sep, int_to_float):
"""

# Transform
new_jsons = [_flatten(j, sep=sep, int_to_float=int_to_float) for j in json_list]
new_jsons = [_flatten(j, sep=sep, int_to_float=int_to_float, remove_null=remove_null) for j in json_list]
return new_jsons


def update_df_list(df_list, json_list, sep, int_to_float):
def update_df_list(df_list, json_list, sep, int_to_float, remove_null):
"""
Update list of dataframes with list of jsons

Expand All @@ -142,15 +149,15 @@ def update_df_list(df_list, json_list, sep, int_to_float):
:return: list of dataframes udpated
"""

data = _transform_jsons(json_list, sep, int_to_float)
data = _transform_jsons(json_list, sep, int_to_float, remove_null)
df = pd.DataFrame(data)

df_list.append(df)

return df_list


def update_csv(path_csv, json_list, columns, sep, int_to_float):
def update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null):
"""
Append a csv with json list

Expand All @@ -161,7 +168,7 @@ def update_csv(path_csv, json_list, columns, sep, int_to_float):
:param int_to_float: if set tu true int will be casted to float
"""

data = _transform_jsons(json_list, sep, int_to_float)
data = _transform_jsons(json_list, sep, int_to_float, remove_null)
df = pd.DataFrame(data)


Expand Down Expand Up @@ -239,7 +246,7 @@ def get_columns(list_data_paths, sep, logger):
return columns_list


def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False):
def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False):
"""
Get dataframe from files containing one json per line

Expand All @@ -264,7 +271,7 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
if (j % 500000 == 0):
logger.info('Iteration ' + str(j) + ': Creating sub dataframe')
if columns:
update_csv(path_csv, json_list, columns, sep, int_to_float)
update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null)
json_list.clear()

if (j % 100000 == 0):
Expand All @@ -281,12 +288,12 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
logger.info('Iteration ' + str(j) + ': Creating last sub dataframe')
if columns:
logger.info("updating csv with new data" + path_csv)
update_csv(path_csv, json_list, columns, sep, int_to_float)
update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null)
json_list.clear()

if not columns:
# Concatenate the dataframes created
list_of_dfs = update_df_list([], json_list, sep, int_to_float)
list_of_dfs = update_df_list([], json_list, sep, int_to_float, remove_null)
logger.info('Concatenate ' + str(len(list_of_dfs)) + ' DataFrames')
df = pd.concat(list_of_dfs)

Expand Down Expand Up @@ -334,7 +341,7 @@ def main():
df.to_csv(opt.path_output, encoding="utf-8", index=None)

# Get dataframe
df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float)
df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null)

if not opt.streaming:
logger.info("saving data to " + opt.path_output)
Expand Down