# pyspark installation

In [None]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 KB[0m [31m25.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=03717dd654b9fe1f920ab14991e2988d3b51436818278d8f62fa47d548019c33
 

 # Create spark session with hive enabled

In [None]:
from os.path import abspath

from pyspark.sql import SparkSession

# warehouse_location
warehouse_location = abspath('hive-warehouse')

# Create spark session with hive enabled
spark = SparkSession.builder.master('local').appName('sql').config('spark.sql.warehouse.dir',warehouse_location).enableHiveSupport().config("hive.exec.dynamic.partition","true").config("hive.exec.dynamic.partition.mode","nonstrict").config("spark.sql.legacy.allowNonEmptyLocationInCTAS","true").getOrCreate()

spark



# show databases

In [None]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



# creating new database

In [None]:
spark.sql('create database if not exists test3').show()

++
||
++
++



# creating table using specified file format:

In [None]:
spark.sql('create table if not exists test3.Employee_csv (Id Int ,Name string ,loc string) using csv')

DataFrame[]

In [None]:
# DDL of the table

spark.sql('show create table test3.Employee_csv ').show(truncate = False)

+----------------------------------------------------------------------------------------+
|createtab_stmt                                                                          |
+----------------------------------------------------------------------------------------+
|CREATE TABLE test3.employee_csv (\n  Id INT,\n  Name STRING,\n  loc STRING)\nUSING csv\n|
+----------------------------------------------------------------------------------------+



In [None]:
# insert data into table
spark.sql("insert into test3.employee_csv values(1,'Satish','hyd')")

DataFrame[]

In [None]:
ls /content/hive-warehouse/test3.db/employee_csv/

part-00000-00cefcd9-7160-44fc-9530-15c0bb8dea45-c000.csv  _SUCCESS


In [None]:
# show table
spark.sql("select * from test3.employee_csv ").show()

+---+------+---+
| Id|  Name|loc|
+---+------+---+
|  1|Satish|hyd|
+---+------+---+



# show tables in database

In [None]:
spark.sql('show tables in test3').show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|    test3|employee_csv|      false|
+---------+------------+-----------+



# describe database

In [None]:
spark.sql("describe database test3" ).show(truncate = False)

+--------------+-------------------------------------+
|info_name     |info_value                           |
+--------------+-------------------------------------+
|Namespace Name|test3                                |
|Comment       |                                     |
|Location      |file:/content/hive-warehouse/test3.db|
|Owner         |root                                 |
+--------------+-------------------------------------+



# create and descibe database in specified location

In [None]:
spark.sql("create database if not exists sandeep location '/content/ext_db'")

DataFrame[]

In [None]:
spark.sql("describe database sandeep").show(truncate = False)

+--------------+--------------------+
|info_name     |info_value          |
+--------------+--------------------+
|Namespace Name|sandeep             |
|Comment       |                    |
|Location      |file:/content/ext_db|
|Owner         |root                |
+--------------+--------------------+



# internal table 

internal table /managed table -- default location (user/hive/warehouse/db/table_name)

In [None]:


spark.sql("create table if not exists student_int(id int ,Name string ,rollno int) ")

DataFrame[]

In [None]:
spark.sql("insert into student_int values(1,'sandeep',0382)")

DataFrame[]

In [None]:
spark.sql("insert into table student_int values(2,'satish',0561)")

DataFrame[]

In [None]:
spark.sql("select * from student_int").show(truncate = False)

+---+-------+------+
|id |Name   |rollno|
+---+-------+------+
|2  |satish |561   |
|1  |sandeep|382   |
+---+-------+------+



In [None]:
spark.sql("show create table student_int").show(truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                        |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE default.student_int (\n  id INT,\n  Name STRING,\n  rollno INT)\nUSING text\nTBLPROPERTIES (\n  'transient_lastDdlTime' = '1678688136')\n|
+------------------------------------------------------------------------------------------------------------------------------------------------------+



# External table creation


1. can create external table by using location
2. can create external table by using external key word location



In [None]:
spark.sql("create table student_ext(id int ,name string ,rollno int) location '/content/External/student_ext'")

DataFrame[]

In [None]:
spark.sql("show tables").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|student_ext|      false|
|  default|student_int|      false|
+---------+-----------+-----------+



In [None]:
spark.sql("describe table default.student_ext ").show(truncate = False)

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|id      |int      |null   |
|name    |string   |null   |
|rollno  |int      |null   |
+--------+---------+-------+



In [None]:
spark.sql("insert into table student_ext values(1,'sandeep',0382)")

DataFrame[]

In [None]:
spark.sql("insert into table student_ext values(2,'satish',0561)")

DataFrame[]

In [None]:
spark.sql("select * from student_ext").show(truncate = False)

+---+-------+------+
|id |name   |rollno|
+---+-------+------+
|2  |satish |561   |
|1  |sandeep|382   |
+---+-------+------+



In [None]:
spark.sql('show create table student_ext').show(truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                       |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE default.student_ext (\n  id INT,\n  name STRING,\n  rollno INT)\nUSING text\nLOCATION 'file:/content/External/student_ext'\nTBLPROPERTIES (\n  'transient_lastDdlTime' = '1678688139')\n|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


In [None]:
spark.sql("create external table if not exists family_ext (sno int ,name string ,age int ) location '/content/External/family_ext'")

DataFrame[]

In [None]:
spark.sql("show tables").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default| family_ext|      false|
|  default|student_ext|      false|
|  default|student_int|      false|
+---------+-----------+-----------+



In [None]:
spark.sql("insert into table family_ext values(1,'Krishna',60)")

DataFrame[]

In [None]:
spark.sql('select * from family_ext').show(truncate = False)

+---+-------+---+
|sno|name   |age|
+---+-------+---+
|1  |Krishna|60 |
+---+-------+---+



In [None]:
spark.sql('show create table family_ext').show(truncate = False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE default.family_ext (\n  sno INT,\n  name STRING,\n  age INT)\nUSING text\nLOCATION 'file:/content/External/family_ext'\nTBLPROPERTIES (\n  'transient_lastDdlTime' = '1678688141')\n|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



# Difference between internal and external table

if we drop internal table both table meta data and data will be deleted (/content/hive-warehouse)

if we drop External table only table meta data dropped ,data will remain in external location (/content/External)

In [None]:
spark.sql('drop table student_int')

DataFrame[]

In [None]:
spark.sql('drop table student_ext')

DataFrame[]

# Views

View : window of a table or query , view does not store any data.

# types of views:

1. Normal View

2. Temporary view
*   global temporary view
*   temporary view

# Creating Normal view

In [None]:
spark.sql("create external table test3.emp(id int,fname string,lname string,salary int,exp int) location '/content/External/emp'")

DataFrame[]

In [None]:
spark.sql("insert into test3.emp(id,fname,lname,salary,exp) values(1,'Satish','Nookala',100000,7)")
spark.sql("insert into test3.emp(id,fname,lname,salary,exp) values(2,'Sandeep','Nookala',50000,4)")
spark.sql("insert into test3.emp(id,fname,lname,salary,exp) values(3,'krishna','Nookala',10000,2)")
spark.sql("insert into test3.emp(id,fname,lname,salary,exp) values(4,'padma','Nookala',1000,2)")
spark.sql("insert into test3.emp(id,fname,lname,salary,exp) values(5,'anusha','Nookala',100,1)")
spark.sql("insert into test3.emp(id,fname,lname,salary,exp) values(6,'swathi','Nookala',10,0)")

DataFrame[]

In [None]:
spark.sql('select * from test3.emp').show()

+---+-------+-------+------+---+
| id|  fname|  lname|salary|exp|
+---+-------+-------+------+---+
|  5| anusha|Nookala|   100|  1|
|  6| swathi|Nookala|    10|  0|
|  1| Satish|Nookala|100000|  7|
|  4|  padma|Nookala|  1000|  2|
|  3|krishna|Nookala| 10000|  2|
|  2|Sandeep|Nookala| 50000|  4|
+---+-------+-------+------+---+



In [None]:
spark.sql('select * from test3.emp order by id').show()

+---+-------+-------+------+---+
| id|  fname|  lname|salary|exp|
+---+-------+-------+------+---+
|  1| Satish|Nookala|100000|  7|
|  2|Sandeep|Nookala| 50000|  4|
|  3|krishna|Nookala| 10000|  2|
|  4|  padma|Nookala|  1000|  2|
|  5| anusha|Nookala|   100|  1|
|  6| swathi|Nookala|    10|  0|
+---+-------+-------+------+---+



In [None]:
# create or replce view from select query 

spark.sql('create or replace view emp_view_id as select * from test3.emp order by id')

DataFrame[]

In [None]:
# create or replce view from select query

spark.sql('create or replace view emp_view_salary as select * from test3.emp order by salary desc')


DataFrame[]

In [None]:
# DL of view emp_view_salary
spark.sql('show create table emp_view_salary').show(truncate = False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE VIEW default.emp_view_salary (\n  id,\n  fname,\n  lname,\n  salary,\n  exp)\nTBLPROPERTIES (\n  'transient_lastDdlTime' = '1678688149')\nAS select * from test3.emp order by salary desc\n|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
#DL of view emp_view_id
spark.sql('show create table emp_view_id').show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE VIEW default.emp_view_id (\n  id,\n  fname,\n  lname,\n  salary,\n  exp)\nTBLPROPERTIES (\n  'transient_lastDdlTime' = '1678688148')\nAS select * from test3.emp order by id\n|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
spark.sql('select * from emp_view_salary').show(truncate = False)

+---+-------+-------+------+---+
|id |fname  |lname  |salary|exp|
+---+-------+-------+------+---+
|1  |Satish |Nookala|100000|7  |
|2  |Sandeep|Nookala|50000 |4  |
|3  |krishna|Nookala|10000 |2  |
|4  |padma  |Nookala|1000  |2  |
|5  |anusha |Nookala|100   |1  |
|6  |swathi |Nookala|10    |0  |
+---+-------+-------+------+---+



In [None]:
spark.sql('select * from emp_view_id').show(truncate = False)

+---+-------+-------+------+---+
|id |fname  |lname  |salary|exp|
+---+-------+-------+------+---+
|1  |Satish |Nookala|100000|7  |
|2  |Sandeep|Nookala|50000 |4  |
|3  |krishna|Nookala|10000 |2  |
|4  |padma  |Nookala|1000  |2  |
|5  |anusha |Nookala|100   |1  |
|6  |swathi |Nookala|10    |0  |
+---+-------+-------+------+---+



# global temporary view

In [None]:
 #global temporary view
spark.sql('create or replace global temporary view gtv_emp as select * from test3.emp')

DataFrame[]

In [None]:
 #global temporary view
spark.sql('select * from global_temp.gtv_emp').show()

+---+-------+-------+------+---+
| id|  fname|  lname|salary|exp|
+---+-------+-------+------+---+
|  5| anusha|Nookala|   100|  1|
|  6| swathi|Nookala|    10|  0|
|  1| Satish|Nookala|100000|  7|
|  4|  padma|Nookala|  1000|  2|
|  3|krishna|Nookala| 10000|  2|
|  2|Sandeep|Nookala| 50000|  4|
+---+-------+-------+------+---+



# temporary view

In [None]:
spark.sql('create or replace temporary view tv_emp as select * from test3.emp')

DataFrame[]

In [None]:
spark.sql('select * from tv_emp').show(truncate = False)

+---+-------+-------+------+---+
|id |fname  |lname  |salary|exp|
+---+-------+-------+------+---+
|5  |anusha |Nookala|100   |1  |
|6  |swathi |Nookala|10    |0  |
|1  |Satish |Nookala|100000|7  |
|4  |padma  |Nookala|1000  |2  |
|3  |krishna|Nookala|10000 |2  |
|2  |Sandeep|Nookala|50000 |4  |
+---+-------+-------+------+---+



# Differnce between View ,Temp View ,Global Temp View

View : scope is spark catalog .view  can be accessed by any user in any session and in any cluster

Temp View: scope is user level only. view avaliable only that user cannot accessed by others

Global Temp View : scope is spark session only.view avaliable only at spark cluster level.any user within cluster can access view


# Constraints

1.   NOT NULL : indicates that values in a specific columns cannot be null
2.   CHECK: indicates that a specified Boolean must be true for each input row

# NOT NULL

In [None]:
spark.sql("create table if not exists test3.events( id long not null ,date string not null , location string,description string)")

DataFrame[]

In [None]:
spark.sql("insert into test3.events(id,date,location,description) values(1,current_Date,'Hyderabad','This sample data')")

DataFrame[]

In [None]:
spark.sql('select * from test3.events').show()

+---+----------+---------+----------------+
| id|      date| location|     description|
+---+----------+---------+----------------+
|  1|13-03-2023|Hyderabad|This sample data|
|  1|2023-03-13|Hyderabad|This sample data|
+---+----------+---------+----------------+



In [None]:
spark.sql('describe table test3.events').show()

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|         id|   bigint|   null|
|       date|   string|   null|
|   location|   string|   null|
|description|   string|   null|
+-----------+---------+-------+



# DML - data manipulation language

In [None]:
spark.sql("CREATE TABLE IF NOT EXISTS TEST3.EMP(EMPNO DECIMAL(4) NOT NULL,ENAME VARCHAR(10),JOB VARCHAR(10),MGR DECIMAL(4),HIREDATE STRING,SAL DECIMAL,COMM DECIMAL) partitioned by (DEPTNO DECIMAL(4))")
 

In [None]:
spark.sql("INSERT INTO TEST3.EMP VALUES(7369,'SMITH','CLERK',7902,'17-12-1980',800,null,20)")
spark.sql("INSERT INTO TEST3.EMP VALUES(7499,'ALLEN','SALESMAN',7698,'20-02-1981',1600,300,30)")
spark.sql("INSERT INTO TEST3.EMP VALUES(7521,'WARD','SALESMAN',7698,'22-02-1981',1250,500,30)")
spark.sql("INSERT INTO TEST3.EMP VALUES(7566,'JONES','MANAGER',7839,'04-02-1981',2975,null,20)")

In [None]:
spark.sql('select * from test3.emp').show()

# create table as select query

In [None]:
# crete new table with data using select query
spark.sql('create table test3.emp_bkp as select * from test3.emp')

In [None]:
spark.sql('select * from test3.emp_bkp').show()

In [None]:
# crete new table without data using select query

spark.sql('create table test3.emp_bkp2 as select * from test3.emp where 1=2')

In [None]:
spark.sql('select * from test3.emp_bkp2').show()

#Delete and truncate
used to delete entire table data
Delete : using this we can delete individual records

truncate: Normally it will delete entire table data and also can delete partitioned wise data using partition keyword

# truncate

In [None]:
spark.sql('select * from test3.emp_bkp').show()

In [None]:
spark.sql('truncate table test3.emp_bkp').show()

In [None]:
spark.sql('truncate table test3.emp partition( DEPTNO=20)').show()

In [None]:
spark.sql('select * from test3.emp').show()

# Delete

In [None]:
spark.sql("create external table test3.emp_ext location '/content/External/emp' as select * from test3.emp")

In [None]:
spark.sql('select * from test3.emp_ext').show()

In [None]:
#delete entire table data
spark.sql('delete from test3.emp_ext').show()

In [None]:
#delete specific recoders only
spark.sql('delete from test3.emp_ext where DEPTNO=20').show()