##File System Command

In [0]:
%fs
ls 

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/foobar/,foobar/,0,0
dbfs:/scenarios/,scenarios/,0,0
dbfs:/udb/,udb/,0,0
dbfs:/user/,user/,0,0


In [0]:
%fs
ls dbfs:/FileStore/tables


path,name,size,modificationTime
dbfs:/FileStore/tables/SalesData.csv,SalesData.csv,617,1689689602000
dbfs:/FileStore/tables/employees.csv,employees.csv,61,1689692632000


In [0]:
%fs 
rm -r dbfs:/FileStore/tables/ScreenerData.csv

#Read CSV File

In [0]:
#Read flat file, define schema,filter out employee earning less than 20k, add column bonus 10% for each employee and calculate
# add new column the total salary after bonus, remove column salary, save the final df as parquet

from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql.types import StructType,StructField, StringType,DoubleType

paySchema = StructType([ \
    StructField("Emp",StringType(),True), \
    StructField("Dept",StringType(),True), \
    StructField("Salary",DoubleType(),True) \
  ])

parquet_pay_path = "/FileStore/LakeHouse/EmpPay/"

empPayData = spark.read.csv(path="/FileStore/tables/EmpPay.csv", sep="|", header=True, schema=paySchema)

empPayData =empPayData.filter("Salary<20000")
empPayData=empPayData.withColumn("Bonus",col("Salary")*10/100)
empPayData =empPayData.withColumn("TotalSalary", col("Bonus") + col("Salary"))
empPayData =empPayData.drop("Salary")
empPayData= empPayData.withColumn("ingestionDate", current_timestamp())
empPayData= empPayData.withColumnRenamed("ingestionDate", "ImportDate")
#display(empPayData)

empPayData.write.mode("append").parquet(parquet_pay_path)
empParquetDF= spark.read.parquet(parquet_pay_path)
display(empParquetDF)



Emp,Dept,Bonus,TotalSalary,ImportDate
Rahul,Sales,1000.0,11000.0,
Bob,Finance,1200.0,13200.0,
Arun,Reporting,1800.0,19800.0,
Rahul,Sales,1000.0,11000.0,2023-07-21T10:53:18.853+0000
Bob,Finance,1200.0,13200.0,2023-07-21T10:53:18.853+0000
Arun,Reporting,1800.0,19800.0,2023-07-21T10:53:18.853+0000
Rahul,Sales,1000.0,11000.0,2023-07-25T16:27:49.061+0000
Bob,Finance,1200.0,13200.0,2023-07-25T16:27:49.061+0000
Arun,Reporting,1800.0,19800.0,2023-07-25T16:27:49.061+0000
Rahul,Sales,1000.0,11000.0,2023-07-21T10:52:56.844+0000


In [0]:
%fs
ls dbfs:/FileStore/LakeHouse/EmpPay

path,name,size,modificationTime
dbfs:/FileStore/LakeHouse/EmpPay/_SUCCESS,_SUCCESS,0,1689936678000
dbfs:/FileStore/LakeHouse/EmpPay/_committed_1910535123153937502,_committed_1910535123153937502,221,1689936170000
dbfs:/FileStore/LakeHouse/EmpPay/_committed_4121409593000796777,_committed_4121409593000796777,221,1689936047000
dbfs:/FileStore/LakeHouse/EmpPay/_committed_4253268636208939365,_committed_4253268636208939365,232,1689935618000
dbfs:/FileStore/LakeHouse/EmpPay/_committed_4307898832227351326,_committed_4307898832227351326,123,1689935232000
dbfs:/FileStore/LakeHouse/EmpPay/_committed_6167242486210209442,_committed_6167242486210209442,221,1689935634000
dbfs:/FileStore/LakeHouse/EmpPay/_committed_7037928025282726053,_committed_7037928025282726053,221,1689936678000
dbfs:/FileStore/LakeHouse/EmpPay/_started_1910535123153937502,_started_1910535123153937502,0,1689936170000
dbfs:/FileStore/LakeHouse/EmpPay/_started_4121409593000796777,_started_4121409593000796777,0,1689936046000
dbfs:/FileStore/LakeHouse/EmpPay/_started_4253268636208939365,_started_4253268636208939365,0,1689935617000


In [0]:
#Define custom Schema
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType,DoubleType

dataSchema = StructType([ \
    StructField("date",StringType(),True), \
    StructField("SalesPerson",StringType(),True), \
    StructField("SalesID",IntegerType(),True), \
    StructField("ProductID", IntegerType(), True), \
    StructField("Product", StringType(), True), \
    StructField("Sales", DoubleType(), True) \
  ])


