Skip to content
Merged

Dev #13

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions json_to_csv/json_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def get_args():
parser.add_argument("--path_data_jsonperline", type=str, help="File or folder of files containing one json per line")
parser.add_argument("--streaming", action='store_true', default=False, help="Create the csv in a stream way instead of loading every json in memory")
parser.add_argument("--sep", default='.', help="Separator used to create columns' names")
parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float")
parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float")
parser.add_argument("--path_output", type=str, help="Path output")
parser.add_argument("--remove_null", action='store_true', default=False, help="Remove null values (kept by default)")

args = parser.parse_args()
return args

Expand All @@ -48,14 +50,15 @@ def setup_custom_logger(name):
return logger


def _flatten(d, parent_key='', sep='_', int_to_float=False):
def _flatten(d, parent_key='', sep='_', int_to_float=False, remove_null=False):
"""
Flatten a nested dictionary to one leve dictionary (recursive function)

:param d: dictionary
:param parent_key: parent_key used to create field name
:param sep: separator of nested fields
:param int_to_float: if set tu true int will be casted to float
:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays

:return: list of jsons flattened
"""
Expand All @@ -69,14 +72,16 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False):
for w in v:
my_elems_w = []
if isinstance(w, dict):
my_elems_w.extend(_flatten(w, sep=sep, int_to_float=int_to_float).items())
my_elems_w.extend(_flatten(w, sep=sep, int_to_float=int_to_float, remove_null=remove_null).items())
elif isinstance(w, str):
my_elems.append('"' + w + '"')
continue
elif w != None:
my_elems.append(w)
continue
else:
if not remove_null:
my_elems.append('null')
continue
# Put in in alphabetical order
my_elems_w = sorted(my_elems_w, key=lambda tup: tup[0])
Expand All @@ -102,7 +107,7 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False):
my_elems = '[' + ','.join(my_elems) + ']'
items.append((new_key, my_elems))
elif isinstance(v, dict):
items.extend(_flatten(v, new_key, sep=sep, int_to_float=int_to_float).items())
items.extend(_flatten(v, new_key, sep=sep, int_to_float=int_to_float, remove_null=remove_null).items())
else:
if isinstance(v, int) and int_to_float:
items.append((new_key, float(v)))
Expand All @@ -112,56 +117,57 @@ def _flatten(d, parent_key='', sep='_', int_to_float=False):
return dict(items)


def _transform_jsons(json_list, sep, int_to_float):
def _transform_jsons(json_list, sep, int_to_float, remove_null):
"""
Transform list of jsons by flattening those

:param json_list: list of jsons
:param sep: separator to use when creating columns' names
:param int_to_float: if set tu true int will be casted to float

:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays

:return: list of jsons flattened
"""

# Transform
new_jsons = [_flatten(j, sep=sep, int_to_float=int_to_float) for j in json_list]
new_jsons = [_flatten(j, sep=sep, int_to_float=int_to_float, remove_null=remove_null) for j in json_list]
return new_jsons


def update_df_list(df_list, json_list, sep, int_to_float):
def update_df_list(df_list, json_list, sep, int_to_float, remove_null):
"""
Update list of dataframes with list of jsons

:param df_list: list of dataframes
:param json_list: list of jsons
:param sep: separator to use when creating columns' names
:param int_to_float: if set tu true int will be casted to float

:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays

:return: list of dataframes udpated
"""

data = _transform_jsons(json_list, sep, int_to_float)
data = _transform_jsons(json_list, sep, int_to_float, remove_null)
df = pd.DataFrame(data)

df_list.append(df)

return df_list


def update_csv(path_csv, json_list, columns, sep, int_to_float):
def update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null):
"""
Append a csv with json list

:param path_csv: path to csv to append
:param json_list: list of json files
:param columns: list of columns to dump (order is important)
:param sep: separator to use when creating columns' names
:param int_to_float: if set tu true int will be casted to float
:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays
"""

