In [1]:
 from pyspark.sql import SparkSession
 import getpass
 username = getpass.getuser()
 spark= SparkSession. \
 builder. \
 config('spark.ui.port','0'). \
 config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
 enableHiveSupport(). \
 master('yarn'). \
 getOrCreate()

In [2]:
spark

In [3]:
# ! hadoop fs -ls -h /public/trendytech/datasets/order_data.csv
! hadoop fs -head /public/trendytech/datasets/order_data.csv
# ! hadoop fs -ls -h /public/trendytech/datasets/order_data.csv

﻿InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536378,,PACK OF 60 DINOSAUR CAKE CASES,24,01-12-2010 9.37,0.55,14688,United Kingdom
536378,,PACK OF 60 PINK PAISLEY CAKE CASES,24,01-12-2010 9.37,0.55,14688,United Kingdom
536378,84991,60 TEATIME FAIRY CAKE CASES,24,01-12-2010 9.37,0.55,14688,United Kingdom
536378,84519A,TOMATO CHARLIE+LOLA COASTER SET,6,01-12-2010 9.37,2.95,14688,United Kingdom
536378,85183B,CHARLIE & LOLA WASTEPAPER BIN FLORA,48,01-12-2010 9.37,1.25,14688,United Kingdom
536378,85071B,RED CHARLIE+LOLA PERSONAL DOORSIGN,96,01-12-2010 9.37,0.38,14688,United Kingdom
536378,21931,JUMBO STORAGE BAG SUKI,10,01-12-2010 9.37,1.95,14688,United Kingdom
536378,21929,JUMBO BAG PINK VINTAGE PAISLEY,10,01-12-2010 9.37,1.95,14688,United Kingdom
536380,22961,JAM MAKING SET PRINTED,24,01-12-2010 9.41,1.45,17809,United Kingdom
536381,22139,RETROSPOT TEA SET CERAMIC 11 PC ,23,01-12-2010 9.41,4.25,15311,United Kingdom
536381,84854,GIRLY PINK TOOL SET,5,01-

## Aggregations:

    Simple_agg,  Grouping _agg,  Window_agg

In [4]:
# Infering schema
orders_df = spark.read \
.format("csv") \
.option("inferSchema", "true") \
.option("header","true") \
.load("/public/trendytech/datasets/order_data.csv")

In [5]:
orders_df.show(truncate=False)

+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|536378   |null     |PACK OF 60 DINOSAUR CAKE CASES     |24      |01-12-2010 9.37|0.55     |14688     |United Kingdom|
|536378   |null     |PACK OF 60 PINK PAISLEY CAKE CASES |24      |01-12-2010 9.37|0.55     |14688     |United Kingdom|
|536378   |84991    |60 TEATIME FAIRY CAKE CASES        |24      |01-12-2010 9.37|0.55     |14688     |United Kingdom|
|536378   |84519A   |TOMATO CHARLIE+LOLA COASTER SET    |6       |01-12-2010 9.37|2.95     |14688     |United Kingdom|
|536378   |85183B   |CHARLIE & LOLA WASTEPAPER BIN FLORA|48      |01-12-2010 9.37|1.25     |14688     |United Kingdom|
|536378   |85071B   |RED CHARLIE+LOLA PERSONAL D

In [6]:
orders_df.printSchema()
# But the date fileds are string, so trying to enforme schema

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
from pyspark.sql import *
from pyspark.sql.functions import *

In [8]:
# Simple aggregartions - Finally 1 output
#1) Similary we can perform using spark sql by creating a temp table

orders_df.select(count('*').alias("ttl_counts") \
                 ,countDistinct("InvoiceNo").alias("unique_invoice_counts") \
                 ,avg("UnitPrice").alias("avg_unit_price")
                 ).show()

+----------+---------------------+------------------+
|ttl_counts|unique_invoice_counts|    avg_unit_price|
+----------+---------------------+------------------+
|    541782|                25858|4.6115653233219325|
+----------+---------------------+------------------+



