In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [4]:
spark = SparkSession.builder.appName("work_with_file").getOrCreate()

In [17]:
dffile = spark.read.format("csv").option("header","true").load("sales_data.csv")

In [18]:
dfdd = dffile.dropDuplicates()

In [19]:
dfdd.show(3)



+---------+---------------+-----------+
|ProductID|TransactionDate|SalesAmount|
+---------+---------------+-----------+
|     2945|      2020-3-21|      461.9|
|     4281|      2010-4-13|     279.84|
|     2025|      2011-9-26|     105.09|
+---------+---------------+-----------+
only showing top 3 rows



                                                                                

In [22]:
df_valid_data = dfdd.na.fill(value=0,subset=["SalesAmount"])\
    .na.fill(value="",subset=["TransactionDate"])\
    .na.fill(value=0,subset=["ProductID"])

In [28]:
df_agg_date = df_valid_data.groupBy("ProductID").agg(
    F.round(F.sum("SalesAmount"),2).alias("Sum"),
    F.round(F.avg("SalesAmount"),2).alias("Avg"),
    F.count("SalesAmount").alias("Count")
)

In [29]:
df_agg_date.orderBy(F.col("Count").desc()).select("ProductID","Count").show(10)



+---------+-----+
|ProductID|Count|
+---------+-----+
|     4397|  250|
|     4436|  250|
|     3687|  249|
|      287|  248|
|     1398|  245|
|     4109|  244|
|     3799|  244|
|     1385|  243|
|       68|  243|
|     4854|  243|
+---------+-----+
only showing top 10 rows





In [30]:
df_agg_date.orderBy(F.col("Avg").desc()).select("ProductID","Avg").show(10)



+---------+------+
|ProductID|   Avg|
+---------+------+
|      268|300.82|
|     4373|290.31|
|     4461|289.66|
|     3696|289.04|
|     1831|287.13|
|     4534|286.56|
|     3859|285.27|
|     4203|285.05|
|      921|284.72|
|     2530|284.46|
+---------+------+
only showing top 10 rows



                                                                                

In [31]:
df_with_year_month = df_valid_data\
    .withColumn("Year",F.year("TransactionDate"))\
    .withColumn("Month",F.month("TransactionDate"))

In [34]:
df_with_year_month.groupBy("Year","Month").agg(
    F.round(F.sum("SalesAmount"),2).alias("Sum")
).show()



+----+-----+----------+
|Year|Month|       Sum|
+----+-----+----------+
|2022|   10|1512293.09|
|2012|   10|1531084.54|
|2010|   12| 1523618.7|
|2010|    7|1524210.42|
|2015|    2|1527827.76|
|2019|   10|1526347.56|
|2017|    3|1511505.63|
|2017|    8|1484452.14|
|2014|    4|1491672.65|
|2020|    6|1525667.85|
|2019|    5|1492053.63|
|2017|   10|1535215.63|
|2018|   10| 1520160.5|
|2021|    8|1495608.84|
|2015|   12|1529252.69|
|2016|    7|1556638.12|
|2016|   11|1510172.71|
|2021|    6|1531925.38|
|2023|    9|1532568.25|
|2023|    8|1536898.69|
+----+-----+----------+
only showing top 20 rows



                                                                                

In [50]:
win = Window.partitionBy().orderBy(F.col("TransactionDate").asc()).rowsBetween(Window.currentRow - 5,Window.currentRow)

df_valid_data = df_valid_data.filter(F.col("TransactionDate") > F.add_months(F.current_date(), -6))
df_valid_data = df_valid_data.withColumn("roll", F.round(F.avg("SalesAmount").over(win),2))

In [52]:
df_valid_data.show(50)

23/10/09 18:41:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------+---------------+-----------+------+
|ProductID|TransactionDate|SalesAmount|  roll|
+---------+---------------+-----------+------+
|     2714|      2023-10-1|      56.33| 56.33|
|     1069|      2023-10-1|     362.38|209.36|
|     3529|      2023-10-1|      45.77|154.83|
|      144|      2023-10-1|     323.33|196.95|
|     1151|      2023-10-1|     113.62|180.29|
|     4108|      2023-10-1|      249.7|191.86|
|      246|      2023-10-1|     499.53|265.72|
|     1708|      2023-10-1|     408.08|273.34|
|     4841|      2023-10-1|     174.55| 294.8|
|     2206|      2023-10-1|      28.39|245.64|
|     4771|      2023-10-1|      48.19|234.74|
|     2751|      2023-10-1|     216.67|229.24|
|     3273|      2023-10-1|      87.98|160.64|
|     3251|      2023-10-1|     172.31|121.35|
|     2553|      2023-10-1|     152.62|117.69|
|     3486|      2023-10-1|     109.14|131.15|
|     2198|      2023-10-1|      482.0|203.45|
|     1388|      2023-10-1|     271.22|212.55|
|     2934|  

In [46]:
!pwd

/home/jovyan
