#Basic Operations

##Libraries

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import column as col
from pyspark.sql.window import Window 
from pyspark.sql.functions import monotonically_increasing_id

##Sample Data Frame Creation

In [0]:
data = [(1,'giri',100),(2,'john',200),(3,'pranesh',150),(3,'pranesh',130)]
schema = ['id','name','salary']
df = spark.createDataFrame(data,schema)
df.display()

id,name,salary
1,giri,100
2,john,200
3,pranesh,150
3,pranesh,130


### 1) Show the DataFrame

In [0]:
df.show()
df.printSchema()
df.display()
df.show(truncate = False)

+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|   giri|   100|
|  2|   john|   200|
|  3|pranesh|   150|
|  3|pranesh|   130|
+---+-------+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



id,name,salary
1,giri,100
2,john,200
3,pranesh,150
3,pranesh,130


+---+-------+------+
|id |name   |salary|
+---+-------+------+
|1  |giri   |100   |
|2  |john   |200   |
|3  |pranesh|150   |
|3  |pranesh|130   |
+---+-------+------+



### 2) Select only name and salary

In [0]:

df.select(df.name,df.salary).show()


+-------+------+
|   name|salary|
+-------+------+
|   giri|   100|
|   john|   200|
|pranesh|   150|
|pranesh|   130|
+-------+------+



### 3) Rename column salary to income

In [0]:

df.select(df.id,df.name,df.salary.alias('income')).show()
df.withColumnRenamed('salary','income').show()

+---+-------+------+
| id|   name|income|
+---+-------+------+
|  1|   giri|   100|
|  2|   john|   200|
|  3|pranesh|   150|
|  3|pranesh|   130|
+---+-------+------+

+---+-------+------+
| id|   name|income|
+---+-------+------+
|  1|   giri|   100|
|  2|   john|   200|
|  3|pranesh|   150|
|  3|pranesh|   130|
+---+-------+------+



### 4) Add a new column salary_bonus = salary + 50

In [0]:
df.withColumn('salary_bonus',df.salary + 50).show()
df.withColumn('salary_bonus',col('salary') + 50).show()

+---+-------+------+------------+
| id|   name|salary|salary_bonus|
+---+-------+------+------------+
|  1|   giri|   100|         150|
|  2|   john|   200|         250|
|  3|pranesh|   150|         200|
|  3|pranesh|   130|         180|
+---+-------+------+------------+

+---+-------+------+------------+
| id|   name|salary|salary_bonus|
+---+-------+------+------------+
|  1|   giri|   100|         150|
|  2|   john|   200|         250|
|  3|pranesh|   150|         200|
|  3|pranesh|   130|         180|
+---+-------+------+------------+



### 5) Filter employees whose salary is greater than 150

In [0]:
df.filter(df.salary > 150).show()
df.filter(col('salary') > 150).show()
df.where('salary > 150').show()


+---+----+------+
| id|name|salary|
+---+----+------+
|  2|john|   200|
+---+----+------+

+---+----+------+
| id|name|salary|
+---+----+------+
|  2|john|   200|
+---+----+------+

+---+----+------+
| id|name|salary|
+---+----+------+
|  2|john|   200|
+---+----+------+



### 6) Find the total salary paid

In [0]:
df.agg(F.sum('salary')).show()
df.agg(F.sum('salary').alias('total_salary')).show()
df.select(F.sum('salary').alias('total_salary')).show()
df.groupBy().sum("salary").show()

+-----------+
|sum(salary)|
+-----------+
|        580|
+-----------+

+------------+
|total_salary|
+------------+
|         580|
+------------+

+------------+
|total_salary|
+------------+
|         580|
+------------+



### 7) Find the average salary

In [0]:
df.agg(F.avg('salary')).show()
df.agg(F.avg('salary').alias('avg_salary')).show()
df.select(F.avg('salary').alias('avg_salary')).show()
df.groupBy().avg('salary').show()

