In [0]:
#importing modules
from pyspark.sql import SparkSession
from delta.tables import *

In [0]:
#creating delta tables
Dd = DeltaTable.create(spark) \
    .tableName('employe_demo') \
    .addColumn('emp_id','INT') \
    .addColumn('emp_Name','STRING') \
    .addColumn('gender','STRING') \
    .addColumn('salary','INT') \
    .addColumn('dept','STRING') \
    .property('description','table created for demo purpose') \
    .location('/FileStore/table/delta/createtable1') \
    .execute() 

In [0]:
%sql
select * from employe_demo

emp_id,emp_Name,gender,salary,dept
300,charan,M,6000,sales
300,charan,M,6000,sales
300,charan,M,6000,sales
300,charan,M,6000,sales
100,shiva,M,2000,IT
200,sai,M,8000,HR


In [0]:
%sql
insert into employe_demo values(100,'shiva','M',2000,'IT');

num_affected_rows,num_inserted_rows
1,1


In [0]:
display(spark.sql('select * from employe_demo'))

emp_id,emp_Name,gender,salary,dept
300,niranjan,M,9000,De
100,shiva,M,2000,IT
400,setha,F,10000,BI
200,sai,M,8000,HR
200,sai,M,8000,HR


In [0]:
from pyspark.sql.types import IntegerType,StringType,StructType,StructField
employe_data = [(200,'sai','M',8000,'HR'),(300,'niranjan','M',9000,'De'),(400,'setha','F',10000,'BI')]

employe_schema = StructType([ \
    StructField('emp_id',IntegerType(),False), \
    StructField('emp_name',StringType(),True), \
    StructField('gender',StringType(),True), \
    StructField('salary',IntegerType(),True), \
    StructField('dept',StringType(),True) \
])

df = spark.createDataFrame(data=employe_data, schema = employe_schema)
display(df)

emp_id,emp_name,gender,salary,dept
200,sai,M,8000,HR
300,niranjan,M,9000,De
400,setha,F,10000,BI


In [0]:
deltadf =df.write.format('delta').mode('append').saveAsTable('employe_demo')

In [0]:
employe_data =[(300,'charan','M',6000,'sales')]

employe_schema = StructType([ \
    StructField('emp_id',IntegerType(),False), \
    StructField('emp_name',StringType(),True), \
    StructField('gender',StringType(),True), \
    StructField('salary',StringType(),True), \
    StructField('dept',StringType(),True) \
])

df1 = spark.createDataFrame(data=employe_data, schema= employe_schema)
display(df1)

emp_id,emp_name,gender,salary,dept
300,charan,M,6000,sales


In [0]:
df1.write.insertInto('employe_demo',overwrite=False)

In [0]:
%sql
select * from employe_demo

emp_id,emp_Name,gender,salary,dept
300,charan,M,6000,sales
300,niranjan,M,9000,De
300,niranjan,M,9000,De
100,shiva,M,2000,IT
400,setha,F,10000,BI
400,setha,F,10000,BI
200,sai,M,8000,HR
200,sai,M,8000,HR
200,sai,M,8000,HR


In [0]:
df1.createOrReplaceTempView('delta_data')

In [0]:
# querying the temp view delta_data
%sql
select * from delta_data

emp_id,emp_name,gender,salary,dept
300,charan,M,6000,sales


In [0]:
%sql
insert into employe_demo
select * from delta_data

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from employe_demo

In [0]:
spark.sql('insert into employe_demo select * from delta_data')

Out[14]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from employe_demo

emp_id,emp_Name,gender,salary,dept
300,charan,M,6000,sales
300,charan,M,6000,sales
300,charan,M,6000,sales
300,niranjan,M,9000,De
300,niranjan,M,9000,De
100,shiva,M,2000,IT
400,setha,F,10000,BI
400,setha,F,10000,BI
200,sai,M,8000,HR
200,sai,M,8000,HR


In [0]:
%sql
delete from employe_demo where emp_id =100

num_affected_rows
1


In [0]:
%sql
Delete from delta.`/FileStore/table/delta/createtable1` where emp_id =200

num_affected_rows
3


In [0]:
spark.sql("delete from employe_demo where emp_id == 300")

Out[18]: DataFrame[num_affected_rows: bigint]

In [0]:
spark.sql("delete from employe_demo where emp_id == 300 and gender = 'M'")

Out[29]: DataFrame[num_affected_rows: bigint]

In [0]:
%sql
select * from employe_demo

emp_id,emp_Name,gender,salary,dept
400,setha,F,10000,BI
400,setha,F,10000,BI
