<a href="https://colab.research.google.com/github/daverahul/Colab/blob/main/Pyspark_SQL__window_functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installing pyspark and creating a spark session.

In [6]:
## pip install pyspark
!pip install -q pyspark

## Creating Spark Session
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("test-app").getOrCreate()

We will cover some window functions in pyspark SQL

There are mainly two types of Window functions. We will see examples of each.

1. Ranking Function
2. Aggregate Function


In [None]:
## Lets create some data first

columns= ["employee_name", "department", "salary"]
salaryData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("Jordon", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

df_salary = spark.createDataFrame(data = salaryData, schema = columns)
df_salary.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|       Jordon|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



### RANKING WINDOW FUNCTION

In [None]:
## One of the very simple example is to use row_number based partitioned by a column and group by another column
## In this example we take department as the grouping column, and salary as sorting column

## USING ROW_NUMBER()
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank
windowSpec  = Window.partitionBy("department").orderBy("salary")

df_salary.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|Jordon       |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



On above example as you noticed in the Sales Department , James and Jordon has same salary of 3000, Robert and Saif has same salary of 4100; however they are ranked differently.

If we want to have the same rank when there is a TIE, we can use ranking. There are two types of ranking when it comes to Window Functioning:

rank() - It breaks the tie, but leaves GAP in the sequence when it finds tie.
dense_rank() - It breaks tie without leaving any gaps in the sequence.
Lets see the examples on same dataset.

In [None]:
## USING RANK()

df_salary.withColumn("rank",rank().over(windowSpec)) \
    .show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|Maria        |Finance   |3000  |1   |
|Scott        |Finance   |3300  |2   |
|Jen          |Finance   |3900  |3   |
|Kumar        |Marketing |2000  |1   |
|Jeff         |Marketing |3000  |2   |
|James        |Sales     |3000  |1   |
|Jordon       |Sales     |3000  |1   |
|Robert       |Sales     |4100  |3   |
|Saif         |Sales     |4100  |3   |
|Michael      |Sales     |4600  |5   |
+-------------+----------+------+----+



In [None]:
## USING DENSE_RANK()
df_salary.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|Jordon       |Sales     |3000  |1         |
|Robert       |Sales     |4100  |2         |
|Saif         |Sales     |4100  |2         |
|Michael      |Sales     |4600  |3         |
+-------------+----------+------+----------+



### AGGREGATE WINDOW FUNCTION

Some of the popular ones are: sum, avg, min , max

In [None]:
## Aggregate functions
## Lets see what are the average, sum, min, max salaries across different departments

windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
df_salary.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



In [None]:
## stopping spark session- its a good practice to kill the spark session once done
spark.stop()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample DataFrame
data = [("John", 25), ("Alice", 30), ("Bob", 22)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Define a Python function
def double_age(age):
    return age * 2

# Register the Python function as a UDF
double_age_udf = udf(double_age, IntegerType())

# Apply the UDF to a DataFrame
df_result = df.withColumn("DoubleAge", double_age_udf("Age"))

# Show the result
df_result.show()


+-----+---+---------+
| Name|Age|DoubleAge|
+-----+---+---------+
| John| 25|       50|
|Alice| 30|       60|
|  Bob| 22|       44|
+-----+---+---------+



In [7]:
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, round
from pyspark.sql.types import StructType, StructField, IntegerType, DateType

# Initialize Spark session
spark = SparkSession.builder.appName("average_selling_price").getOrCreate()

# Data for Prices and Units Sold
prices_data = [(1, datetime.date(2019, 2, 17), datetime.date(2019, 2, 28), 5),
(1, datetime.date(2019, 3, 1), datetime.date(2019, 3, 22), 20),
(2, datetime.date(2019, 2, 1), datetime.date(2019, 2, 20), 15),
(2, datetime.date(2019, 2, 21), datetime.date(2019, 3, 31), 30)]

units_sold_data = [(1, datetime.date(2019, 2, 25), 100),
(1, datetime.date(2019, 3, 1), 15),
(2, datetime.date(2019, 2, 10), 200),
(2, datetime.date(2019, 3, 22), 15)]

# Schemas
prices_schema = StructType([
StructField("product_id", IntegerType(), True),
StructField("start_date", DateType(), True),
StructField("end_date", DateType(), True),
StructField("price", IntegerType(), True)
])
units_sold_schema = StructType([
StructField("product_id", IntegerType(), True),
StructField("purchase_date", DateType(), True),
StructField("units", IntegerType(), True)
])


df_prices = spark.createDataFrame(data=prices_data, schema=prices_schema)
df_units_sold = spark.createDataFrame(data=units_sold_data, schema=units_sold_schema)


In [8]:
df_prices.show()

+----------+----------+----------+-----+
|product_id|start_date|  end_date|price|
+----------+----------+----------+-----+
|         1|2019-02-17|2019-02-28|    5|
|         1|2019-03-01|2019-03-22|   20|
|         2|2019-02-01|2019-02-20|   15|
|         2|2019-02-21|2019-03-31|   30|
+----------+----------+----------+-----+



In [9]:
df_units_sold.show()

+----------+-------------+-----+
|product_id|purchase_date|units|
+----------+-------------+-----+
|         1|   2019-02-25|  100|
|         1|   2019-03-01|   15|
|         2|   2019-02-10|  200|
|         2|   2019-03-22|   15|
+----------+-------------+-----+



In [13]:
# Write a solution in PySpark to find the average selling price for each product. average_price should be rounded to 2 decimal places.

In [18]:
df_joined = df_prices.join(df_units_sold,"product_id","left").where(col("purchase_date").between(col("start_date"),col("end_date")))

In [19]:
df_joined.distinct().show()

+----------+----------+----------+-----+-------------+-----+
|product_id|start_date|  end_date|price|purchase_date|units|
+----------+----------+----------+-----+-------------+-----+
|         1|2019-02-17|2019-02-28|    5|   2019-02-25|  100|
|         1|2019-03-01|2019-03-22|   20|   2019-03-01|   15|
|         2|2019-02-01|2019-02-20|   15|   2019-02-10|  200|
|         2|2019-02-21|2019-03-31|   30|   2019-03-22|   15|
+----------+----------+----------+-----+-------------+-----+



In [29]:
df_average = df_joined.groupby("product_id").agg(
    round(sum(col("units") * col("price"))/sum("units"),2).alias("average_price")
)
df_average.show()

+----------+-------------+
|product_id|average_price|
+----------+-------------+
|         1|         6.96|
|         2|        16.05|
+----------+-------------+



In [25]:
df_average_price = df_joined.groupby("product_id").agg(
round(sum(col("units") * col("price")) / sum("units"), 2).alias("average_price")
)

In [38]:
#ask - You are provided with two datasets, branch1 and branch2 , representing information about stdents
#and their marks in different subjects across different branches. Your goal is to combine these datasets
#into one final dataset. Missing text information should be shown as 'unknown' , and
#missing numerical information should be shown as -9999.

#Pyspark code
#--------------------
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Define the schema for branch1 DataFrame
schema_branch1 = StructType([
    StructField("Branch", StringType(), True),
    StructField("Student", StringType(), True),
    StructField("Maths_marks", IntegerType(), True)
])

# Create branch1 DataFrame
branch1_data = [("Delhi", "Neha", 90)]
branch1 = spark.createDataFrame(branch1_data, schema_branch1)

# Define the schema for branch2 DataFrame
schema_branch2 = StructType([
    StructField("Student", StringType(), True),
    StructField("Branch", StringType(), True),
    StructField("Science_marks", IntegerType(), True),
    StructField("Maths_marks", IntegerType(), True)
])

# Create branch2 DataFrame
branch2_data = [
    ("Arav", "Kolkata", 79, 83),
    (None, "Kolkata", 89, 73)
]
branch2 = spark.createDataFrame(branch2_data, schema_branch2)
#we are using unionByName to merge the two dataframes


In [39]:
branch1.show()

+------+-------+-----------+
|Branch|Student|Maths_marks|
+------+-------+-----------+
| Delhi|   Neha|         90|
+------+-------+-----------+



In [35]:
branch2.show()

+-------+-------+-------------+-----------+
|Student| Branch|Science_marks|Maths_marks|
+-------+-------+-------------+-----------+
|   Arav|Kolkata|           79|         83|
|   NULL|Kolkata|           89|         73|
+-------+-------+-------------+-----------+



In [44]:
branch1_new = branch1.withColumn("Science_marks",lit(None))

In [48]:
merged = branch2.unionByName(branch1_new)

In [49]:
merged.dtypes

[('Student', 'string'),
 ('Branch', 'string'),
 ('Science_marks', 'int'),
 ('Maths_marks', 'int')]

In [54]:
for col_name, col_types in merged.dtypes:
  if col_types == 'int':
    merged = merged.fillna({col_name:-9999})
  elif col_types == 'string':
    merged = merged.fillna({col_name:'unknown'})



In [55]:
merged.show()

+-------+-------+-------------+-----------+
|Student| Branch|Science_marks|Maths_marks|
+-------+-------+-------------+-----------+
|   Arav|Kolkata|           79|         83|
|unknown|Kolkata|           89|         73|
|   Neha|  Delhi|        -9999|         90|
+-------+-------+-------------+-----------+

