Skip to content
Merged

Dev #23

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ The csv column's will be
team_captain, team_defend, team_str
```

Up until now, ljson are handled. Jsons are handled if in the file indicated as input is in one of the following format:
- Json array with one element per line
- One json element in the first line

TODO:

It does not yet parse jsons formatted in another way.
But soon, it will be

## Installation

Expand All @@ -44,14 +52,14 @@ optional arguments:
every json in memory
--sep SEP Separator used to create columns names
--int_to_float Cast int to float
--path_output PATH_OUTPUT
Path output
--path_output PATH_OUTPUT Path output
--remove_null Remove null values (kept by default)
--is_json Indicate if input file is a json
```

Please refer to [here](https://github.com/Besedo/json-to-csv/examples) for examples.



## Meta

Distributed under the Apache license v2.0. See ``LICENSE`` for more information.
Expand Down
154 changes: 134 additions & 20 deletions json_to_csv/json_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_args():
parser.add_argument("--int_to_float", action='store_true', default=False, help="Cast int to float")
parser.add_argument("--path_output", type=str, help="Path output")
parser.add_argument("--remove_null", action='store_true', default=False, help="Remove null values (kept by default)")
parser.add_argument("--is_json", action='store_true', default=False, help="Indicate if input file is a json")

args = parser.parse_args()
return args
Expand Down Expand Up @@ -190,7 +191,78 @@ def update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
return columns_list


def get_columns(list_data_paths, sep, logger, int_to_float, remove_null):
def read_jsons_chunks(file_object, chunk_size=10000):
"""Lazy function to read a json by chunk.
Default chunk size: 10k"""

# Parse the next real chunk_size lines
chunk = file_object.read(1000000)
data = []
i = 0
nb_bracket = 0
nb_quotes = 0
example = ""
count_escape_char = 0
while True:
# Read cahracter by character
for k, c in enumerate(chunk):
# Check quoting
if c == '"':
# Check only when '"' is a delimiter of field or value in json
if count_escape_char % 2 == 0:
nb_quotes += 1
# Check beginning of brackets
elif c == '{' and nb_quotes % 2 == 0:
# Check only when '{' is a delimiter of field or value in json
if count_escape_char % 2 == 0:
nb_bracket += 1
# Check ending of brackets
elif c == '}' and nb_quotes % 2 == 0:
# Check only when '"' is a delimiter of field or value in json
if count_escape_char % 2 == 0:
nb_bracket -= 1
# This means we finished to read one json
if nb_bracket == 0 and nb_quotes % 2 == 0:
example += c
data.append(json.loads(example))
i += 1
# When chunk_size jsons obtained, dump those
if i % chunk_size == 0:
yield(data)
data = []

# Initialize those
example = ""
continue
# If we are in between 2 json examples or at the beginning
elif c in ['[', ',', '\n'] and nb_bracket == 0 and nb_quotes % 2 == 0:
continue
# If we are at the end of the file
if c in [']', ''] and nb_bracket == 0 and nb_quotes % 2 == 0:
# If EOF obtained or end of jsonarray send what's left of the data
if example == "" or example == "]":
yield(data)
return
if c == "\\":
count_escape_char += 1
else:
count_escape_char = 0
# Append character to the json example
example += c

# If at the end of the chunk, read new chunk
if k == len(chunk) - 1:
chunk = file_object.read(1000000)
# Keep what's left of the chunk
elif len(chunk) != 0:
chunk = chunk[k:]
# if k == 0 that means that we read the whole file
else:
break



def get_columns(list_data_paths, sep, logger, int_to_float, remove_null, is_json):
"""
Get the columns created accordingly to a list of files containing json

Expand All @@ -199,28 +271,48 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null):
:param logger: logger (used to print)
:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays
:param is_json: if set to true, inputs are considered as valid json

:return: Exhaustive list of columns
"""

columns_list = []

j = 0
chunk_size = 50000
for data_file in list_data_paths:
logger.info(data_file)
json_list = []
with open(data_file) as f:
for i, line in enumerate(f):
j += 1
if (j % 500000 == 0):
# If we deal with json (or json array) file
if is_json:
f = open(data_file)
# Read json file by chunk
for x in read_jsons_chunks(f, chunk_size=chunk_size):
if j!=0 and (j % chunk_size == 0):
columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found')
json_list = []
try:
json_list.append(json.loads(line))
json_list.extend(x)
# Maximum of chunk_size elements were added
j += chunk_size
except:
logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped")
logger.info("Json in line " + str(j) + " (in file: " + data_file + ") does not seem well formed. Example was skipped")
continue
# If we deal with ljson
else:
with open(data_file) as f:
for i, line in enumerate(f):
j += 1
if (j % 50000 == 0):
columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
logger.info('Iteration ' + str(j) + ': Updating columns ===> ' + str(len(columns_list)) + ' columns found')
json_list = []
try:
json_list.append(json.loads(line))
except:
logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped")
continue
# A quicker solution would be to join directly to create a valid json
if (len(json_list) > 0):
columns_list = update_columns_list(columns_list, json_list, sep, int_to_float, remove_null)
Expand All @@ -231,7 +323,7 @@ def get_columns(list_data_paths, sep, logger, int_to_float, remove_null):
return columns_list


def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False):
def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep='.', int_to_float=False, remove_null=False, is_json=False):
"""
Get dataframe from files containing one json per line

Expand All @@ -242,30 +334,52 @@ def get_dataframe(list_data_paths, columns=None, path_csv=None, logger=None, sep
:param sep: separator to use when creating columns' names
:param int_to_float: if set to true int will be casted to float
:param remove_null: if set to true, will remove_null from json arrays
:param is_json: if set to true, inputs are considered as valid json

:return: dataframe or nothing if the dataframe is generated while streaming the files
"""

