In [41]:
import pandas as pd
import pyperclip
import re

import argparse
from pathlib import Path
from typing import Optional
import time
from dataclasses import dataclass, field
import camelot.io as camelot
import gc


In [42]:


# Path to your CSV file
CSV_FILE_PATH = "data/contactors.csv"

# Path to the output SQL file
OUTPUT_SQL_FILE = "output/contactors.sql"

# Table name
TABLE_NAME = "main.contactor_type"

def generate_sql_from_csv():
    try:
        # Read the CSV file into a pandas DataFrame
        df = pd.read_csv(CSV_FILE_PATH)

        # Open the output file in write mode
        with open(OUTPUT_SQL_FILE, "w") as sql_file:
            for _, row in df.iterrows():
                # Generate the SQL INSERT statement for each row (excluding ID)
                sql_statement = f"""
                INSERT INTO {TABLE_NAME} (In_ac1, In_ac3, p_ac1, p_ac3, nc_aux_count, no_aux_count, control_voltage)
                VALUES ({row['In_ac1']}, {row['In_ac3']}, {row['p_ac1']}, {row['p_ac3']}, {row['nc_aux_count']}, {row['no_aux_count']}, '{row['control_voltage']}');
                """
                # Write the SQL statement to the file
                sql_file.write(sql_statement.strip() + "\n")

        print(f"SQL statements successfully written to {OUTPUT_SQL_FILE}")

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    generate_sql_from_csv()

An error occurred: [Errno 2] No such file or directory: 'data/contactors.csv'


In [43]:

@dataclass
class Ctx:
    """Context manager for file paths with input/output separation"""
    _rootPath: Path = field(default_factory=lambda: Path('.').resolve())
    
    # Input related (private)
    _inputPathStr: str = field(default='data')
    _inputFilename: str = field(default="input_file.xlsx")
    
    # Output related (private)
    _outputPathStr: str = field(default="output")
    _outputFilename: str = field(default="output_file")
    _outputFileExtension: str = field(default=".xlsx")
    _outputCustomSuffix: Optional[str] = field(default=None)
    
    
    # Read-only properties
    @property
    def inputDirPath(self) -> Path:
        """Directory containing input files"""
        return self._rootPath / self._inputPathStr
    
    @property
    def inputFilePath(self) -> Path:
        """Full path to input file"""
        return self.inputDirPath / self._inputFilename
    
    @property
    def outputDirPath(self) -> Path:
        """Directory for output files"""
        return self._rootPath / self._outputPathStr
    
    @property
    def outputFilePath(self) -> Path:
        """Full path to output file with automatic timestamp if file exists"""
        base_name = self._outputFilename + self._outputFileExtension
        p = self.outputDirPath / base_name
        
        if not p.exists():
            return p
        else:
            if self._outputCustomSuffix:
                formatted = self._outputCustomSuffix
            else:
                localTime = time.localtime()
                formatted = time.strftime('%Y_%m_%d_%X', localTime)
                formatted = formatted.replace(":", "")
            
            stem = p.stem  
            suffix = p.suffix 
            new_filename = f"{stem}_{formatted}{suffix}"
            return p.with_name(new_filename)


ctx = Ctx()
ctx._inputPathStr = "data/tables"

# Get all .txt files in data/tables directory
tables_dir = ctx.inputDirPath
table_files = list(tables_dir.glob("*.txt"))

print(f"Found {len(table_files)} table file(s) in {tables_dir}")
print(f"Files: {[f.name for f in table_files]}\n")

ignore_keywords = [
    'ID',
    'constraint',
    'primary',
    'create',
    'table',
    'key'
]

# Process each file
all_schemas = {}

