# Install necessary libraries

In [None]:
!pip install psycopg2-binary python-dotenv

# Import necessary libraries

In [1]:
import os
import psycopg2
import csv
from typing import List
from dotenv import load_dotenv
from datetime import datetime

# Defining functions

### Selective Column export

In [9]:
def load_db_config() -> dict:
    """Load database configuration from environment variables."""
    load_dotenv()
    return {
        'dbname': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USERNAME'),
        'password': os.getenv('DB_PASS'),
        'host': os.getenv('DB_HOST'),
        'port': os.getenv('DB_PORT')
    }

def export_to_csv(columns: List[str], output_file: str = 'user_export.csv') -> None:
    """
    Export specified columns from the user table to a CSV file.
    
    Args:
        columns: List of column names to export
        output_file: Name of the output CSV file
    """
    try:
        # Connect to the database
        db_config = load_db_config()
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Create the SQL query with schema.table name
        columns_str = ', '.join(columns)
        query = f'SELECT {columns_str} FROM public.user'
        
        # Execute the query
        cursor.execute(query)
        
        # Write to CSV
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            # Write header
            writer.writerow(columns)
            # Write data
            for row in cursor:
                writer.writerow(row)
        
        print(f"Data successfully exported to {output_file}")
        
    except psycopg2.Error as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

# Execution

In [10]:
if __name__ == "__main__":
    # Example usage
    columns_to_export = ["id", "email", "password", "role"]
    export_to_csv(columns_to_export)

Data successfully exported to user_export.csv


# Single table export

In [7]:
def load_db_config() -> dict:
    """Load database configuration from environment variables."""
    load_dotenv()
    return {
        'dbname': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USERNAME'),
        'password': os.getenv('DB_PASS'),
        'host': os.getenv('DB_HOST'),
        'port': os.getenv('DB_PORT')
    }

def export_full_table_to_csv():
    """Export the entire user table to a CSV file with timestamp in filename."""
    try:
        # Connect to the database
        db_config = load_db_config()
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Get column names
        cursor.execute("""
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = 'public' 
            AND table_name = 'user'
            ORDER BY ordinal_position;
        """)
        columns = [col[0] for col in cursor.fetchall()]
        
        # Create filename with timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f'user_table_export_{timestamp}.csv'
        
        # Execute the query for all data
        cursor.execute('SELECT * FROM public.user')
        
        # Write to CSV
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            # Write header
            writer.writerow(columns)
            # Write data
            for row in cursor:
                writer.writerow(row)
        
        print(f"Full table successfully exported to {filename}")
        print(f"Total columns exported: {len(columns)}")
        print(f"Columns: {', '.join(columns)}")
        
    except psycopg2.Error as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

if __name__ == "__main__":
    export_full_table_to_csv()

Full table successfully exported to user_table_export_20241230_220628.csv
Total columns exported: 28
Columns: id, email, password, role, isConfirmed, isActive, fullName, profilePicture, countryId, phone, additionalPhone, address01, address02, dateOfBirth, gender, createdAt, updatedAt, brandId, verificationToken, userId, isDeleted, isUserRegisterForBrand, passWordChangeRequest, isOwner, isBiometric, newPassword, contactPersonId, isBlocked


# Multiple Tables Export

In [3]:
import os
import csv
import psycopg2
from psycopg2 import sql # For safe SQL identifiers
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Any # Added Dict, Any for type hints

# --- Helper Functions ---

def load_db_config() -> Dict[str, Any]:
    """Load database configuration from environment variables."""
    load_dotenv()
    
    db_port_str = os.getenv('DB_PORT')
    db_port = None
    if db_port_str and db_port_str.isdigit():
        db_port = int(db_port_str)
    elif db_port_str:
        print(f"Warning: DB_PORT ('{db_port_str}') is not a valid number. Using default PostgreSQL port 5432 if applicable by psycopg2.")

    config: Dict[str, Any] = {
        'dbname': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USERNAME'),
        'password': os.getenv('DB_PASS'),
        'host': os.getenv('DB_HOST'),
    }
    if db_port: # Only add port to config if it's valid
        config['port'] = db_port
    
    # Check for missing essential config values (excluding password, which PGPASSWORD or .pgpass might handle)
    for key, value in config.items():
        if value is None and key not in ['port', 'password']: 
             print(f"Warning: Environment variable for 'DB_{key.upper()}' is not set. Connection might fail.")
             
    return config

