Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inserting Data into Databricks via the databricks-sql-python library (Leveraging SQLALCHEMY) #299

Open
kopitb opened this issue Dec 1, 2023 · 5 comments
Labels
bug Something isn't working sqlalchemy

Comments

@kopitb
Copy link

kopitb commented Dec 1, 2023

Issue Description: Inserting Data into Databricks via the databricks-sql-python library (Leveraging SQLALCHEMY)

Error Message:

sql

DatabaseError: (databricks.sql.exc.ServerOperationError) Column id is not specified in INSERT
[SQL: INSERT INTO model_integrated (name) VALUES (%(name)s)]
[parameters: {'name': 'Loadsheetname'}]
(Background on this error at: https://sqlalche.me/e/14/4xp6)

Overview:
I'm encountering an issue when attempting to insert records into a Databricks database using SQLAlchemy. The error suggests that the id column is not specified in the INSERT statement, leading to a ServerOperationError.

For what it's worth, this works perfectly fine when inserting into a PostgreSQL database.

Steps to Reproduce:

  1. Connect to Databricks using SQLAlchemy.
  2. Define SQLAlchemy models, including an auto-incrementing primary key (id) column.
  3. Attempt to insert records into the model_integrated table.
  4. Encounter the mentioned error.
    Expected Behavior:
    I expect the records to be inserted successfully into the Databricks database, with the auto-incrementing id column being generated by the database.

Environment:

Python Version: 3.11.4
Databricks-sql-python: 3.0.1

I have verified that a similar approach works for a PostgreSQL database but fails in Databricks.
The issue seems to be related to the auto-incrementing primary key behavior.
Code Snippet:

python

import pandas as pd
import sqlalchemy.orm
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import Column, Integer, String, ForeignKey, Numeric, DateTime
from databricks import sql

connection = sql.connect(
    server_hostname=HOST,
    http_path=HTTP_PATH,
    access_token=ACCESS_TOKEN)

print("Connection established")   

# SQLAlchemy setup
Base = declarative_base()

# Model class for "model" table
class ModelIntegrated(Base):
    __tablename__ = 'model_integrated'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String)    
    periods = relationship('PeriodIntegrated', backref=('model_integrated'))

# Model class for "period" table
class PeriodIntegrated(Base):
    __tablename__ = 'period_integrated'    
    id = Column(Integer, primary_key=True, autoincrement=True)
    model_id = Column(Integer, ForeignKey('model_integrated.id'))
    name = Column(String)
    solution = Column(Numeric)
    start = Column(DateTime)
    end = Column(DateTime)
    period_order = Column(Integer)

catalog = "<catalog>"

conn_string = (
    "databricks://token:{token}@{host}?http_path={http_path}&catalog={catalog}&schema={schema}".format(
        token=ACCESS_TOKEN,
        host=HOST,
        port=<port>,
        catalog=catalog,
        http_path=HTTP_PATH,
        schema="<schema>"
    )
)
print("this is the conn_string", conn_string)

engine = create_engine(conn_string, echo=True)

print("engine executed")

Session = sessionmaker(bind=engine)
session = Session()
excel_file_path = "loadsheet.xlsx"
xls = pd.ExcelFile(excel_file_path)
for tab_name in xls.sheet_names:
    print(f"Processing tab: {tab_name}")
    df = pd.read_excel(excel_file_path, sheet_name=tab_name)

    # Convert column names to lowercase
    df.columns = df.columns.str.lower()
    print("Data in the tab:")
    print(df)
    
    if tab_name == 'Model':
        print("Processing Model data")
        for _, row in df.iterrows():
            # PostreSQL solution
            model = ModelIntegrated(name=row['name'])
            session.add(model)
            session.commit()  # Commit the transaction
            
            # Retrieve the generated ID using a separate query
            model_id = session.query(ModelIntegrated.id).filter_by(name=row['name']).scalar()

            # session.flush()  # Get the auto-generated ID
            # model_id = model.id # Retrieve the ID
            print(f"Inserted Model with name: {model.name}, ID: {model_id}")
    
    elif tab_name == 'Period':
        print("Processing Period data")
        
        # Sort the DataFrame by "start" dates in ascending order
        df_sorted = df.sort_values(by='start')
        
        # Add a new column "period_order" with ascending integer values
        df_sorted['period_order'] = range(1, len(df_sorted) + 1)
        
        for _, row in df_sorted.iterrows():
            period = PeriodIntegrated(
                model_id=model_id,
                name=row['name'],
                solution=row['solution'],
                start=datetime.strptime(row['start'], '%Y-%m-%d %I:%M:%S %p'),  # Convert to datetime
                end=datetime.strptime(row['end'], '%Y-%m-%d %I:%M:%S %p')  # Convert to datetime
            )
            
            # Set the "period_order" attribute with the value from the DataFrame
            period.period_order = row['period_order']
            
            session.add(period)
            print(f"Inserted Period with name: {period.name}, Period Order: {period.period_order}")

# Commit the changes
session.commit()
session.close()

Note:
I have also reached out to the Databricks community for assistance.

Thank you,
Brent

@susodapop
Copy link
Contributor

Thanks for the detailed write-up. There are a few things happening here.

First up, you cannot use autoincrement=True with the Databricks dialect. This is discussed in our dialect REAMDE here. Support for it will come in a future release.

Second, it looks like you're using the old SQLAlchemy 1.x syntax for your model code. The dialect included in databricks-sql-connector>=3.0.0 is built for SQLAlchemy 2.x exclusively. It may work but we can't guarantee it. You can see an example of the new syntax in our e2e tests here.

