# Upsert into a Delta Table

In [None]:
from utils.spark import get_spark

from pyspark.sql import functions as F
from delta import DeltaTable

In [None]:
spark = get_spark()

## Set up a Delta Table

Let's set up a simple delta table from scratch.
We will cheat a bit by first creating a dataframe with data and then let Spark guess the schema.
However, for the delta-table creation, we only use the schema, the table itself is empty.

In [None]:
df = spark.createDataFrame([
    {"brand": "audi", "color": "green", "amount": 300000},
    {"brand": "audi", "color": "blue", "amount": 4000},
    {"brand": "vw", "color": "red", "amount": 9000},
    {"brand": "vw", "color": "green", "amount": 12000},
])

In [None]:
df.show()

In [None]:
df.schema

In [None]:
!rm -rf /data/vehicle-colors

In [None]:
deltaTable = (DeltaTable
                .createIfNotExists(spark)
                .location("/data/vehicle-colors")
                .addColumns(df.schema)
                .execute())

In [None]:
!ls /data/vehicle-colors

# Upsert data

We will now merge data (also "upsert" for update or insert). 
As the table is empty before, we will simply insert the data.

In [None]:
 deltaTable.alias("dt").merge(
        df.alias('new'),
        "dt.brand = new.brand and dt.color = new.color") \
        .whenNotMatchedInsertAll().execute()

In [None]:
deltaTable.toDF().show()

Let's now make a plot where we aggregate the amounts per brand.

In [None]:
deltaTable.toDF().groupBy("brand").agg(F.sum("amount").alias("total")).toPandas().plot(kind="bar", x="brand", y="total");

Here we only insert new data. This means the amount of blue cars will not be updated.

In [None]:
new_df = spark.createDataFrame([
    {"brand": "audi", "color": "pink", "amount": 42},
    {"brand": "audi", "color": "green", "amount": 3000},
])

In [None]:
deltaTable.alias("dt").merge(
        new_df.alias('new'),
        "dt.brand = new.brand and dt.color = new.color") \
        .whenNotMatchedInsert(values={
            "dt.brand": "new.brand",
            "dt.color": "new.color",
            "dt.amount": "new.amount"
        }
    ).execute()

In [None]:
deltaTable.toDF().show()

In the next request we also specify that we want to update rows:

In [None]:
deltaTable.alias("dt").merge(
        new_df.alias('new'),
        "dt.brand = new.brand and dt.color = new.color") \
        .whenNotMatchedInsert(values={
            "dt.brand": "new.brand",
            "dt.color": "new.color",
            "dt.amount": "new.amount"
        }) \
        .whenMatchedUpdate(set={
            "dt.amount": "new.amount"
        }) \
        .execute()

In [None]:
deltaTable.toDF().show()

In [None]:
deltaTable.toDF().groupBy("brand").agg(F.sum("amount").alias("total")).toPandas().plot(kind="bar", x="brand", y="total");

We are happy with our new plot... but the customer says, yesterday there were more Audis.
So it would be cool to go back in history, reproduce yesterdays plot and see whether she is right:

You can view the history of the delta table with the following command: it is just a normal dataframe:

In [None]:
deltaTable.history().orderBy('version').toPandas()

In [None]:
old_df = spark.read.format("delta").option("versionAsOf", 1).load("/data/vehicle-colors")

In [None]:
old_df.groupBy("brand").agg(F.sum("amount").alias("total")).toPandas().plot(kind="bar", x="brand", y="total");