def export_single_table_to_csv(table_name: str, output_dir: str, db_config: Dict[str, Any]) -> bool:
    """
    Export a single specified table to a CSV file with a timestamp in the filename.
    Returns True on success, False on failure.
    Uses the provided db_config.
    """
    conn = None
    cursor = None
    
    # Validate db_config essentials for connection
    required_keys_for_connection = ['dbname', 'user'] # host, port, password can be optional depending on setup
    for key in required_keys_for_connection:
        if not db_config.get(key):
            print(f"Error: Database configuration for '{key}' (DB_{key.upper()}) is missing. Cannot connect for table '{table_name}'.")
            return False
    
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Get column names safely
        # Note: table_name here is from a predefined list, so direct use in string is less risky,
        # but using parameters for information_schema queries is still good practice.
        cursor.execute(
            sql.SQL("""
                SELECT column_name 
                FROM information_schema.columns 
                WHERE table_schema = 'public' 
                AND table_name = %s
                ORDER BY ordinal_position;
            """), (table_name,)
        )
        
        columns_result = cursor.fetchall()
        if not columns_result:
            print(f"Warning: Table '{table_name}' not found in schema 'public', or it has no columns. Skipping.")
            return True # Considered "successful" for batch operation as table might not exist
        columns = [col[0] for col in columns_result]
        
        # Create filename with timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        # output_dir is already created by the calling function
        filename = os.path.join(output_dir, f'{table_name}_export_{timestamp}.csv')
        
        # Construct the SELECT query for data using psycopg2.sql.Identifier for safety
        query_data = sql.SQL("SELECT * FROM public.{}").format(sql.Identifier(table_name))
        cursor.execute(query_data)
        
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(columns) # Write header
            for row_data in cursor:
                writer.writerow(row_data) # Write data
        
        print(f"Table '{table_name}' successfully exported to {filename}")
        return True

    except psycopg2.Error as e:
        print(f"Database error for table '{table_name}': {e}")
        return False
    except IOError as e:
        print(f"File system error for table '{table_name}' (e.g., cannot write file): {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred while exporting table '{table_name}': {e}")
        return False
    finally:
        if cursor: cursor.close()
        if conn: conn.close()

# --- Main Script for Exporting the Predefined Set of Tables ---

TABLES_TO_EXPORT_PREDEFINED = [
    "user", "general_settings", "Subscription", "FailedPayment",
    "paid_user_info", "PaymentDetails", "car_info", "insurance_details",
    "registration_details", "request_maintanainance", "car_expense",
    "inspection_report", "reminder", "entry_inspection", "task",
    "maintanainance_history", "maintenance_alert", "vehicle_document_alerts",
    "connected_car", "brand", "location", "location_city", "designation",
    "employee"
]

def export_all_predefined_tables():
    """Exports all tables listed in TABLES_TO_EXPORT_PREDEFINED to individual CSV files."""
    
    print(f"Starting export for {len(TABLES_TO_EXPORT_PREDEFINED)} predefined tables...")
    
    # Load DB config once for the entire batch
    db_config = load_db_config()
    if not db_config.get('dbname'): # Critical check from load_db_config (though it prints a warning)
        print("Error: DB_NAME is not configured. Cannot proceed with export.")
        return

    # Create a unique parent directory for this export batch
    timestamp_dir_str = datetime.now().strftime('%Y%m%d_%H%M%S')
    # Use db_config['dbname'] safely, ensuring it's not None
    db_name_for_folder = db_config.get('dbname', 'unknown_db')
    base_export_dir_name = f"{db_name_for_folder}_predefined_set_export_{timestamp_dir_str}"
    
    try:
        os.makedirs(base_export_dir_name, exist_ok=True)
        print(f"All CSVs will be saved in directory: '{base_export_dir_name}'")
    except OSError as e:
        print(f"Error: Could not create export directory '{base_export_dir_name}': {e}. Aborting export.")
        return

    success_count = 0
    fail_count = 0

    for table_name in TABLES_TO_EXPORT_PREDEFINED:
        print(f"\nProcessing table: {table_name}...")
        if export_single_table_to_csv(table_name, output_dir=base_export_dir_name, db_config=db_config):
            success_count += 1
        else:
            fail_count += 1
            print(f"Failed to export table: {table_name}")
    
    print(f"\n--- Export Summary for Predefined Set ---")
    print(f"Output Directory: {base_export_dir_name}")
    print(f"Total tables attempted: {len(TABLES_TO_EXPORT_PREDEFINED)}")
    print(f"Successfully exported: {success_count}")
    print(f"Failed to export: {fail_count}")
    
    if fail_count == 0 and success_count > 0:
        print("All tables from the predefined set were exported successfully.")
    elif success_count == 0 and fail_count == 0 and TABLES_TO_EXPORT_PREDEFINED: # Should not occur if list not empty
        print("No tables were processed (list might be empty or all tables skipped).")
    elif not TABLES_TO_EXPORT_PREDEFINED:
         print("The list of tables to export was empty.")
    else:
        print("Some tables from the predefined set failed to export. Please review the logs above for details.")

