In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.postgresql import JSONB 
import time
import os

# --- LOAD FUNCTION (Simplified: Append-Only) ---
def load_json_to_postgres(json_file_path, table_name, engine, chunk_size=50000):
    """
    Loads a line-by-line JSON file into a Postgres table.
    
    This function APPS data. It does NOT truncate the table.
    It assumes the table already exists.
    
    Args:
        json_file_path (str): The full path to the .json file.
        table_name (str): The destination table name.
        engine (sqlalchemy.engine): The SQLAlchemy connection engine.
        chunk_size (int): Number of lines to read per chunk.
    """
    
    print("-" * 50)
    print(f"Starting load for table: '{table_name}'")

    # --- Check if file exists (a simple, essential check) ---
    if not os.path.exists(json_file_path):
        print(f"  ERROR: File not found at: {json_file_path}")
        print(f"  Skipping load for table '{table_name}'.")
        print("-" * 50)
        return

    start_time = time.time()
    total_rows = 0
    
    # Define special dtypes for 'tb_business'
    special_dtypes = {}
    if table_name == 'tb_business':
        special_dtypes = {'attributes': JSONB, 'hours': JSONB}
        
    try:
        print(f"  ... Reading file {json_file_path}...")
        
        # Loop through the file in chunks
        for chunk in pd.read_json(json_file_path, lines=True, chunksize=chunk_size):
            
            # Load the chunk into the SQL table
            chunk.to_sql(
                table_name,
                con=engine,
                if_exists='append', # <-- Key logic: always append
                index=False,
                dtype=special_dtypes
            )
            
            total_rows += len(chunk)
            print(f"  ... {total_rows} rows loaded into '{table_name}'...")

        end_time = time.time()
        print(f"Load for table '{table_name}' successful.")
        print(f"Total {total_rows} rows inserted.")
        print(f"Load time: {end_time - start_time:.2f} seconds.")

    except Exception as e:
        # Basic error handling for the load process
        print(f"ERROR during load for table '{table_name}': {e}")
    
    print("-" * 50)

# --- 1. GLOBAL SETTINGS ---
DB_PASSWORD = "DB_PASSWORD" 
BASE_DATA_FOLDER = "D:/Yelp-JSON" 

# --- 2. LOAD MAP (Define files and tables) ---
# This dictionary maps the source JSON filename to the destination table name.
files_to_load = {
    "yelp_academic_dataset_user.json": "tb_user",
    "yelp_academic_dataset_business.json": "tb_business",
    "yelp_academic_dataset_checkin.json": "tb_checkin",
    "yelp_academic_dataset_tip.json": "tb_tip",
    "yelp_academic_dataset_review.json": "tb_review",
}

# --- 3. DATABASE CONNECTION ---
# This script assumes the database (yelp_db) and tables ALREADY EXIST.
db_url = f'postgresql://postgres:{DB_PASSWORD}@localhost:5432/yelp_db'
engine = None 

try:
    engine = create_engine(db_url)
    # Test the connection
    with engine.connect() as connection:
        print("Database connection successful!")
        
except Exception as e:
    print(f"CRITICAL ERROR: Could not connect to database: {e}")
    print("Check DB_PASSWORD or if the Postgres server is running.")

# --- 4. MAIN LOAD LOOP ---
if engine:  
    print("\nStarting batch load process (APPEND-ONLY)...")
    print("NOTE: Run TRUNCATE in DBeaver first if you want a fresh load.")
    overall_start_time = time.time()

    # Loop through the dictionary and load each file
    for file_name, table_name in files_to_load.items():
        
        full_file_path = os.path.join(BASE_DATA_FOLDER, file_name)
        
        # Call the simplified load function
        load_json_to_postgres(full_file_path, table_name, engine)

    overall_end_time = time.time()
    print("\nBatch load process finished.")
    print(f"Total operation time: {overall_end_time - overall_start_time:.2f} seconds.")
