In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables/datalake/csv")


Out[32]: [FileInfo(path='dbfs:/FileStore/tables/datalake/csv/categories.csv', name='categories.csv', size=432, modificationTime=1737052601000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/customers.csv', name='customers.csv', size=12345, modificationTime=1737052602000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/employees.csv', name='employees.csv', size=4593, modificationTime=1737052602000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/orderdetails.csv', name='orderdetails.csv', size=46804, modificationTime=1737052599000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/orders.csv', name='orders.csv', size=136684, modificationTime=1737052599000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/products.csv', name='products.csv', size=4773, modificationTime=1737052600000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/shippers.csv', name='shippers.csv', size=148, modificationTime=1737052600000),
 FileInfo(path='dbfs:/FileStore/tables/datalake/csv/s

In [0]:
# Get Csv Files
files = dbutils.fs.ls("dbfs:/FileStore/tables/datalake/csv")
# Directory to save our Datalake Delta
delta_path = "dbfs:/FileStore/tables/datalake/delta"
datalake_extension = "delta"
# Extract file names
for file in files:
    file_path = file.path
    dataframe = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load(file_path)
    
    # Getting the name of file to concat
    if file.name.endswith(".csv"):
        file_name_without_extension = file.name[:-4]  # Remove the last 4 characters (".csv")
    
    # Write to Delta
    dataframe.write.format("delta").mode("overwrite").save(f"{delta_path}/{file_name_without_extension}.{datalake_extension}")


In [0]:
# Loading some Data at Delta Tables
df = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/categories.delta")

# Adding a new row to Delta
new = spark.createDataFrame([(9, "coffee", "Moka pot, Aeropress, cappuccino")], df.schema)
new.write.format("delta").mode("append").save("dbfs:/FileStore/tables/datalake/delta/categories.delta")

In [0]:
#upsert 
from delta.tables import *

# Loading tables as DeltaTable again
deltaTable_orders = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/datalake/delta/orders.delta")
deltaTable_order_details = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/datalake/delta/orderdetails.delta")

# New values to be inserted
new_order = spark.createDataFrame([(11078, "ALFKI", 1, "2023-08-01")], ["OrderID", "CustomerID", "EmployeeID", "OrderDate"])
new_order_details = spark.createDataFrame([(11078, 1, 18, 3)], ["OrderID", "ProductID", "UnitPrice", "Quantity"])

deltaTable_orders.alias("orders").merge(
    new_order.alias("newOrder"),
    "orders.OrderID = newOrder.OrderID")\
    .whenMatchedUpdate(set = {"CustomerID" : "newOrder.CustomerID", "EmployeeID" : "newOrder.EmployeeID", "OrderDate" : "newOrder.OrderDate"})\
    .whenNotMatchedInsert(values = {"OrderID" : "newOrder.OrderID", "CustomerID" : "newOrder.CustomerID", "EmployeeID" : "newOrder.EmployeeID", "OrderDate" : "newOrder.OrderDate"})\
    .execute()

deltaTable_order_details.alias("order_details").merge(
    new_order_details.alias("newOrderDetails"),
    "order_details.OrderID = newOrderDetails.OrderID AND order_details.ProductID = newOrderDetails.ProductID")\
    .whenMatchedUpdate(set = {"UnitPrice" : "newOrderDetails.UnitPrice", "Quantity" : "newOrderDetails.Quantity"})\
    .whenNotMatchedInsert(values = {"OrderID" : "newOrderDetails.OrderID", "ProductID" : "newOrderDetails.ProductID", "UnitPrice" : "newOrderDetails.UnitPrice", "Quantity" : "newOrderDetails.Quantity"})\
    .execute()

In [0]:
# Reading some data using a basic filter
df_orders = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orders.delta").filter("OrderID == 11078")
df_orders.show()

+-------+----------+----------+-------------------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+
|OrderID|CustomerID|EmployeeID|          OrderDate|RequiredDate|ShippedDate|ShipVia|Freight|ShipName|ShipAddress|ShipCity|ShipRegion|ShipPostalCode|ShipCountry|
+-------+----------+----------+-------------------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+
|  11078|     ALFKI|         1|2023-08-01 00:00:00|        null|       null|   null|   null|    null|       null|    null|      null|          null|       null|
+-------+----------+----------+-------------------+------------+-----------+-------+-------+--------+-----------+--------+----------+--------------+-----------+



In [0]:
# Reading some data using a basic filter
df_order_details = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orderdetails.delta").filter("OrderID == 11078").filter("ProductID == 1")
df_order_details.show()

+-------+---------+---------+--------+--------+
|OrderID|ProductID|UnitPrice|Quantity|Discount|
+-------+---------+---------+--------+--------+
|  11078|        1|     18.0|       3|    null|
+-------+---------+---------+--------+--------+



In [0]:
# Creating some temporary views from Dataframe.
# The views are only accessible within the session it is created in, enabling you to use SQL queries to manipulate the data as if it were a table.

# Directory Datalake Delta
delta_file = "dbfs:/FileStore/tables/datalake/delta/"
delta_path =  dbutils.fs.ls(delta_file)
datalake_extension = "delta"
# Extract file names
for file in delta_path:    
    df = spark.read.format("delta").load(file.path[:-1]) # Remove the last character ("/")
    df.createOrReplaceTempView(file.name[:-7]) # Remove the last 7 characters (".delta/")
    
join_query = """
SELECT orderdetails .OrderID AS OrderID, orderdetails .Quantity , orderdetails .UnitPrice as UnitPrice,
products.ProductID as ProductID,  products.ProductName as Product, suppliers.CompanyName AS Suppliers,  
employees.LastName as Employee, orders.OrderDate as Date, customers.CompanyName as Customer
FROM orders
JOIN orderdetails  ON orders.OrderID = orderdetails .OrderID
JOIN products ON orderdetails .ProductID = products.ProductID
JOIN categories ON products.CategoryID = categories.CategoryID
JOIN suppliers ON products.SupplierID = suppliers.SupplierID
JOIN employees ON orders.EmployeeID = employees.EmployeeID
JOIN shippers ON orders.ShipVia = shippers.ShipperID
JOIN customers ON orders.CustomerID = customers.CustomerID
"""

df_result = spark.sql(join_query)

# Write result in a new Delta Table
df_result.write.format("delta").mode("overwrite").save(f"{delta_file}join.{datalake_extension}")

In [0]:
#Reading our Delta using SQL 
df = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/join.delta")
df.createOrReplaceTempView("OrdersJoin")

results = spark.sql("SELECT * FROM OrdersJoin WHERE OrderID = 10248 AND ProductID =11 ")
results.show()

+-------+--------+---------+---------+--------------+--------------------+--------+-------------------+--------------------+
|OrderID|Quantity|UnitPrice|ProductID|       Product|           Suppliers|Employee|               Date|            Customer|
+-------+--------+---------+---------+--------------+--------------------+--------+-------------------+--------------------+
|  10248|      12|     14.0|       11|Queso Cabrales|Cooperativa de Qu...|Buchanan|2020-07-04 00:00:00|Vins et alcools C...|
+-------+--------+---------+---------+--------------+--------------------+--------+-------------------+--------------------+



In [0]:
#Clean up 
dbutils.fs.rm("dbfs:/FileStore/tables/datalake/delta/", recurse=True)