## Spark Hive Table CodeGen to SQL OD
- This will import two datasets from Azure Open Datasets. Taxi data and US Population by County
- A Hive table will be created for each of the imported datasets.  Most customers overlay a Hive table on their data lake assets
    - The taxi dataset will be partitioned by Year and Month so we can see how this is code generated for partitions
    - The population dataset will not be partitioned
    - The Hive table must also be created as "STORED AS PARQUET"
- From the Hive table metadata a script will run that will code generate the SQL OD CREATE VIEW statements

### Steps to run
- Import this notebook into Synapse
- In Cells 4, 5, 10 and 11 you need to change the abfs path "abfss://" to point to your storage

### Creating the SQL OD views
- Copy the generated code from Cell 13 and click on Develop hub and then create a new SQL Script.
    - NOTE: You do not need to copy the first line "dfTable: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [database: string, tableName: string ... 1 more field]"
- Paste the code
- You should create a new SQL OD database ```CREATE DATABASE DataLake```  (You can name this what you like)
- Run the code on a SQL OD database (DataLake) 
- Run the SELECT TOP 10 * statements to verify thing work
- You can now call the SQL OD view from any tool that supports SQL Server (Excel, Java, Python, .NET, PowerBI, etc.)

### Hive metastore sync
- Also note, if you navigate to the Data hub and the click on Workspace and filter the databases to "default (Spark)" you will also see the tables.  You can technically query this table with SQL OD.  Your Hive metastore is kept in sync from Spark and SQL OD (only for tables that are STORED AS PARQUET).  The reason you might not want to use this metastore is that strings are all VARCHAR(MAX) and you have less control of the datatypes.  Using the smallest datatype possible is always best.

## Taxi Data (Partitioned)


In [None]:
%%spark
// Azure Open Datasets.  Read in NYC Green taxi data.
val wasbs_path = "wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/green"
spark.conf.set("fs.azure.sas.nyctlc.azureopendatastorage.blob.core.windows.net","")
val df = spark.read.parquet(wasbs_path)
display(df)

In [None]:
%%spark
// Write the data locally, this will save the data in the partitioned format
df.write.partitionBy("puYear", "puMonth").save("abfss://data-lake@paternostrosynapse.dfs.core.windows.net/nyx_taxi_green")

In [None]:
%%sql
-- Create an external table on your data lake.  Note, we store as PARQUET (Spark format)
CREATE EXTERNAL TABLE nyx_taxi_green 
    (doLocationId STRING, dropoffLatitude DOUBLE, dropoffLongitude DOUBLE, extra DOUBLE, fareAmount DOUBLE, 
    improvementSurcharge STRING, lpepDropoffDatetime TIMESTAMP, lpepPickupDatetime TIMESTAMP, mtaTax DOUBLE,
    passengerCount INT, paymentType INT, pickupLatitude DOUBLE, pickupLongitude DOUBLE, puLocationId STRING, 
    rateCodeID INT, storeAndFwdFlag STRING, tipAmount DOUBLE, tollsAmount DOUBLE, 
    totalAmount DOUBLE, tripDistance DOUBLE, tripType INT, vendorID INT)
PARTITIONED BY (puYear INT, puMonth INT)
STORED AS PARQUET
LOCATION 'abfss://data-lake@paternostrosynapse.dfs.core.windows.net/nyx_taxi_green/'

-- NOTE: 
-- If you get this error "The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx-wx---;", you need to grant "other" access to /tmp to correct.
-- Open your data lake and find the folder /tmp and grant read/write/execute to other.  Set is as Access and Default.
-- Do the same for the /tmp/hive folder.

In [None]:
%%sql
-- Since the data was not inserted into the talbe with an INSERT..INTO, Hive needs to refresh its partitions from storage
MSCK REPAIR TABLE nyx_taxi_green

In [None]:
%%sql
-- Make sure it works!
SELECT *
  FROM nyx_taxi_green
 LIMIT 10

## Population Data (Non-Partitioned)

In [None]:
%%spark
// Azure Open Datasets.  Read in NYC Green taxi data.
val wasbs_path = "wasbs://censusdatacontainer@azureopendatastorage.blob.core.windows.net/release/us_population_county/"
spark.conf.set("fs.azure.sas.censusdatacontainer.azureopendatastorage.blob.core.windows.net","")
val df = spark.read.parquet(wasbs_path)
display(df)

In [None]:
%%spark
// Write the data locally (no partitioning)
df.write.save("abfss://data-lake@paternostrosynapse.dfs.core.windows.net/us_population_by_county")

In [None]:
%%sql
-- Create an external table on your data lake.  Note, we store as PARQUET (Spark format)
CREATE EXTERNAL TABLE us_population_by_county 
    (decennialTime STRING, stateName STRING, countyName STRING, population INT, race STRING, sex STRING, minAge INT, maxAge INT, year INT)
STORED AS PARQUET
LOCATION 'abfss://data-lake@paternostrosynapse.dfs.core.windows.net/us_population_by_county/'

