In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Initialize SparkSession (this is automatically set in Databricks, but can be customized if needed)
spark = SparkSession.builder \
    .appName("pyspark-databricks") \
    .config("spark.sql.shuffle.partitions", "1") \
    .config("spark.default.parallelism", "1") \
    .getOrCreate()

# Access the SparkContext from the SparkSession
sc = spark.sparkContext

###Data Preparation

In [0]:
data = [
    (0, "06-26-2011", 300.4, "Exercise", "GymnasticsPro", "cash"),
    (1, "05-26-2011", 200.0, "Exercise Band", "Weightlifting", "credit"),
    (2, "06-01-2011", 300.4, "Exercise", "Gymnastics Pro", "cash"),
    (3, "06-05-2011", 100.0, "Gymnastics", "Rings", "credit"),
    (4, "12-17-2011", 300.0, "Team Sports", "Field", "cash"),
    (5, "02-14-2011", 200.0, "Gymnastics", None, "cash"),
    (6, "06-05-2011", 100.0, "Exercise", "Rings", "credit"),
    (7, "12-17-2011", 300.0, "Team Sports", "Field", "cash"),
    (8, "02-14-2011", 200.0, "Gymnastics", None, "cash")
]

df = spark.createDataFrame(data, ["id", "tdate", "amount", "category", "product", "spendby"])
df.show()

data2 = [
    (4, "12-17-2011", 300.0, "Team Sports", "Field", "cash"),
    (5, "02-14-2011", 200.0, "Gymnastics", None, "cash"),
    (6, "02-14-2011", 200.0, "Winter", None, "cash"),
    (7, "02-14-2011", 200.0, "Winter", None, "cash")
]

df1 = spark.createDataFrame(data2, ["id", "tdate", "amount", "category", "product", "spendby"])
df1.show()

data4 = [
    (1, "raj"),
    (2, "ravi"),
    (3, "sai"),
    (5, "rani")
]

cust = spark.createDataFrame(data4, ["id", "name"])
cust.show()

data3 = [
    (1, "mouse"),
    (3, "mobile"),
    (7, "laptop")
]

prod = spark.createDataFrame(data3, ["id", "product"])
prod.show()

# Register DataFrames as temporary views
df.createOrReplaceTempView("df")
df1.createOrReplaceTempView("df1")
cust.createOrReplaceTempView("cust")
prod.createOrReplaceTempView("prod")


+---+----------+------+-------------+--------------+-------+
| id|     tdate|amount|     category|       product|spendby|
+---+----------+------+-------------+--------------+-------+
|  0|06-26-2011| 300.4|     Exercise| GymnasticsPro|   cash|
|  1|05-26-2011| 200.0|Exercise Band| Weightlifting| credit|
|  2|06-01-2011| 300.4|     Exercise|Gymnastics Pro|   cash|
|  3|06-05-2011| 100.0|   Gymnastics|         Rings| credit|
|  4|12-17-2011| 300.0|  Team Sports|         Field|   cash|
|  5|02-14-2011| 200.0|   Gymnastics|          null|   cash|
|  6|06-05-2011| 100.0|     Exercise|         Rings| credit|
|  7|12-17-2011| 300.0|  Team Sports|         Field|   cash|
|  8|02-14-2011| 200.0|   Gymnastics|          null|   cash|
+---+----------+------+-------------+--------------+-------+

+---+----------+------+-----------+-------+-------+
| id|     tdate|amount|   category|product|spendby|
+---+----------+------+-----------+-------+-------+
|  4|12-17-2011| 300.0|Team Sports|  Field|   cash

####Pyspark-sql queries begin...

In [0]:
spark.sql("SELECT * FROM df").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
6,06-05-2011,100.0,Exercise,Rings,credit
7,12-17-2011,300.0,Team Sports,Field,cash
8,02-14-2011,200.0,Gymnastics,,cash


In [0]:
spark.sql('SELECT id, tdate from df').display()

id,tdate
0,06-26-2011
1,05-26-2011
2,06-01-2011
3,06-05-2011
4,12-17-2011
5,02-14-2011
6,06-05-2011
7,12-17-2011
8,02-14-2011


