In [1]:
import glob
import os
import json
import re
import pandas as pd

In [2]:
def get_column_names(schemas, ds_name, sorting_key='column_position'):
    column_details = schemas[ds_name]
    columns = sorted(column_details, key=lambda col: col[sorting_key])
    return [col['column_name'] for col in columns]


In [3]:
def read_csv(file, schemas):
    file_path_list = re.split(r'[\\/]', file)
    ds_name = file_path_list[-2]
    file_name = file_path_list[-1]
    
    columns = get_column_names(schemas, ds_name)
    df = pd.read_csv(file, names=columns)
    
    return df

In [4]:
def file_converter(ds_name):
    src_base_dir = 'data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db'
    tgt_base_dir = 'data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db_json'
    
    # Load schema safely
    with open(f'{src_base_dir}/schemas.json') as f:
        schemas = json.load(f)
    
    # Get all files matching part-* pattern
    files = glob.glob(f'{src_base_dir}/{ds_name}/part-*')
    
    for file in files:
        print(f'Processing {file}')
        df = read_csv(file, schemas)
        file_name = re.split(r'[\\/]', file)[-1]
        to_json(df, tgt_base_dir, ds_name, file_name)

# You still need to define the to_json() function, e.g.:
def to_json(df, base_dir, ds_name, file_name):
    os.makedirs(f'{base_dir}/{ds_name}', exist_ok=True)
    output_path = f'{base_dir}/{ds_name}/{file_name}.json'
    df.to_json(output_path, orient='records', lines=True)

In [5]:
def process_file(ds_name=None):
    src_base_dir = 'data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db'
    tgt_base_dir = 'data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db_json'
    
    with open(f'{src_base_dir}/schemas.json') as f:
        schemas = json.load(f)
    
    dataset_list = list(schemas.keys()) if ds_name is None else [ds_name] if isinstance(ds_name, str) else ds_name
    
    for dataset in dataset_list:
        print(f'Processing {dataset}')
        file_converter(src_base_dir, tgt_base_dir, dataset)

In [6]:
process_file()

Processing departments


TypeError: file_converter() takes 1 positional argument but 3 were given

In [None]:
ds_name = 'orders'

file_converter(ds_name)

Processing data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db/orders\part-00000


In [None]:
import os
import json
import glob
import re
import pandas as pd

# ----------------------------
# Helper: Get ordered column names from schema
# ----------------------------
def get_column_names(schemas, ds_name, sorting_key='column_position'):
    column_details = schemas[ds_name]
    columns = sorted(column_details, key=lambda col: col[sorting_key])
    return [col['column_name'] for col in columns]

# ----------------------------
# Helper: Read CSV with columns from schema
# ----------------------------
def read_csv(file, schemas):
    file_path_list = re.split(r'[\\/]', file)
    ds_name = file_path_list[-2]
    
    columns = get_column_names(schemas, ds_name)
    df = pd.read_csv(file, names=columns)
    
    return df

# ----------------------------
# Helper: Write DataFrame to JSON file
# ----------------------------
def to_json(df, base_dir, ds_name, file_name):
    os.makedirs(f'{base_dir}/{ds_name}', exist_ok=True)
    output_path = f'{base_dir}/{ds_name}/{file_name}.json'
    df.to_json(output_path, orient='records', lines=True)

# ----------------------------
# File Converter for one dataset
# ----------------------------
def file_converter(ds_name):
    src_base_dir = 'data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db'
    tgt_base_dir = 'data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db_json'
    
    with open(f'{src_base_dir}/schemas.json') as f:
        schemas = json.load(f)
    
    files = glob.glob(f'{src_base_dir}/{ds_name}/part-*')
    
    for file in files:
        print(f'Processing {file}')
        df = read_csv(file, schemas)
        file_name = re.split(r'[\\/]', file)[-1]
        to_json(df, tgt_base_dir, ds_name, file_name)

# ----------------------------
# Process one or all datasets
# ----------------------------
def process_files(ds_names=None):
    src = 'data/retail_db'
    tgt = 'data/retail_db_json'
    with open(f'{src}/schemas.json') as f:
        schemas = json.load(f)
    ds_names = ds_names or schemas.keys()
    for name in ds_names:
        print(f'Processing {name}')
        file_converter(src, tgt, name)


In [None]:
process_file()


=== Processing Dataset: departments ===
Processing data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db/departments\part-00000

=== Processing Dataset: categories ===
Processing data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db/categories\part-00000

=== Processing Dataset: orders ===
Processing data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db/orders\part-00000

=== Processing Dataset: products ===
Processing data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db/products\part-00000

=== Processing Dataset: customers ===
Processing data-engineering-essentials/035 Python Essentials for Data Engineers/05 Project 1 - File Format Converter/data/retail_db/customers\part-00000

=== Processing Dataset: order_items ===
P

In [None]:
schemas = json.load(open('data-engineering-essentials/035 Python Essentials for Data Engineers/apps/file-format-converter/data/retail_db/schemas.json'))

In [None]:
schemas.keys()

dict_keys(['departments', 'categories', 'orders', 'products', 'customers', 'order_items'])