In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark= SparkSession.\
    builder.\
    appName("Pyspark use-case-1").\
    master('yarn').\
    getOrCreate()

In [2]:
from pyspark.sql.functions import * 

In [3]:
transactions_df = spark.read \
.format("csv") \
.option("InferSchema","True") \
.option("Header","False") \
.load("/user/itv007180/trans4")

In [4]:
transactions_df.show(5)

+----------+----------+-----+-------------+
|       _c0|       _c1|  _c2|     exp_type|
+----------+----------+-----+-------------+
|CI6XLYUMQK|2019-03-16| 5.72|Entertainment|
|CI6XLYUMQK|2015-05-15|11.06|Entertainment|
|CI6XLYUMQK|2015-08-20|15.97|Entertainment|
|CI6XLYUMQK|2016-07-16|17.46|Entertainment|
|CI6XLYUMQK|2020-12-02| 8.24|Entertainment|
+----------+----------+-----+-------------+
only showing top 5 rows



In [5]:
trans_df = transactions_df.withColumnRenamed("_c0","cus_id").withColumnRenamed("_c1","trans_date").withColumnRenamed("_c2","amount")

In [6]:
## filter and orderby in pyspark

In [7]:
trans_df.filter("exp_type = 'Fines'").orderBy("amount",ascending=True).show(5)

+----------+----------+------+--------+
|    cus_id|trans_date|amount|exp_type|
+----------+----------+------+--------+
|CNV42WMZMZ|2014-07-08| 21.25|   Fines|
|CCO1G75OUP|2019-07-11| 22.51|   Fines|
|CMP4FCQ42S|2018-04-26| 23.66|   Fines|
|CCO1G75OUP|2020-05-08|  24.3|   Fines|
|CCO1G75OUP|2019-07-03| 24.52|   Fines|
+----------+----------+------+--------+
only showing top 5 rows



In [8]:
## As we use the trans_df dataframe through out , we are doing cache to run our process faster

In [9]:
trans_df.cache()

cus_id,trans_date,amount,exp_type
CI6XLYUMQK,2019-03-16,5.72,Entertainment
CI6XLYUMQK,2015-05-15,11.06,Entertainment
CI6XLYUMQK,2015-08-20,15.97,Entertainment
CI6XLYUMQK,2016-07-16,17.46,Entertainment
CI6XLYUMQK,2020-12-02,8.24,Entertainment
CI6XLYUMQK,2018-03-27,9.58,Entertainment
CI6XLYUMQK,2020-03-19,10.43,Entertainment
CI6XLYUMQK,2019-07-21,29.25,Entertainment
CI6XLYUMQK,2018-11-22,12.55,Entertainment
CI6XLYUMQK,2019-12-07,10.82,Entertainment


In [10]:
from pyspark.sql import * 
from pyspark.sql.functions import * 

In [11]:
## To get TOP 5 Spenders in each category

In [12]:
window_amount_desc = Window.partitionBy("exp_type").orderBy(desc("amount"))

In [13]:
new_df = trans_df.withColumn("Rank",dense_rank().over(window_amount_desc))

In [14]:
top_5_in_all_catgories = new_df.filter("exp_type != 'Tax'").filter("Rank < 6")

In [15]:
top_5_in_all_catgories.write.format("csv").mode("overwrite").option("path","/user/itv007180/top_5_all_cat").save()

In [16]:
top_5_in_all_catgories.show(5)

+----------+----------+------+--------+----+
|    cus_id|trans_date|amount|exp_type|Rank|
+----------+----------+------+--------+----+
|C1GEQYVLHW|2019-04-08|447.29|   Fines|   1|
|CS9WEZK0XZ|2018-01-11|444.17|   Fines|   2|
|C1GEQYVLHW|2019-06-14|438.99|   Fines|   3|
|CS9WEZK0XZ|2014-07-19|422.11|   Fines|   4|
|C094B01B0F|2019-03-20|410.12|   Fines|   5|
+----------+----------+------+--------+----+
only showing top 5 rows



In [17]:
## To get LEAST 5 Spenders in each category