In [0]:
spark.sql("SELECT * FROM df WHERE category='Exercise'").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
6,06-05-2011,100.0,Exercise,Rings,credit


In [0]:
spark.sql("SELECT id, tdate, category, spendby FROM df WHERE category='Exercise' and spendby='cash'").display()

id,tdate,category,spendby
0,06-26-2011,Exercise,cash
2,06-01-2011,Exercise,cash


In [0]:
spark.sql("SELECT * FROM df WHERE category IN ('Exercise', 'Gymnastics')").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
5,02-14-2011,200.0,Gymnastics,,cash
6,06-05-2011,100.0,Exercise,Rings,credit
8,02-14-2011,200.0,Gymnastics,,cash


In [0]:
spark.sql("SELECT * FROM df WHERE product like '%Gymnastics%'").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash


In [0]:
spark.sql("SELECT * FROM df WHERE category != 'Exercise'").display()

id,tdate,amount,category,product,spendby
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
7,12-17-2011,300.0,Team Sports,Field,cash
8,02-14-2011,200.0,Gymnastics,,cash


In [0]:
spark.sql("SELECT * FROM df WHERE category not in ('Exercise', 'Gymnastics')").display()

id,tdate,amount,category,product,spendby
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
4,12-17-2011,300.0,Team Sports,Field,cash
7,12-17-2011,300.0,Team Sports,Field,cash


In [0]:
spark.sql("SELECT * FROM df WHERE product is null").display()

id,tdate,amount,category,product,spendby
5,02-14-2011,200.0,Gymnastics,,cash
8,02-14-2011,200.0,Gymnastics,,cash


In [0]:
spark.sql("SELECT * FROM df WHERE product is not null").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
6,06-05-2011,100.0,Exercise,Rings,credit
7,12-17-2011,300.0,Team Sports,Field,cash


In [0]:
spark.sql("SELECT max(id) as highest_value FROM df").display()

highest_value
8


In [0]:
spark.sql("SELECT min(id) as highest_value FROM df").display()

highest_value
0


In [0]:
spark.sql("SELECT count(1) FROM df").display()

count(1)
9


In [0]:
spark.sql("SELECT *,CASE WHEN spendby = 'cash' THEN 1 ELSE 0 END as status FROM df").display()

id,tdate,amount,category,product,spendby,status
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash,1
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit,0
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash,1
3,06-05-2011,100.0,Gymnastics,Rings,credit,0
4,12-17-2011,300.0,Team Sports,Field,cash,1
5,02-14-2011,200.0,Gymnastics,,cash,1
6,06-05-2011,100.0,Exercise,Rings,credit,0
7,12-17-2011,300.0,Team Sports,Field,cash,1
8,02-14-2011,200.0,Gymnastics,,cash,1


In [0]:
spark.sql("SELECT *, concat(id,'-',category) as condata FROM df").display()

id,tdate,amount,category,product,spendby,condata
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash,0-Exercise
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit,1-Exercise Band
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash,2-Exercise
3,06-05-2011,100.0,Gymnastics,Rings,credit,3-Gymnastics
4,12-17-2011,300.0,Team Sports,Field,cash,4-Team Sports
5,02-14-2011,200.0,Gymnastics,,cash,5-Gymnastics
6,06-05-2011,100.0,Exercise,Rings,credit,6-Exercise
7,12-17-2011,300.0,Team Sports,Field,cash,7-Team Sports
8,02-14-2011,200.0,Gymnastics,,cash,8-Gymnastics


In [0]:
spark.sql("SELECT id, category, product, concat_ws('-', id, category, product) as condata FROM df").display()

id,category,product,condata
0,Exercise,GymnasticsPro,0-Exercise-GymnasticsPro
1,Exercise Band,Weightlifting,1-Exercise Band-Weightlifting
2,Exercise,Gymnastics Pro,2-Exercise-Gymnastics Pro
3,Gymnastics,Rings,3-Gymnastics-Rings
4,Team Sports,Field,4-Team Sports-Field
5,Gymnastics,,5-Gymnastics
6,Exercise,Rings,6-Exercise-Rings
7,Team Sports,Field,7-Team Sports-Field
8,Gymnastics,,8-Gymnastics


