## Download / Install MSSQL Express for local db

Website: https://www.microsoft.com/fr-fr/sql-server/sql-server-downloads

Download 2019: https://go.microsoft.com/fwlink/?linkid=866658

In [None]:
from msa.odbc import PyODBC

server = PyODBC(
    uri="Server=localhost\SQLEXPRESS01;Database=master;DRIVER={ODBC Driver 18 for SQL Server};"
        "Trusted_Connection=yes;TrustServerCertificate=YES;"
)

## Init data

In [None]:
with server.cursor() as c:
    try:
        c.execute(f"DROP TABLE PYMSA_UNITTEST")
    except Exception:
        pass
    c.execute(f"""CREATE TABLE PYMSA_UNITTEST (
ID int IDENTITY PRIMARY KEY,
int int,
bigint bigint,
bit bit,
decimal decimal,
float float,
real real,
date date,
datetime datetime,
datetime2 datetime2,
smalldatetime smalldatetime,
time time,
string varchar(64) not null,
binary varbinary(64)
)""")
    c.commit()
    c.execute(f"TRUNCATE TABLE PYMSA_UNITTEST")
    c.executemany(
        "INSERT INTO [master].[dbo].[PYMSA_UNITTEST]([int],[string]) VALUES (?,?),(?,?)",
        [[1, '1', 2, '2']]
    )
    c.commit()

## Fetch

### Main thread

In [None]:
with server.cursor() as c:
    result = c.execute(f"SELECT * from PYMSA_UNITTEST").fetchall()
    result = [list(row) for row in result]
result

In [None]:
with server.cursor() as c:
    c.execute(f"SELECT * from PYMSA_UNITTEST")
    result = list(c.fetch_arrow_batches(n=10)) # Iterator pyarrow.RecordBatch

In [None]:
with server.cursor() as c:
    c.execute(f"SELECT * from PYMSA_UNITTEST")
    result = c.fetch_arrow() # all, pyarrow.Table
result

In [None]:
with server.cursor() as c:
    c.execute(f"SELECT int, string, date, float, real from PYMSA_UNITTEST")
    pyarrow_batch_reader = c.reader() # pyarrow.RecordBatchReader

### Concurrent cursor execute

In [None]:
import os

def r(x):
    return x.fetch_arrow()
    
tables = list(server.map(
    "execute", # getattr(cursor, method) or callable like def func(cursor, *args, **kwargs)
    result_wrapper=r,
    arguments=[
        ([f"select * from PYMSA_UNITTEST"], {}) # (args, kwargs)
        for _ in range(1000)
    ],
    concurrency=os.cpu_count() # default
))
len(tables), tables[0]

## SQLTable

In [None]:
with server.connect() as connection:
    table = connection.table(name="PYMSA_UNITTEST")
    schema_arrow = table.schema_arrow
schema_arrow

## Insert

### Classic INSERT INTO VALUES

In [None]:
import datetime

with server.connect(timeout=30) as connection: # timeout in seconds   
    with connection.cursor() as c:
        table = c.table(name="PYMSA_UNITTEST")

        c.insert_pylist(
            table=table,
            rows=[[datetime.datetime.now(), datetime.datetime.now(), 1, b"bytes"]],
            columns=["string", "datetime2", "int", "binary"],
            commit=True,
            commit_size=10, # number of rows to commit, max = 1000, default = 1, set > 1 IT'S FASTER ! but random order
        )
        c.execute(f"SELECT * from PYMSA_UNITTEST")
    
        batches = list(c.fetch_arrow_batches(n=10))
    
    table.truncate()
    with table.cursor as c:
        c.set_identity_insert(table, True)
        c.insert_arrow(
            table=table,
            data=batches, # pyarrow.RecordBatch, pyarrow.RecordBatchReader, pyarrow.Table or Iterable[RecordBatch]
            cast=True,
            safe=True,
            commit=True,
            delayed_check_constraints=True, # delay constraints check at the end
            check_constraints=False # check table constraints on insert
        )
    
    result = connection.cursor().execute("SELECT * from PYMSA_UNITTEST").fetchall()
result

### Bulk Insert with CSV

In [None]:
with server.connect() as connection:
    table = connection.table(name="PYMSA_UNITTEST")
    data = connection.cursor().execute("SELECT * from PYMSA_UNITTEST").fetch_arrow()
    
    table.insert_arrow(data, commit=True, bulk=True)
    table.bulk_insert_arrow(data, commit=True)