data = _transform_jsons(json_list, sep, int_to_float)
data = _transform_jsons(json_list, sep, int_to_float, remove_null)
df = pd.DataFrame(data)


Expand All @@ -179,7 +185,7 @@ def update_csv(path_csv, json_list, columns, sep, int_to_float):
return


def update_columns_list(columns_list, json_list, sep):
def update_columns_list(columns_list, json_list, sep, int_to_float, remove_null):
"""
Update columns list with new json information
Sometimes jsons do not have the same fields
Expand All @@ -188,10 +194,12 @@ def update_columns_list(columns_list, json_list, sep):
:param columns_list: list of columns to update
:param json_list: list of jsons
:param sep: separator to use when creating columns' names
:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays

:return: list of columns updated
"""
data = _transform_jsons(json_list, sep)
data = _transform_jsons(json_list, sep, int_to_float, remove_null)
cols = []
for js in data:
cols.extend(js.keys())
Expand All @@ -200,13 +208,15 @@ def update_columns_list(columns_list, json_list, sep):
return columns_list


def get_columns(list_data_paths, sep, logger):
def get_columns(list_data_paths, sep, logger, int_to_float, remove_null):
"""
Get the columns created accordingly to a list of files containing json

:param list_data_paths: list of files containing one json per line
:param sep: separator to use when creating columns' names
:param logger: logger (used to print)
:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays

:return: Exhaustive list of columns
"""
Expand All @@ -221,7 +231,7 @@ def get_columns(list_data_paths, sep, logger):
for i, line in enumerate(f):
j += 1
if (j % 500000 == 0):
columns_list = update_columns_list(columns_list, json_list, sep)
columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found')
json_list = []
try:
Expand All @@ -231,15 +241,15 @@ def get_columns(list_data_paths, sep, logger):
continue
# A quicker solution would be to join directly to create a valid json
if (len(json_list) > 0):
columns_list = update_columns_list(columns_list, json_list, sep)
columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found')

# Concatenate the dataframes created
logger.info('Full column\'s list obtained: ' + str(len(columns_list)) + ' fields found')
return columns_list


def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False):
def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False):
"""
Get dataframe from files containing one json per line

Expand All @@ -248,8 +258,8 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
:param path_csv: path to csv output if streaming
:param logger: logger (used to print)
:param sep: separator to use when creating columns' names
:param int_to_float: if set tu true int will be casted to float

:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays

:return: dataframe or nothing if the dataframe is generated while streaming the files
"""
Expand All @@ -264,7 +274,7 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
if (j % 500000 == 0):
logger.info('Iteration ' + str(j) + ': Creating sub dataframe')
if columns:
update_csv(path_csv, json_list, columns, sep, int_to_float)
update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null)
json_list.clear()

if (j % 100000 == 0):
Expand All @@ -280,13 +290,13 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
if (len(json_list) > 0):
logger.info('Iteration ' + str(j) + ': Creating last sub dataframe')
if columns:
logger.info("updating csv with new data" + path_csv)
update_csv(path_csv, json_list, columns, sep, int_to_float)
logger.info("updating csv with new data " + path_csv)
update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null)
json_list.clear()

if not columns:
# Concatenate the dataframes created
list_of_dfs = update_df_list([], json_list, sep, int_to_float)
list_of_dfs = update_df_list([], json_list, sep, int_to_float, remove_null)
logger.info('Concatenate ' + str(len(list_of_dfs)) + ' DataFrames')
df = pd.concat(list_of_dfs)

Expand Down Expand Up @@ -324,7 +334,7 @@ def main():
# Get list of columns if not in streaming
columns_list = None
if opt.streaming:
columns_list = get_columns(data, opt.sep, logger)
columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null)
# Sort columns in alphabetical order
columns_list.sort()
df = pd.DataFrame(columns=columns_list)
Expand All @@ -334,7 +344,7 @@ def main():
df.to_csv(opt.path_output, encoding="utf-8", index=None)

# Get dataframe
df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float)
df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null)

if not opt.streaming:
logger.info("saving data to " + opt.path_output)
Expand Down