In [None]:
%%spark
//Imports
    import org.apache.spark.sql.delta.DeltaLog
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.Path
    import org.apache.parquet.hadoop.ParquetFileReader
    import org.apache.parquet.hadoop.metadata.ParquetMetadata
    import org.apache.parquet.hadoop.metadata.FileMetaData
    import org.apache.parquet.hadoop.metadata.BlockMetaData
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType}
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.sql.DataFrame
    import java.time.LocalDateTime
    import org.apache.spark.sql.functions.lit


/*****************************************************************************************************
    Instructions:
    - Connect Notebook to Lakehouse (does not have to be in same Workspace)
    - Update deltaTable parameter immedately below instructions with name of table to analyze
    - Review other parameters (append vs overwrite) 
    - Run
    - Review four output tables that have "zz_DeltaAnalyzerOutput"

    zz_DeltaAnalyzerOutput_parquetFiles
        This table has one row per Parquet file
        Ideally, there should not be thousands of these
        This table only uses parquet file metadata and should be quick to populate

    zz_DeltaAnalyzerOutput_rowRowgroups
        This table has one row per rowgroup and shows rowgroups for every parquet file
        Look for the number of rows per rowgroup.  Ideally this should be 1M to 16M rows (higher the better)
        This table only uses parquet file metadata and should be quick to populate

    zz_DeltaAnalyzerOutput_columnChunks
        One row per column/chunk within rowgroups
        Large number of output and has much more detail about dictionaries and compression
        This table only uses parquet file metadata and should be quick to populate
    
    zz_DeltaAnalyzerOutput_columns
        One row per column of the table
        Look to see how many unique values per column.  If using floating point, consider modifying parquet file to use DECIMAL(17,4)
        This table runs a compute query against the Detla table so may take time depending on size of Delta table

    Run 
    %%sql
    OPTIMIZE tablename vorder 


    Footnote:
        Useful doc
        https://www.javadoc.io/doc/org.apache.parquet/parquet-hadoop/latest/index.html

*****************************************************************************************************/

//Parameters
val deltaTable: String = "fact_myevents_2bln"   

val timeStamp = LocalDateTime.now().toString
val overwriteOrAppend: String = "Append"     // "Append" or "Overwrite"
var printToScreen: Boolean = false              // true or false