for table_file in table_files:
    print(f"\n{'='*60}")
    print(f"Processing: {table_file.name}")
    print(f"{'='*60}")
    
    # Step 1: Read the file
    ctx._inputFilename = table_file.name
    with open(ctx.inputFilePath, 'r') as f:
        read = f.read()
    
    print(f"\nSQL Content:\n{read}\n")
    
    # Step 2: Extract column definitions
    columns = re.findall(r'(\w+)\s+(\w+)', str(read))
    
    # Step 3: Remove ignored keywords
    columns = [col for col in columns if col[0] not in ignore_keywords]
    
    # Step 4: Convert to a dictionary
    schema = {col[0]: col[1] for col in columns}
    
    # Step 5: Print the processed schema
    print("Processed Schema:")
    for column, dtype in schema.items():
        print(f"  {column}: {dtype}")
    
    # Step 6: Create column list
    desired_columns = []
    line = ""
    for column, dtype in schema.items():
        desired_columns.append(column)
        line += f"{column}\t"
    
    print(f"\nColumns: {desired_columns}")
    print(f"Tab-separated: {line}")
    
    # Store schema
    all_schemas[table_file.stem] = {
        'schema': schema,
        'columns': desired_columns,
        'line': line
    }

# Summary
print(f"\n{'='*60}")
print(f"SUMMARY: Processed {len(all_schemas)} table(s)")
print(f"{'='*60}")

for table_name, data in all_schemas.items():
    print(f"\n{table_name}:")
    print(f"  Columns ({len(data['columns'])}): {', '.join(data['columns'])}")

# Copy last table's columns to clipboard
if all_schemas:
    last_table = list(all_schemas.keys())[-1]
    pyperclip.copy(all_schemas[last_table]['line'])
    print(f"\n'{last_table}' schema copied to clipboard!")




Found 2 table file(s) in C:\Users\35850\Desktop\repositories\DevTools\image_tables_into_xlsx\data\tables
Files: ['circuit_breaker_types.txt', 'contactor_types.txt']


Processing: circuit_breaker_types.txt

SQL Content:
create table main.circuit_breaker_type
(
    ID     integer
        constraint circuit_breaker_type_pk
            primary key,
    curve  text    not null,
    phases integer not null,
    "In"   integer not null,
    Ik     integer not null
);

create unique index main.circuit_breaker_type_curve_Ik_In_phases_uindex
    on main.circuit_breaker_type (curve, Ik, "In", phases);



Processed Schema:
  curve: text
  not: null
  phases: integer
  integer: not
  Ik: integer
  index: main
  circuit_breaker_type_curve_Ik_In_phases_uindex: on

Columns: ['curve', 'not', 'phases', 'integer', 'Ik', 'index', 'circuit_breaker_type_curve_Ik_In_phases_uindex']
Tab-separated: curve	not	phases	integer	Ik	index	circuit_breaker_type_curve_Ik_In_phases_uindex	

Processing: contactor_types.tx

In [44]:
df = pd.read_excel("data/contactors_AC3_AND_AC1.xlsx")

# Export the specified columns to a CSV file
filtered_df = df[all_schemas['contactor_types']['columns']]

# Store the filtered DataFrame in a variable
csv_data = filtered_df.to_csv(index=False)

# Export the filtered DataFrame to a CSV file
filtered_df.to_csv("output/tables/filtered_contactors.csv", index=False)




# Get columns from schema
columns = list(schema.keys())

print(f"Generating SQL INSERTs for {len(df)} rows...")

# Setup output file
ctx._outputFilename = f"test_inserts"
ctx._outputPathStr = 'output/tables'
ctx._outputFileExtension = ".sql"
output_sql_path = ctx.outputFilePath

# Generate SQL statements
with open(output_sql_path, "w") as sql_file:
    for idx, row in df.iterrows():
        # Build values dynamically based on data types
        values = []
        for col in columns:
            if col not in row:
                print(f"Warning: Column '{col}' not found in CSV, skipping row {idx}")
                continue
            
            value = row[col]
            
            # Handle NULL values
            if pd.isna(value):
                values.append("NULL")
            # Handle text types
            elif schema[col].lower() in ['text', 'varchar', 'char', 'string']:
                # Escape single quotes
                escaped_value = str(value).replace("'", "''")
                values.append(f"'{escaped_value}'")
            # Handle numeric types
            else:
                values.append(str(value))
        
        # Create SQL INSERT statement
        columns_str = ", ".join(columns)
        values_str = ", ".join(values)
        
        sql_statement = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
        sql_file.write(sql_statement + "\n")

