In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SaveMode}
import java.sql.{Connection, DriverManager, Statement}
import scala.util.{Failure, Success, Try}

// Get params from Workflow
val sp_name = dbutils.widgets.get("sp_name")
val env = dbutils.widgets.get("environment")
val table_name = "databricks.StateReportedCustomer"
val sql_query = s"SELECT * FROM state_reporting_$env.gold.customer"

// SQL Server Connection Parameters
val username = dbutils.secrets.get(scope = "state_reporting", key = s"sql_server_user_$env")
val password = dbutils.secrets.get(scope = "state_reporting", key = s"sql_server_pass_$env")
val host = dbutils.secrets.get(scope = "state_reporting", key = s"sql_server_host_$env")
val database = "StateReporting"
val instanceName = s"$env"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val url = s"jdbc:sqlserver://$host\\$instanceName;databaseName=$database;encrypt=true;trustServerCertificate=true"

// Function to truncate table
def truncateTable(tableName: String): Unit = {
  Try {
    val connection = DriverManager.getConnection(url, username, password)
    val statement: Statement = connection.createStatement()
    try {
      statement.executeUpdate(s"DELETE FROM $tableName")
      println(s"Table $tableName truncated successfully.")
    } finally {
      statement.close()
      connection.close()
    }
  } match {
    case Success(_) => println(s"Truncate table operation completed successfully.")
    case Failure(exception) =>
      println(s"Error truncating table: ${exception.getMessage}")
      throw new RuntimeException(s"Failed to truncate table: $tableName", exception)
  }
}

try {
  // Execute query to fetch data
  val df: DataFrame = spark.sql(sql_query)
    .select(
      col("customer_reporting_state_id").alias("CustomerReportingStateID"),
      col("customer_id").alias("CustomerID"),
      col("state_code").substr(0, 2).alias("StateCode"),
      col("active_status").cast("int").alias("ActiveStatus"),
      col("customer_status").cast("int").alias("CustomerStatus"),
      to_date(col("active_status_start_date")).alias("ActiveStatusStartDate"),
      to_date(col("active_status_end_date")).alias("ActiveStatusEndDate"),
      col("report_status_cd").substr(0, 30).alias("ReportStatusCD"),
      lit(null).cast("date").alias("FirstReportDate"),
      lit(null).cast("date").alias("StopReportDate"),
      to_date(col("install_date")).alias("InstallDate"),
      to_date(col("deinstall_date")).alias("DeInstallDate"),
      to_date(col("create_date")).alias("CreateDate"),
      col("create_user").substr(0, 50).alias("CreateUser"),
      to_date(col("modify_date")).alias("ModifyDate"),
      col("modify_user").substr(0, 50).alias("ModifyUser"),
      col("repeat_offender").substr(0, 1).alias("RepeatOffender"),
      to_date(col("offense_date")).alias("OffenseDate"),
      to_date(col("iid_start_date")).alias("IIDStartDate"),
      to_date(col("iid_end_date")).alias("IIDEndDate"),
      to_date(col("created_at")).alias("CreationDate")
    )
    .na.fill(Map("ActiveStatus" -> 0, "CustomerStatus" -> 0))

  if (df.isEmpty) {
    throw new RuntimeException("No rows to insert")
  }

  // Truncate table
  truncateTable(table_name)

  // Insert data into SQL Server table
  df.write
    .format("jdbc")
    .option("url", url)
    .option("dbtable", table_name)
    .option("driver", driver)
    .option("user", username)
    .option("password", password)
    .mode(SaveMode.Append)
    .save()

  println(s"Successfully inserted data into table $table_name")

} catch {
  case e: Exception =>
    println(s"Error processing DataFrame: ${e.getMessage}")
    throw e
}