In [0]:
#salesDF = spark.read.csv("/FileStore/tables/SalesData.csv")

#salesDF = spark.read.csv("/FileStore/tables/SalesData.csv", header=True)

#salesDF = spark.read.csv("/FileStore/tables/SalesData.csv", header=True, inferSchema=True)

salesDF = spark.read.csv("/FileStore/tables/SalesData.csv", header=True, schema=dataSchema)

display(salesDF)



date,SalesPerson,SalesID,ProductID,Product,Sales
05/01/2023,Amber,3,2,Mortgage,17429.0
24/02/2023,Chantal,4,2,Mortgage,11978.0
17/05/2023,Natalia,9,4,Health insurance,24055.0
02/05/2023,Eric,10,3,Car insurance,5935.0
24/03/2023,Jeroen,5,2,Mortgage,25902.0
02/03/2023,Eric,10,3,Car insurance,1235.0
16/03/2023,Amber,3,2,Mortgage,2452.0
05/02/2023,Amber,3,2,Mortgage,11129.0
12/03/2023,Taylor,14,2,Mortgage,14028.0
22/03/2023,Sarah,4,3,Car insurance,23263.0


#### Transformation

In [0]:
from pyspark.sql.functions import col,lit, current_timestamp

salesDF = salesDF.withColumn("Sales", col("Sales").cast("integer"))
salesDF = salesDF.withColumn("ingestionDate", current_timestamp())
display(salesDF)

date,SalesPerson,SalesID,ProductID,Product,Sales,ingestionDate
05/01/2023,Amber,3,2,Mortgage,17429,2023-07-25T16:25:51.028+0000
24/02/2023,Chantal,4,2,Mortgage,11978,2023-07-25T16:25:51.028+0000
17/05/2023,Natalia,9,4,Health insurance,24055,2023-07-25T16:25:51.028+0000
02/05/2023,Eric,10,3,Car insurance,5935,2023-07-25T16:25:51.028+0000
24/03/2023,Jeroen,5,2,Mortgage,25902,2023-07-25T16:25:51.028+0000
02/03/2023,Eric,10,3,Car insurance,1235,2023-07-25T16:25:51.028+0000
16/03/2023,Amber,3,2,Mortgage,2452,2023-07-25T16:25:51.028+0000
05/02/2023,Amber,3,2,Mortgage,11129,2023-07-25T16:25:51.028+0000
12/03/2023,Taylor,14,2,Mortgage,14028,2023-07-25T16:25:51.028+0000
22/03/2023,Sarah,4,3,Car insurance,23263,2023-07-25T16:25:51.028+0000


#Write DataFrame to Parquet Format