print(f"SQL statements successfully written to: {output_sql_path}")
print(f"Total statements: {len(df)}")

Generating SQL INSERTs for 18 rows...
SQL statements successfully written to: C:\Users\35850\Desktop\repositories\DevTools\image_tables_into_xlsx\output\tables\test_inserts_2026_02_17_22.19.01.sql
Total statements: 18


In [45]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import sqlite3
import json

app = FastAPI(title="Database Query API", version="1.0.0")

# Database configuration
DATABASE_PATH = "database.db"

class QueryRequest(BaseModel):
    query: str
    params: Optional[List[Any]] = None

class QueryResponse(BaseModel):
    data: List[Dict[str, Any]]
    columns: List[str]
    row_count: int

def get_db_connection():
    """Get database connection"""
    try:
        conn = sqlite3.connect(DATABASE_PATH)
        conn.row_factory = sqlite3.Row  # Enable dict-like access
        return conn
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Database connection failed: {str(e)}")

@app.get("/")
async def root():
    return {"message": "Database Query API"}

@app.get("/tables")
async def get_tables():
    """Get all table names in the database"""
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = [row[0] for row in cursor.fetchall()]
        return {"tables": tables}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

@app.get("/tables/{table_name}/schema")
async def get_table_schema(table_name: str):
    """Get schema information for a specific table"""
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        cursor.execute(f"PRAGMA table_info({table_name});")
        schema = cursor.fetchall()
        if not schema:
            raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found")
        
        columns = []
        for col in schema:
            columns.append({
                "name": col[1],
                "type": col[2],
                "nullable": not col[3],
                "default": col[4],
                "primary_key": bool(col[5])
            })
        
        return {"table": table_name, "columns": columns}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