Third, the actual exception is a syntax error. SQLAlchemy is writing an invalid SQL statement by omitting the column names in the INSERT. I'm not clear how this is happening as it's not something we observe in the 1000+ INSERT test cases that we run during development. But I wonder if you may be using an older SQLAlchemy version below 2.0.0.

Which SQLAlchemy version do you have installed?

@susodapop
Copy link
Contributor

I tried reproducing this error locally with:

  • Python 3.11.4
  • SQLAlchemy==2.0.22
  • databricks-sql-connector==3.0.1

and am not able to make SQLAlchemy emit an INSERT statement that omits the column names.

Can you provide a reproducible example? FWIW: I don't think the Excel file has any bearing on this behaviour. In my attempts to reproduce I used both an Excel file as input and a randomly generated pandas dataframe. It worked as expected in both cases.

@kopitb
Copy link
Author

kopitb commented Dec 4, 2023

Thanks @susodapop . I was running SQLAlchemy==1.4.49.

I upgraded to 2.0.22, but I'm still facing the same issue...

I think the issue is that within the excel file there is no "id" column.

I want SQLAlchemy to add the "Model" to Databricks. Have Databricks + SQLAlchemy issue an id (Primary Key), and then return that id for all the future tables which I can use as a Foreign Key.

This works for a PostgreSQL database with the exact same data.
'''
for tab_name in xls.sheet_names:
print(f"Processing tab: {tab_name}")
df = pd.read_excel(excel_file_path, sheet_name=tab_name)

# Convert column names to lowercase
df.columns = df.columns.str.lower()
print("Data in the tab:")
print(df)

if tab_name == 'Model':
    print("Processing Model data")
    for _, row in df.iterrows():
        model = ModelIntegrated(name=row['name'])
        session.add(model)
        session.flush()  # Get the auto-generated ID
        model_id = model.id # Retrieve the ID
        print(f"Inserted Model with name: {model.name}, ID: {model_id}")

'''

Thank you,
Brent

@susodapop
Copy link
Contributor

Just getting back to this after traveling.

Can you please provide a runnable reproduction? I have attempted to reproduce the issue going off the code you provided but the code works. I don't have access to your excel file (and as indicated above, I don't think the excel file is really the problem). Without reproduction steps we're blocked on implementing a fix.

@kopitb
Copy link
Author

kopitb commented Dec 12, 2023

Hi @susodapop here is a runnable reproduceable code. You'll just need to add your Databricks placeholders, and location to the excel file (attached)

import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Numeric, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

def process_excel_data(excel_file_path, host, http_path, access_token, catalog, schema):
    # Replace placeholders with actual values
    HOST = host
    HTTP_PATH = http_path
    ACCESS_TOKEN = access_token

    # SQLAlchemy setup
    Base = declarative_base()

    # Model class for "model" table
    class ModelIntegrated(Base):
        __tablename__ = 'model_integrated'

        id = Column(Integer, primary_key=True)
        name = Column(String)
        periods = relationship('PeriodIntegrated', backref=('model_integrated'))

    # Model class for "period" table
    class PeriodIntegrated(Base):
        __tablename__ = 'period_integrated'
        id = Column(Integer, primary_key=True)
        model_id = Column(Integer, ForeignKey('model_integrated.id'))
        name = Column(String)
        solution = Column(Numeric)
        start = Column(DateTime)
        end = Column(DateTime)
        period_order = Column(Integer)

    conn_string = (
        f"databricks://token:{ACCESS_TOKEN}@{HOST}?http_path={HTTP_PATH}&catalog={catalog}&schema={schema}"
    )

    engine = create_engine(conn_string, echo=True)

    Session = sessionmaker(bind=engine)
    session = Session()

    xls = pd.ExcelFile(excel_file_path)

    for tab_name in xls.sheet_names:
        print(f"Processing tab: {tab_name}")
        df = pd.read_excel(excel_file_path, sheet_name=tab_name)

        # Convert column names to lowercase
        df.columns = df.columns.str.lower()
        print("Data in the tab:")
        print(df)

        if tab_name == 'Model':
            print("Processing Model data")
            for _, row in df.iterrows():
                # PostgreSQL solution
                model = ModelIntegrated(name=row['name'])
                session.add(model)
                session.commit()  # Commit the transaction

                # Retrieve the generated ID using a separate query
                model_id = session.query(ModelIntegrated.id).filter_by(name=row['name']).scalar()

                print(f"Inserted Model with name: {model.name}, ID: {model_id}")

        elif tab_name == 'Period':
            print("Processing Period data")

            # Sort the DataFrame by "start" dates in ascending order
            df_sorted = df.sort_values(by='start')

            # Add a new column "period_order" with ascending integer values
            df_sorted['period_order'] = range(1, len(df_sorted) + 1)

            for _, row in df_sorted.iterrows():
                period = PeriodIntegrated(
                    model_id=model_id,
                    name=row['name'],
                    solution=row['solution'],
                    start=datetime.strptime(row['start'], '%Y-%m-%d %I:%M:%S %p'),  # Convert to datetime
                    end=datetime.strptime(row['end'], '%Y-%m-%d %I:%M:%S %p')  # Convert to datetime
                )

                # Set the "period_order" attribute with the value from the DataFrame
                period.period_order = row['period_order']

                session.add(period)
                print(f"Inserted Period with name: {period.name}, Period Order: {period.period_order}")

    # Commit the changes
    session.commit()
    session.close()

# Example usage
excel_path = "<location>/githubtest.xlsx"
process_excel_data(excel_path, "<your_host>", "<your_http_path>", "<your_access_token>", "<your_catalog>", "<your_schema>")

@kravets-levko kravets-levko added bug Something isn't working sqlalchemy labels Apr 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working sqlalchemy
Projects
None yet
Development

No branches or pull requests

3 participants