In [0]:
product_data = [
(2,"samsung","01-01-1995",11000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-01-2006",15000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(3,"oneplus","01-01-2010",23000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=["product_id","product_name","sales_date","sales"]

product_df = spark.createDataFrame(data=product_data,schema=product_schema)

product_df.show()


+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         2|     samsung|01-01-1995|  11000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-01-2006|  15000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         3|     oneplus|01-01-2010|  23000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
window = Window.partitionBy("product_id").orderBy("sales_date")

In [0]:
#If you compare to see the latest sales column to the actual latest sale, it is not correct at all
product_df.withColumn("first_sales",first("sales").over(window))\
  .withColumn("latest_sales",last("sales").over(window)).show()

+----------+------------+----------+-------+-----------+------------+
|product_id|product_name|sales_date|  sales|first_sales|latest_sales|
+----------+------------+----------+-------+-----------+------------+
|         1|      iphone|01-01-2006|  15000|      15000|       15000|
|         1|      iphone|01-02-2023|1300000|      15000|     1300000|
|         1|      iphone|01-03-2023|1600000|      15000|     1600000|
|         1|      iphone|01-04-2023|1700000|      15000|     1700000|
|         1|      iphone|01-05-2023|1200000|      15000|     1200000|
|         1|      iphone|01-06-2023|1100000|      15000|     1100000|
|         2|     samsung|01-01-1995|  11000|      11000|       11000|
|         2|     samsung|01-02-2023|1120000|      11000|     1120000|
|         2|     samsung|01-03-2023|1080000|      11000|     1080000|
|         2|     samsung|01-04-2023|1800000|      11000|     1800000|
|         2|     samsung|01-05-2023| 980000|      11000|      980000|
|         2|     sam

In [0]:
#So we solved the above problem by using rowsBetween cause by default spark takes up Unbounded preceding which just takes the values before the selected. Instead we defined both Unbounded preceding and unbounded Following so as to tell spark to look up both sides of the selected value 
window = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

In [0]:
product_df.withColumn("first_sales",first("sales").over(window))\
  .withColumn("latest_sales",last("sales").over(window)).show()

+----------+------------+----------+-------+-----------+------------+
|product_id|product_name|sales_date|  sales|first_sales|latest_sales|
+----------+------------+----------+-------+-----------+------------+
|         1|      iphone|01-01-2006|  15000|      15000|     1100000|
|         1|      iphone|01-02-2023|1300000|      15000|     1100000|
|         1|      iphone|01-03-2023|1600000|      15000|     1100000|
|         1|      iphone|01-04-2023|1700000|      15000|     1100000|
|         1|      iphone|01-05-2023|1200000|      15000|     1100000|
|         1|      iphone|01-06-2023|1100000|      15000|     1100000|
|         2|     samsung|01-01-1995|  11000|      11000|     1100000|
|         2|     samsung|01-02-2023|1120000|      11000|     1100000|
|         2|     samsung|01-03-2023|1080000|      11000|     1100000|
|         2|     samsung|01-04-2023|1800000|      11000|     1100000|
|         2|     samsung|01-05-2023| 980000|      11000|     1100000|
|         2|     sam

In [0]:
emp_data = [(1,"manish","11-07-2023","10:20"),
        (1,"manish","11-07-2023","11:20"),
        (2,"rajesh","11-07-2023","11:20"),
        (1,"manish","11-07-2023","11:50"),
        (2,"rajesh","11-07-2023","13:20"),
        (1,"manish","11-07-2023","19:20"),
        (2,"rajesh","11-07-2023","17:20"),
        (1,"manish","12-07-2023","10:32"),
        (1,"manish","12-07-2023","12:20"),
        (3,"vikash","12-07-2023","09:12"),
        (1,"manish","12-07-2023","16:23"),
        (3,"vikash","12-07-2023","18:08")]

emp_schema = ["id", "name", "date", "time"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

emp_df.show()

+---+------+----------+-----+
| id|  name|      date| time|
+---+------+----------+-----+
|  1|manish|11-07-2023|10:20|
|  1|manish|11-07-2023|11:20|
|  2|rajesh|11-07-2023|11:20|
|  1|manish|11-07-2023|11:50|
|  2|rajesh|11-07-2023|13:20|
|  1|manish|11-07-2023|19:20|
|  2|rajesh|11-07-2023|17:20|
|  1|manish|12-07-2023|10:32|
|  1|manish|12-07-2023|12:20|
|  3|vikash|12-07-2023|09:12|
|  1|manish|12-07-2023|16:23|
|  3|vikash|12-07-2023|18:08|
+---+------+----------+-----+



In [0]:
#Converted to timestamp, to connect or add 2 columns that is date and time togather 
emp_df = emp_df.withColumn ("timestamp" ,
from_unixtime (unix_timestamp (expr ("CONCAT(date, ' ', time) "),"dd-MM-yyyy HH: mm")))

In [0]:
window = Window.partitionBy("id","date").orderBy("date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

Running Sales



In [0]:
product_df.show()

+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         2|     samsung|01-01-1995|  11000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-01-2006|  15000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         3|     oneplus|01-01-2010|  23000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [0]:
#Creating a new window to get running_sum by adding the 2 values above the specified value 
window = Window.partitionBy("product_id","sales_date").orderBy("sales_date").rowsBetween(-2,0)

In [0]:
#getting running sum of 2 values after defining the window above to sum the 2 values above it
product_df.withColumn("running_sum",sum("sales").over(window)).show()

+----------+------------+----------+-------+-----------+
|product_id|product_name|sales_date|  sales|running_sum|
+----------+------------+----------+-------+-----------+
|         1|      iphone|01-01-2006|  15000|      15000|
|         1|      iphone|01-02-2023|1300000|    1300000|
|         1|      iphone|01-03-2023|1600000|    1600000|
|         1|      iphone|01-04-2023|1700000|    1700000|
|         1|      iphone|01-05-2023|1200000|    1200000|
|         1|      iphone|01-06-2023|1100000|    1100000|
|         2|     samsung|01-01-1995|  11000|      11000|
|         2|     samsung|01-02-2023|1120000|    1120000|
|         2|     samsung|01-03-2023|1080000|    1080000|
|         2|     samsung|01-04-2023|1800000|    1800000|
|         2|     samsung|01-05-2023| 980000|     980000|
|         2|     samsung|01-06-2023|1100000|    1100000|
|         3|     oneplus|01-01-2010|  23000|      23000|
|         3|     oneplus|01-02-2023|1120000|    1120000|
|         3|     oneplus|01-03-