## Notebook Configuration

In [None]:
print(spark.conf.get("spark.executor.instances"))
print(spark.conf.get("spark.executor.cores"))
print(spark.conf.get("spark.executor.memory"))

Note: If you request more number of vcores than  pool limit or available vcores in the pool, you will get an exception. Try reducing the numbers of vcores requested or increasing your pool size.

In [None]:
%%configure -f 
{
    "numExecutors": 4, 
    "executorCores": 4,
    "executorMemory": "28g"
}

In [None]:
print(spark.conf.get("spark.executor.instances"))
print(spark.conf.get("spark.executor.cores"))
print(spark.conf.get("spark.executor.memory"))

https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-autoscale#get-started

In [None]:
%%configure -f
    {
        "conf" : {
            "spark.dynamicAllocation.enable": "true",
            "spark.dynamicAllocation.minExecutors": "2",
            "spark.dynamicAllocation.maxExecutors" : "6"                        
     }
    }

## Prepare data

In [None]:
from azureml.opendatasets import NycTlcYellow

from datetime import datetime
from dateutil import parser
start_date = parser.parse('2010-01-01')
end_date = parser.parse('2010-02-28')

nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
nyc_tlc_df = nyc_tlc.to_spark_dataframe()

# nyc_tlc_df_clean = nyc_tlc_df.drop_duplicates()
nyc_tlc_df_clean = nyc_tlc_df.drop_duplicates().repartition(32)

## Converting Parquet to Delta

In [None]:
nyc_tlc_df_clean.write.mode("overwrite").format("parquet").save("/data/deltademo/parquettbl")
nyc_tlc_df_clean.write.mode("overwrite").format("parquet").partitionBy("puYear","puMonth").save("/data/deltademo/partitionedparquettbl")

In [None]:
from delta.tables import *
# Convert unpartitioned Parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/data/deltademo/parquettbl`")

