In [1]:
import pandas as pd
import numpy as np
import re, os, json, glob
from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError

In [40]:
def map_dtype_to_sql(dtype, sample_values=None):
    dtype = str(dtype)
    if 'int' in dtype:
        return 'int'
    elif 'float' in dtype:
        return 'int'  # or 'float' if needed
    elif 'datetime' in dtype:
        return 'datetime'
    # Try to infer datetime from string/object
    elif dtype == 'object' and sample_values is not None:
        # Only check non-null values
        sample_values = [v for v in sample_values if pd.notnull(v)]
        success_count = 0
        for v in sample_values[:10]:  # sample first 10 non-null entries
            try:
                pd.to_datetime(v, dayfirst=True)
                success_count += 1
            except Exception:
                pass
        if success_count >= 8:  # 80%+ look like datetimes
            return 'datetime'
        else:
            return 'string'
    else:
        return 'string'

csv_files = glob.glob("*.csv")
col_dict = {}

for file in csv_files:
    try:
        df = pd.read_csv(file, nrows=100)
        filename_only = os.path.splitext(os.path.basename(file))[0]
        filename_only = filename_only.replace("_cleaned", "")
        col_types = {}
        for col, dtype in df.dtypes.items():
            sample_values = df[col].dropna().astype(str).tolist()[:10]
            col_types[col] = map_dtype_to_sql(dtype, sample_values)
        col_dict[filename_only] = col_types
    except Exception as e:
        print(f"Error reading {file}: {e}")

with open("column.json", "w") as f:
    json.dump(col_dict, f, indent=4)

print("Column names and SQL types extracted from all CSV files and saved to column.json")

Column names and SQL types extracted from all CSV files and saved to column.json


In [25]:
db_url = "postgresql+psycopg2://postgres:1234@localhost:5432/postgres"

db_name = "food_db"

engine = create_engine(db_url, isolation_level='AUTOCOMMIT')

with engine.connect() as conn:
    result = conn.execute(text(f"SELECT 1 FROM pg_database WHERE datname = '{db_name}'"))
    exists = result.scalar()
    if not exists:
        conn.execute(text(f'CREATE DATABASE "{db_name}"'))
        print(f"Database {db_name} created..!!")
    else:
        print(f"Database {db_name} already exists!!")

Database food_db already exists!!


In [44]:
db_url = "postgresql+psycopg2://postgres:1234@localhost:5432/food_db"
engine = create_engine(db_url)

with open('column.json') as f:
    schemas = json.load(f)

type_map = {
    'int': 'INTEGER',
    'string': 'VARCHAR',
    'datetime': 'TIMESTAMP'
}

with engine.begin() as conn:
    for table_name, columns in schemas.items():
        # Force lowercase for table name
        table_name_lc = table_name.lower()

        # Build column definitions (lowercase)
        col_defs = []
        for col_name, col_type in columns.items():
            col_name_lc = col_name.lower()
            sql_type = type_map.get(col_type.lower(), 'VARCHAR')
            if sql_type == 'VARCHAR':
                sql_type = 'VARCHAR(255)'
            col_defs.append(f"{col_name_lc} {sql_type}")  # no quotes

        col_defs_str = ", ".join(col_defs)
        create_table_sql = f'CREATE TABLE IF NOT EXISTS {table_name_lc} ({col_defs_str});'

        print(f"Creating table {table_name_lc}...")
        conn.execute(text(create_table_sql))

print("Tables created successfully.")

Creating table claim_data...
Creating table food_data...
Creating table provider_data...
Creating table receiver_data...
Tables created successfully.


In [2]:
db_url = "postgresql+psycopg2://postgres:1234@localhost:5432/food_db"
engine = create_engine(db_url)

with open('column.json') as f:
    schemas = json.load(f)
    
file_to_table = {
    "claim_data_cleaned.csv": "claim_data",
    "food_data_cleaned.csv": "food_data",
    "provider_data_cleaned.csv": "provider_data",
    "receiver_data_cleaned.csv": "receiver_data"
}

# Loop through all tables in schema
for csv_file, table_name in file_to_table.items():
    print(f"Loading {csv_file} into {table_name}...")
    df = pd.read_csv(csv_file)
    
    
    # Keep only columns in schema, in correct order
    schema_cols = list(schemas[table_name].keys())
    df = df[schema_cols]
    
    # Type conversion based on JSON schema
    for col, sql_type in schemas[table_name].items():
        if sql_type == 'datetime':
            df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True)
        elif sql_type == 'int':
            df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
        else:
            df[col] = df[col].astype(str)
    
    # Insert into SQL
    df.to_sql(table_name, engine, if_exists='append', index=False)
    print(f"✅ Data inserted into {table_name}")

Loading claim_data_cleaned.csv into claim_data...
✅ Data inserted into claim_data
Loading food_data_cleaned.csv into food_data...
✅ Data inserted into food_data
Loading provider_data_cleaned.csv into provider_data...
✅ Data inserted into provider_data
Loading receiver_data_cleaned.csv into receiver_data...
✅ Data inserted into receiver_data