In [4]:
# --- Execute the export ---
# This will run when you execute the cell in Jupyter Notebook.
# Ensure your .env file is set up correctly.

# Call the main function to start the export process
export_all_predefined_tables()

print("\nScript execution finished.")

Starting export for 24 predefined tables...
All CSVs will be saved in directory: 'fleetblox-dev_predefined_set_export_20250521_201126'

Processing table: user...
Table 'user' successfully exported to fleetblox-dev_predefined_set_export_20250521_201126\user_export_20250521_201127.csv

Processing table: general_settings...
Table 'general_settings' successfully exported to fleetblox-dev_predefined_set_export_20250521_201126\general_settings_export_20250521_201127.csv

Processing table: Subscription...
Table 'Subscription' successfully exported to fleetblox-dev_predefined_set_export_20250521_201126\Subscription_export_20250521_201128.csv

Processing table: FailedPayment...
Table 'FailedPayment' successfully exported to fleetblox-dev_predefined_set_export_20250521_201126\FailedPayment_export_20250521_201129.csv

Processing table: paid_user_info...
Table 'paid_user_info' successfully exported to fleetblox-dev_predefined_set_export_20250521_201126\paid_user_info_export_20250521_201130.csv

Pr

# Full Database Export

In [None]:
import os
import subprocess
from datetime import datetime
from dotenv import load_dotenv

def load_db_config() -> dict:
    """Load database configuration from environment variables."""
    load_dotenv()
    
    db_port_str = os.getenv('DB_PORT')
    db_port = None
    if db_port_str and db_port_str.isdigit():
        db_port = int(db_port_str)
    elif db_port_str:
        print(f"Warning: DB_PORT ('{db_port_str}') is not a valid number. Using default port if applicable.")

    config = {
        'dbname': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USERNAME'),
        'password': os.getenv('DB_PASS'),
        'host': os.getenv('DB_HOST'),
    }
    if db_port:
        config['port'] = db_port
    
    for key, value in config.items():
        if value is None and key not in ['port', 'password']: # Password can be handled by PGPASSWORD or .pgpass
             print(f"Warning: Environment variable for '{key.upper()}' (e.g., DB_{key.upper()}) is not set.")
             
    return config

