### *Window Function()*
PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row 
individually.It is also popularly growing to perform data transformations. We will understand the concept of window functions, syntax, and finally how to use them with PySpark 
SQL and PySpark DataFrame API. 

### *Ranking Function*
The function returns the statistical rank of a given value for each row in a partition or group. The goal of this function is to provide consecutive numbering of 
the rows in the resultant column, set by the order selected in the Window.partition for each partition specified in the OVER clause.
E.g. row_number(), rank(), dense_rank(), etc.

In [0]:
# Creating Dataframe

data = [
    ("James", "Developer", 3000),
    ("Michael", "Testing", 2000),
    ("Tony", "Developer", 4000),
    ("Parker", "Support", 3000),
    ("Stephen", "Developer", 4000),
    ("Clark", "Support", 3500),
    ("Bruce", "Testing", 3000),
    ("Allen", "Developer", 3500),
    ("Loki", "Support", 3000),
    ("Buttowski", "Developer", 5000),
]

schema = ["Name", "Role", "Salary"]

In [0]:
df = spark.createDataFrame(data, schema)

In [0]:
df.display()

Name,Role,Salary
James,Developer,3000
Michael,Testing,2000
Tony,Developer,4000
Parker,Support,3000
Stephen,Developer,4000
Clark,Support,3500
Bruce,Testing,3000
Allen,Developer,3500
Loki,Support,3000
Buttowski,Developer,5000


In [0]:
from pyspark.sql.window import Window

#### *To perform window function operation on a group of rows first, we need to partition i.e. define the group of data rows using window.partition() function, and for row number and rank function we need to additionally order by on partition data using ORDER BY clause.* 

In [0]:
window = Window.partitionBy("Role").orderBy("Salary")

In [0]:
from pyspark.sql.functions import row_number, rank, dense_rank

#### *row_number().*
row_number() function is used to gives a sequential number to each row present in the table. 

In [0]:
# row_number()
df.withColumn("Row_number", row_number().over(window)).display()

Name,Role,Salary,Row_number
James,Developer,3000,1
Allen,Developer,3500,2
Tony,Developer,4000,3
Stephen,Developer,4000,4
Buttowski,Developer,5000,5
Parker,Support,3000,1
Loki,Support,3000,2
Clark,Support,3500,3
Michael,Testing,2000,1
Bruce,Testing,3000,2


#### *rank()*
The rank function is used to give ranks to rows specified in the window partition. This function leaves gaps in rank if there are ties.

In [0]:
# rank() 
df.withColumn("rank", rank().over(window)).display()

Name,Role,Salary,rank
James,Developer,3000,1
Allen,Developer,3500,2
Tony,Developer,4000,3
Stephen,Developer,4000,3
Buttowski,Developer,5000,5
Parker,Support,3000,1
Loki,Support,3000,1
Clark,Support,3500,3
Michael,Testing,2000,1
Bruce,Testing,3000,2


#### *dense_rank()*
This function is used to get the rank of each row in the form of row numbers. This is similar to rank() function, there is only one difference the rank function leaves gaps in rank when there are ties. Here it won't leave any gaps between number if there are repeated values present.

In [0]:
# dense_rank() 
df.withColumn("dense_rank", dense_rank().over(window)).display()

Name,Role,Salary,dense_rank
James,Developer,3000,1
Allen,Developer,3500,2
Tony,Developer,4000,3
Stephen,Developer,4000,3
Buttowski,Developer,5000,4
Parker,Support,3000,1
Loki,Support,3000,1
Clark,Support,3500,2
Michael,Testing,2000,1
Bruce,Testing,3000,2


#### *withColumn()*
PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more. In this notebook we can see commonly used PySpark DataFrame column operations using withColumn() examples.

In [0]:
from pyspark.sql.functions import col, cast, lit

In [0]:
# Converting the datatype for a column. As you can see before the datatype for 'Salary' column is 'long' now we have changed it to 'integer'
df.printSchema()
df1 = df.withColumn("Salary", col("Salary").cast("integer"))
df1.display()
df1.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Role: string (nullable = true)
 |-- Salary: long (nullable = true)



Name,Role,Salary
James,Developer,3000
Michael,Testing,2000
Tony,Developer,4000
Parker,Support,3000
Stephen,Developer,4000
Clark,Support,3500
Bruce,Testing,3000
Allen,Developer,3500
Loki,Support,3000
Buttowski,Developer,5000


root
 |-- Name: string (nullable = true)
 |-- Role: string (nullable = true)
 |-- Salary: integer (nullable = true)



#### *PySpark withColumn() function of DataFrame can also be used to change the value of an existing column. In order to change the value, pass an existing column name as a first argument and a value to be assigned as a second argument to the withColumn() function. Note that the second argument should be Column type.*

In [0]:
# Updating a column value in the table.
df.withColumn("Salary", col("Salary") * 2).display()

Name,Role,Salary
James,Developer,6000
Michael,Testing,4000
Tony,Developer,8000
Parker,Support,6000
Stephen,Developer,8000
Clark,Support,7000
Bruce,Testing,6000
Allen,Developer,7000
Loki,Support,6000
Buttowski,Developer,10000


####*In order to create a new column, pass the column name you wanted to the first argument of withColumn() transformation function. Make sure this new column not already present on DataFrame, if it presents it updates the value of that column.*

In [0]:
# Creating a new column in the table.
df2 = df.withColumn("Country", lit("India"))
df2.display()

Name,Role,Salary,Country
James,Developer,3000,India
Michael,Testing,2000,India
Tony,Developer,4000,India
Parker,Support,3000,India
Stephen,Developer,4000,India
Clark,Support,3500,India
Bruce,Testing,3000,India
Allen,Developer,3500,India
Loki,Support,3000,India
Buttowski,Developer,5000,India


In [0]:
# creating a new column using an existing column in the table.
df2.withColumn("Bonus",col("Salary")/10).display()

Name,Role,Salary,Country,Bonus
James,Developer,3000,India,300.0
Michael,Testing,2000,India,200.0
Tony,Developer,4000,India,400.0
Parker,Support,3000,India,300.0
Stephen,Developer,4000,India,400.0
Clark,Support,3500,India,350.0
Bruce,Testing,3000,India,300.0
Allen,Developer,3500,India,350.0
Loki,Support,3000,India,300.0
Buttowski,Developer,5000,India,500.0


#### *To rename an existing column use withColumnRenamed() function on DataFrame. 'Country' column is renamed to 'Nation'.*

In [0]:
df2.withColumnRenamed("Country","Nation").display()

Name,Role,Salary,Nation
James,Developer,3000,India
Michael,Testing,2000,India
Tony,Developer,4000,India
Parker,Support,3000,India
Stephen,Developer,4000,India
Clark,Support,3500,India
Bruce,Testing,3000,India
Allen,Developer,3500,India
Loki,Support,3000,India
Buttowski,Developer,5000,India
