## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/Raw.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,first_name,last_name,age,city
1,Bordy,Donohue,9,Sirnasari
2,Moise,Byrkmyr,6,Marsa Alam
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu


In [0]:
# Create a view or table

temp_table_name = "Raw_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `Raw_csv`

id,first_name,last_name,age,city
1,Bordy,Donohue,9,Sirnasari
2,Moise,Byrkmyr,6,Marsa Alam
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Raw_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
%sql
CREATE OR REPLACE TABLE raw_data_target (
  id int,
  name string,
  last_name string,
  age int,
  city string
 )
using DELTA
LOCATION "/FileStore/tables/delta_merge"

In [0]:
%sql
select * from raw_data_target

id,name,last_name,age,city


###method 1

In [0]:
#creating a temp table
df.createOrReplaceTempView("Source_view")

In [0]:
%sql
select * from Source_view

id,first_name,last_name,age,city
1,Bordy,Donohue,9,Sirnasari
2,Moise,Byrkmyr,6,Marsa Alam
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu


In [0]:
%sql
select * from raw_data_target

id,name,last_name,age,city


In [0]:
%sql
merge into raw_data_target as target
using Source_view as source 
on target.id=source.id 
when matched 
then update set 
  target.city=source.city

when not matched then 
insert (id,name,last_name,age,city) values(source.id, source.first_name, source.last_name, source.age, source.city)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
10,0,0,10


In [0]:
%sql
select * from raw_data_target

id,name,last_name,age,city
1,Bordy,Donohue,9,Sirnasari
2,Moise,Byrkmyr,6,Marsa Alam
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu


###scd-type 2

In [0]:
data=[(2,'Moise','Byrkmyr',6,'Banglore'),(18,'Virat','kohli',34,'Delhi')]

df2=spark.createDataFrame(data,schema=df.columns)
display(df2)

id,first_name,last_name,age,city
2,Moise,Byrkmyr,6,Banglore
18,Virat,kohli,34,Delhi


Out[24]: ['id', 'first_name', 'last_name', 'age', 'city']

In [0]:
df2.createOrReplaceTempView("Source_view_1")

In [0]:
%sql
select * from Source_view_1

id,first_name,last_name,age,city
2,Moise,Byrkmyr,6,Banglore
18,Virat,kohli,34,Delhi


In [0]:
%sql
merge into raw_data_target as target
using Source_view_1 as source 
on target.id=source.id 
when matched 
then update set 
  target.city=source.city

when not matched then 
insert (id,name,last_name,age,city) values(source.id, source.first_name, source.last_name, source.age, source.city)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql
select * from raw_data_target

id,name,last_name,age,city
1,Bordy,Donohue,9,Sirnasari
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu
2,Moise,Byrkmyr,6,Banglore


-----

###with python

In [0]:
data_py=[(1,'Rohit','Sharma',36,'Mumbai'),(2,'Virat','kohli',34,'Delhi')]
df_py=spark.createDataFrame(data_py,schema=df.columns)
display(df2)

id,first_name,last_name,age,city
2,Moise,Byrkmyr,6,Banglore
18,Virat,kohli,34,Delhi


In [0]:
from delta.tables import *
df_delta=DeltaTable.forPath(spark,"/FileStore/tables/delta_merge")

In [0]:
type(df_delta)

Out[44]: delta.tables.DeltaTable

In [0]:
df_delta.alias("target").merge(
    source=df_py.alias("source"),
    condition="source.id=target.id"
    ).whenMatchedUpdate(set=
                       {
                           "city":"source.city"
                       }
     ).whenNotMatchedInsert(values=
                           {
                              "target.id":"source.id",
                               "target.name":"source.first_name",
                               "target.last_name":"source.last_name",
                               "target.age":"source.age",
                               "target.city":"source.city"
                           }).execute()

In [0]:
%sql
select * from raw_data_target

id,name,last_name,age,city
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu
1,Bordy,Donohue,9,Mumbai
2,Moise,Byrkmyr,6,Delhi


In [0]:
%sql
select * from raw_data_target

id,name,last_name,age,city
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu
1,Bordy,Donohue,9,Mumbai
2,Moise,Byrkmyr,6,Delhi


In [0]:
df_cnv = sqlContext.table("raw_data_target")

In [0]:
display(df_cnv)

id,name,last_name,age,city
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu
1,Bordy,Donohue,9,Mumbai
2,Moise,Byrkmyr,6,Delhi


In [0]:
df_cnv=df_cnv.orderBy("id")
display(df_cnv)

id,name,last_name,age,city
1,Bordy,Donohue,9,Mumbai
2,Moise,Byrkmyr,6,Delhi
3,Gare,Cotmore,15,Santiago
4,Lanie,Duchatel,10,Julayjilah
5,Alfreda,Tavernor,8,Montaigu
6,Margalo,Lorne,15,Sigou
7,Orren,MacConnell,10,Shimen
8,Gherardo,Shepherd,12,Borovan
9,Pia,Fulloway,14,Gentilly
10,Mile,Dawidowitsch,13,Xinpu
