In [27]:
import pandas               as pd
import numpy                as np
from pyspark.sql            import SparkSession
from pyspark.sql.functions  import DataFrame
from pyspark.sql.session    import SparkSession

In [28]:
spark = SparkSession.builder.appName("CDC - Deltalake_CDC") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [29]:
def read_data(path: str, format: str) -> DataFrame:
        """[Read data on local path]

        Args:
            path (str): [data location path]

        Returns:
            DataFrame: [Dataframe with data]
        """        
        df = spark.read.format(format)\
            .option("inferSchema", "true")\
            .option("header", "true")\
            .load(path)

        return df

In [30]:
spark.sparkContext.setLogLevel("INFO")

In [31]:
from delta.tables import *
from pyspark.sql.functions import *

# Reading of paths with saprk.read and saving as delta

In [32]:
# Location variables - mes4/dia01
csvFilePath = "/media/hdd-1/mygit/CDC/DATA_EX.csv"
# Read data
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(csvFilePath)

In [33]:
# Save csv into Delta Lake format
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("/media/hdd-1/mygit/CDC/DATA_EX.delta")

In [34]:
# Transforming parquet into Dataframe to extract columns and rows
df = read_data("/media/hdd-1/mygit/CDC/DATA_EX.delta", "delta")

In [35]:
df.where("DN_CODE = 44").show()

+-------+---------+-------+------+-----+
|DN_CODE|DN_CODLFA|DN_NAME|COD_UN|LEVEL|
+-------+---------+-------+------+-----+
+-------+---------+-------+------+-----+



# STEP MERGE

Step 1: spark dataframe in pandas dataframe

In [36]:
dt1 = df.toPandas()
dt2 = dt1[dt1["DN_CODE"]>0]
type(dt2)


pandas.core.frame.DataFrame

Step 2: Use df.values ​​to get a numpy array of values

In [37]:
# DataFrame.values attribute Pandas returns a NumPy representation of the given DataFrame.
vls = dt2.values.tolist()  

In [38]:
cols = df.columns
items = vls
merge_table = spark.createDataFrame(items, cols)
merge_table.count()
merge_table.sort(merge_table.DN_CODE.desc()).show()

+-------+---------+--------------------+-------------+-----+
|DN_CODE|DN_CODLFA|             DN_NAME|       COD_UN|LEVEL|
+-------+---------+--------------------+-------------+-----+
|     43|    Salim|        SALINOMYCINE|        MG/KG|   41|
|     42|    Lasal|          LASALOCIDE|        MG/KG|   40|
|     41|    MoneN|           MONENSINA|        MG/KG|   39|
|     39|       FB|         GROSS FIBER|         % MS|   38|
|     38|     NFDN|NITROGEN LINKED T...|         % MS|   37|
|     37|  Lignina|             LIGNINE|         % MS|   32|
|     36|      FDA|FIBER IN ACID DET...|         % MS|   36|
|     35|   CINZAS|                 ASH|         % MS|   35|
|     34|      PDR|                 PDR|         % PB|   34|
|     33|      ENN|NON-NITROGEN EXTR...|         % MS|   31|
|     31|     VitE|           VITAMIN E|     UI/KG MS|   30|
|     30|     VitD|           D VITAMIN|1000 UI/KG MS|   29|
|     29|     VitA|           VITAMIN A|1000 UI/KG MS|   28|
|     28|       Cr|     

In [39]:
merge_table.sort(merge_table.DN_CODE.desc()).show()

+-------+---------+--------------------+-------------+-----+
|DN_CODE|DN_CODLFA|             DN_NAME|       COD_UN|LEVEL|
+-------+---------+--------------------+-------------+-----+
|     43|    Salim|        SALINOMYCINE|        MG/KG|   41|
|     42|    Lasal|          LASALOCIDE|        MG/KG|   40|
|     41|    MoneN|           MONENSINA|        MG/KG|   39|
|     39|       FB|         GROSS FIBER|         % MS|   38|
|     38|     NFDN|NITROGEN LINKED T...|         % MS|   37|
|     37|  Lignina|             LIGNINE|         % MS|   32|
|     36|      FDA|FIBER IN ACID DET...|         % MS|   36|
|     35|   CINZAS|                 ASH|         % MS|   35|
|     34|      PDR|                 PDR|         % PB|   34|
|     33|      ENN|NON-NITROGEN EXTR...|         % MS|   31|
|     31|     VitE|           VITAMIN E|     UI/KG MS|   30|
|     30|     VitD|           D VITAMIN|1000 UI/KG MS|   29|
|     29|     VitA|           VITAMIN A|1000 UI/KG MS|   28|
|     28|       Cr|     

In [40]:
# Access the Delta Lake Table
deltaTable = DeltaTable.forPath(spark, "/media/hdd-1/mygit/CDC/DATA_EX.delta")

In [41]:
# Accessing the metadata of operations performed on Delta Lake
deltaTable.history().show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      2|2021-07-26 12:05:22|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          1|          null|        false|{numFiles -> 1, n...|        null|
|      1|2021-07-26 12:04:08|  null|    null|    MERGE|{predicate -> (up...|null|    null|     null|          0|          null|        false|{numTargetRowsCop...|        null|
|      0|2021-07-26 12:02:13|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|        

In [42]:
deltaTable.alias("deltaTable") \
    .merge(merge_table.alias("updates"),"updates.DN_CODE = deltaTable.DN_CODE") \
    .whenMatchedDelete(condition = "updates.DN_CODE = true") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

In [43]:
deltaTable.history().show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      3|2021-07-26 12:05:28|  null|    null|    MERGE|{predicate -> (up...|null|    null|     null|          2|          null|        false|{numTargetRowsCop...|        null|
|      2|2021-07-26 12:05:22|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          1|          null|        false|{numFiles -> 1, n...|        null|
|      1|2021-07-26 12:04:08|  null|    null|    MERGE|{predicate -> (up...|null|    null|     null|          0|        

In [44]:
delta = deltaTable.toDF()
delta.count()

39

In [45]:
delta.where("DN_CODE = 44").show()

+-------+---------+-------+------+-----+
|DN_CODE|DN_CODLFA|DN_NAME|COD_UN|LEVEL|
+-------+---------+-------+------+-----+
+-------+---------+-------+------+-----+



In [46]:
historical = spark.read.format("delta").option("versionAsOf", "0").load("/media/hdd-1/mygit/CDC/DATA_EX.delta")
historical.count()

41

In [47]:
historical.where("DN_CODE = 44").show()

+-------+---------+--------------+------+-----+
|DN_CODE|DN_CODLFA|       DN_NAME|COD_UN|LEVEL|
+-------+---------+--------------+------+-----+
|     44|   Virgin|VIRGINIAMICINE| MG/KG|   33|
+-------+---------+--------------+------+-----+