In [9]:
# 2)grouping aggregations
summary_df = orders_df.groupBy("country","invoiceno") \
.agg(sum("quantity").alias("total_quantity"),sum(expr("quantity * unitprice")).alias("invoice_value")).sort("invoiceno")

In [10]:
summary_df.show()

+--------------+---------+--------------+------------------+
|       country|invoiceno|total_quantity|     invoice_value|
+--------------+---------+--------------+------------------+
|United Kingdom|   536378|           242|192.78000000000003|
|United Kingdom|   536380|            24|              34.8|
|United Kingdom|   536381|           198|449.97999999999996|
|United Kingdom|   536382|           134|430.59999999999997|
|United Kingdom|   536384|           190|             489.6|
|United Kingdom|   536385|            53|            130.85|
|United Kingdom|   536386|           236|508.20000000000005|
|United Kingdom|   536387|          1440|           3193.92|
|United Kingdom|   536388|           108|            226.14|
|     Australia|   536389|           107|            358.25|
|United Kingdom|   536390|          1568|           1825.74|
|United Kingdom|   536392|           103|318.14000000000004|
|United Kingdom|   536393|             8|              79.6|
|United Kingdom|   53639

In [11]:
# 3)window aggreagtion 
orders_wn_df = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header", "true") \
.load("/public/trendytech/datasets/windowdata.csv")

In [12]:
orders_wn_df.sort("country").show()

+---------------+-------+-----------+-------------+------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------------+-------+-----------+-------------+------------+
|      Australia|     49|          1|          214|       258.9|
|      Australia|     48|          1|          107|      358.25|
|      Australia|     50|          2|          133|      387.95|
|        Austria|     50|          2|            3|      257.04|
|        Bahrain|     51|          1|           54|      205.74|
|        Belgium|     48|          1|          528|       346.1|
|        Belgium|     50|          2|          285|      625.16|
|        Belgium|     51|          2|          942|      838.65|
|Channel Islands|     49|          1|           80|      363.53|
|         Cyprus|     50|          1|          917|     1590.82|
|        Denmark|     49|          1|          454|      1281.5|
|        Finland|     50|          1|         1254|       892.8|
|         France|     49|

In [13]:
# 4). calculate the running total of invoice value over a given window.
#     sol:
#         define a window, order based a column, define a widow size , use this window on the df

In [14]:
mywindow  = Window.partitionBy("country") \
.orderBy("weeknum") \
.rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [15]:
result_df = orders_wn_df.withColumn("running_ttl",sum("invoicevalue").over(mywindow))

In [16]:
result_df.show()

+-------+-------+-----------+-------------+------------+------------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|       running_ttl|
+-------+-------+-----------+-------------+------------+------------------+
| Sweden|     50|          3|         3714|      2646.3|            2646.3|
|Germany|     48|         11|         1795|     3309.75|           3309.75|
|Germany|     49|         12|         1852|     4521.39|           7831.14|
|Germany|     50|         15|         1973|     5065.79|          12896.93|
|Germany|     51|          5|         1103|     1665.91|          14562.84|
| France|     48|          4|         1299|     2808.16|           2808.16|
| France|     49|          9|         2303|     4527.01|           7335.17|
| France|     50|          6|          529|      537.32|           7872.49|
| France|     51|          5|          847|     1702.87|           9575.36|
|Belgium|     48|          1|          528|       346.1|             346.1|
|Belgium|   

In [17]:
# .rowsBetween(Window.unboundedPreceding,Window.currentRow)
# unboundedPreceding - will include considers values in partition from the 1st values
    # for Germany 
    # 3309.75 = 1st value
    #  3309.75+4521.39 = 2nd value
    #  3309.75+4521.39+ 5065.79  = 3rd value 

# .rowsBetween(-2,Window.currentRow) (3 rows will be summed, current value and 2 above)
    # if you had a 4th value for Germany - then 4th+3r+2nd
    #  i.e 4521.39+ 5065.79+1665.91 = 14562.84

# Window Functions

In [18]:
# 5) Generate rank , dense_rank, row_number based on the invoice_value for each country 
orders_wn1_df = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header", "true") \
.load("/public/trendytech/datasets/windowdatamodified.csv")

