# Databricks Delta Batch Operations - Append

Databricks&reg; Delta allows you to read, write and query data in data lakes in an efficient manner.

## Datasets Used
We will use online retail datasets from
* `/mnt/training/online_retail` in the demo part and
* `/mnt/training/structured-streaming/events/` in the exercises

### Getting Started

Run the following cell to configure our "classroom."

In [3]:
%run ./Includes/Classroom-Setup

Set up relevant paths.

In [5]:
miniDataInputPath = "/mnt/training/online_retail/outdoor-products/outdoor-products-mini.csv"
genericDataPath = userhome + "/generic/customer-data/"
deltaDataPath = userhome + "/delta/customer-data/"
deltaIotPath = userhome + "/delta/iot-pipeline/"
print(deltaDataPath)
print(deltaIotPath)

Here, we add new data to the consumer product data.

Before we load data into non-Databricks Delta and Databricks Delta tables, do a simple pre-processing step:

* The column `StockCode` should be of type `String`.

In [7]:
from pyspark.sql.functions import col
newDataDF = (spark       
  .read                                              # Read a DataFrame from storage
  .option("inferSchema","true")                      # Infer schema
  .option("header","true")                           # File has a header
  .csv(miniDataInputPath)                                    # Path to file
  .withColumn("StockCode", col('StockCode').cast("String")) 
)

Do a simple count of number of new items to be added to production data.

In [9]:
newDataDF.count()

## APPEND Using Non-Databricks Delta pipeline
Append to the production table.

In the next cell, load the new data in `parquet` format and save to `../generic/customer-data/`.

In [11]:
(newDataDF
  .write
  .format("parquet")
  .partitionBy("Country")
  .mode("append")
  .save(genericDataPath)
)

We expect to see `65499 + 36 = 65535` rows, but we do not.

We may even see an error message.

In [13]:
%sql
SELECT count(*) FROM customer_data

count(1)
65518


-sandbox

Strange: we got a count we were not expecting!

This is the <b>schema on read</b> problem. It means that as soon as you put data into a data lake, 
the schema is unknown <i>until</i> you perform a read operation.

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Repair the table again and count the number of records.

In [15]:
%sql
MSCK REPAIR TABLE customer_data;

SELECT count(*) FROM customer_data

count(1)
65535


## APPEND Using Databricks Delta Pipeline

Next, repeat the process by writing to Databricks Delta format. 

In the next cell, load the new data in Databricks Delta format and save to `../delta/customer-data/`.

In [17]:
# Just in case it exists already.
dbutils.fs.rm(deltaDataPath, True)

In [18]:
(newDataDF
  .write
  .format("delta")
  .partitionBy("Country")
  .mode("append")
  .save(deltaDataPath)
)

Perform a simple `count` query to verify the number of records and notice it is correct.

Should be `65535`.

In [20]:
%sql
SELECT count(*) FROM customer_data_delta

count(1)
36


## Exercise 1

0. Read the JSON data under `streamingEventPath` into a DataFrame
0. Add a `date` column using `from_unixtime(col("time").cast('String'),'MM/dd/yyyy').cast("date"))`
0. Add a `deviceId` column consisting of random numbers from 0 to 99 using this expression `expr("cast(rand(5) * 100 as int)`
0. Use the `repartition` method to split the data into 200 partitions

Refer to  <a href="http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#" target="_blank">Pyspark function documentation</a>.

In [22]:
# TODO
from pyspark.sql.functions import from_unixtime, col, expr, to_date


streamingEventPath = "/mnt/training/structured-streaming/events/"
rawDataDF = ( 
  spark
  .read
  .option("inferSchema", "true")
  .json(streamingEventPath)
#  .withColumn("date", to_date(from_unixtime(col("time").cast('Long'), 'MM/dd/yyyy')))
  .withColumn("date", to_date(from_unixtime(col('time').cast('Long'),'yyyy-MM-dd')))
  .withColumn("deviceId", expr("cast(rand(5)*100 as int)"))
  .repartition(200)
)

display(rawDataDF)

action,time,date,deviceId
Open,1469593050,2016-07-27,91
Open,1469652257,2016-07-27,91
Open,1469531853,2016-07-26,71
Close,1469681650,2016-07-28,96
Close,1469624898,2016-07-27,44
Close,1469634211,2016-07-27,84
Close,1469601888,2016-07-27,57
Close,1469653015,2016-07-27,1
Close,1469572689,2016-07-26,75
Close,1469653480,2016-07-27,72