def export_entire_database_pg_dump(output_format: str = 'custom'):
    """
    Export the entire database using pg_dump.
    Requires pg_dump to be in the system's PATH or provide full path.
    
    Args:
        output_format (str): 'custom' (for .dump, compressed, pg_restore), 
                             'plain' (for .sql, text SQL commands),
                             'directory' (for parallel dumps).
                             Defaults to 'custom'.
    """
    db_config = load_db_config()

    if not db_config.get('dbname'):
        print("Error: DB_NAME is not configured. Cannot proceed.")
        return

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Determine file extension and pg_dump format option
    if output_format == 'custom':
        file_extension = 'dump'
        format_option = '-Fc' # Custom format (compressed, good for pg_restore)
    elif output_format == 'plain':
        file_extension = 'sql'
        format_option = '-Fp' # Plain SQL text
    elif output_format == 'directory':
        # For directory format, pg_dump expects a directory name, not a file
        # We'll create a directory and pg_dump will populate it.
        # The filename variable will represent the directory name.
        file_extension = '' # No extension for directory name
        format_option = '-Fd' # Directory format
        output_dir_name = f"{db_config['dbname']}_fulldb_export_{timestamp}"
        filename = output_dir_name # This is now a directory
        if os.path.exists(filename):
            print(f"Error: Output directory '{filename}' already exists.")
            return
        # No need to create the directory beforehand for -Fd, pg_dump does it.
    else:
        print(f"Error: Unsupported output format '{output_format}'. Choose 'custom', 'plain', or 'directory'.")
        return

    if output_format != 'directory':
        filename = f"{db_config['dbname']}_fulldb_export_{timestamp}.{file_extension}"


    pg_dump_command = ['pg_dump']
    
    # Add connection parameters
    if db_config.get('host'):
        pg_dump_command.extend(['-h', db_config['host']])
    if db_config.get('port'):
        pg_dump_command.extend(['-p', str(db_config['port'])])
    if db_config.get('user'):
        pg_dump_command.extend(['-U', db_config['user']])
    
    pg_dump_command.append(format_option)
    pg_dump_command.append(db_config['dbname'])

    # Environment variables for pg_dump
    env = os.environ.copy()
    if db_config.get('password'):
        env['PGPASSWORD'] = db_config['password']
        # Note: It's generally more secure to use a .pgpass file for pg_dump
        # or rely on other authentication methods (like peer authentication if applicable).

    try:
        print(f"Starting database dump for '{db_config['dbname']}'...")
        print(f"Command (password hidden): {' '.join(pg_dump_command)} > {filename if output_format != 'directory' else output_dir_name}")

        if output_format == 'directory':
            pg_dump_command.extend(['-f', filename]) # -f specifies output directory for -Fd
            process = subprocess.Popen(pg_dump_command, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout, stderr = process.communicate()
        else: # custom or plain
            with open(filename, 'wb') as f_out: # 'wb' for binary formats like custom
                process = subprocess.Popen(pg_dump_command, stdout=f_out, stderr=subprocess.PIPE, env=env)
                stdout, stderr = process.communicate() # stdout is redirected to file, capture stderr

        if process.returncode == 0:
            print(f"Database '{db_config['dbname']}' successfully exported to '{filename}'")
            if stderr:
                print(f"pg_dump warnings/messages:\n{stderr.decode()}")
        else:
            print(f"Error during pg_dump (return code: {process.returncode}):")
            print(f"STDOUT:\n{stdout.decode() if stdout else 'N/A'}")
            print(f"STDERR:\n{stderr.decode() if stderr else 'N/A'}")

    except FileNotFoundError:
        print("Error: 'pg_dump' command not found. Make sure PostgreSQL client tools are installed and in your PATH.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

# --- The single table export function from your previous code (can be kept for individual table exports) ---
import csv
import psycopg2
from psycopg2 import sql

def export_table_to_csv(table_name: str, output_dir: str = "."):
    """Export a single specified table to a CSV file with timestamp in filename."""
    # (This is the function you provided earlier, slightly adapted for output directory)
    conn = None
    cursor = None
    try:
        db_config = load_db_config()
        if not db_config.get('dbname') or not db_config.get('user'):
            print("Error: Database name or user not configured. Please check your .env file.")
            return False # Indicate failure

        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = 'public' 
            AND table_name = %s
            ORDER BY ordinal_position;
        """, (table_name,))
        
        columns_result = cursor.fetchall()
        if not columns_result:
            print(f"Warning: Table '{table_name}' not found in schema 'public', or it has no columns. Skipping.")
            return True # Not an error, just skipping
        columns = [col[0] for col in columns_result]
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        # Ensure output_dir exists
        os.makedirs(output_dir, exist_ok=True)
        filename = os.path.join(output_dir, f'{table_name}_export_{timestamp}.csv')
        
        query_data = sql.SQL("SELECT * FROM public.{}").format(sql.Identifier(table_name))
        cursor.execute(query_data)
        
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(columns)
            for row in cursor:
                writer.writerow(row)
        
        print(f"Table '{table_name}' successfully exported to {filename}")
        return True # Indicate success

    except psycopg2.Error as e:
        print(f"Database error for table {table_name}: {e}")
        return False
    except IOError as e:
        print(f"File system error for table {table_name} (e.g., cannot write file): {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred for table {table_name}: {e}")
        return False
    finally:
        if cursor: cursor.close()
        if conn: conn.close()

# Method 2: Python script to export all tables to individual CSV files (Data Only, No Schema)
# This is NOT a full database backup, but can be useful if you just need data in CSVs.

def export_all_tables_to_csvs():
    """Exports all user tables from the 'public' schema to individual CSV files."""
    conn = None
    cursor = None
    db_config = load_db_config()
    if not db_config.get('dbname'):
        print("Error: DB_NAME is not configured. Cannot proceed.")
        return

    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()

        # Get all table names in the 'public' schema
        cursor.execute("""
            SELECT tablename 
            FROM pg_catalog.pg_tables 
            WHERE schemaname = 'public';
        """)
        tables = [row[0] for row in cursor.fetchall()]

        if not tables:
            print("No tables found in the 'public' schema.")
            return

        # Create a directory for this export batch
        timestamp_dir = datetime.now().strftime('%Y%m%d_%H%M%S')
        base_export_dir = f"{db_config['dbname']}_all_tables_csv_{timestamp_dir}"
        os.makedirs(base_export_dir, exist_ok=True)
        print(f"Exporting all tables to directory: {base_export_dir}")

        success_count = 0
        fail_count = 0
        for table_name in tables:
            print(f"\nExporting table: {table_name}...")
            if export_table_to_csv(table_name, output_dir=base_export_dir):
                success_count += 1
            else:
                fail_count += 1
        
        print(f"\n--- Export Summary ---")
        print(f"Successfully exported tables: {success_count}")
        print(f"Failed to export tables: {fail_count}")
        if fail_count == 0:
            print("All tables exported successfully to CSVs.")
        else:
            print("Some tables failed to export. Check logs above.")


    except psycopg2.Error as e:
        print(f"Database connection or initial query error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()


if __name__ == "__main__":
    print("Choose an export method:")
    print("1. Full database backup using pg_dump (recommended for backups)")
    print("2. Export all tables to individual CSV files (data only)")
    print("3. Export a single table to CSV")
    
    choice = input("Enter your choice (1, 2, or 3): ").strip()

    if choice == '1':
        print("\nChoose pg_dump output format:")
        print("  c. Custom (.dump - compressed, for pg_restore)")
        print("  p. Plain SQL (.sql - text SQL commands)")
        print("  d. Directory (outputs to a directory, good for parallel)")
        format_choice_input = input("Enter format (c, p, d) [default: c]: ").strip().lower()
        
        dump_format = 'custom' # default
        if format_choice_input == 'p':
            dump_format = 'plain'
        elif format_choice_input == 'd':
            dump_format = 'directory'
        elif format_choice_input == 'c' or not format_choice_input:
            dump_format = 'custom' # explicit default
        else:
            print("Invalid format choice. Using 'custom'.")

        export_entire_database_pg_dump(output_format=dump_format)
    elif choice == '2':
        export_all_tables_to_csvs()
    elif choice == '3':
        target_table = input("Enter the name of the single table to export: ").strip()
        if not target_table:
            print("No table name provided. Exiting.")
        else:
            if not target_table.replace('_', '').isalnum(): # Basic check
                print(f"Warning: Table name '{target_table}' may contain invalid characters.")
            export_table_to_csv(target_table) # Uses current dir by default
    else:
        print("Invalid choice. Exiting.")

Choose an export method:
1. Full database backup using pg_dump (recommended for backups)
2. Export all tables to individual CSV files (data only)
3. Export a single table to CSV
Exporting all tables to directory: ninja-dev_all_tables_csv_20250517_230044

Exporting table: _prisma_migrations...
Table '_prisma_migrations' successfully exported to ninja-dev_all_tables_csv_20250517_230044\_prisma_migrations_export_20250517_230045.csv

Exporting table: task...
Table 'task' successfully exported to ninja-dev_all_tables_csv_20250517_230044\task_export_20250517_230045.csv

Exporting table: otp...
Table 'otp' successfully exported to ninja-dev_all_tables_csv_20250517_230044\otp_export_20250517_230046.csv

Exporting table: purchase_plan...
Table 'purchase_plan' successfully exported to ninja-dev_all_tables_csv_20250517_230044\purchase_plan_export_20250517_230046.csv

Exporting table: newLetter...
Table 'newLetter' successfully exported to ninja-dev_all_tables_csv_20250517_230044\newLetter_export_