#### In this project, we will use 3 API's: 
- Spark Sql API: interact with the dataframe as a Spark Sql Table (register the data location)
- Pyspark API: create a Spark Dataframe, write & read to a local disk, modify table, read of updated Delta Lake contents
- Delta Lake Python API: modify a table (upsert & merge)

In [1]:
pyspark_args_str = """
--packages "io.delta:delta-core_2.12:1.0.0"
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell
"""

In [2]:
pyspark_args_str

'\n--packages "io.delta:delta-core_2.12:1.0.0"\n--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"\n--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell\n'

In [3]:
import os
os.environ.get("PYSPARK_SUBMIT_ARGS")

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = pyspark_args_str

In [5]:
os.environ.get("PYSPARK_SUBMIT_ARGS")

'\n--packages "io.delta:delta-core_2.12:1.0.0"\n--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"\n--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell\n'

In [6]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .master("local[8]")
    .getOrCreate()
)

####  Create a DataFrame

In [26]:
kids = [
    {'name': 'Alice', 'age': 1},
    {'name': 'Berto', 'age': 7},
    {'name': 'Chen', 'age': 4},
    {'name': 'Dinesh', 'age': 6},
]

df = spark.createDataFrame(kids, schema="name STRING, age INT")

In [13]:
df.show()

+------+---+
|  name|age|
+------+---+
| Alice|  1|
| Berto|  7|
|  Chen|  4|
|Dinesh|  6|
+------+---+



#### Write data to a local disk

(
df.write
    .format("delta")
    .mode("overwrite")
    .option("path", "/home/jovyan/data/kids")
    .save()
)

#### Reading data from the local disk to verify

In [16]:
(
spark.read
    .format("delta")
    .option("path", "/home/jovyan/data/kids")
    .load()
    .show()
)

+------+---+
|  name|age|
+------+---+
|Dinesh|  6|
| Berto|  7|
| Alice|  1|
|  Chen|  4|
+------+---+



#### Creating a data location for the Spark Sql Table

In [17]:
spark.sql("""
  CREATE TABLE kids
  USING delta
  LOCATION '/home/jovyan/data/kids'
""")

DataFrame[]

In [18]:
spark.sql("DESCRIBE DETAIL kids").show()

+------+--------------------+------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|                  id|        name|description|            location|           createdAt|        lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|29bd423d-d7db-46b...|default.kids|       null|file:/home/jovyan...|2021-06-28 14:29:...|2021-06-28 14:29:...|              []|       5|       2978|        {}|               1|               2|
+------+--------------------+------------+-----------+--------------------+--------------------+--------------------+----------------+--------+-----------+----------+----------------+-------------

#### Read data from the table

spark.sql("SELECT * FROM kids").show()

#### Modifying the table using the Delta Lake Python API

In [20]:
update = spark.createDataFrame([
    {"name": "Berto", "age": 8},
    {"name": "Eva", "age": 0}
], schema="name STRING, age INT")

In [22]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col

#### Reference the location of the data and perform the update (upsert / merge)

In [23]:
deltaTable = DeltaTable.forPath(spark, "/home/jovyan/data/kids")

In [24]:
(
    deltaTable.alias("kids")
    .merge(
        update.alias("update"),
        "kids.name = update.name"
    )
    .whenMatchedUpdate(set={"age": col("update.age")})
    .whenNotMatchedInsert(values={"name": col("update.name"), "age": col("update.age")})
    .execute()
)

##### use Pyspark to read the contents of the updated Delta Table

In [25]:
spark.read.format("delta").option("path", "data/kids").load().show()
df = spark.read.format("delta").option("versionAsOf", 0).load("data/kids")
df.show()

+------+---+
|  name|age|
+------+---+
|Dinesh|  6|
| Berto|  8|
| Alice|  1|
|  Chen|  4|
|   Eva|  0|
+------+---+

+------+---+
|  name|age|
+------+---+
|Dinesh|  6|
| Berto|  7|
| Alice|  1|
|  Chen|  4|
+------+---+