-- NOTE: 
-- If you get this error "The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwx-wx---;", you need to grant "other" access to /tmp to correct.
-- Open your data lake and find the folder /tmp and grant read/write/execute to other.  Set is as Access and Default.
-- Do the same for the /tmp/hive folder.

In [None]:
%%sql
-- Make sure it works!
SELECT *
  FROM us_population_by_county
 LIMIT 10

In [None]:
%%spark

val dfTable = spark.sql("SHOW TABLES").filter("tableName == 'nyx_taxi_green' OR tableName == 'us_population_by_county'") // you can run for all tables, but better to test with a few first
  
for (row <- dfTable.collect()) 
{   
    var sqlOD = new StringBuilder(); 
    var sqlODPartitionSelect = new StringBuilder(); 
    var sqlODPartitionLocation = new StringBuilder(); 
    var sqlODRemovePartitionColFromTable = new StringBuilder(); 
    sqlOD.clear()
    sqlODPartitionSelect.clear()
    sqlODPartitionLocation.clear()
    sqlODRemovePartitionColFromTable.clear()

    var database = row.mkString(",").split(",")(0)
    var tableName = row.mkString(",").split(",")(1)

    val location = spark.sql("DESCRIBE EXTENDED " + tableName).filter("col_name == 'Location'").select($"data_type")
    
    sqlOD.append("-----------------------------------------------------------------------------------------------\n")
    sqlOD.append("-- " + tableName + "\n")
    sqlOD.append("-----------------------------------------------------------------------------------------------\n")
    sqlOD.append("IF EXISTS (SELECT * FROM sys.objects WHERE name ='" + tableName + "')\n")
    sqlOD.append("  BEGIN\n")
    sqlOD.append("  DROP VIEW " + tableName + ";\n")
    sqlOD.append("  END\n")
    sqlOD.append("GO\n\n")
    sqlOD.append("CREATE VIEW " + tableName + " AS\n")
    sqlOD.append("SELECT\n")
    sqlOD.append("REPLACE_ME_sqlODPartitionSelect")
    sqlOD.append("  *\n")
    sqlOD.append("FROM OPENROWSET(\n")
    sqlOD.append("   BULK '")
    for (loc <- location.collect())
    {
        var abfsPath = loc.mkString(",").split(",")(0)
        sqlOD.append(abfsPath)
        sqlOD.append("REPLACE_ME_sqlODPartitionLocation")
    }
    sqlOD.append("/*.parquet',\n")
    sqlOD.append("   FORMAT='PARQUET'\n")
    sqlOD.append(")\n")
    sqlOD.append("WITH (\n")

    val cols = spark.sql("DESCRIBE " + tableName)
    var i = 1;
    var partitionIndex = 1;
    var numberOfColumns = cols.collect().length
    var processingPartitions = false

    for (col <- cols.collect())
    {
        var colString = col.mkString("|")
        //println(colString)
        var colSplit = colString.split('|')
        //colSplit.foreach(println) 
        var col_name = colSplit(0)
        var data_type = ""
        try
        {
            data_type = colSplit(1)
        }
        catch {
            case e: Exception => var void = ""
        }

        // Map data types (MORE WORK needs to be done here, this is just a few mappings)
        if (data_type == "string")
        {
            data_type = "VARCHAR(255)"    // NOTE: This is just a fixed number that was picked since it covered most of the string sizes.  
        }
        if (data_type == "double")
        {
            data_type = "FLOAT"
        }
         if (data_type == "timestamp")
        {
            data_type = "DATETIME2"
        }      

        if (col_name == "# Partition Information" || col_name== "# col_name")
        {
            processingPartitions = true;
        }

        if (processingPartitions == true && col_name != "# Partition Information" && col_name != "# col_name")
        {
            sqlODPartitionSelect.append("   CAST(r.filepath(" + partitionIndex.toString() + ") AS " + data_type + ") AS " + col_name + ",\n")
            sqlODPartitionLocation.append("/" + col_name + "=*")
            partitionIndex = partitionIndex + 1 
            sqlODRemovePartitionColFromTable.append("  " + col_name + " " + data_type).append(",\n")
       }
        
        if (processingPartitions == false)
        {
            sqlOD.append("  " + col_name + " " + data_type).append(",\n")
        }
        i = i + 1
    }
          
    sqlOD.append(") AS [r];\n")
    sqlOD.append("GO\n\n")
    sqlOD.append("-- SELECT TOP 10 * FROM " + tableName + ";\n")
    sqlOD.append("-----------------------------------------------------------------------------------------------\n")
    sqlOD.append("\n")
    sqlOD.append("\n")

    println(sqlOD.toString()
    .replace("REPLACE_ME_sqlODPartitionSelect",sqlODPartitionSelect.toString())
    .replace("REPLACE_ME_sqlODPartitionLocation",sqlODPartitionLocation.toString())
    .replace(sqlODRemovePartitionColFromTable.toString(),"") // remove partition columns from WITH statement
    .replace(",\n) AS [r];","\n) AS [r];") // remove the trailing comma from the WITH statement
    )
}