In [18]:
window_amount_asc = Window.partitionBy("exp_type").orderBy(asc("amount"))

In [19]:
new_df1 = trans_df.withColumn("Rank",dense_rank().over(window_amount_asc))

In [20]:
last_5_in_all_catgories = new_df1.filter("exp_type != 'Tax'").filter("Rank < 6")

In [21]:
last_5_in_all_catgories.write.format("csv").mode("overwrite").option("path","/user/itv007180/last_5_all_cat").save()

In [22]:
last_5_in_all_catgories.show(5)

+----------+----------+------+--------+----+
|    cus_id|trans_date|amount|exp_type|Rank|
+----------+----------+------+--------+----+
|CNV42WMZMZ|2014-07-08| 21.25|   Fines|   1|
|CCO1G75OUP|2019-07-11| 22.51|   Fines|   2|
|CMP4FCQ42S|2018-04-26| 23.66|   Fines|   3|
|CCO1G75OUP|2020-05-08|  24.3|   Fines|   4|
|CCO1G75OUP|2019-07-03| 24.52|   Fines|   5|
+----------+----------+------+--------+----+
only showing top 5 rows



In [23]:
## To get TOP 5 Spenders ACROSS CATEGORIES

In [24]:
from pyspark.sql import * 
from pyspark.sql.functions import * 

In [25]:
## Spark SQL style
##trans_df.createOrReplaceTempView("trans7180")
##spark.sql("select * from trans7180 limit 3 ")

In [26]:
##spark.sql("select cus_id,sum(amount) as sum  from trans7180 group by cus_id order by sum  limit 5 ")

In [27]:
top_5_all_catgories =trans_df.groupBy("cus_id").sum("amount").orderBy("sum(amount)",ascending=False).limit(5)

In [28]:
top_5_all_catgories.write.format("csv").mode("overwrite").option("path","/user/itv007180/top_5_all_catgories").save()

In [29]:
top_5_all_catgories.show(20)

+----------+-----------------+
|    cus_id|      sum(amount)|
+----------+-----------------+
|CZ5EYYQBK6|935036.2300000003|
|C1CCNU63BB|927497.9500000004|
|C0S8NLA7VV|927225.1400000007|
|CRQWFVDKNP|914698.4699999992|
|C42TSJYEHN|903543.2900000003|
+----------+-----------------+



In [30]:
last_5_all_catgories = trans_df.groupBy("cus_id").sum("amount").orderBy("sum(amount)",ascending=True).limit(5)

In [31]:
last_5_all_catgories.write.mode("overwrite").format("csv").option("path","/user/itv007180/last_5_all_catgories").save()

In [32]:
last_5_all_catgories.show(20)

+----------+------------------+
|    cus_id|       sum(amount)|
+----------+------------------+
|CBNBB92Q6A|20994.749999999996|
|CE7FXMQ88J|          30695.86|
|CUNXTMRUIS|30771.489999999987|
|CEDUGQN98O|          33483.53|
|CB0D9H1LSO|          35313.33|
+----------+------------------+



In [33]:
current_date = spark.sql("select current_date")

In [34]:
last_month_trans_df = trans_df.filter("trans_date >= '2020-12-01'").filter("trans_date <= '2020-12-31'")

In [35]:
last_month_trans_df.cache()

cus_id,trans_date,amount,exp_type
CI6XLYUMQK,2020-12-02,8.24,Entertainment
CI6XLYUMQK,2020-12-11,9.32,Entertainment
CI6XLYUMQK,2020-12-23,8.61,Entertainment
CI6XLYUMQK,2020-12-04,7.68,Entertainment
CI6XLYUMQK,2020-12-24,12.53,Entertainment
CI6XLYUMQK,2020-12-24,7.09,Entertainment
CI6XLYUMQK,2020-12-06,12.1,Entertainment
CI6XLYUMQK,2020-12-03,12.63,Entertainment
CI6XLYUMQK,2020-12-14,8.81,Entertainment
CI6XLYUMQK,2020-12-03,8.92,Entertainment


In [36]:
last_month_trans_df.show(2)