In [19]:
orders_wn1_df.show()

+--------------+-------+-----------+-------------+------------+
|       country|weeknum|numinvoices|totalquantity|invoicevalue|
+--------------+-------+-----------+-------------+------------+
|         Spain|     49|          1|           67|      174.72|
|       Germany|     48|         11|         1795|      1600.0|
|     Lithuania|     48|          3|          622|     1598.06|
|       Germany|     49|         12|         1852|      1800.0|
|       Bahrain|     51|          1|           54|      205.74|
|       Iceland|     49|          1|          319|      711.79|
|         India|     51|          5|           95|       300.0|
|     Australia|     50|          2|          133|      387.95|
|         Italy|     49|          1|           -2|       -17.0|
|         India|     49|          5|         1280|      3284.1|
|         Spain|     50|          2|          400|     1049.01|
|United Kingdom|     51|        200|        28782|    75103.46|
|        Norway|     49|          1|    

In [20]:
# from pyspark.sql import *
from pyspark.sql.functions import *

In [21]:
window1 = Window.partitionBy("country").orderBy("invoicevalue")
orders_wn1_df.withColumn("rnk",rank().over(window1)).show()
# rank - rank being skipped

+-------+-------+-----------+-------------+------------+---+
|country|weeknum|numinvoices|totalquantity|invoicevalue|rnk|
+-------+-------+-----------+-------------+------------+---+
| Sweden|     50|          3|         3714|      2646.3|  1|
|Germany|     48|         11|         1795|      1600.0|  1|
|Germany|     51|          5|         1103|      1600.0|  1|
|Germany|     49|         12|         1852|      1800.0|  3|
|Germany|     50|         15|         1973|      1800.0|  3|
| France|     51|          5|          847|       500.0|  1|
| France|     49|          9|         2303|       500.0|  1|
| France|     48|          4|         1299|       500.0|  1|
| France|     50|          6|          529|      537.32|  4|
|Belgium|     50|          2|          285|      625.16|  1|
|Belgium|     48|          1|          528|       800.0|  2|
|Belgium|     51|          2|          942|       800.0|  2|
|Finland|     50|          1|         1254|       892.8|  1|
|  India|     51|       

In [22]:
orders_wn1_df.withColumn("dn_rnk",dense_rank().over(window1)).show()
# dense_rnk  - Not skipping the values

+-------+-------+-----------+-------------+------------+------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|dn_rnk|
+-------+-------+-----------+-------------+------------+------+
| Sweden|     50|          3|         3714|      2646.3|     1|
|Germany|     48|         11|         1795|      1600.0|     1|
|Germany|     51|          5|         1103|      1600.0|     1|
|Germany|     49|         12|         1852|      1800.0|     2|
|Germany|     50|         15|         1973|      1800.0|     2|
| France|     51|          5|          847|       500.0|     1|
| France|     49|          9|         2303|       500.0|     1|
| France|     48|          4|         1299|       500.0|     1|
| France|     50|          6|          529|      537.32|     2|
|Belgium|     50|          2|          285|      625.16|     1|
|Belgium|     48|          1|          528|       800.0|     2|
|Belgium|     51|          2|          942|       800.0|     2|
|Finland|     50|          1|         12

In [23]:
orders_wn1_df.withColumn("row_no",row_number().over(window1)).show()
# row_number - generating cotinous values

+-------+-------+-----------+-------------+------------+------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|row_no|
+-------+-------+-----------+-------------+------------+------+
| Sweden|     50|          3|         3714|      2646.3|     1|
|Germany|     48|         11|         1795|      1600.0|     1|
|Germany|     51|          5|         1103|      1600.0|     2|
|Germany|     49|         12|         1852|      1800.0|     3|
|Germany|     50|         15|         1973|      1800.0|     4|
| France|     51|          5|          847|       500.0|     1|
| France|     49|          9|         2303|       500.0|     2|
| France|     48|          4|         1299|       500.0|     3|
| France|     50|          6|          529|      537.32|     4|
|Belgium|     50|          2|          285|      625.16|     1|
|Belgium|     48|          1|          528|       800.0|     2|
|Belgium|     51|          2|          942|       800.0|     3|
|Finland|     50|          1|         12

