# 02 - Load data into an Azure SQL partitioned table

Azure SQL supports [table and index partitioning](https://docs.microsoft.com/en-us/sql/relational-databases/partitions/partitioned-tables-and-indexes). If a table is partitioned, data can be loaded in parallel without the need to put a lock on the entire table. In order to allow parallel partitions to be loaded, the source RDD/DataFrame/Dataset and the target Azure SQL table *MUST* have compatible partitions, which means that one RDD partition ends up exactly in one or more than one Azure SQL partitions, and those are not used by other RDD partitions.

When table is partitioned, data *can* be bulk loaded in parallel also if there are indexes on the table. Especially on very large databases this is the recommended approach. The bulk load process will be slower, but you'll not need to create indexes after having loaded the data. Creation of indexes on huge, already loaded, tables is a very expensive operation that you would like to avoid if possibile.

The sample is using the new sql-spark-connector (https://github.com/microsoft/sql-spark-connector). The new connector must be manually installed by importing the .jar file (available in GitHub repo's releases) into the cluster.
Anytime we mention "row-store" indexes, we mean an index that is not using the [column-store layout](https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-overview) to store its data.

In this notebook there are three samples

- Load data into a partitioned table with row-store indexes
- Load data into a partitioned table with columns-store indexes

Databricks supported versions: Spark 2.4.5 and Scala 2.11

## Setup

Define variables used thoughout the script. Azure Key Value has been used to securely store sensitive data. More info here: [Create an Azure Key Vault-backed secret scope](https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope)

In [4]:
val scope = "key-vault-secrets"

val storageAccount = "dmstore2";
val storageKey = dbutils.secrets.get(scope, "dmstore2-2");

val server = dbutils.secrets.get(scope, "srv001").concat(".database.windows.net");
val database = dbutils.secrets.get(scope, "db001");
val user = dbutils.secrets.get(scope, "dbuser001");
val password = dbutils.secrets.get(scope, "dbpwd001");
val table = "dbo.LINEITEM_LOADTEST"


Configure Spark to access Azure Blob Store

In [6]:
spark.conf.set(s"fs.azure.account.key.$storageAccount.blob.core.windows.net", storageKey);

Load the Parquet file generated in `00-create-parquet-file` notebook that contains LINEITEM data partitioned by Year and Month

In [8]:
val li = spark.read.parquet(s"wasbs://tpch@$storageAccount.blob.core.windows.net/10GB/parquet/lineitem")

Loaded data is split in 20 dataframe partitions

In [10]:
li.rdd.getNumPartitions

Show schema of loaded data

In [12]:
li.printSchema

All columns are shown as nullable, even if they were originally set to NOT NULL, so we will need to fix this to make sure data can be loaded correctly. 

Schema needs to be defined explicitly as connector is very sensitive to nullability, as per the following issue [Nullable column mismatch between Spark DataFrame & SQL Table Error](
https://github.com/microsoft/sql-spark-connector/issues/5), so we need to explicity create the schema and apply it to the loaded data

In [14]:
import org.apache.spark.sql.types._

val schema = StructType(
    StructField("L_ORDERKEY", IntegerType, false) ::
    StructField("L_PARTKEY", IntegerType, false) ::
    StructField("L_SUPPKEY", IntegerType, false) ::  
    StructField("L_LINENUMBER", IntegerType, false) ::
    StructField("L_QUANTITY", DecimalType(15,2), false) ::
    StructField("L_EXTENDEDPRICE", DecimalType(15,2), false) ::
    StructField("L_DISCOUNT", DecimalType(15,2), false) ::
    StructField("L_TAX", DecimalType(15,2), false) ::
    StructField("L_RETURNFLAG", StringType, false) ::
    StructField("L_LINESTATUS", StringType, false) ::
    StructField("L_SHIPDATE", DateType, false) ::
    StructField("L_COMMITDATE", DateType, false) ::
    StructField("L_RECEIPTDATE", DateType, false) ::
    StructField("L_SHIPINSTRUCT", StringType, false) ::  
    StructField("L_SHIPMODE", StringType, false) ::  
    StructField("L_COMMENT", StringType, false) ::  
    StructField("L_PARTITION_KEY", IntegerType, false) ::  
    Nil)
    
  val li2 = spark.createDataFrame(li.rdd, schema)

Make sure you create on your Azure SQL the following LINEITEM table, partitioned by L_PARTITION_KEY:

```sql
create partition function pf_LINEITEM(int)
as range left for values 
(
	199201,199202,199203,199204,199205,199206,199207,199208,199209,199210,199211,199212,
	199301,199302,199303,199304,199305,199306,199307,199308,199309,199310,199311,199312,
	199401,199402,199403,199404,199405,199406,199407,199408,199409,199410,199411,199412,
	199501,199502,199503,199504,199505,199506,199507,199508,199509,199510,199511,199512,
	199601,199602,199603,199604,199605,199606,199607,199608,199609,199610,199611,199612,
	199701,199702,199703,199704,199705,199706,199707,199708,199709,199710,199711,199712,
	199801,199802,199803,199804,199805,199806,199807,199808,199809,199810
);

create partition scheme ps_LINEITEM
as partition pf_LINEITEM
all to ([Primary])
;

create table [dbo].[LINEITEM_LOADTEST]
(
	[L_ORDERKEY] [int] not null,
	[L_PARTKEY] [int] not null,
	[L_SUPPKEY] [int] not null,
	[L_LINENUMBER] [int] not null,
	[L_QUANTITY] [decimal](15, 2) not null,
	[L_EXTENDEDPRICE] [decimal](15, 2) not null,
	[L_DISCOUNT] [decimal](15, 2) not null,
	[L_TAX] [decimal](15, 2) not null,
	[L_RETURNFLAG] [char](1) not null,
	[L_LINESTATUS] [char](1) not null,
	[L_SHIPDATE] [date] not null,
	[L_COMMITDATE] [date] not null,
	[L_RECEIPTDATE] [date] not null,
	[L_SHIPINSTRUCT] [char](25) not null,
	[L_SHIPMODE] [char](10) not null,
	[L_COMMENT] [varchar](44) not null,
	[L_PARTITION_KEY] [int] not null
) on ps_LINEITEM([L_PARTITION_KEY])
```

You can check that Azure SQL table is partitioned by running the following T-SQL command:

```sql
SELECT
    schema_name(t.schema_id) as [schema_name],
    t.[name] as table_name,
    i.[name] as index_name,
    ps.[partition_id],
    ps.partition_number,
    p.data_compression_desc,
    i.[type_desc],    
    ps.row_count,
    (ps.used_page_count * 8.) / 1024. / 1024. as size_in_gb
from
    sys.dm_db_partition_stats as ps 
inner join  
    sys.partitions as p on ps.partition_id = p.partition_id
inner join
    sys.tables as t on t.object_id = ps.object_id
inner join
    sys.indexes as i on ps.object_id = i.object_id and ps.index_id = i.index_id
where
    t.[name] = 'LINEITEM_LOADTEST' and t.[schema_id] = schema_id('dbo')
order by
    [schema_name], table_name, index_name, partition_number
```

## Load data into a partitioned table with row-store indexes

On the target table create the Clustered Index and a couple of Non-Clustered Index:

```sql
create clustered index IXC on dbo.[LINEITEM_LOADTEST] ([L_COMMITDATE]) 
on ps_LINEITEM([L_PARTITION_KEY]);

create unique nonclustered index IX1 on dbo.[LINEITEM_LOADTEST] ([L_ORDERKEY], [L_LINENUMBER], [L_PARTITION_KEY]) 
on ps_LINEITEM([L_PARTITION_KEY]);

create nonclustered index IX2 on dbo.[LINEITEM_LOADTEST] ([L_PARTKEY], [L_PARTITION_KEY]) 
on ps_LINEITEM([L_PARTITION_KEY]);
```

As DataFrame and Azure SQL Table are both partitioned by L_PARTITION_KEY, there isn't much left to do and the connector will take care of everything for us. `tableLock` can be set to `false` and locks will not interfere with each other.

In [20]:
val url = s"jdbc:sqlserver://$server;databaseName=$database;"

li2.write 
  .format("com.microsoft.sqlserver.jdbc.spark") 
  .mode("overwrite")   
  .option("truncate", "true") 
  .option("url", url) 
  .option("dbtable", "dbo.LINEITEM_LOADTEST") 
  .option("user", user) 
  .option("password", password) 
  .option("reliabilityLevel", "BEST_EFFORT") 
  .option("tableLock", "false") 
  .option("batchsize", "100000") 
  .save()

## Load data into a partitioned table with column-store index

Empty table if needed, to speed up index deletion

```sql
truncate table dbo.[LINEITEM_LOADTEST];
```

Drop the previously create indexes if needed:
```sql
drop index IXC on dbo.[LINEITEM_LOADTEST];
drop index IX1 on dbo.[LINEITEM_LOADTEST];
drop index IX2 on dbo.[LINEITEM_LOADTEST];
```

And the create a clustered columnstore index:

```sql
create clustered columnstore index IXCCS on dbo.[LINEITEM_LOADTEST]
on ps_LINEITEM([L_PARTITION_KEY]);
```

Load data using [columnstore data loading best pratices](https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-data-loading-guidance), by loading 1048576 rows at time, to land directly into a compressed segment. Locking the table is also not needed. Data with be loaded in parallel, using as many as Apache Spark workers are available.

In [24]:
val url = s"jdbc:sqlserver://$server;databaseName=$database;"

li2.write 
  .format("com.microsoft.sqlserver.jdbc.spark") 
  .mode("overwrite")   
  .option("truncate", "true") 
  .option("url", url) 
  .option("dbtable", "dbo.LINEITEM_LOADTEST") 
  .option("user", user) 
  .option("password", password) 
  .option("reliabilityLevel", "BEST_EFFORT") 
  .option("tableLock", "true") 
  .option("batchsize", "1048576") 
  .save()

## Additional Notes

Please note that if you *can* use the `tableLock` option by setting it to `true`, performance may increase as less overhead will be required to manage locks