# Convert partitioned parquet table at path '<path-to-table>' and partitioned by columns with data type, separated by comma
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/data/deltademo/partitionedparquettbl`", "puYear int, puMonth int")

In [None]:
%%sql
-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`/data/deltademo/parquettbl`;

-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by columns with data type, separated by comma
CONVERT TO DELTA parquet.`/data/deltademo/partitionedparquettbl` PARTITIONED BY (puYear int, puMonth int);

## Working with Dataframe

In [None]:
# Creating managed delta tables
# nyc_tlc_df_clean.write.format("delta").mode("overwrite").saveAsTable("deltademo.deltataxitrips")
# nyc_tlc_df_clean.write.mode("overwrite").format("delta").partitionBy("puYear","puMonth").saveAsTable("deltademo.deltapartitionedtaxitrips")

# Creating delta path/files
nyc_tlc_df_clean.write.mode("overwrite").format("delta").save("/data/deltademo/deltataxitrips")
nyc_tlc_df_clean.write.mode("overwrite").format("delta").partitionBy("puYear","puMonth").save("/data/deltademo/deltapartitionedtaxitrips")

In [None]:
%%sql
DROP DATABASE IF EXISTS deltademo CASCADE;

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS deltademo")
spark.sql("CREATE TABLE IF NOT EXISTS deltademo.deltataxitrips USING DELTA LOCATION '{0}'".format("/data/deltademo/deltataxitrips")) 
spark.sql("CREATE TABLE IF NOT EXISTS deltademo.deltapartitionedtaxitrips USING DELTA LOCATION '{0}'".format("/data/deltademo/deltapartitionedtaxitrips")) 

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS deltademo;

CREATE TABLE IF NOT EXISTS deltademo.deltataxitrips
USING DELTA
LOCATION '/data/deltademo/deltataxitrips';

CREATE TABLE IF NOT EXISTS deltademo.deltapartitionedtaxitrips
USING DELTA
LOCATION '/data/deltademo/deltapartitionedtaxitrips';

In [None]:
%%sql
DESCRIBE deltademo.deltataxitrips;
DESCRIBE DETAIL deltademo.deltataxitrips;

DESCRIBE deltademo.deltapartitionedtaxitrips;
DESCRIBE DETAIL deltademo.deltapartitionedtaxitrips;

## Data Merge

In [None]:
%%sql
SELECT puYear, puMonth, count(*)
FROM deltademo.deltapartitionedtaxitrips 
GROUP BY puYear, puMonth;

In [None]:
from azureml.opendatasets import NycTlcYellow
from datetime import datetime
from dateutil import parser
start_date = parser.parse('2010-02-01')
end_date = parser.parse('2010-03-31')
nyc_tlc_incr = NycTlcYellow(start_date=start_date, end_date=end_date)
nyc_tlc_incr_df = nyc_tlc_incr.to_spark_dataframe()

nyc_tlc_incr_df_clean = nyc_tlc_incr_df.drop_duplicates().repartition(32)
nyc_tlc_incr_df_clean.createOrReplaceTempView("deltaincrementaltaxitrips")

In [None]:
%%sql
MERGE INTO deltademo.deltapartitionedtaxitrips target
USING deltaincrementaltaxitrips source
ON 
  source.vendorID = target.vendorID AND source.tpepPickupDateTime = target.tpepPickupDateTime 
  AND source.tpepDropoffDateTime = target.tpepDropoffDateTime AND source.startLat = target.startLat 
  AND source.startLon = target.startLon AND source.endLat = target.endLat AND source.endLon = target.endLon 
  AND source.passengerCount = target.passengerCount AND source.tripDistance = target.tripDistance 
  AND source.rateCodeId = target.rateCodeId AND source.paymentType = target.paymentType 
  AND source.totalAmount = target.totalAmount
  AND target.puYear IN (2010) AND target.puMonth IN (2,3) -- Partition Pruning 
WHEN MATCHED THEN
  UPDATE SET 
    target.puLocationId = source.puLocationId
    , target.doLocationId = source.doLocationId
    , target.storeAndFwdFlag = source.storeAndFwdFlag
    , target.fareAmount = source.fareAmount
    , target.extra = source.extra
    , target.mtaTax = source.mtaTax
    , target.improvementSurcharge = source.improvementSurcharge
    , target.tipAmount = source.tipAmount
    , target.tollsAmount = source.tollsAmount
WHEN NOT MATCHED
  THEN INSERT (
    target.vendorID, target.tpepPickupDateTime, target.tpepDropoffDateTime, target.passengerCount, target.tripDistance,
    target.puLocationId, target.doLocationId, target.startLon, target.startLat, target.endLon, target.endLat, target.rateCodeId,
    target.storeAndFwdFlag, target.paymentType, target.fareAmount, target.extra, target.mtaTax, target.improvementSurcharge, target.tipAmount,
    target.tollsAmount, target.totalAmount, target.puYear, target.puMonth)   
 VALUES (
   source.vendorID, source.tpepPickupDateTime, source.tpepDropoffDateTime, source.passengerCount, source.tripDistance,
   source.puLocationId, source.doLocationId, source.startLon, source.startLat, source.endLon, source.endLat, source.rateCodeId, 
   source.storeAndFwdFlag, source.paymentType, source.fareAmount, source.extra, source.mtaTax, source.improvementSurcharge, source.tipAmount, 
   source.tollsAmount, source.totalAmount, source.puYear, source.puMonth)

In [None]:
%%sql
SELECT puYear, puMonth, count(*)
FROM deltademo.deltapartitionedtaxitrips 
GROUP BY puYear, puMonth
ORDER BY puYear, puMonth;

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()

## Time Travel - History of changes

In [None]:
%%sql
DELETE FROM deltademo.deltapartitionedtaxitrips
WHERE puYear = 2010 AND puMonth = 1;

In [None]:
%%sql
SELECT puYear, puMonth, count(*)
FROM deltademo.deltapartitionedtaxitrips 
GROUP BY puYear, puMonth
ORDER BY puYear, puMonth;

In [None]:
%%sql
--DESCRIBE HISTORY deltademo.deltapartitionedtaxitrips; -- LIMIT 1;  -- get the last operation only
DESCRIBE HISTORY delta.`/data/deltademo/deltapartitionedtaxitrips`; 

In [None]:
from delta.tables import *
deltapartitionedtaxitrips = DeltaTable.forPath(spark, '/data/deltademo/deltapartitionedtaxitrips')
display(deltapartitionedtaxitrips.history())

In [None]:
deltapartitionedtaxitrips = spark.read.format("delta").option("versionAsOf", 1).load("/data/deltademo/deltapartitionedtaxitrips")  
display(deltapartitionedtaxitrips.groupBy("puYear","puMonth").count())

In [None]:
deltapartitionedtaxitrips = spark.read.format("delta").option("timestampAsOf", '2021-11-19 20:25:55.87').load("/data/deltademo/deltapartitionedtaxitrips")  
display(deltapartitionedtaxitrips.groupBy("puYear","puMonth").count())

## VACCUM - Maintaining history of past data

In [None]:
%%sql

-- vacuum files not required by versions older than the default retention period
VACUUM deltademo.deltapartitionedtaxitrips;

-- vacuum files in path-based table
VACUUM '/data/deltademo/deltapartitionedtaxitrips'; 
VACUUM delta.`/data/deltademo/deltapartitionedtaxitrips`;

-- vacuum files not required by versions more than 720 hours (30 days) old
VACUUM delta.`/data/deltademo/deltapartitionedtaxitrips` RETAIN 720 HOURS;

 -- do dry run to get the list of files to be deleted
VACUUM deltademo.deltapartitionedtaxitrips DRY RUN;

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/data/deltademo/deltapartitionedtaxitrips')  # path-based tables, or
deltaTable = DeltaTable.forName(spark, 'deltademo.deltapartitionedtaxitrips')    # Hive metastore-based tables

deltaTable.vacuum(720)     # vacuum files not required by versions more than 720 hours (30 days) old

deltaTable.vacuum()        # vacuum files not required by versions older than the default retention period

## File Compaction

In [None]:
path = "/data/deltademo/deltapartitionedtaxitrips"
partition = "puYear = '2010' and puMonth = '3'"
numFilesPerPartition = 16

(spark.read
 .format("delta")
 .load(path)
 .where(partition)
 .repartition(numFilesPerPartition)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .option("replaceWhere", partition)
 .save(path))

## Convert a Delta table to a Parquet table

You can easily convert a Delta table back to a Parquet table using the following steps:

If you have performed Delta Lake operations that can change the data files (for example, delete or merge), run vacuum with retention of 0 hours to delete all data files that do not belong to the latest version of the table.
Delete the _delta_log directory in the table directory.

**NOTE: **Delta Lake has a safety check to prevent you from running a dangerous vacuum command. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Apache Spark configuration property spark.databricks.delta.retentionDurationCheck.enabled to false. You must choose an interval that is longer than the longest running concurrent transaction and the longest period that any stream can lag behind the most recent update to the table.



In [None]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, '/data/deltademo/deltapartitionedtaxitrips')  # path-based tables, or
deltaTable = DeltaTable.forName(spark, 'deltademo.deltapartitionedtaxitrips')    # Hive metastore-based tables

deltaTable.vacuum(0)        # vacuum files not required by versions older than the default retention period

In [None]:
spark.conf.get("spark.databricks.delta.retentionDurationCheck.enabled")

In [None]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")