In [None]:
#!pip install glob

In [1]:
import sqlite3

db_path = "demo.db"
output_file = "demo_dump.sql"

conn = sqlite3.connect(db_path)
cursor = conn.cursor()

with open(output_file, "w", encoding="utf-8") as f:
    # List all tables
    f.write("-- List of tables\n")
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = [row[0] for row in cursor.fetchall()]
    for table in tables:
        f.write(f"\n-- Table: {table}\n")
        # Get first 4 rows
        cursor.execute(f"SELECT * FROM {table} LIMIT 4;")
        rows = cursor.fetchall()
        for row in rows:
            f.write(f"{row}\n")

conn.close()
print(f"Written to {output_file}")



Written to demo_dump.sql


In [1]:
import sqlite3
import json

def get_complete_schema(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    schema_info = {}
    
    # Get all tables
    cursor.execute("""
        SELECT name FROM sqlite_master 
        WHERE type='table' AND name NOT LIKE 'sqlite_%'
        ORDER BY name
    """)
    
    tables = [row[0] for row in cursor.fetchall()]
    
    for table in tables:
        print(f"\n-- Table: {table}")
        
        # Get CREATE TABLE statement
        cursor.execute("""
            SELECT sql FROM sqlite_master 
            WHERE type='table' AND name=?
        """, (table,))
        
        create_sql = cursor.fetchone()[0]
        print(f"CREATE: {create_sql}")
        
        # Get column info
        cursor.execute(f"PRAGMA table_info({table})")
        columns = cursor.fetchall()
        
        print("COLUMNS:")
        for col in columns:
            print(f"  {col[1]} {col[2]} {'NOT NULL' if col[3] else 'NULL'} {'PK' if col[5] else ''}")
        
        # Get foreign keys
        cursor.execute(f"PRAGMA foreign_key_list({table})")
        foreign_keys = cursor.fetchall()
        
        if foreign_keys:
            print("FOREIGN KEYS:")
            for fk in foreign_keys:
                print(f"  {fk[3]} -> {fk[2]}.{fk[4]}")
        
        # Get indexes
        cursor.execute(f"PRAGMA index_list({table})")
        indexes = cursor.fetchall()
        
        if indexes:
            print("INDEXES:")
            for idx in indexes:
                cursor.execute(f"PRAGMA index_info({idx[1]})")
                idx_cols = cursor.fetchall()
                cols = [col[2] for col in idx_cols]
                print(f"  {idx[1]}: {', '.join(cols)} {'UNIQUE' if idx[2] else ''}")
        
        # Sample data (first 3 rows)
        cursor.execute(f"SELECT * FROM {table} LIMIT 3")
        sample_data = cursor.fetchall()
        
        if sample_data:
            print("SAMPLE DATA:")
            cursor.execute(f"PRAGMA table_info({table})")
            col_names = [col[1] for col in cursor.fetchall()]
            
            for row in sample_data:
                row_dict = dict(zip(col_names, row))
                print(f"  {row_dict}")
        
        # Store structured info
        schema_info[table] = {
            'create_sql': create_sql,
            'columns': columns,
            'foreign_keys': foreign_keys,
            'indexes': indexes,
            'sample_data': sample_data
        }
    
    conn.close()
    return schema_info

# Usage
if __name__ == "__main__":
    db_path = "demo.db"  # Replace with your DB path
    schema = get_complete_schema(db_path)
    
    # Optional: Save to JSON file
    with open('schema_analysis.json', 'w') as f:
        json.dump(schema, f, indent=2, default=str)
    
    print(f"\nSchema analysis saved to schema_analysis.json")


-- Table: Account
CREATE: CREATE TABLE `Account` (`name` text not null, `rootType` text not null, `parentAccount` text, `accountType` text, `isGroup` boolean default '0', `createdBy` text not null, `modifiedBy` text not null, `created` datetime not null, `modified` datetime not null, `lft` integer not null, `rgt` integer not null, foreign key(`parentAccount`) references `Account`(`name`) on delete RESTRICT on update CASCADE, primary key (`name`))
COLUMNS:
  name TEXT NOT NULL PK
  rootType TEXT NOT NULL 
  parentAccount TEXT NULL 
  accountType TEXT NULL 
  isGroup boolean NULL 
  createdBy TEXT NOT NULL 
  modifiedBy TEXT NOT NULL 
  created datetime NOT NULL 
  modified datetime NOT NULL 
  lft INTEGER NOT NULL 
  rgt INTEGER NOT NULL 
FOREIGN KEYS:
  parentAccount -> Account.name
INDEXES:
  sqlite_autoindex_Account_1: name UNIQUE
SAMPLE DATA:
  {'name': 'Application of Funds (Assets)', 'rootType': 'Asset', 'parentAccount': None, 'accountType': '', 'isGroup': 1, 'createdBy': 'i77146

In [3]:
#short schema 

import sqlite3

def analyze_frappe_patterns(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Get all tables
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [row[0] for row in cursor.fetchall()]
    
    child_tables = []
    master_tables = []
    
    for table in tables:
        # Check column names
        cursor.execute(f"PRAGMA table_info({table})")
        columns = [col[1] for col in cursor.fetchall()]
        
        # Detect Frappe child table pattern
        has_parent = 'parent' in columns
        has_parenttype = 'parenttype' in columns
        has_parentfield = 'parentfield' in columns or 'idx' in columns
        
        if has_parent and has_parenttype:
            child_tables.append({
                'table': table,
                'columns': columns,
                'pattern': 'child_table'
            })
            
            # Get sample to understand relationship
            cursor.execute(f"SELECT parent, parenttype, parentfield FROM {table} LIMIT 3")
            samples = cursor.fetchall()
            print(f"\nCHILD TABLE: {table}")
            print(f"Links: {samples}")
            
        else:
            master_tables.append({
                'table': table, 
                'columns': columns,
                'pattern': 'master_table'
            })
    
    print(f"\nSUMMARY:")
    print(f"Child Tables: {[t['table'] for t in child_tables]}")
    print(f"Master Tables: {[t['table'] for t in master_tables]}")
    
    conn.close()
    return {'child_tables': child_tables, 'master_tables': master_tables}

# Quick analysis
patterns = analyze_frappe_patterns("demo.db")


SUMMARY:
Child Tables: []
Master Tables: ['PrintTemplate', 'Color', 'Currency', 'NumberSeries', 'Account', 'AccountingLedgerEntry', 'Lead', 'Address', 'ItemGroup', 'UOM', 'UOMConversionItem', 'LoyaltyProgram', 'LoyaltyPointEntry', 'CollectionRulesItems', 'Payment', 'PaymentMethod', 'PaymentFor', 'JournalEntry', 'JournalEntryAccount', 'ItemEnquiry', 'CouponCode', 'AppliedCouponCodes', 'PriceList', 'PriceListItem', 'PricingRule', 'PricingRuleItem', 'PricingRuleDetail', 'Tax', 'TaxDetail', 'TaxSummary', 'Location', 'StockLedgerEntry', 'StockMovement', 'StockMovementItem', 'Batch', 'SerialNumber', 'CustomForm', 'CustomField', 'POSProfile', 'POSOpeningShift', 'POSClosingShift', 'ERPNextSyncQueue', 'FetchFromERPNextQueue', 'IntegrationErrorLog', 'SalesInvoice', 'PurchaseInvoice', 'SalesQuote', 'SalesInvoiceItem', 'PurchaseInvoiceItem', 'SalesQuoteItem', 'Shipment', 'ShipmentItem', 'PurchaseReceipt', 'PurchaseReceiptItem', 'ClosingAmounts', 'ClosingCash', 'DefaultCashDenominations', 'Opening

In [4]:
import sqlite3
import json
import uuid

def convert_frappe_to_universal(db_path, output_file="universal_data.json"):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Get all table names
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [row[0] for row in cursor.fetchall()]
    
    universal_records = []
    
    # Define child table patterns based on your schema
    child_table_patterns = {
        'JournalEntryAccount': {
            'link_doctype': 'Account',
            'link_field': 'account',
            'additional_fields': ['debit', 'credit', 'idx']
        },
        'SalesInvoiceItem': {
            'link_doctype': 'Item', 
            'link_field': 'item',
            'additional_fields': ['description', 'rate', 'quantity', 'amount', 'tax', 'idx']
        },
        'PurchaseInvoiceItem': {
            'link_doctype': 'Item',
            'link_field': 'item', 
            'additional_fields': ['description', 'rate', 'quantity', 'amount', 'tax', 'idx']
        },
        'TaxDetail': {
            'link_doctype': 'TaxComponent',
            'link_field': 'account',
            'additional_fields': ['rate', 'idx']
        },
        'TaxSummary': {
            'link_doctype': 'TaxComponent', 
            'link_field': 'account',
            'additional_fields': ['rate', 'amount', 'idx']
        },
        'PaymentFor': {
            'link_doctype_field': 'referenceType',  # Dynamic link
            'link_field': 'referenceName',
            'additional_fields': ['amount', 'idx']
        }
    }
    
    for table in tables:
        print(f"Processing table: {table}")
        
        # Get all columns for this table
        cursor.execute(f"PRAGMA table_info({table})")
        columns = {col[1]: col for col in cursor.fetchall()}
        
        # Check if this is a child table
        is_child_table = ('parent' in columns and 
                         'parenttype' in columns and 
                         ('parentfield' in columns or 'idx' in columns))
        
        if is_child_table and table in child_table_patterns:
            # Handle known child table patterns
            pattern = child_table_patterns[table]
            cursor.execute(f"SELECT * FROM {table}")
            rows = cursor.fetchall()
            col_names = [col[1] for col in cursor.execute(f"PRAGMA table_info({table})").fetchall()]
            
            for row in rows:
                row_dict = dict(zip(col_names, row))
                
                # Build child record
                child_data = {
                    'parent': row_dict['parent'],
                    'parenttype': row_dict['parenttype']
                }
                
                # Handle dynamic vs static link doctype
                if 'link_doctype_field' in pattern:
                    child_data['link_doctype'] = row_dict[pattern['link_doctype_field']]
                else:
                    child_data['link_doctype'] = pattern['link_doctype']
                
                child_data['link_name'] = row_dict[pattern['link_field']]
                
                # Add additional fields
                for field in pattern['additional_fields']:
                    if field in row_dict and row_dict[field] is not None:
                        child_data[field] = row_dict[field]
                
                universal_records.append({
                    'doctype': 'Child',
                    'name': None,
                    'data': child_data
                })
        
        elif not is_child_table:
            # Handle master/transaction tables
            cursor.execute(f"SELECT * FROM {table}")
            rows = cursor.fetchall()
            col_names = [col[1] for col in cursor.execute(f"PRAGMA table_info({table})").fetchall()]
            
            for row in rows:
                row_dict = dict(zip(col_names, row))
                
                # Extract name and clean data
                doc_name = row_dict.get('name') or str(uuid.uuid4())
                
                # Remove system fields from data
                clean_data = {k: v for k, v in row_dict.items() 
                             if k not in ['name', 'createdBy', 'modifiedBy', 'created', 'modified']}
                
                universal_records.append({
                    'doctype': table,
                    'name': doc_name,
                    'data': clean_data
                })
    
    # Save to JSON
    with open(output_file, 'w') as f:
        json.dump(universal_records, f, indent=2, default=str)
    
    print(f"\nConversion complete!")
    print(f"Total records: {len(universal_records)}")
    print(f"Output saved to: {output_file}")
    
    # Show summary
    doctype_counts = {}
    for record in universal_records:
        doctype = record['doctype']
        doctype_counts[doctype] = doctype_counts.get(doctype, 0) + 1
    
    print("\nRecord counts by doctype:")
    for doctype, count in sorted(doctype_counts.items()):
        print(f"  {doctype}: {count}")
    
    conn.close()
    return universal_records

# Usage
if __name__ == "__main__":
    records = convert_frappe_to_universal("demo.db")
    
    # Preview first few records
    print("\nFirst 5 records:")
    for i, record in enumerate(records[:5]):
        print(f"{i+1}. {record['doctype']}: {record['name']}")
        if record['doctype'] == 'Child':
            print(f"   {record['data']['parent']} -> {record['data']['link_doctype']}")

Processing table: PrintTemplate
Processing table: Color
Processing table: Currency
Processing table: NumberSeries
Processing table: Account
Processing table: AccountingLedgerEntry
Processing table: Lead
Processing table: Address
Processing table: ItemGroup
Processing table: UOM
Processing table: UOMConversionItem
Processing table: LoyaltyProgram
Processing table: LoyaltyPointEntry
Processing table: CollectionRulesItems
Processing table: Payment
Processing table: PaymentMethod
Processing table: PaymentFor
Processing table: JournalEntry
Processing table: JournalEntryAccount
Processing table: ItemEnquiry
Processing table: CouponCode
Processing table: AppliedCouponCodes
Processing table: PriceList
Processing table: PriceListItem
Processing table: PricingRule
Processing table: PricingRuleItem
Processing table: PricingRuleDetail
Processing table: Tax
Processing table: TaxDetail
Processing table: TaxSummary
Processing table: Location
Processing table: StockLedgerEntry
Processing table: StockM

In [2]:
import sqlite3

# Connect to SQLite DB
conn = sqlite3.connect("demo.db")
cursor = conn.cursor()

# Get all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
tables = [row[0] for row in cursor.fetchall()]

# Collect foreign key relationships
fk_map = {table: [] for table in tables}
for table in tables:
    cursor.execute(f"PRAGMA foreign_key_list({table});")
    for fk in cursor.fetchall():
        # fk format: (id, seq, table, from, to, on_update, on_delete, match)
        fk_map[table].append({
            "from_column": fk[3],
            "to_table": fk[2],
            "to_column": fk[4],
            "on_update": fk[5],
            "on_delete": fk[6]
        })

# Recursive print of schema dependencies
def print_schema_tree(table, level=0, visited=None):
    if visited is None:
        visited = set()
    indent = "  " * level
    print(f"{indent}- {table}")
    visited.add(table)
    for fk in fk_map.get(table, []):
        to_table = fk["to_table"]
        if to_table not in visited:
            print_schema_tree(to_table, level + 1, visited)

# Print all schema trees
for table in tables:
    print_schema_tree(table)



- PrintTemplate
- Color
- Currency
- NumberSeries
- Account
- AccountingLedgerEntry
  - Account
  - Party
    - LoyaltyProgram
    - Lead
      - Address
    - Currency
- Lead
  - Address
- Address
- ItemGroup
  - Tax
- UOM
- UOMConversionItem
  - UOM
- LoyaltyProgram
  - Account
- LoyaltyPointEntry
  - SalesInvoice
    - LoyaltyProgram
      - Account
    - SalesQuote
      - Currency
      - PriceList
      - NumberSeries
    - Shipment
      - Party
        - Lead
          - Address
- CollectionRulesItems
- Payment
  - PaymentMethod
    - Account
  - Party
    - LoyaltyProgram
    - Lead
      - Address
    - Currency
  - NumberSeries
- PaymentMethod
  - Account
- PaymentFor
- JournalEntry
  - NumberSeries
- JournalEntryAccount
  - Account
- ItemEnquiry
- CouponCode
  - PricingRule
    - UOM
    - Item
      - Tax
      - Account
      - ItemGroup
    - NumberSeries
- AppliedCouponCodes
  - CouponCode
    - PricingRule
      - UOM
      - Item
        - Tax
        - Account
      

In [None]:
import sqlite3
import json
from collections import defaultdict

# Connect to your DB
conn = sqlite3.connect("demo.db")
cursor = conn.cursor()

# Get tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
tables = [row[0] for row in cursor.fetchall()]

# Build foreign key map
fk_map = defaultdict(list)
for table in tables:
    cursor.execute(f"PRAGMA foreign_key_list({table});")
    for fk in cursor.fetchall():
        fk_map[table].append({
            "from_column": fk[3],
            "to_table": fk[2],
            "to_column": fk[4]
        })

# Collect all rows per table
table_rows = {}
for table in tables:
    cursor.execute(f"SELECT * FROM {table}")
    columns = [desc[0] for desc in cursor.description]
    table_rows[table] = [dict(zip(columns, row)) for row in cursor.fetchall()]

# Function to recursively build JSON for a root table
def build_json(table, parent_key=None, parent_value=None):
    result = []
    for row in table_rows.get(table, []):
        # If parent filtering is applied, skip rows not matching
        if parent_key and row.get(parent_key) != parent_value:
            continue

        # Build JSON for current row
        obj = dict(row)

        # Handle foreign key children
        for fk in fk_map.get(table, []):
            child_table = fk["to_table"]
            child_key = fk["to_column"]
            from_key = fk["from_column"]
            # Recursively include child objects where FK matches current row
            child_objs = build_json(child_table, child_key, row.get(from_key))
            if child_objs:
                # Store under child table name
                obj[child_table] = child_objs

        # Handle implicit parent-child fields if exist
        if 'parentSchemaName' in row and 'parent' in row:
            child_table_name = row['parentSchemaName']
            child_objs = build_json(child_table_name, 'name', parent_value_field)
            if child_objs:
                obj[child_table_name] = child_objs

        result.append(obj)
    return result

# Example: build all SalesInvoices as JSON
invoices_json = build_json('SalesInvoice')
print(json.dumps(invoices_json, indent=2))