In [24]:
# Lead and lag
# 6)compare the invoice values for a country with its previous week

In [25]:
wind_3 = Window.partitionBy("country").orderBy("weeknum")
orders_wn1_df.withColumn("previous_week",lag("invoicevalue").over(wind_3)).show()

+-------+-------+-----------+-------------+------------+-------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|
+-------+-------+-----------+-------------+------------+-------------+
| Sweden|     50|          3|         3714|      2646.3|         null|
|Germany|     48|         11|         1795|      1600.0|         null|
|Germany|     49|         12|         1852|      1800.0|       1600.0|
|Germany|     50|         15|         1973|      1800.0|       1800.0|
|Germany|     51|          5|         1103|      1600.0|       1800.0|
| France|     48|          4|         1299|       500.0|         null|
| France|     49|          9|         2303|       500.0|        500.0|
| France|     50|          6|          529|      537.32|        500.0|
| France|     51|          5|          847|       500.0|       537.32|
|Belgium|     48|          1|          528|       800.0|         null|
|Belgium|     50|          2|          285|      625.16|        800.0|
|Belgi

In [26]:
# example for lead
wind_3 = Window.partitionBy("country").orderBy("weeknum")
orders_wn1_df.withColumn("next_week",lead("invoicevalue").over(wind_3)).show()

+-------+-------+-----------+-------------+------------+---------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|next_week|
+-------+-------+-----------+-------------+------------+---------+
| Sweden|     50|          3|         3714|      2646.3|     null|
|Germany|     48|         11|         1795|      1600.0|   1800.0|
|Germany|     49|         12|         1852|      1800.0|   1800.0|
|Germany|     50|         15|         1973|      1800.0|   1600.0|
|Germany|     51|          5|         1103|      1600.0|     null|
| France|     48|          4|         1299|       500.0|    500.0|
| France|     49|          9|         2303|       500.0|   537.32|
| France|     50|          6|          529|      537.32|    500.0|
| France|     51|          5|          847|       500.0|     null|
|Belgium|     48|          1|          528|       800.0|   625.16|
|Belgium|     50|          2|          285|      625.16|    800.0|
|Belgium|     51|          2|          942|       800.0|     n

In [27]:
# 7) Build a df with sample data ,with proper schema, strip the month from the date filed

In [28]:
 logs_data = [("INFO","2015-8-8 20:49:22"),
 ("WARN","2015-1-14 20:05:00"),
 ("INFO","2017-6-14 00:08:35"),
 ("INFO","2016-1-18 11:50:14"),
 ("DEBUG","2017-7-1 12:55:02"),
 ("INFO","2014-2-26 12:34:21"),
 ("INFO","2015-7-12 11:13:47"),
 ("INFO","2017-4-15 01:20:18"),
 ("DEBUG","2016-11-2 20:19:23"),
 ("INFO","2012-8-20 10:09:44")] 

In [29]:
sample_df = spark.createDataFrame(logs_data).toDF('log_level','log_time')
sample_df.show()

+---------+------------------+
|log_level|          log_time|
+---------+------------------+
|     INFO| 2015-8-8 20:49:22|
|     WARN|2015-1-14 20:05:00|
|     INFO|2017-6-14 00:08:35|
|     INFO|2016-1-18 11:50:14|
|    DEBUG| 2017-7-1 12:55:02|
|     INFO|2014-2-26 12:34:21|
|     INFO|2015-7-12 11:13:47|
|     INFO|2017-4-15 01:20:18|
|    DEBUG|2016-11-2 20:19:23|
|     INFO|2012-8-20 10:09:44|
+---------+------------------+



In [30]:
sample_df_t1 = sample_df.withColumn("log_time",to_timestamp("log_time"))

In [31]:
sample_df_t1.show()