json_list = []
j = 0
chunk_size = 50000
for data_file in list_data_paths:
logger.info(data_file)
with open(data_file) as f:
for i, line in enumerate(f):
j += 1
if (j % 500000 == 0):
json_list = []
# If we deal with json (or json array) file
if is_json:
f = open(data_file)
# Read json file by chunk
for x in read_jsons_chunks(f, chunk_size=chunk_size):
if j!=0 and (j % chunk_size == 0):
logger.info('Iteration ' + str(j) + ': Creating sub dataframe')
if columns:
update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null)
json_list.clear()

if (j % 100000 == 0):
logger.info(str(i) + ' documents processed')
json_list = []
try:
json_list.append(json.loads(line))
json_list.extend(x)
# Maximum of chunk_size elements were added
j += chunk_size # -1 because we add 1 at the beginning of the loop
except:
logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped")
continue
# If we deal with ljson
else:
with open(data_file) as f:
for i, line in enumerate(f):
j += 1
if (j % 50000 == 0):
logger.info('Iteration ' + str(j) + ': Creating sub dataframe')
if columns:
update_csv(path_csv, json_list, columns, sep, int_to_float, remove_null)
json_list.clear()

if (j % 100000 == 0):
logger.info(str(i) + ' documents processed')
try:
json_list.append(json.loads(line))
except:
logger.info("Json in line " + str(i) + " (in file: " + data_file + ") does not seem well formed. Example was skipped")
continue

# A quicker solution would be to join directly to create a valid json
logger.info('Convert to DataFrame')
Expand Down Expand Up @@ -313,10 +427,10 @@ def main():
logger.info("Reading " + opt.path_data_jsonperline)
data = [opt.path_data_jsonperline]

# Get list of columns if not in streaming
# Get list of columns if in streaming
columns_list = None
if opt.streaming:
columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null)
columns_list = get_columns(data, opt.sep, logger, opt.int_to_float, opt.remove_null, opt.is_json)
# Sort columns in alphabetical order
columns_list.sort()
df = pd.DataFrame(columns=columns_list)
Expand All @@ -326,7 +440,7 @@ def main():
df.to_csv(opt.path_output, encoding="utf-8", index=None, quoting=1)

# Get dataframe
df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null)
df = get_dataframe(data, columns=columns_list, path_csv=opt.path_output, logger=logger, sep=opt.sep, int_to_float=opt.int_to_float, remove_null=opt.remove_null, is_json=opt.is_json)

if not opt.streaming:
logger.info("saving data to " + opt.path_output)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pandas