//Code

    val LakehouseID: String = "dc8e7126-8b8d-4065-a2c8-5d266e25e296" // sc.hadoopConfiguration.get("trident.lakehouse.id")
    val defaultFS: String = sc.hadoopConfiguration.get("fs.defaultFS")
    val deltaTablePath: String = s"${defaultFS}/${LakehouseID}/Tables/${deltaTable}/"

    val deltaLog: DeltaLog = DeltaLog.forTable(spark, deltaTablePath)
    val formatter = java.text.NumberFormat.getInstance

    deltaLog.update()
    println(s"Files: ${deltaLog.snapshot.allFiles.count}")

    var parquetFiles: ListBuffer[(String,String,String,String,String,Long,String)] = ListBuffer()
    var rowGroups: ListBuffer[(String,String,String,Int,Int,Long,Long,Long,Float)] = ListBuffer()
    var columnChunks: ListBuffer[(String,String,String,Int,String,String,String,String,Long,Long,Long,String,Long,String)] = ListBuffer()

    val totalNumberRowGroups: Int = deltaLog.snapshot.allFiles.collect().map{ file => 
        val path: String = file.path    
        val configuration = new Configuration()

        val parquetFileReader: ParquetFileReader = ParquetFileReader.open(configuration, new Path(deltaTablePath + path))

        val rowGroupMeta = parquetFileReader.getRowGroups() //java.util.ArrayList
        val parquetFileMetaData: FileMetaData = parquetFileReader.getFileMetaData()
        val parquetFileMetaDataKeyValues = parquetFileMetaData.getKeyValueMetaData() // Array

        val rowGroupSize: Int = rowGroupMeta.size

        parquetFiles.append(
            (
                deltaTable ,
                timeStamp,
                path , 
                parquetFileMetaDataKeyValues.get("com.microsoft.parquet.vorder.enabled"),
                parquetFileMetaDataKeyValues.get("com.microsoft.parquet.vorder.level"),
                rowGroupSize,
                parquetFileMetaData.getCreatedBy
            )
        )

        for(rowGroupNumber <- 0 to rowGroupSize -1)
        {
            val rowGroup: BlockMetaData = rowGroupMeta.get(rowGroupNumber)

            //println(rowGroup.getColumns.size)
            //val rowGroupColumns: List[org.apache.parquet.hadoop.metadata.ColumnChunkMetaData] = rowGroup.getColumns
            val rowGroupColumns = rowGroup.getColumns

            for(columnChunkID <- 0 to rowGroupColumns.size-1)
                {
                val columnStat = rowGroupColumns.get(columnChunkID)

                columnChunks.append(
                    (
                        deltaTable,
                        timeStamp,
                        path,
                        rowGroupNumber,
                        columnStat.getPath().toString,
                        columnStat.getCodec().toString,
                        columnStat.getPrimitiveType().toString,
                        columnStat.getStatistics().toString().replace("\"","'"), // need to convert double quotes to single quotes for Power BI
                        columnStat.getTotalSize(),
                        columnStat.getTotalUncompressedSize(),
                        columnStat.getValueCount(),
                        columnStat.hasDictionaryPage().toString,
                        columnStat.getDictionaryPageOffset(),
                        columnStat.getEncodings().toString                      
                    )
                )
            }

            val compressedPercent: Double = rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize * 100
            rowGroups.append(
                (
                    deltaTable,
                    timeStamp,
                    path, 
                    rowGroupNumber,
                    rowGroupSize ,
                    rowGroup.getRowCount,
                    rowGroup.getCompressedSize,
                    rowGroup.getTotalByteSize,
                    rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize                
                )
            )
        }
        rowGroupSize
    }.sum

    //println("Total number of row groups: " + totalNumberRowGroups)

    // Column Names cannot have spaces!!
    var parquetFilesDF: DataFrame = parquetFiles.toDF("TableName","Timestamp","Filename","vorder_enabled","vorder_level","RowGroups","CreatedBy")
    var rowGroupsDF: DataFrame = rowGroups.toDF("TableName","Timestamp","Filename","RowGroupID","TotalFileRowGroups","Rowcount","CompressedSize","UncompressedSize","CompressionRatio")
    val columnChunksDF: DataFrame = columnChunks.toDF("TableName","Timestamp","Filename","ColumnChunkID","Path","Codec","PrimativeType","Statistics","TotalSize","TotalUncompressedSize","ValueCount","HasDict","DictOffset","Encodings"   )

    /********************************************************************************************************************
        Create DF for one row per column
    ********************************************************************************************************************/

        var columns: ListBuffer[(String,String,String,Long,String,Long,Long,Long,Long,Double,Double)] = ListBuffer()
        val columnList: Array[String] = spark.table(deltaTable).columns

        val totalRows = rowGroupsDF.agg(sum("Rowcount")).first().getLong(0)
        val tableSize = rowGroupsDF.agg(sum("CompressedSize")).first().getLong(0)
        val totalRowGroups = parquetFilesDF.agg(sum("RowGroups")).first().getLong(0)

        if(totalRows<=1000000)
        {
            /* Using single query for all columns */
            var sql: String = "select 1 as dummy"
            for(column <- columnList) 
                {
                sql+=(s", count(distinct ${column}) as ${column}")
                }
            sql +=s" from ${deltaTable}"
            val distinctDFblock: DataFrame = spark.sql(sql)
        
            for(column <- columnList) 
            {
                val filtercondition: String = s"Path = '[${column.toString}]'"
                val distinctCount = distinctDFblock.select(col(column).cast("long")).first().getLong(0)
                val primativeType= columnChunksDF.filter(filtercondition).select(col("PrimativeType")).first().getString(0)
                val columnSize = columnChunksDF.filter(filtercondition).agg(sum("TotalSize")).first().getLong(0)
                val columnSizeUncompressed = columnChunksDF.filter(filtercondition).agg(sum("TotalUncompressedSize")).first().getLong(0)

                columns.append(
                        (
                        deltaTable,
                        timeStamp,
                        column ,
                        distinctCount,
                        primativeType,
                        columnSize,
                        columnSizeUncompressed,
                        totalRows ,
                        tableSize ,
                        distinctCount.toDouble/totalRows*100.0,
                        columnSize.toDouble/tableSize*100.0
                        )
                    )
            }

        } else {
             var NotTheseColumns = List("requestPathUri","clientRequestId","fileSystemID")
            for(column <- columnList) 
                {
                val filtercondition: String = s"Path = '[${column.toString}]'"

                val primativeType= columnChunksDF.filter(filtercondition).select(col("PrimativeType")).first().getString(0)
                val columnSize = columnChunksDF.filter(filtercondition).agg(sum("TotalSize")).first().getLong(0)
                val columnSizeUncompressed = columnChunksDF.filter(filtercondition).agg(sum("TotalUncompressedSize")).first().getLong(0)


                var distinctCount: Long = 0;
                if(!NotTheseColumns.contains(column))
                {
                    var sql:String = ""

                   if(totalRows<10000000)
                    {
                        println("Running Precise DCOUNT on " + column) 
                        sql = s"select count(distinct ${column}) as ${column} from ${deltaTable}"
                        val distinctDF: DataFrame  = spark.sql(sql)
                        distinctCount = distinctDF.select(col(column).cast("long")).first().getLong(0)
                    } else {
                        println("Running Approx DCOUNT on " + column) 
                        sql = s"select approx_count_distinct(${column}) as ${column} from ${deltaTable}"
                        val distinctDF: DataFrame  = spark.sql(sql)
                        distinctCount = distinctDF.select(col(column).cast("long")).first().getLong(0)       
                    }

    
                } else {
                    println("Skipping " + column) 
                }

                columns.append(
                        (
                        deltaTable,
                        timeStamp,
                        column ,
                        distinctCount,
                        primativeType,
                        columnSize,
                        columnSizeUncompressed,
                        totalRows ,
                        tableSize ,
                        distinctCount.toDouble/totalRows*100.0,
                        columnSize.toDouble/tableSize*100.0
                        )
                    )
                }
             }
        val columnsDF: DataFrame = columns.toDF("TableName","Timestamp","ColumnName","DistinctCount","PrimitiveType","ColumnSize","ColumnSizeUncompressed","TotalRows","TableSize","CardinalityOfTotalRows","SizePercentOfTable")

    parquetFilesDF = parquetFilesDF
                    .withColumn("TotalTableRows",lit(totalRows))
                    .withColumn("TotalTableRowGroups",lit(totalRowGroups))
    rowGroupsDF = rowGroupsDF
                    .withColumn("TotalTableRows",lit(totalRows))
                    .withColumn("RatioOfTotalTableRows", col("Rowcount")*100.0 / lit(totalRows))
                    .withColumn("TotalTableRowGroups",lit(totalRowGroups))


   // Display Dataframes to screen
        //if(printToScreen)
            display(parquetFilesDF)
            display(rowGroupsDF)
            display(columnChunksDF)
            display(columnsDF)

    // Save Dataframe to Lakehouse files
    parquetFilesDF
        .write
        .mode(overwriteOrAppend)
        .option("header", "true")
        .option("overwriteSchema", "true")
        .format("delta")
        .save("./Tables/zz_DeltaAnalyzerOutput_parquetFiles")

    rowGroupsDF
        .write
        .mode(overwriteOrAppend)
        .option("header", "true")
        .option("overwriteSchema", "true")
        .format("delta")
        .save("./Tables/zz_DeltaAnalyzerOutput_rowGroups")

    columnChunksDF
        .write
        .mode(overwriteOrAppend)
        .option("header", "true")
        .option("overwriteSchema", "true")
        .format("delta")
        .save("./Tables/zz_DeltaAnalyzerOutput_columnChunks")

    columnsDF
        .write
        .mode(overwriteOrAppend)
        .option("header", "true")
        .option("overwriteSchema", "true")
        .format("delta")
        .save("./Tables/zz_DeltaAnalyzerOutput_columns")
