In [1]:
import glob
import os
import json
import pandas as pd
import re

In [2]:
def get_column_names(schemas, ds_name, sorting_key='column_position'):
    column_details = schemas[ds_name]
    columns = sorted(column_details, key=lambda col: col[sorting_key])
    return [col['column_name'] for  col in columns]

In [3]:
def read_csv(file, schemas):
    file_path_list = re.split('[/\\\]', file)
    ds_name = file_path_list[-2]
    file_name = file_path_list[-1]
    columns = get_column_names(schemas, ds_name)
    df = pd.read_csv(file, names=columns)
    return df

In [4]:
def to_json(df, tgt_base_dir, ds_name, file_name):
    json_file_path = f'{tgt_base_dir}/{ds_name}/{file_name}'
    os.makedirs(f'{tgt_base_dir}/{ds_name}', exist_ok=True)
    df.to_json(
        json_file_path,
        orient='records',
        lines = True
    )
    

In [5]:
def file_converter(tgt_base_dir, src_base_dir, ds_name):    
    schemas = json.load(open(f'{src_base_dir}/schemas.json'))
    files = glob.glob(f'{src_base_dir}/{ds_name}/part-*')
    
    for file in files:
        print(f'Processing: {file}')
        df = read_csv(file, schemas)
        file_name = re.split('[/\\\]', file)[-1]
        to_json(df, tgt_base_dir, ds_name, file_name)

In [6]:
def process_files(ds_names=None):
    src_base_dir = 'retail_db'
    tgt_base_dir = 'retail_db_json'
    schemas = json.load(open(f'{src_base_dir}/schemas.json'))
    if not ds_names:
        ds_names = schemas.keys()
    for ds_name in ds_names:
        print(f'Processing {ds_name}')
        file_converter(tgt_base_dir, src_base_dir, ds_name)

In [7]:
process_files()

Processing orders
Processing: retail_db/orders\part-00000
Processing order_items
Processing: retail_db/order_items\part-00000
