In [3]:
import pandas as pd
from databricks import sql 
from multiprocessing import Pool
import time

In [2]:

def execute_with_retries(cursor, query, max_retries=5, delay=5):
    retries = 0
    while retries < max_retries:
        try:
            cursor.execute(query)
            return
        except sql.exc.RequestError as e:
            error_message = str(e)
            print(f"Query failed (Attempt {retries+1}/{max_retries}): {error_message}")  
            
            if "429" in error_message or "503" in error_message:  # Rate limit or service unavailable
                retries += 1
                time.sleep(delay)
            else:
                raise
    
    raise Exception(f"Max retries exceeded for query execution. Last error: {error_message}")

def escape_sql_value(value):
    if value is None or str(value).strip().lower() in {"null", ""}:
        return "NULL"
    elif isinstance(value, (int, float)):  
        return str(value)  # Keep numbers unquoted
    else:
        value = str(value)  
        value = value.replace("'", "''").replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
        return f"'{value}'"

def process_chunk(chunk_number, chunk, schema, table_name, connection_details):
    connection = sql.connect(**connection_details)
    cursor = connection.cursor()

    values_list = []
    for _, row in chunk.iterrows():
        values = [escape_sql_value(val) for val in row]
        values_list.append(f"({', '.join(values)})")

    if values_list:
        insert_query = f"""
        INSERT INTO {schema}.{table_name}
        VALUES {', '.join(values_list)}
        """
        execute_with_retries(cursor, insert_query)

    cursor.close()
    connection.close()

def upload_file_to_databricks(df, table_name, schema, chunk_size, num_processes):
    # Ensure all data is converted to strings and replace NaN with None
    df = df.where(pd.notnull(df), None)

    connection = sql.connect(**connection_details)
    cursor = connection.cursor()

    # Create table if it doesn't exist
    columns = [f"`{col}` STRING" for col in df.columns]
    create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {schema}.{table_name} 
        USING delta 
        TBLPROPERTIES (
            'delta.columnMapping.mode' = 'name',
            'delta.minReaderVersion' = '2',
            'delta.minWriterVersion' = '5'
        )
        AS SELECT {', '.join([f"CAST(NULL AS STRING) AS `{col}`" for col in df.columns])} WHERE 1=0
    """
    execute_with_retries(cursor, create_table_query)
    
    cursor.close()
    connection.close()

    # Split data into chunks
    chunks = [df.iloc[i:i + chunk_size] for i in range(0, len(df), chunk_size)]

    # Multiprocessing for parallel insertion
    with Pool(processes=num_processes) as pool:
        results = []
        for i, chunk in enumerate(chunks):
            result = pool.apply_async(process_chunk, args=(i, chunk, schema, table_name, connection_details))
            results.append(result)

        pool.close()
        pool.join()

In [None]:
# Databricks configuration
connection_details = {
    "server_hostname": "<DATABRICKS_HOSTNAME>",
    "http_path": "<DATABRICKS_HTTP_PATH>",
    "access_token": "<DATABRICKS_ACCESS_TOKEN>"
}

file_path = "<LOCAL_FILE_PATH>"
df = pd.read_csv(file_path, dtype=str)

table_name = "<TABLE_NAME>"

try:
    upload_file_to_databricks(df, table_name, schema="<SCHEMA_NAME>", chunk_size=5000, num_processes=10)  # Control parallelism here
except Exception as e:
    print(f"Failed to upload file: {e}")