+---------+-------------------+
|log_level|           log_time|
+---------+-------------------+
|     INFO|2015-08-08 20:49:22|
|     WARN|2015-01-14 20:05:00|
|     INFO|2017-06-14 00:08:35|
|     INFO|2016-01-18 11:50:14|
|    DEBUG|2017-07-01 12:55:02|
|     INFO|2014-02-26 12:34:21|
|     INFO|2015-07-12 11:13:47|
|     INFO|2017-04-15 01:20:18|
|    DEBUG|2016-11-02 20:19:23|
|     INFO|2012-08-20 10:09:44|
+---------+-------------------+



In [32]:
sample_df_t1.printSchema()

root
 |-- log_level: string (nullable = true)
 |-- log_time: timestamp (nullable = true)



In [33]:
sample_df_mnth = sample_df_t1.withColumn('mnth',month("log_time"))

In [34]:
sample_df_mnth.show()

+---------+-------------------+----+
|log_level|           log_time|mnth|
+---------+-------------------+----+
|     INFO|2015-08-08 20:49:22|   8|
|     WARN|2015-01-14 20:05:00|   1|
|     INFO|2017-06-14 00:08:35|   6|
|     INFO|2016-01-18 11:50:14|   1|
|    DEBUG|2017-07-01 12:55:02|   7|
|     INFO|2014-02-26 12:34:21|   2|
|     INFO|2015-07-12 11:13:47|   7|
|     INFO|2017-04-15 01:20:18|   4|
|    DEBUG|2016-11-02 20:19:23|  11|
|     INFO|2012-08-20 10:09:44|   8|
+---------+-------------------+----+



In [35]:
# count of logs at a month level
sample_df_mnth.groupBy("log_level","mnth").agg(count("mnth").alias("counts")).sort("log_level").show()

+---------+----+------+
|log_level|mnth|counts|
+---------+----+------+
|    DEBUG|   7|     1|
|    DEBUG|  11|     1|
|     INFO|   2|     1|
|     INFO|   8|     2|
|     INFO|   7|     1|
|     INFO|   1|     1|
|     INFO|   4|     1|
|     INFO|   6|     1|
|     WARN|   1|     1|
+---------+----+------+



In [36]:
# 8) Calculate the running_total month wise
 revenue_data = [(2000,"01-Jan-2020"),
 (3000,"02-Jan-2020"),
 (45000,"22-Jan-2020"),
 (40000,"02-Feb-2020"),
 (13000,"01-Mar-2020")]

In [37]:
raw_df = spark.createDataFrame(revenue_data)

In [38]:
trans_df = raw_df.toDF("revenue","Date")

In [39]:
trans_df1 = trans_df.withColumn("Date",to_date("Date","dd-MMM-yyyy"))

In [40]:
trans_df1.show()

+-------+----------+
|revenue|      Date|
+-------+----------+
|   2000|2020-01-01|
|   3000|2020-01-02|
|  45000|2020-01-22|
|  40000|2020-02-02|
|  13000|2020-03-01|
+-------+----------+



In [41]:
trans_df1.printSchema()

root
 |-- revenue: long (nullable = true)
 |-- Date: date (nullable = true)



In [42]:
trans_mnth = trans_df1.withColumn("mnth", month("date"))
trans_mnth.show()

+-------+----------+----+
|revenue|      Date|mnth|
+-------+----------+----+
|   2000|2020-01-01|   1|
|   3000|2020-01-02|   1|
|  45000|2020-01-22|   1|
|  40000|2020-02-02|   2|
|  13000|2020-03-01|   3|
+-------+----------+----+



In [43]:
df_grp = trans_mnth.groupBy("mnth").agg(sum("revenue").alias("ttl_revn")).sort("mnth")

In [44]:
df_grp.show()

+----+--------+
|mnth|ttl_revn|
+----+--------+
|   1|   50000|
|   2|   40000|
|   3|   13000|
+----+--------+



In [45]:
mywin = Window.orderBy("mnth").rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [46]:
df_grp.withColumn("running_ttlt",sum("ttl_revn").over(mywin)).show()

+----+--------+------------+
|mnth|ttl_revn|running_ttlt|
+----+--------+------------+
|   1|   50000|       50000|
|   2|   40000|       90000|
|   3|   13000|      103000|
+----+--------+------------+



In [47]:
spark.stop()