In [23]:
# TEST - Run this cell to test your solution.
from pyspark.sql.types import StructField, StructType, StringType, LongType, DateType, IntegerType

expectedSchema = StructType([
   StructField("action",StringType(), True),
   StructField("time",LongType(), True),
   StructField("date",DateType(), True),
   StructField("deviceId",IntegerType(), True),
])

dbTest("Delta-03-schemas", set(expectedSchema), set(rawDataDF.schema))

print("Tests passed!")

## Exercise 2

Write out the raw data in Databricks Delta format to `/delta/iot-pipeline/` and create a Databricks Delta table called `demo_iot_data_delta`.

Remember to
* partition by `date`
* save to `deltaIotPath`

In [25]:
# TODO
(rawDataDF
  .write
  .format("delta")
  .mode("overwrite")
  .partitionBy("date")
  .save(deltaIotPath)
)

spark.sql("""
    DROP TABLE IF EXISTS demo_iot_data_delta 
""")

spark.sql("""
    CREATE TABLE demo_iot_data_delta
    USING DELTA
    LOCATION '{}'
""".format(deltaIotPath))

In [26]:
# TEST - Run this cell to test your solution.
try:
  tableExists = (spark.table("demo_iot_data_delta") is not None)
except:
  tableExists = False
  
dbTest("Delta-03-tableExists", True, tableExists)  

print("Tests passed!")

## Exercise 3

Create a new DataFrame with columns `action`, `time`, `date` and `deviceId`. The columns contain the following data:

* `action` contains the value `Open`
* `time` contains the Unix time cast into a long integer `cast(1529091520 as bigint)`
* `date` contains `cast('2018-06-01' as date)`
* `deviceId` contains a random number from 0 to 499 given by `expr("cast(rand(5) * 500 as int)")`

In [28]:
# TODO
from pyspark.sql.functions import expr, col
from pyspark.sql.types import LongType

newDataDF = (
  spark
  .range(10000)
  .repartition(200)
  .selectExpr("'Open' as action", "cast(1529091520 as bigint) as time", "cast('2018-06-01' as date) as date")
  .withColumn("deviceId", expr("cast(rand(5) * 500 as int)"))
)

display(newDataDF)

action,time,date,deviceId
Open,1529091520,2018-06-01,43
Open,1529091520,2018-06-01,289
Open,1529091520,2018-06-01,348
Open,1529091520,2018-06-01,219
Open,1529091520,2018-06-01,247
Open,1529091520,2018-06-01,296
Open,1529091520,2018-06-01,289
Open,1529091520,2018-06-01,3
Open,1529091520,2018-06-01,3
Open,1529091520,2018-06-01,415


In [29]:
# TEST - Run this cell to test your solution.
total = newDataDF.count()

dbTest("Delta-03-newDataDF-count", 10000, total)

print("Tests passed!")

## Exercise 4

Append new data to `demo_iot_data_delta`.

* Use `append` mode
* Save to `deltaIotPath`

In [31]:
(newDataDF
  .write
  .format("delta")
  .partitionBy("date")
  .mode("append")
  .save(deltaIotPath)
)


In [32]:
# TEST - Run this cell to test your solution.
from pyspark.sql.types import Row
numFiles = spark.sql("SELECT count(*) as total FROM demo_iot_data_delta").collect()[0][0]

dbTest("Delta-03-numFiles", 110000 , numFiles)

print("Tests passed!")

## Summary
With Databricks Delta, you can easily append new data without schema-on-read issues.

## Review Questions
**Q:** What parameter do you need to add to an existing dataset in a Delta table?<br>
**A:** 
`df.write...mode("append").save("..")`

**Q:** What's the difference between `.mode("append")` and `.mode("overwrite")` ?<br>
**A:** `append` atomically adds new data to an existing Databricks Delta table and `overwrite` atomically replaces all of the data in a table.

**Q:** I've just repaired `myTable` using `MSCK REPAIR TABLE myTable;`
How do I verify that the repair worked ?<br>
**A:** `SELECT count(*) FROM myTable` and make sure the count is what I expected


  
**Q:** In exercise 2, why did we use `.withColumn(.. cast(rand(5) ..)` i.e. pass a seed to the `rand()` function ?<br>
**A:** In order to ensure we get the SAME set of pseudo-random numbers every time, on every cluster.

## Next Steps

Start the next lesson, [Upsert]($./04-Upsert).

## Additional Topics & Resources

* <a href="https://docs.azuredatabricks.net/delta/delta-batch.html" target="_blank">Table Batch Read and Writes</a>