<a href="https://colab.research.google.com/github/egnsuresh/Spark_Practice/blob/master/handling_updates_and_inserts_(delta_load)_using_delta_lake_(third_party)_API_in_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



---



---



---



# 1. PySpark installation , spark session object creation and importing common functions.

---

---





In [None]:
!pip install pyspark==3.0.0
!pip install delta-spark==3.0.0

In [2]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


# 2. Sample data creation & Data profiling

---


---




In [22]:

emp_df = spark.createDataFrame([(101,'Alice',10000), (102,'Bob', 20000)]
                               , ["emp_id","name", "sal"])
emp_df.show()
emp_df.printSchema()

emp_df.write.format("delta").mode("overwrite").saveAsTable("emp_target_table")

emp_df2 = spark.createDataFrame([(101,'Alice',15000), (103,'Cob',50000)]
                                , ["emp_id","name", "sal"])

emp_df2.show()
emp_df2.printSchema()

emp_df2.write.format("delta").mode("overwrite").saveAsTable("emp_delta_table")


+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|10000|
|   102|  Bob|20000|
+------+-----+-----+

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)

+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|15000|
|   103|  Cob|50000|
+------+-----+-----+

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)



# 3. handling updates and inserts (delta load) using delta lake (third-party) API

---



---



<table>
<tr>
<td>
<pre>
+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|10000|
|   102|  Bob|20000|
+------+-----+-----+

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)

+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|15000|
|   103|  Cob|50000|
+------+-----+-----+

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
</pre>
</td>

  <td>
  <pre>
+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|15000|
|   103|  Cob|50000|
|   102|  Bob|20000|
+------+-----+-----+

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
</pre></td>

</tr></table>

  **updat existing records and insert new records into target table**


# The point

In [20]:

spark.sql("""

MERGE INTO emp_target_table

USING emp_delta_table

ON emp_target_table.emp_id = emp_delta_table.emp_id


WHEN MATCHED THEN

  UPDATE SET *

WHEN NOT MATCHED
  THEN INSERT *""")

spark.sql("select * from emp_target_table").show()


+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|15000|
|   103|  Cob|50000|
|   102|  Bob|20000|
+------+-----+-----+



# The explanation

In [21]:
spark.sql("""

--where to write final results/which table is base/main or table with old data
MERGE INTO emp_target_table

--delta table/table with latest data/temp table
USING emp_delta_table

--condition to compare two tables / one or more columns to compare
ON emp_target_table.emp_id = emp_delta_table.emp_id

--update all columns with latest data
--if record exists, when we compared data with above ON condition
WHEN MATCHED THEN
  UPDATE SET *

--Insert new records(from new table/delta) into old table(target),
--if no match found from base table/main table
WHEN NOT MATCHED
  THEN INSERT *""")

final_df=spark.sql("select * from emp_target_table")
final_df.show()
final_df.printSchema()

+------+-----+-----+
|emp_id| name|  sal|
+------+-----+-----+
|   101|Alice|15000|
|   103|  Cob|50000|
|   102|  Bob|20000|
+------+-----+-----+

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)



# The explanation

<table style="background:red; width:85%; border:1px solid black;">
<tr style="color:red">
  <td>1.what have u(input)</td>
  <td>cust_df</td>
</tr>
<tr>
  <td colspan="3"><code>cust_df.<..></code></td>
</tr>
<tr>
  <td>2.what do u want</td>
  <td>new column as outlet_tran_list</td>
  <td>then use withColumn</td>
</tr>
<tr>
<td colspan="3">cust_df<code>.withColumn("outlet_tran_list",<..>)</code></code></td>
</tr>
<tr>
  <td>3.What for?</td>
  <td>to store comma separted tran_type values list</td>
  <td>then use collect_list</td>
</tr>
<tr>
  <td>4.from which column</td>
  <td>tran_type</td>
  <td></td>
</tr>
<tr>
<td colspan="3">cust_df.withColumn("outlet_tran_list",<code> f.collect_list("tran_type").<..>)</code></td>
</tr>
<tr>
  <td>5.how u want it</td>
  <td>for each outlet_name unique value</td>
  <td>then use over(Window.partitionBy("outlet_name")).<..></td>
</tr>
<tr><td colspan="3">cust_df.withColumn("outlet_tran_list", f.collect_list("tran_type")<code>.over(Window.partitionBy("outlet_name"))</code>)</td>
<tr>
  <td> 6.where do u want store result</td>
  <td>outlet_df</td>
</tr>
<tr>
  <td colspan="3" style="background:red;"><code>outlet_df=</code>cust_df.withColumn("outlet_tran_list", f.collect_list("tran_type").over(Window.partitionBy("outlet_name")))
  </td>
</tr>
</table>

The End

---



---



---