### Insert parquet

In [None]:
import pyarrow, io
import pyarrow.parquet as p

def gen_data(n: int):
    return pyarrow.Table.from_arrays([
        pyarrow.array(['test%s' % i for i in range(n)]),
        pyarrow.array(['dropped' for i in range(n)])
    ], schema=pyarrow.schema([
        pyarrow.field("string", pyarrow.string(), nullable=False),
        pyarrow.field("dropped", pyarrow.string(), nullable=False)
    ]))

buf = io.BytesIO()
p.write_table(gen_data(10), buf)
buf.seek(0)

### Insert parquet file

In [None]:
from pyarrow.fs import LocalFileSystem

with server.connect() as connection:
    # SQLTable methods build default cursor with connection.cursor()
    table = connection.table(name="PYMSA_UNITTEST")
    
    table.truncate()
    
    table.insert_parquet_file(
        buf, # or filesystem path, file-object, Native file like pyarrow.parquet.write_table
        batch_size=65536, # default = 65536
        commit=True,
        bulk=False, # CSV bulk insert
        filesystem=LocalFileSystem() # pyarrow.fs.FileSystem, default = LocalFileSystem()
    )
    # or with cursor.insert_parquet_file(table, buf, ...)
    
    result = connection.cursor().execute("SELECT * from PYMSA_UNITTEST").fetch_arrow()
result.to_pandas()

### Insert parquet dir

In [None]:
import pyarrow.parquet as p
import os

base_dir = "parquets"
os.makedirs(base_dir, exist_ok=True)

for i in range(1, 3):
    folder = "%s/partition=%s" % (base_dir, i)
    os.makedirs(folder, exist_ok=True)
    
    p.write_table(gen_data(i), "%s/part%s.parquet" % (base_dir, i))
    p.write_table(gen_data(i), "%s/part%s.parquet" % (folder, i))

In [None]:
import pyarrow.compute as pc

with server.cursor() as cursor:
    table = cursor.table("PYMSA_UNITTEST")
    table.truncate()
    
    def a(batch):
        filtered_batch = batch.filter(pc.equal(batch['string'], 'test0'))
        print(filtered_batch.num_rows)
        return filtered_batch

    cursor.insert_parquet_dir(
        table,
        base_dir, # or filesystem dir path only
        batch_size=65536, # default
        commit=True,
        bulk=False, # CSV bulk insert
        # see https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
        file_filters=[('string', 'in', ['test0', 'test1'])], # filter on file read
        batch_apply=a, # apply on each batch
        filesystem=LocalFileSystem() # pyarrow.fs.FileSystem, default = LocalFileSystem()
    )
    # or with cursor.insert_parquet_dir(table, "path/to/dir", ...)

    result = cursor.execute("SELECT * from PYMSA_UNITTEST").fetch_arrow().to_pandas()
result

In [None]:
import shutil
shutil.rmtree(base_dir)

## SQLView: inherit SQLTable
So you can insert in it too

In [None]:
with server.connect() as connection:
    connection.cursor().execute("CREATE VIEW vPYMSA_UNITTEST AS SELECT string, int from PYMSA_UNITTEST")
    
    table = connection.view("vPYMSA_UNITTEST")
    table.schema_arrow # persist
table.schema_arrow

## SQLIndex

In [None]:
with server.cursor() as c:
    table = c.table("PYMSA_UNITTEST")
    print(c.create_table_index.__doc__)
    c.create_table_index(
        table=table,
        type="",
        columns=["string", "int"]
    )
    c.commit()

In [None]:
table.indexes

### Insert with indexes

In [None]:
with server.cursor() as c:
    table.truncate()
    c.set_identity_insert(table, True)
    c.disable_table_all_indexes(table, except_primary_key=True)
    c.insert_arrow(
        table=table,
        data=batches, # pyarrow.RecordBatch, pyarrow.RecordBatchReader, pyarrow.Table or Iterable[RecordBatch]
        cast=True,
        safe=True,
        commit=True
    )
    c.rebuild_table_all_indexes(table)

In [None]:
table.cursor.execute("SELECT * from PYMSA_UNITTEST").fetchall()

## Drop SQLTable

In [None]:
with server.connect() as connection:
    connection.table(name="PYMSA_UNITTEST").drop()