@app.get("/tables/{table_name}/data")
async def get_table_data(table_name: str, limit: Optional[int] = 100, offset: Optional[int] = 0):
    """Get data from a specific table with pagination"""
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        
        # TODO: Replace with actual query - PLACEHOLDER
        query = f"SELECT * FROM {table_name} LIMIT ? OFFSET ?"
        cursor.execute(query, (limit, offset))
        
        rows = cursor.fetchall()
        columns = [description[0] for description in cursor.description]
        data = [dict(row) for row in rows]
        
        return QueryResponse(
            data=data,
            columns=columns,
            row_count=len(data)
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

@app.post("/query")
async def execute_query(request: QueryRequest):
    """Execute a custom SQL query"""
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        
        # TODO: Add your custom queries here - PLACEHOLDER
        if request.params:
            cursor.execute(request.query, request.params)
        else:
            cursor.execute(request.query)
        
        # Handle SELECT queries
        if request.query.strip().upper().startswith('SELECT'):
            rows = cursor.fetchall()
            columns = [description[0] for description in cursor.description]
            data = [dict(row) for row in rows]
            
            return QueryResponse(
                data=data,
                columns=columns,
                row_count=len(data)
            )
        else:
            # Handle INSERT, UPDATE, DELETE queries
            conn.commit()
            return {"message": "Query executed successfully", "rows_affected": cursor.rowcount}
            
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

@app.get("/contactors")
async def get_contactors():
    """Get all contactors - PLACEHOLDER QUERY"""
    # TODO: Replace with your actual contactor query
    query = "SELECT * FROM contactor_type LIMIT 50"
    
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [description[0] for description in cursor.description]
        data = [dict(row) for row in rows]
        
        return QueryResponse(
            data=data,
            columns=columns,
            row_count=len(data)
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

@app.get("/contactors/{contactor_id}")
async def get_contactor_by_id(contactor_id: int):
    """Get specific contactor by ID - PLACEHOLDER QUERY"""
    # TODO: Replace with your actual contactor by ID query
    query = "SELECT * FROM contactor_type WHERE id = ?"
    
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        cursor.execute(query, (contactor_id,))
        row = cursor.fetchone()
        
        if not row:
            raise HTTPException(status_code=404, detail=f"Contactor with ID {contactor_id} not found")
        
        columns = [description[0] for description in cursor.description]
        data = dict(row)
        
        return {"contactor": data}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

# PLACEHOLDER ENDPOINTS - Add your specific queries here
@app.get("/custom/query1")
async def custom_query_1():
    """Custom query placeholder 1"""
    # TODO: Add your custom query logic here
    query = "SELECT COUNT(*) as total FROM sqlite_master WHERE type='table'"
    
    conn = get_db_connection()
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        result = cursor.fetchone()
        return {"result": dict(result)}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        conn.close()

@app.get("/custom/query2")
async def custom_query_2():
    """Custom query placeholder 2"""
    # TODO: Add your custom query logic here
    pass

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

KeyboardInterrupt: 

In [None]:
import requests
import pandas as pd
BASE_URL = "http://localhost:3000/api/circuit-breaker-types"

response = requests.get(f"{BASE_URL}/")

df = pd.DataFrame(response)

if response.status_code == 200:
    print("Response:", response.json())
else:
    print(f"Error: {response.status_code}, {response.text}")

In [None]:
import requests
import pandas as pd
import pyperclip
BASE_URL = "http://localhost:3000/api/circuit-breaker-types"
import time
response = requests.get(f"{BASE_URL}/")
data = response.json()
df = pd.DataFrame(data)

def map_to_order_number(row) -> str:
    order_number = "A9F0"
    match row['curve']:
        case 'B':
            order_number += "3"
        case 'C':
            order_number += "4"

    match row['phases']:
        case 1:
            order_number += "1"
        case 2:
            order_number += "2"
        case 3:
            order_number += "3"

    order_number += str(row['In']).zfill(2)
    return order_number
    

df['order_number'] = df.apply(map_to_order_number, axis=1)

columns = [
    "part_type", "manufacturer_id", "order_number", "sahkonumero",
    "price", "description", "created_date", "is_active", "type_id"
]

test = pd.read_excel("output/processed_circuit_breakers.xlsx")
test.rename(columns={test.columns[0]: "order_number"}, inplace=True)
print(test.columns[0])
part_base_df = pd.DataFrame(columns=columns)


merged_df = pd.merge(part_base_df, test, on="order_number", how="right")

part_base_df['type_id'] = df['ID']
part_base_df['manufacturer_id'] = 2
part_base_df['order_number'] = df['order_number']
part_base_df['sahkonumero'] = merged_df['sahkonumero_y']
part_base_df['price'] = 9999
part_base_df['description'] = merged_df['description_y']
part_base_df['part_type'] = 'circuit_breaker'
part_base_df['is_active'] = 1
# part_base_df['created_date'] = time.localtime()
TABLE_NAME = "parts_base"
sql_queries = ""
for _, row in part_base_df.iterrows():
    # Dynamically build the VALUES part of the query
    values = []
    for col in part_base_df.columns:
        value = row[col]
        if pd.isna(value):  # Handle NULL values
            values.append("NULL")
        elif isinstance(value, str):  # Handle strings
            values.append(f"'{value.replace('\'', '\'\'')}'")  # Escape single quotes
        else:  # Handle numbers
            values.append(str(value))
    
    # Construct the SQL query
    columns_str = ", ".join(part_base_df.columns)
    values_str = ", ".join(values)
    sql_query = f"INSERT INTO {TABLE_NAME} ({columns_str}) VALUES ({values_str});"
    
    # Append the query to the string
    sql_queries += sql_query + "\n"

# Copy the generated SQL queries to the clipboard
pyperclip.copy(sql_queries)
if response.status_code == 200:
    print("Response:", response.json())
else:
    print(f"Error: {response.status_code}, {response.text}")





# df2 = pd.read_excel('output/processed_circuit_breakers.xlsx')


In [None]:
import requests
import pandas as pd
import re
df = pd.read_excel("data/circuit_breakers.xlsx")

## A9F03101	iC60N johdonsuojak 1P B 1A 6kA	1	32 580 01	B

#split description 
str1 = "iC60N johdonsuojak 1P B 1A 6kA"

def extract_decimal(cell: str):
    split = cell.split()
    numbers = re.findall(r'\d+', str(split[4])) if len(split) > 4 else []
    if len(numbers) == 2:
        decimal_str = f"{numbers[0]}.{numbers[1]}"
        try:
            return float(decimal_str)
        except ValueError:
            return None
    elif len(numbers) == 1:
        try:
            return int(numbers[0])
        except ValueError:
            return None
    else:
        return None

df['in'] = df['description'].apply(extract_decimal)


columns = [
    "part_type", "manufacturer_id", "order_number", "sahkonumero",
    "price", "description", "created_date", "is_active", "type_id"
]





BASE_URL = "http://localhost:3000/api/circuit-breaker-types"


response = requests.get(f"{BASE_URL}/")
data = response.json()
database = pd.DataFrame(data)


    

In [11]:
import requests
import pandas as pd
import pyperclip
BASE_URL = "http://localhost:3000/api/circuit-breaker-types"
import time
response = requests.get(f"{BASE_URL}/")
data = response.json()
df = pd.DataFrame(data)

def map_to_order_number(row) -> str:
    order_number = "A9F0"
    match row['curve']:
        case 'B':
            order_number += "3"
        case 'C':
            order_number += "4"

    match row['phases']:
        case 1:
            order_number += "1"
        case 2:
            order_number += "2"
        case 3:
            order_number += "3"

    order_number += str(int(row['In'])).zfill(2)
    return order_number
    

df['order_number'] = df.apply(map_to_order_number, axis=1)

columns = [
    "part_type", "manufacturer_id", "order_number", "sahkonumero",
    "price", "description", "created_date", "is_active", "type_id"
]


test = pd.read_excel("output/processed_circuit_breakers.xlsx")
test.rename(columns={test.columns[0]: "order_number"}, inplace=True)
print(test.columns[0])


merged_df = pd.merge(df, test, on="order_number", how="right")

columns2 = [
    "ID",
    "curve",
    "phases",
    "In",
    "Ik"
]

temp = pd.DataFrame(columns=columns2)
temp['curve'] = merged_df['curve_y']
temp['phases'] = merged_df['phases_y']
temp['In'] = merged_df['numbers']
temp['Ik'] = 6000
table_name = "circuit_breaker_types"

# Initialize an empty string to store the SQL statements
sql_statements = ""

# Iterate over the rows of the DataFrame
for _, row in temp.iterrows():
    # Extract values for each column
    values = [
        "NULL" if pd.isna(row["ID"]) else row["ID"],  # Handle NULL for ID
        f"'{row['curve']}'",  # Text values need to be quoted
        row["phases"],
        row["In"],
        row["Ik"]
    ]
    
    # Convert the values list to a comma-separated string
    values_str = ", ".join(map(str, values))
    
    # Create the SQL INSERT statement
    sql_statement = f"INSERT INTO {table_name} (ID, curve, phases, \"In\", Ik) VALUES ({values_str});"
    
    # Append the statement to the collection
    sql_statements += sql_statement + "\n"

# Print or save the generated SQL statements
print(sql_statements)
pyperclip.copy(sql_statements)
nan_id_rows = merged_df[merged_df['ID'].isna()]

order_number
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 1.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 2.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 2.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 4.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 4.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 6.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 6.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 10.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 10.0, 6000);
INSERT INTO circuit_breaker_types (ID, curve, phases, "In", Ik) VALUES (NULL, 'B', 1, 16.0, 6000);
INSE