In [0]:
spark.sql("SELECT category, lower(category) FROM df").display()

category,lower(category)
Exercise,exercise
Exercise Band,exercise band
Exercise,exercise
Gymnastics,gymnastics
Team Sports,team sports
Gymnastics,gymnastics
Exercise,exercise
Team Sports,team sports
Gymnastics,gymnastics


In [0]:
spark.sql("SELECT category, upper(category) FROM df").display()

category,upper(category)
Exercise,EXERCISE
Exercise Band,EXERCISE BAND
Exercise,EXERCISE
Gymnastics,GYMNASTICS
Team Sports,TEAM SPORTS
Gymnastics,GYMNASTICS
Exercise,EXERCISE
Team Sports,TEAM SPORTS
Gymnastics,GYMNASTICS


In [0]:
spark.sql("SELECT amount, ceil(amount) as updated_value FROM df").display()

amount,updated_value
300.4,301
200.0,200
300.4,301
100.0,100
300.0,300
200.0,200
100.0,100
300.0,300
200.0,200


In [0]:
spark.sql("SELECT amount, floor(amount) as updated_value FROM df").display()

amount,updated_value
300.4,300
200.0,200
300.4,300
100.0,100
300.0,300
200.0,200
100.0,100
300.0,300
200.0,200


In [0]:
spark.sql("SELECT amount, round(amount) as updated_value FROM df").display()

amount,updated_value
300.4,300.0
200.0,200.0
300.4,300.0
100.0,100.0
300.0,300.0
200.0,200.0
100.0,100.0
300.0,300.0
200.0,200.0


In [0]:
spark.sql("SELECT product, coalesce(product,'NA') FROM df").display()

product,"coalesce(product, NA)"
GymnasticsPro,GymnasticsPro
Weightlifting,Weightlifting
Gymnastics Pro,Gymnastics Pro
Rings,Rings
Field,Field
,
Rings,Rings
Field,Field
,


In [0]:
spark.sql("SELECT trim(product) FROM df").display()

trim(product)
GymnasticsPro
Weightlifting
Gymnastics Pro
Rings
Field
""
Rings
Field
""


In [0]:
spark.sql("SELECT DISTINCT category FROM df").display()

category
Exercise
Exercise Band
Gymnastics
Team Sports


In [0]:
spark.sql("SELECT DISTINCT category, spendby FROM df").display()

category,spendby
Exercise,cash
Exercise Band,credit
Gymnastics,credit
Team Sports,cash
Gymnastics,cash
Exercise,credit


In [0]:
spark.sql("SELECT DISTINCT * FROM df").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
6,06-05-2011,100.0,Exercise,Rings,credit
7,12-17-2011,300.0,Team Sports,Field,cash
8,02-14-2011,200.0,Gymnastics,,cash


In [0]:
spark.sql("SELECT *,substring(product, 1, 10) as substring_value FROM df").display()

id,tdate,amount,category,product,spendby,substring_value
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash,Gymnastics
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit,Weightlift
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash,Gymnastics
3,06-05-2011,100.0,Gymnastics,Rings,credit,Rings
4,12-17-2011,300.0,Team Sports,Field,cash,Field
5,02-14-2011,200.0,Gymnastics,,cash,
6,06-05-2011,100.0,Exercise,Rings,credit,Rings
7,12-17-2011,300.0,Team Sports,Field,cash,Field
8,02-14-2011,200.0,Gymnastics,,cash,


In [0]:
spark.sql("SELECT *, split(product, ' ')[0] as split_data FROM df").display()

id,tdate,amount,category,product,spendby,split_data
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash,GymnasticsPro
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit,Weightlifting
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash,Gymnastics
3,06-05-2011,100.0,Gymnastics,Rings,credit,Rings
4,12-17-2011,300.0,Team Sports,Field,cash,Field
5,02-14-2011,200.0,Gymnastics,,cash,
6,06-05-2011,100.0,Exercise,Rings,credit,Rings
7,12-17-2011,300.0,Team Sports,Field,cash,Field
8,02-14-2011,200.0,Gymnastics,,cash,