+----------+----------+------+-------------+
|    cus_id|trans_date|amount|     exp_type|
+----------+----------+------+-------------+
|CI6XLYUMQK|2020-12-02|  8.24|Entertainment|
|CI6XLYUMQK|2020-12-11|  9.32|Entertainment|
+----------+----------+------+-------------+
only showing top 2 rows



In [37]:
last_month_top5 = last_month_trans_df.groupBy("cus_id").sum("amount").orderBy("sum(amount)",ascending=False).limit(5)

In [38]:
last_month_top5.write.format("csv").mode("overwrite").option("path","/user/itv007180/last_month_top5").save()

In [39]:
last_month_top5.show(5)

+----------+------------------+
|    cus_id|       sum(amount)|
+----------+------------------+
|CX6ND8J2D3|18792.100000000002|
|CYEP9ZLAGW|18594.760000000002|
|C07NQ72YC9|          17976.95|
|C0U0KM6B0A|17868.899999999998|
|CB41XA8SQ7|          17479.93|
+----------+------------------+



In [40]:
## Last month last 5 customers

In [41]:
last_month_last5 = last_month_trans_df.groupBy("cus_id").sum("amount").orderBy("sum(amount)",ascending=True).limit(5)

In [42]:
last_month_last5.write.mode("overwrite").format("csv").option("path","/user/itv007180/last_month_last5").save()

In [43]:
last_month_last5.show(5)

+----------+------------------+
|    cus_id|       sum(amount)|
+----------+------------------+
|CNV42WMZMZ|1160.6600000000003|
|CBYTIV8QZO|            1169.7|
|CTS8BEXUAZ|1301.5999999999997|
|C24QN6BC73|1420.5300000000002|
|C9FRDEZIZQ|1490.1499999999996|
+----------+------------------+



In [44]:
last_one_yr_trans_df = trans_df.filter("trans_date >= '2020-01-01'").filter("trans_date <= '2020-12-31'")

In [45]:
fee_reversal_cus_list = last_one_yr_trans_df.groupBy("cus_id").sum("amount").filter("sum(amount) > 50000")

In [46]:
fee_reversal_cus_list.show(2)

+----------+------------------+
|    cus_id|       sum(amount)|
+----------+------------------+
|C6RE4TBRF6|          86678.42|
|C23VF1KQQA|112629.48000000001|
+----------+------------------+
only showing top 2 rows



In [47]:
## Below are additional some points on alias, round function , Adaptive Query Execution

In [48]:
## Usage of alias function

In [49]:
new_test_df = fee_reversal_cus_list.select("cus_id",col("sum(amount)").alias("sum"))

In [50]:
## Usgae of round function

In [51]:
new_test_df.select("cus_id",round("sum",2)).show(1)

+----------+-------------+
|    cus_id|round(sum, 2)|
+----------+-------------+
|C23VF1KQQA|    112629.48|
+----------+-------------+
only showing top 1 row



In [52]:
## Lets check AQE status and enable it if necessary for better query ; Without AQE in general the above query will result in 200 partitions because of groupBy which is a WIDE TRANSFORMATION. This enabled by DEFAULT from SPARK 3.2 version

In [53]:
spark.conf.get('spark.sql.adaptive.enabled')

'false'

In [54]:
## Set SPARK.SQL.ADAPTIVE.ENABLED parameter to TRUE to enable AQE in pyspark ; This to take force we need to ideally set this parameter before our groupBy statement

In [55]:
spark.conf.set('spark.sql.adaptive.enabled',True)

In [56]:
spark.conf.get('spark.sql.adaptive.enabled')

'true'

In [57]:
fee_reversal_cus_list.write.format("csv").mode("overwrite").option("path","/user/itv007180/fee_reversal_cus_list").save()

In [58]:
fee_reversal_cus_list.show(5)

+----------+------------------+
|    cus_id|       sum(amount)|
+----------+------------------+
|C6RE4TBRF6|          86678.42|
|C23VF1KQQA|112629.48000000001|
|C3NH8CDGWM| 66399.02000000002|
|CO5MBA8HIG|61563.299999999996|
|C23Z7NYX5G|          75646.28|
+----------+------------------+
only showing top 5 rows