+-----------+
|avg(salary)|
+-----------+
|      145.0|
+-----------+

+----------+
|avg_salary|
+----------+
|     145.0|
+----------+

+----------+
|avg_salary|
+----------+
|     145.0|
+----------+

+-----------+
|avg(salary)|
+-----------+
|      145.0|
+-----------+



### 8) Find max salary for each name

In [0]:
df.groupBy('name').agg(F.max('salary').alias('max_salary')).show()

+-------+----------+
|   name|max_salary|
+-------+----------+
|   giri|       100|
|   john|       200|
|pranesh|       150|
+-------+----------+



### 9) Count how many records each name has

In [0]:
df.groupBy('name').agg(F.count('*').alias('name_count')).show()
df.groupBy('name').count().show()

+-------+----------+
|   name|name_count|
+-------+----------+
|   giri|         1|
|   john|         1|
|pranesh|         2|
+-------+----------+

+-------+-----+
|   name|count|
+-------+-----+
|   giri|    1|
|   john|    1|
|pranesh|    2|
+-------+-----+



### 10) Find the sum of salary per id

In [0]:
df.groupBy("id").agg(F.sum('salary').alias('total_Salary')).show()

+---+------------+
| id|total_Salary|
+---+------------+
|  1|         100|
|  2|         200|
|  3|         280|
+---+------------+



### 11) Find duplicate records (same id and name)

In [0]:
df1 = df.groupBy('id','name').agg(F.count('*').alias('name_count'))
df1.filter(df1.name_count > 1).show()

df.groupBy("id","name").count().filter("count > 1").show()

+---+-------+----------+
| id|   name|name_count|
+---+-------+----------+
|  3|pranesh|         2|
+---+-------+----------+

+---+-------+-----+
| id|   name|count|
+---+-------+-----+
|  3|pranesh|    2|
+---+-------+-----+



### 12) Remove exact duplicate rows

In [0]:
df.dropDuplicates().show()
df.distinct().show()


+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|   giri|   100|
|  2|   john|   200|
|  3|pranesh|   150|
|  3|pranesh|   130|
+---+-------+------+

+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|   giri|   100|
|  2|   john|   200|
|  3|pranesh|   150|
|  3|pranesh|   130|
+---+-------+------+



### 13) Assign a row_number for each name ordered by salary descending

In [0]:
window_spec = Window.partitionBy("id").orderBy(F.col('salary').desc())
df.withColumn("row_number", F.row_number().over(window_spec)).show()

df4 = df.withColumn("unique_id",monotonically_increasing_id())
df4.show()

+---+-------+------+----------+
| id|   name|salary|row_number|
+---+-------+------+----------+
|  1|   giri|   100|         1|
|  2|   john|   200|         1|
|  3|pranesh|   150|         1|
|  3|pranesh|   130|         2|
+---+-------+------+----------+

+---+-------+------+---------+
| id|   name|salary|unique_id|
+---+-------+------+---------+
|  1|   giri|   100|        0|
|  2|   john|   200|        1|
|  3|pranesh|   150|        2|
|  3|pranesh|   130|        3|
+---+-------+------+---------+



### 14) From duplicates, keep only the highest salary

In [0]:
window_spec = Window.partitionBy('id').orderBy(F.col('salary').desc())
df.withColumn("row_number",F.row_number().over(window_spec)).filter("row_number = 1").show()
df.withColumn("row_number",F.row_number().over(window_spec)).filter(F.col("row_number") == 1).show()

+---+-------+------+----------+
| id|   name|salary|row_number|
+---+-------+------+----------+
|  1|   giri|   100|         1|
|  2|   john|   200|         1|
|  3|pranesh|   150|         1|
+---+-------+------+----------+

+---+-------+------+----------+
| id|   name|salary|row_number|
+---+-------+------+----------+
|  1|   giri|   100|         1|
|  2|   john|   200|         1|
|  3|pranesh|   150|         1|
+---+-------+------+----------+