In [0]:
df.display()
df1.display()
cust.display()
prod.display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
6,06-05-2011,100.0,Exercise,Rings,credit
7,12-17-2011,300.0,Team Sports,Field,cash
8,02-14-2011,200.0,Gymnastics,,cash


id,tdate,amount,category,product,spendby
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
6,02-14-2011,200.0,Winter,,cash
7,02-14-2011,200.0,Winter,,cash


id,name
1,raj
2,ravi
3,sai
5,rani


id,product
1,mouse
3,mobile
7,laptop


In [0]:
spark.sql("SELECT * FROM df UNION ALL SELECT * FROM df1").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
6,06-05-2011,100.0,Exercise,Rings,credit
7,12-17-2011,300.0,Team Sports,Field,cash
8,02-14-2011,200.0,Gymnastics,,cash
4,12-17-2011,300.0,Team Sports,Field,cash


In [0]:
spark.sql("SELECT * FROM df UNION SELECT * FROM df1").display()

id,tdate,amount,category,product,spendby
0,06-26-2011,300.4,Exercise,GymnasticsPro,cash
1,05-26-2011,200.0,Exercise Band,Weightlifting,credit
2,06-01-2011,300.4,Exercise,Gymnastics Pro,cash
3,06-05-2011,100.0,Gymnastics,Rings,credit
4,12-17-2011,300.0,Team Sports,Field,cash
5,02-14-2011,200.0,Gymnastics,,cash
6,06-05-2011,100.0,Exercise,Rings,credit
7,12-17-2011,300.0,Team Sports,Field,cash
8,02-14-2011,200.0,Gymnastics,,cash
6,02-14-2011,200.0,Winter,,cash


In [0]:
spark.sql("SELECT category, sum(amount) as total_amount FROM df GROUP BY category").display()

category,total_amount
Exercise,700.8
Exercise Band,200.0
Gymnastics,500.0
Team Sports,600.0


In [0]:
spark.sql("SELECT category, spendby, sum(amount) FROM df GROUP BY category, spendby").display()

category,spendby,sum(amount)
Exercise,cash,600.8
Exercise Band,credit,200.0
Gymnastics,credit,100.0
Team Sports,cash,600.0
Gymnastics,cash,400.0
Exercise,credit,100.0


In [0]:
spark.sql("SELECT category, spendby, sum(amount) as total, count(amount) as count FROM df GROUP BY category, spendby").display()

category,spendby,total,count
Exercise,cash,600.8,2
Exercise Band,credit,200.0,1
Gymnastics,credit,100.0,1
Team Sports,cash,600.0,2
Gymnastics,cash,400.0,2
Exercise,credit,100.0,1


In [0]:
spark.sql("SELECT category, max(amount) as max_value FROM df GROUP BY category").display()

category,max_value
Exercise,300.4
Exercise Band,200.0
Gymnastics,200.0
Team Sports,300.0


In [0]:
spark.sql("SELECT category, max(amount) as max_value FROM df GROUP BY category ORDER BY category desc").display()

category,max_value
Team Sports,300.0
Gymnastics,200.0
Exercise Band,200.0
Exercise,300.4


In [0]:
spark.sql("SELECT category, amount, row_number() over(PARTITION BY category ORDER BY amount DESC) as rownumber FROM df ORDER BY category DESC").display()

category,amount,rownumber
Team Sports,300.0,1
Team Sports,300.0,2
Gymnastics,200.0,1
Gymnastics,200.0,2
Gymnastics,100.0,3
Exercise Band,200.0,1
Exercise,300.4,1
Exercise,300.4,2
Exercise,100.0,3


In [0]:
spark.sql("SELECT category, amount, rank() OVER(PARTITION BY category ORDER BY amount DESC) as rank_value FROM df").display()