In [0]:
%python
parquet_path = "/FileStore/LakeHouse/Sales"
#salesDF.write.mode("append").parquet(parquet_path)
salesDF.write.mode("overwrite").parquet(parquet_path)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-3773630394137448>:3[0m
[1;32m      1[0m parquet_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/FileStore/LakeHouse/Sales[39m[38;5;124m"[39m
[1;32m      2[0m [38;5;66;03m#salesDF.write.mode("append").parquet(parquet_path)[39;00m
[0;32m----> 3[0m salesDF[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39mparquet(parquet_path)

[0;31mNameError[0m: name 'salesDF' is not defined

In [0]:
%fs
ls dbfs:/FileStore/LakeHouse/Sales

path,name,size,modificationTime
dbfs:/FileStore/LakeHouse/Sales/_committed_2274677832709507115,_committed_2274677832709507115,222,1689839201000
dbfs:/FileStore/LakeHouse/Sales/_committed_2341592772876137263,_committed_2341592772876137263,222,1689689648000
dbfs:/FileStore/LakeHouse/Sales/_committed_6303353588364184694,_committed_6303353588364184694,223,1689690274000
dbfs:/FileStore/LakeHouse/Sales/_committed_6595619218373784889,_committed_6595619218373784889,123,1689839257000
dbfs:/FileStore/LakeHouse/Sales/_committed_7477800962496022049,_committed_7477800962496022049,331,1689680332000
dbfs:/FileStore/LakeHouse/Sales/_started_2274677832709507115,_started_2274677832709507115,0,1689839199000
dbfs:/FileStore/LakeHouse/Sales/_started_369032418936463276,_started_369032418936463276,0,1689688518000
dbfs:/FileStore/LakeHouse/Sales/_started_6595619218373784889,_started_6595619218373784889,0,1689839257000
dbfs:/FileStore/LakeHouse/Sales/part-00000-tid-2274677832709507115-04af446d-be47-459d-959f-c6d077f039d5-13-1-c000.snappy.parquet,part-00000-tid-2274677832709507115-04af446d-be47-459d-959f-c6d077f039d5-13-1-c000.snappy.parquet,2316,1689839201000
dbfs:/FileStore/LakeHouse/Sales/part-00000-tid-6595619218373784889-46707170-7ede-4f96-b8c9-6430f82d3cbe-16-1-c000.snappy.parquet,part-00000-tid-6595619218373784889-46707170-7ede-4f96-b8c9-6430f82d3cbe-16-1-c000.snappy.parquet,2316,1689839257000


# Read parquet File

In [0]:
parquetDF= spark.read.parquet(parquet_path)
display(parquetDF)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3313888537580529>:1[0m
[0;32m----> 1[0m parquetDF[38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mparquet(parquet_path)
[1;32m      2[0m display(parquetDF)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_coun

####Narrow Transformation

In [0]:
from pyspark.sql.functions import filter,col

#saleFilterDF = salesDF.filter("Product ='Mortgage'")

#saleFilterDF = salesDF.filter(col('Product')=='Mortgage')

#saleFilterDF = salesDF.where("Product ='Mortgage'")

#saleFilterDF = salesDF.where(col('Product')=='Mortgage')

display(saleFilterDF)

date,SalesPerson,SalesID,ProductID,Product,Sales,ingestionDate
05/01/2023,Amber,3,2,Mortgage,17429,2023-07-21T07:15:23.307+0000
24/02/2023,Chantal,4,2,Mortgage,11978,2023-07-21T07:15:23.307+0000
24/03/2023,Jeroen,5,2,Mortgage,25902,2023-07-21T07:15:23.307+0000
16/03/2023,Amber,3,2,Mortgage,2452,2023-07-21T07:15:23.307+0000
05/02/2023,Amber,3,2,Mortgage,11129,2023-07-21T07:15:23.307+0000
12/03/2023,Taylor,14,2,Mortgage,14028,2023-07-21T07:15:23.307+0000
12/02/2023,Taylor,14,2,Mortgage,19078,2023-07-21T07:15:23.307+0000


#### Wide Transformation

In [0]:
from pyspark.sql.functions import col,min,max

salesGroupDF = salesDF.groupBy(col("Product"))

#display(salesGroupDF.count())

#display(salesGroupDF.min('Sales').alias("MinSales"))


#display(salesGroupDF.min('Sales').withColumnRenamed("min(Sales)", "Sales"))

#display(salesGroupDF.min('Sales').select("Product", col("min(Sales)").alias("min_sales")))


Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-1050553210641014>", line 13, in <module>
    display(salesGroupDF.min('Sales').max("Sales").select("Product", col("min(Sales)").alias("min_sales")))
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 2964, in __getattr__
    raise AttributeError(
AttributeError: 'DataFrame' object has no attribute 'max'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 1997, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/



In [0]:
from pyspark.sql.functions import sum,avg,max,min
salesDF.groupBy("Product") \
    .agg(sum("Sales").alias("sum_sales"), \
         avg("Sales").alias("avg_sales"), \
         min("Sales").alias("min_sales"), \
         max("Sales").alias("max_max") \
     ) \
    .show(truncate=False)

+----------------+---------+------------------+---------+-------+
|Product         |sum_sales|avg_sales         |min_sales|max_max|
+----------------+---------+------------------+---------+-------+
|Car insurance   |7170     |3585.0            |1235     |5935   |
|Mortgage        |101996   |14570.857142857143|2452     |25902  |
|Health insurance|24055    |24055.0           |24055    |24055  |
| Car insurance  |97359    |24339.75          |21507    |29577  |
+----------------+---------+------------------+---------+-------+



In [0]:
salesDF.createOrReplaceTempView("SaleTempView")

spark.sql("select Product, sum(Sales) as sum_Sales from SaleTempView group by Product").show()

+----------------+---------+
|         Product|sum_Sales|
+----------------+---------+
|   Car insurance|     7170|
|        Mortgage|   101996|
|Health insurance|    24055|
|   Car insurance|    97359|
+----------------+---------+



#### Window Function

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec  = Window.partitionBy("Product").orderBy("Sales")

salesDF.withColumn("row_number",row_number().over(windowSpec)).show(truncate=False)

+----------+-----------+-------+---------+----------------+-----+-----------------------+----------+
|date      |SalesPerson|SalesID|ProductID|Product         |Sales|ingestionDate          |row_number|
+----------+-----------+-------+---------+----------------+-----+-----------------------+----------+
|08/05/2023|Tara       |8      |3        | Car insurance  |21507|2023-07-21 07:34:33.787|1         |
|22/05/2023|Sarah      |4      |3        | Car insurance  |23012|2023-07-21 07:34:33.787|2         |
|22/03/2023|Sarah      |4      |3        | Car insurance  |23263|2023-07-21 07:34:33.787|3         |
|08/03/2023|Tara       |8      |3        | Car insurance  |29577|2023-07-21 07:34:33.787|4         |
|02/03/2023|Eric       |10     |3        |Car insurance   |1235 |2023-07-21 07:34:33.787|1         |
|02/05/2023|Eric       |10     |3        |Car insurance   |5935 |2023-07-21 07:34:33.787|2         |
|17/05/2023|Natalia    |9      |4        |Health insurance|24055|2023-07-21 07:34:33.787|1 

In [0]:
from pyspark.sql.functions import rank
salesDF.withColumn("rank",rank().over(windowSpec)).show()

+----------+-----------+-------+---------+----------------+-----+--------------------+----+
|      date|SalesPerson|SalesID|ProductID|         Product|Sales|       ingestionDate|rank|
+----------+-----------+-------+---------+----------------+-----+--------------------+----+
|08/05/2023|       Tara|      8|        3|   Car insurance|21507|2023-07-21 06:54:...|   1|
|22/05/2023|      Sarah|      4|        3|   Car insurance|23012|2023-07-21 06:54:...|   2|
|22/03/2023|      Sarah|      4|        3|   Car insurance|23263|2023-07-21 06:54:...|   3|
|08/03/2023|       Tara|      8|        3|   Car insurance|29577|2023-07-21 06:54:...|   4|
|02/03/2023|       Eric|     10|        3|   Car insurance| 1235|2023-07-21 06:54:...|   1|
|02/05/2023|       Eric|     10|        3|   Car insurance| 5935|2023-07-21 06:54:...|   2|
|17/05/2023|    Natalia|      9|        4|Health insurance|24055|2023-07-21 06:54:...|   1|
|16/03/2023|      Amber|      3|        2|        Mortgage| 2452|2023-07-21 06:5

#### JOIN

### Write dataframe to managed Table###

In [0]:
%fs
ls dbfs:/user/hive/warehouse/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/salesmanage222/,salesmanage222/,0,0


##### delete folder

In [0]:

%fs 
rm -r dbfs:/user/hive/warehouse/

In [0]:
parquetDF.write\
.format("parquet")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.option("mergeSchema", "true")\
.saveAsTable("SalesManage")

In [0]:
saleTable = spark.table("SalesManage")
display(saleTable)

Date,SalesPerson,SalesID,ProductID,Product,Sales,ingestionDate
05/01/2023,Amber,3,2,Mortgage,17429,2023-07-18T14:24:32.348+0000
24/02/2023,Chantal,4,2,Mortgage,11978,2023-07-18T14:24:32.348+0000
17/05/2023,Natalia,9,4,Health insurance,24055,2023-07-18T14:24:32.348+0000
02/05/2023,Eric,10,3,Car insurance,5935,2023-07-18T14:24:32.348+0000
24/03/2023,Jeroen,5,2,Mortgage,25902,2023-07-18T14:24:32.348+0000
02/03/2023,Eric,10,3,Car insurance,1235,2023-07-18T14:24:32.348+0000
16/03/2023,Amber,3,2,Mortgage,2452,2023-07-18T14:24:32.348+0000
05/02/2023,Amber,3,2,Mortgage,11129,2023-07-18T14:24:32.348+0000
12/03/2023,Taylor,14,2,Mortgage,14028,2023-07-18T14:24:32.348+0000
22/03/2023,Sarah,4,3,Car insurance,23263,2023-07-18T14:24:32.348+0000


In [0]:
%sql
select * from default.SalesManage

Date,SalesPerson,SalesID,ProductID,Product,Sales,ingestionDate
05/01/2023,Amber,3,2,Mortgage,17429,2023-07-18T14:24:32.348+0000
24/02/2023,Chantal,4,2,Mortgage,11978,2023-07-18T14:24:32.348+0000
17/05/2023,Natalia,9,4,Health insurance,24055,2023-07-18T14:24:32.348+0000
02/05/2023,Eric,10,3,Car insurance,5935,2023-07-18T14:24:32.348+0000
24/03/2023,Jeroen,5,2,Mortgage,25902,2023-07-18T14:24:32.348+0000
02/03/2023,Eric,10,3,Car insurance,1235,2023-07-18T14:24:32.348+0000
16/03/2023,Amber,3,2,Mortgage,2452,2023-07-18T14:24:32.348+0000
05/02/2023,Amber,3,2,Mortgage,11129,2023-07-18T14:24:32.348+0000
12/03/2023,Taylor,14,2,Mortgage,14028,2023-07-18T14:24:32.348+0000
22/03/2023,Sarah,4,3,Car insurance,23263,2023-07-18T14:24:32.348+0000


In [0]:
%sql
describe extended default.SalesManage

col_name,data_type,comment
Date,string,
SalesPerson,string,
SalesID,string,
ProductID,string,
Product,string,
Sales,int,
ingestionDate,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


#Write Data Frame to External Table

In [0]:
external_table_path = "/FileStore/tables/employees.csv"

dbutils.widgets.text("external_table_path", external_table_path)

In [0]:
from pyspark.sql.functions import col,lit, current_timestamp

empDF = spark.read.csv(external_table_path, header=True)
empDF = empDF.withColumn("ingestionDate", current_timestamp())
display(empDF)

name,department,ingestionDate
alex,HR,2023-07-18T14:53:48.552+0000
robin,IT,2023-07-18T14:53:48.552+0000
james,HR,2023-07-18T14:53:48.552+0000
mark,SALES,2023-07-18T14:53:48.552+0000
thh,HR,2023-07-18T14:53:48.552+0000


In [0]:
%sql
CREATE TABLE employeeExt USING CSV LOCATION "${external_table_path}"

In [0]:
%sql
describe extended default.employeeExt

col_name,data_type,comment
_c0,string,
_c1,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,default,
Table,employeeext,
Owner,root,
Created Time,Tue Jul 18 14:43:37 UTC 2023,
Last Access,UNKNOWN,


In [0]:
%sql
refresh dbfs:/FileStore/tables/

In [0]:
%sql
select * from employeeExt

_c0,_c1
name,department
alex,HR
robin,IT


In [0]:
%sql
CREATE TABLE employeeExt2
USING csv
OPTIONS (
  path "${external_table_path}",
  header 'true',
  inferSchema 'true'
)

In [0]:
%sql
select * from default.employeeExt2

name,department
alex,HR
robin,IT


##Create Schema or Database

In [0]:
%sql
CREATE DATABASE test

In [0]:
%sql
CREATE TABLE test.employeeExt
USING csv
OPTIONS (
  path "${external_table_path}",
  header 'true',
  inferSchema 'true'
)

In [0]:
%sql
select * from test.employeeExt

name,department
alex,HR
robin,IT


In [0]:
%sql
describe extended test.employeeExt

col_name,data_type,comment
name,string,
department,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,test,
Table,employeeext,
Owner,root,
Created Time,Tue Jul 18 14:44:38 UTC 2023,
Last Access,UNKNOWN,


####Create Temp view to use data between python and sql command

In [0]:
# Create a view or table
empDF.createOrReplaceTempView("tempEmp")

In [0]:
%sql
select * from tempEmp


name,department,ingestionDate
alex,HR,2023-07-18T15:02:30.866+0000
robin,IT,2023-07-18T15:02:30.866+0000


In [0]:
%sql

CREATE VIEW getempVW AS
select * from test.employeeExt


In [0]:
%sql
REFRESH TABLE test.employeeExt;

In [0]:
%sql
select * from getempVW

name,department
alex,HR
robin,IT
james,HR
mark,SALES


####Get List Of Table from Specify Database

In [0]:
table_list = [t for t in spark.catalog.listTables("default") if t.tableType=='EXTERNAL' or t.tableType=='MANAGED']
for t in table_list:
    print(t.name)

employee11
employeeext
employeeext2
employees1_managedtable
table1


##### DROP TABLE

In [0]:
%sql
DROP TABLE default.table1


In [0]:
table_list = [t for t in spark.catalog.listTables("default") if t.tableType=='EXTERNAL' or t.tableType=='MANAGED']
for t in table_list:
    print(t.name)

####DROP DATABASE

In [0]:
%sql
DROP DATABASE test cascade