### 15) Find the second highest salary overall


In [0]:
window_spec = Window.partitionBy().orderBy(F.col('salary').desc())
df.withColumn("rank",F.dense_rank().over(window_spec)).filter(F.col("rank") == 2).show()

window_spec = Window.orderBy(F.col("salary").desc())
df.withColumn("rank", F.dense_rank().over(window_spec)) \
  .filter(F.col("rank") == 2) \
  .show()




+---+-------+------+----+
| id|   name|salary|rank|
+---+-------+------+----+
|  3|pranesh|   150|   2|
+---+-------+------+----+

+---+-------+------+----+
| id|   name|salary|rank|
+---+-------+------+----+
|  3|pranesh|   150|   2|
+---+-------+------+----+



### 16) Find employees whose salary is less than the average salary

In [0]:
avg_salary = df.agg(F.avg('salary')).collect()[0][0]
df.filter(df.salary < avg_salary).show()
avg_salary = df.select(F.avg("salary")).first()[0]
df.filter(F.col("salary") < avg_salary).show()

df.withColumn("avg_salary", F.avg("salary").over(Window.partitionBy()))\
    .filter(F.col("salary") < F.col("avg_salary"))\
    .drop("avg_salary").show()

+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|   giri|   100|
|  3|pranesh|   130|
+---+-------+------+

+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|   giri|   100|
|  3|pranesh|   130|
+---+-------+------+





+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  1|   giri|   100|
|  3|pranesh|   130|
+---+-------+------+



### 17) Create a new column salary_category:

salary >= 150 → "High"

salary < 150 → "Low"

Try writing the PySpark code using withColumn()

In [0]:
df.withColumn("salary_category", F.when(F.col("salary") < 150, "Low")\
                                        .when(F.col("salary") >= 150, "High")\
                                        .otherwise("None"))\
  .show()

+---+-------+------+---------------+
| id|   name|salary|salary_category|
+---+-------+------+---------------+
|  1|   giri|   100|            Low|
|  2|   john|   200|           High|
|  3|pranesh|   150|           High|
|  3|pranesh|   130|            Low|
+---+-------+------+---------------+



### 18) Top 1 salary per name using window function?

In [0]:
window_spec = Window.partitionBy("name").orderBy(F.col('salary').desc())
df.withColumn("row_number",F.row_number().over(window_spec)).filter("row_number = 1").show()

df.withColumn("rn", F.row_number().over(window_spec)) \
  .filter(F.col("rn") == 1) \
  .drop("rn") \
  .show()

+---+-------+------+----------+
| id|   name|salary|row_number|
+---+-------+------+----------+
|  1|   giri|   100|         1|
|  2|   john|   200|         1|
|  3|pranesh|   150|         1|
+---+-------+------+----------+



### 19) Sort the DataFrame by salary descending

In [0]:
df.orderBy(F.col("salary").desc()).show()
df.sort(F.col("salary").desc()).show()
df.orderBy("salary", ascending=False).show()

+---+-------+------+
| id|   name|salary|
+---+-------+------+
|  2|   john|   200|
|  3|pranesh|   150|
|  3|pranesh|   130|
|  1|   giri|   100|
+---+-------+------+



### 20) Convert this DataFrame to Spark SQL and write a SQL query to get name and max salary

In [0]:
df.createOrReplaceTempView("Data")
spark.sql("select name, max(salary) AS max_salary from Data group by name").show()

+-------+----------+
|   name|max_salary|
+-------+----------+
|   giri|       100|
|   john|       200|
|pranesh|       150|
+-------+----------+



In [0]:
df1 = df.groupBy(df.id,df.name).sum('salary').alias('SumSalary')
df1.display()
df2 = df1.withColumnRenamed('sum(salary)','sumSalary')
df2.display()
df2.filter(df2.sumSalary > 200).display()