category,amount,rank_value
Exercise,300.4,1
Exercise,300.4,1
Exercise,100.0,3
Exercise Band,200.0,1
Gymnastics,200.0,1
Gymnastics,200.0,1
Gymnastics,100.0,3
Team Sports,300.0,1
Team Sports,300.0,1


In [0]:
spark.sql("SELECT category, amount, dense_rank() OVER (PARTITION BY category ORDER BY amount DESC) as dense_rank FROM df").display()

category,amount,dense_rank
Exercise,300.4,1
Exercise,300.4,1
Exercise,100.0,2
Exercise Band,200.0,1
Gymnastics,200.0,1
Gymnastics,200.0,1
Gymnastics,100.0,2
Team Sports,300.0,1
Team Sports,300.0,1


In [0]:
spark.sql("SELECT category, amount, lead(amount) OVER (PARTITION BY category ORDER BY amount DESC) as lead_datax FROM df").display()

category,amount,lead_datax
Exercise,300.4,300.4
Exercise,300.4,100.0
Exercise,100.0,
Exercise Band,200.0,
Gymnastics,200.0,200.0
Gymnastics,200.0,100.0
Gymnastics,100.0,
Team Sports,300.0,300.0
Team Sports,300.0,


In [0]:
spark.sql("SELECT category, amount, lag(amount) OVER (PARTITION BY category ORDER BY amount DESC) as lag_datax FROM df").display()

category,amount,lag_datax
Exercise,300.4,
Exercise,300.4,300.4
Exercise,100.0,300.4
Exercise Band,200.0,
Gymnastics,200.0,
Gymnastics,200.0,200.0
Gymnastics,100.0,200.0
Team Sports,300.0,
Team Sports,300.0,300.0


In [0]:
spark.sql("SELECT category, count(category) as cnt FROM df GROUP BY category HAVING count(category)>1").display()

category,cnt
Exercise,3
Gymnastics,3
Team Sports,2


In [0]:
cust.display()
prod.display()

id,name
1,raj
2,ravi
3,sai
5,rani


id,product
1,mouse
3,mobile
7,laptop


In [0]:
spark.sql("SELECT a.*, product FROM cust a JOIN prod b ON a.id == b.id").display()

id,name,product
1,raj,mouse
3,sai,mobile


In [0]:
spark.sql("SELECT cust.id, cust.name, prod.product FROM cust JOIN prod ON cust.id == prod.id").display()

id,name,product
1,raj,mouse
3,sai,mobile


In [0]:
spark.sql("SELECT c.id, c.name, p.product FROM cust c LEFT JOIN prod p ON c.id == p.id").display()

id,name,product
1,raj,mouse
2,ravi,
3,sai,mobile
5,rani,


In [0]:
spark.sql("SELECT c.id, c.name, p.product FROM cust c RIGHT JOIN prod p ON c.id == p.id").display()

id,name,product
1.0,raj,mouse
3.0,sai,mobile
,,laptop


In [0]:
spark.sql("SELECT c.id, c.name, p.product FROM cust c CROSS JOIN prod p").display()

id,name,product
1,raj,mouse
1,raj,mobile
1,raj,laptop
2,ravi,mouse
2,ravi,mobile
2,ravi,laptop
3,sai,mouse
3,sai,mobile
3,sai,laptop
5,rani,mouse


In [0]:
spark.sql("SELECT p.id, c.name, p.product FROM prod p CROSS JOIN cust c").display()

id,name,product
1,raj,mouse
1,ravi,mouse
1,sai,mouse
1,rani,mouse
3,raj,mobile
3,ravi,mobile
3,sai,mobile
3,rani,mobile
7,raj,laptop
7,ravi,laptop


In [0]:
spark.sql("SELECT c.id, c.name, p.product FROM cust c FULL JOIN prod p ON c.id == p.id").display()

id,name,product
1.0,raj,mouse
2.0,ravi,
3.0,sai,mobile
5.0,rani,
,,laptop


In [0]:
spark.sql("SELECT c.id, c.name FROM cust c LEFT ANTI JOIN prod p ON c.id == p.id").display()

id,name
2,ravi
5,rani
