## Creating spark session by setting required configs

In [1]:
from pyspark.sql import SparkSession
from delta import*

warehouse="D:\\warehouse"
builder= SparkSession.builder.appName("deltaexploration")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spak.sql.warehouse.dir",warehouse)

spark = configure_spark_with_delta_pip(builder)\
    .master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x000001B47906AE00>


Read data by setting schema

In [2]:
Schema="InvoiceNo String,StockCode String,Description String,Quantity Int,InvoiceDate String,UnitPrice Double,CustomerID long,Country String"
data=spark.read.format("csv")\
    .option("header","true")\
    .schema(Schema)\
    .option("path","C:\\Users\\Admin\\Downloads\\order_data.csv")\
    .load()

we are going to see 
1.how we can save the data as delta table
2.how we can create managed and unmanged delta table using hive metastore

In [10]:
# Write delta table to output directory, we are using append mode here.
data.write.format('delta').mode('append').save("\\Lakehouse\\Delta\\output\\orders_delta")

In [11]:
# How to create a unmanaged table using sql way 
# Let us create a database specific to this deltalake exploration

spark.sql("create database deltadb")
spark.sql("use deltadb")


DataFrame[]

In [48]:
# We are loading the table from  the external location we wrote earlier and it is an external table ,if we drop the table from metastore we still have data in external location

spark.sql("""CREATE TABLE deltadb.orders(
          InvoiceNo STRING,
          StockCode STRING,
          Description STRING,
          Quantity INT,
          InvoiceDate STRING,
          UnitPrice DOUBLE,
          CustomerID LONG,
          Country STRING) 
          USING DELTA 
          LOCATION '\\\\Lakehouse\\\\Delta\\\\output\\\\orders_delta' """)

DataFrame[]

In [49]:
# check if the table is created ?
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| deltadb|   orders|      false|
+--------+---------+-----------+



In [50]:
# Let us access the data

spark.sql("select * from deltadb.orders ").show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536378|     null|PACK OF 60 DINOSA...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|     null|PACK OF 60 PINK P...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|    84991|60 TEATIME FAIRY ...|      24|01-12-2010 9.37|     0.55|     14688|United Kingdom|
|   536378|   84519A|TOMATO CHARLIE+LO...|       6|01-12-2010 9.37|     2.95|     14688|United Kingdom|
|   536378|   85183B|CHARLIE & LOLA WA...|      48|01-12-2010 9.37|     1.25|     14688|United Kingdom|
|   536378|   85071B|RED CHARLIE+LOLA ...|      96|01-12-2010 9.37|     0.38|     14688|United Kingdom|
|   536378|    21931|JUMBO STORAGE BAG...|      10|01-12-2010 9.

In [51]:
# to get information relating to table and we can see it as an external table
spark.sql(f"DESCRIBE  FORMATTED {'deltadb.orders'}").show(truncate=False)

+----------------------------+-----------------------------------------------------------------+-------+
|col_name                    |data_type                                                        |comment|
+----------------------------+-----------------------------------------------------------------+-------+
|InvoiceNo                   |string                                                           |       |
|StockCode                   |string                                                           |       |
|Description                 |string                                                           |       |
|Quantity                    |int                                                              |       |
|InvoiceDate                 |string                                                           |       |
|UnitPrice                   |double                                                           |       |
|CustomerID                  |bigint                   

In [53]:
# Let us see how we can use dataframe writer API which can simultaneously create table and instert from spark datafrmae 
# -This makes our life easy and metadata-datatypes will be inferred from spark dataframe and it will be a managed table and will be located in spark warehouse directory

data.write.format('delta').mode('append').saveAsTable("deltadb.orders2")

In [57]:
# spark.sql("select * from deltadb.orders1")
spark.sql(f"DESCRIBE  FORMATTED {'deltadb.orders2'}").show(truncate=False)

+----------------------------+----------------------------------------------------------------+-------+
|col_name                    |data_type                                                       |comment|
+----------------------------+----------------------------------------------------------------+-------+
|InvoiceNo                   |string                                                          |       |
|StockCode                   |string                                                          |       |
|Description                 |string                                                          |       |
|Quantity                    |int                                                             |       |
|InvoiceDate                 |string                                                          |       |
|UnitPrice                   |double                                                          |       |
|CustomerID                  |bigint                            