In [1]:
filesPath = "/home/anil/Anil_Created_Docs/01.02.BID_DATA/PySPark/Spark-The-Definitive-Guide-master/data"

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(filesPath + "/retail-data/all/*.csv")\
.coalesce(5)

In [4]:
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [5]:
df.createOrReplaceTempView("dfTable1")

In [7]:
spark.sql("select * from dfTable1 Limit 2").show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+



In [8]:
df.count()

541909

In [9]:
df.show(4, truncate=False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
only showing top 4 rows



In [10]:
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [11]:
df.select(first("StockCode"), last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



In [12]:
df_num = spark.range(1,10)

In [13]:
df_num.select("*")\
.orderBy(col("id").desc())\
.select(last("id"))\
.show()

+---------------+
|last(id, false)|
+---------------+
|              1|
+---------------+



In [14]:
df.select(count("Quantity").alias("Total_Count")\
         ,sum("Quantity").alias("Total_Qty")\
         ,avg("Quantity").alias("Avg_Qty")\
         ,expr("mean(Quantity) as Mean_Qty")\
         )\
.selectExpr("Total_Qty/Total_Count"
            ,"Avg_Qty"
            ,"Mean_Qty")\
.show()

+-------------------------+----------------+----------------+
|(Total_Qty / Total_Count)|         Avg_Qty|        Mean_Qty|
+-------------------------+----------------+----------------+
|         9.55224954743324|9.55224954743324|9.55224954743324|
+-------------------------+----------------+----------------+



In [15]:
df.select("Country")\
.show(30)

+--------------+
|       Country|
+--------------+
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|United Kingdom|
|        France|
|        France|
|        France|
|        France|
+--------------+
only showing top 30 rows



In [16]:
df.limit(30).select(collect_list("Country"))\
.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_list(Country)                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+-------------------------------------------------------------------------------------------------

In [17]:
df.limit(30).select(collect_set("Country"))\
.show(truncate=False)

+------------------------+
|collect_set(Country)    |
+------------------------+
|[France, United Kingdom]|
+------------------------+



In [18]:
df.groupBy("InvoiceNo")\
.agg(count("Quantity").alias("quan"),\
     expr("count(Quantity)"))\
.show(4)

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
+---------+----+---------------+
only showing top 4 rows



In [4]:
dfWithDate = df.withColumn("date",to_date("InvoiceDate","MM/d/yyyy H:mm"))

In [5]:
dfWithDate.show(4,truncate=False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |date      |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+----------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|2010-12-01|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|2010-12-01|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|2010-12-01|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|2010-12-01|
+---------+---------+-----------------------------------+--------+--------------+---------+------

In [21]:
from pyspark.sql import Window

In [22]:
windowSpec = Window\
.partitionBy("CustomerID","date")\
.orderBy(col("Quantity").desc())\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [23]:
maxPurchaseQty = max(col("Quantity")).over(windowSpec)
sumPurchaseQty = sum(col("Quantity")).over(windowSpec)

In [24]:
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [25]:
windowSpec1 = Window\
.partitionBy("CustomerID")\
.orderBy(col("Quantity").desc())\
.rangeBetween(Window.unboundedPreceding, Window.currentRow)


In [26]:
sumPurchaseQtyRangeBet = sum(col("Quantity")).over(windowSpec)

In [27]:
dfWithDate.groupBy("CustomerID").agg(countDistinct(col("date")).alias("dateCount")).orderBy(col("dateCount").desc()).show(20)

+----------+---------+
|CustomerID|dateCount|
+----------+---------+
|      null|      277|
|     14911|      146|
|     12748|      115|
|     17841|      113|
|     15311|       91|
|     14606|       89|
|     13089|       83|
|     12971|       72|
|     16422|       67|
|     14527|       64|
|     13408|       55|
|     13798|       53|
|     14156|       51|
|     14646|       47|
|     15189|       46|
|     16029|       45|
|     13767|       42|
|     15039|       42|
|     16133|       41|
|     13694|       41|
+----------+---------+
only showing top 20 rows



In [28]:
dfWithDate.where(col("CustomerID").isin(16133,13694)).show()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+----------+
|   536437|    21154|RED RETROSPOT OVE...|     200| 12/1/2010 12:12|     1.06|     13694|United Kingdom|2010-12-01|
|   536437|    22189|CREAM HEART CARD ...|      72| 12/1/2010 12:12|     3.39|     13694|United Kingdom|2010-12-01|
|   536437|    17021|NAMASTE SWAGAT IN...|     600| 12/1/2010 12:12|     0.24|     13694|United Kingdom|2010-12-01|
|   536437|    22059|CERAMIC STRAWBERR...|      48| 12/1/2010 12:12|     1.25|     13694|United Kingdom|2010-12-01|
|   536437|    22188|BLACK HEART CARD ...|      36| 12/1/2010 12:12|     3.39|     13694|United Kingdom|2010-12-01|
|   536437|    84678|CLASSICAL ROSE SM...|      48| 12/1/2010 12:12|    

In [29]:
dfWithDate\
.where(col("CustomerID").isin(16133,13694))\
.select("CustomerID"\
        ,"date"\
        ,"Quantity"\
        ,sumPurchaseQty.alias("sumPurchaseQty")\
                  ,sumPurchaseQtyRangeBet.alias("sumPurchaseQtyRangeBet")\
                  ,maxPurchaseQty.alias("maxPurchaseQty") \
                  ,purchaseDenseRank.alias("purchaseDenseRank") \
                  ,purchaseRank.alias("purchaseRank") \
                 ).show(60)

+----------+----------+--------+--------------+----------------------+--------------+-----------------+------------+
|CustomerID|      date|Quantity|sumPurchaseQty|sumPurchaseQtyRangeBet|maxPurchaseQty|purchaseDenseRank|purchaseRank|
+----------+----------+--------+--------------+----------------------+--------------+-----------------+------------+
|     13694|2011-02-22|     192|           192|                   192|           192|                1|           1|
|     13694|2011-02-22|     144|           336|                   336|           192|                2|           2|
|     13694|2011-02-22|     144|           480|                   480|           192|                2|           2|
|     13694|2011-02-22|     144|           624|                   624|           192|                2|           2|
|     13694|2011-02-22|     120|           744|                   744|           192|                3|           5|
|     13694|2011-02-22|     120|           864|                 

In [6]:
dfNoNull = dfWithDate.drop()

In [7]:
dfNoNull.createOrReplaceTempView("dfNoNull")

In [8]:
spark.sql("SHOW TABLES").show()

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [34]:
spark.sql("""
SELECT CustomerId, stockCode, sum(Quantity) FROM dfnonull
Where CustomerId Not in (18287,18283)
GROUP BY customerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC
""").show()

+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|     18282|    23295|            8|
|     18282|    23187|           43|
|     18282|    23175|            4|
|     18282|    23174|            4|
|     18282|    22818|           12|
|     18282|    22699|            6|
|     18282|    22424|            1|
|     18282|    22423|            2|
|     18282|    22089|            6|
|     18282|    21270|            1|
|     18282|    21109|            2|
|     18282|    21108|            9|
|     18281|    23209|           10|
|     18281|    23008|            1|
|     18281|    23007|            1|
|     18281|    22716|           12|
|     18281|    22467|            6|
|     18281|    22037|           12|
|     18281|    22028|           12|
|     18280|    82484|            3|
+----------+---------+-------------+
only showing top 20 rows



In [42]:
spark.sql("""
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
Where CustomerId Not in (18287,18283)
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),(customerID),())
ORDER BY CustomerId DESC NULLS LAST, stockCode DESC NULLS FIRST
""").show()

+----------+---------+-------------+
|customerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|     18282|     null|           98|
|     18282|    23295|            8|
|     18282|    23187|           43|
|     18282|    23175|            4|
|     18282|    23174|            4|
|     18282|    22818|           12|
|     18282|    22699|            6|
|     18282|    22424|            1|
|     18282|    22423|            2|
|     18282|    22089|            6|
|     18282|    21270|            1|
|     18282|    21109|            2|
|     18282|    21108|            9|
|     18281|     null|           54|
|     18281|    23209|           10|
|     18281|    23008|            1|
|     18281|    23007|            1|
|     18281|    22716|           12|
|     18281|    22467|            6|
|     18281|    22037|           12|
+----------+---------+-------------+
only showing top 20 rows



In [15]:
dfWithDate_Filtered = dfWithDate.where(col("StockCode").isin('22752','22749','71053'))

In [31]:
rolledUpDF = dfWithDate_Filtered.rollup("StockCode","Country").agg(sum("Quantity"),avg("Quantity").alias("Avg_Qty"))\
.selectExpr("StockCode","Country","`sum(Quantity)` as tot_qty","Round(Avg_Qty) as AvgQty")\
.orderBy(desc_nulls_first("StockCode"), desc_nulls_first("Country"))

rolledUpDF.show(40)

+---------+---------------+-------+------+
|StockCode|        Country|tot_qty|AvgQty|
+---------+---------------+-------+------+
|     null|           null|   5952|   5.0|
|    71053|           null|   1911|   5.0|
|    71053| United Kingdom|   1845|   5.0|
|    71053|    Switzerland|     12|  12.0|
|    71053|          Spain|      6|   3.0|
|    71053|           EIRE|     48|   5.0|
|    22752|           null|   1666|   4.0|
|    22752|    Unspecified|      5|   3.0|
|    22752| United Kingdom|   1399|   4.0|
|    22752|    Switzerland|     12|   3.0|
|    22752|         Sweden|      2|   2.0|
|    22752|          Spain|      3|   2.0|
|    22752|         Norway|     24|  24.0|
|    22752|    Netherlands|     26|  13.0|
|    22752|        Germany|     81|   7.0|
|    22752|         France|     72|   7.0|
|    22752|           EIRE|     14|   4.0|
|    22752|        Denmark|     24|  24.0|
|    22752|        Belgium|      2|   2.0|
|    22752|        Austria|      2|   2.0|
|    22749|

In [47]:
cubeDF = dfWithDate_Filtered\
.where(col("StockCode") == '71053')\
.cube("StockCode","Country").agg(sum("Quantity"))\
.selectExpr("StockCode","Country","`sum(Quantity)` as tot_qty")\
.orderBy(desc_nulls_last( "StockCode"), col("country"))

cubeDF.show(40)

+---------+--------------+-------+
|StockCode|       Country|tot_qty|
+---------+--------------+-------+
|    71053|          null|   1911|
|    71053|          EIRE|     48|
|    71053|         Spain|      6|
|    71053|   Switzerland|     12|
|    71053|United Kingdom|   1845|
|     null|          null|   1911|
|     null|          EIRE|     48|
|     null|         Spain|      6|
|     null|   Switzerland|     12|
|     null|United Kingdom|   1845|
+---------+--------------+-------+



In [50]:
cubeDF1 = dfWithDate_Filtered.cube("StockCode","Country").agg(grouping_id(), sum("Quantity"))\
.orderBy(expr("grouping_id()").desc())

cubeDF1.show()

+---------+---------------+-------------+-------------+
|StockCode|        Country|grouping_id()|sum(Quantity)|
+---------+---------------+-------------+-------------+
|     null|           null|            3|         5952|
|     null| United Kingdom|            2|         5087|
|     null|        Austria|            2|            2|
|     null|           EIRE|            2|           96|
|     null|         Israel|            2|            4|
|     null|    Netherlands|            2|          122|
|     null|    Switzerland|            2|           28|
|     null|         Cyprus|            2|            7|
|     null|          Italy|            2|           16|
|     null|         Sweden|            2|           14|
|     null|        Belgium|            2|            2|
|     null|          Spain|            2|           15|
|     null|        Denmark|            2|           24|
|     null|      Australia|            2|          240|
|     null|Channel Islands|            2|       

In [51]:
cubeDF1\
.where(col("StockCode").isNotNull())\
.show()

+---------+--------------+-------------+-------------+
|StockCode|       Country|grouping_id()|sum(Quantity)|
+---------+--------------+-------------+-------------+
|    71053|          null|            1|         1911|
|    22749|          null|            1|         2375|
|    22752|          null|            1|         1666|
|    22749|   Netherlands|            0|           96|
|    22749|         Spain|            0|            6|
|    22752|       Denmark|            0|           24|
|    22749|          EIRE|            0|           34|
|    71053|United Kingdom|            0|         1845|
|    22752|   Switzerland|            0|           12|
|    22752|   Unspecified|            0|            5|
|    22752|        Sweden|            0|            2|
|    22749|        Sweden|            0|           12|
|    22749|        Israel|            0|            4|
|    22749|         Italy|            0|           16|
|    22752|         Spain|            0|            3|
|    22749

In [52]:
cubeDF1\
.where(col("country").isNotNull())\
.show()

+---------+---------------+-------------+-------------+
|StockCode|        Country|grouping_id()|sum(Quantity)|
+---------+---------------+-------------+-------------+
|     null|           EIRE|            2|           96|
|     null|         Sweden|            2|           14|
|     null|        Belgium|            2|            2|
|     null| United Kingdom|            2|         5087|
|     null|         Israel|            2|            4|
|     null|    Unspecified|            2|            5|
|     null|          Japan|            2|           16|
|     null|        Denmark|            2|           24|
|     null|         Cyprus|            2|            7|
|     null|        Germany|            2|           89|
|     null|    Switzerland|            2|           28|
|     null|         Norway|            2|           24|
|     null|        Austria|            2|            2|
|     null|          Spain|            2|           15|
|     null|          Italy|            2|       

In [67]:
cubeDF1\
.where(col("country").isNotNull() & col("date").isNotNull())\
.show()

+----------+---------------+-------------+-------------+
|      date|        Country|grouping_id()|sum(Quantity)|
+----------+---------------+-------------+-------------+
|2011-02-21|         France|            0|          334|
|2011-03-17| United Kingdom|            0|        18037|
|2011-03-23| United Kingdom|            0|         8311|
|2011-04-05|          Japan|            0|         1626|
|2011-08-21|         France|            0|          270|
|2011-10-21| United Kingdom|            0|        20318|
|2011-11-07| United Kingdom|            0|        25943|
|2011-01-05|        Germany|            0|          -56|
|2011-02-07|      Australia|            0|           58|
|2011-03-29|    Netherlands|            0|        12186|
|2011-05-18| United Kingdom|            0|        17366|
|2011-05-25| United Kingdom|            0|         9639|
|2011-06-19|         France|            0|           63|
|2011-07-13|        Belgium|            0|           13|
|2011-09-23|         France|   

In [68]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [69]:
pivoted.columns

['date',
 'Australia_sum(CAST(Quantity AS BIGINT))',
 'Australia_sum(UnitPrice)',
 'Australia_sum(CAST(CustomerID AS BIGINT))',
 'Austria_sum(CAST(Quantity AS BIGINT))',
 'Austria_sum(UnitPrice)',
 'Austria_sum(CAST(CustomerID AS BIGINT))',
 'Bahrain_sum(CAST(Quantity AS BIGINT))',
 'Bahrain_sum(UnitPrice)',
 'Bahrain_sum(CAST(CustomerID AS BIGINT))',
 'Belgium_sum(CAST(Quantity AS BIGINT))',
 'Belgium_sum(UnitPrice)',
 'Belgium_sum(CAST(CustomerID AS BIGINT))',
 'Brazil_sum(CAST(Quantity AS BIGINT))',
 'Brazil_sum(UnitPrice)',
 'Brazil_sum(CAST(CustomerID AS BIGINT))',
 'Canada_sum(CAST(Quantity AS BIGINT))',
 'Canada_sum(UnitPrice)',
 'Canada_sum(CAST(CustomerID AS BIGINT))',
 'Channel Islands_sum(CAST(Quantity AS BIGINT))',
 'Channel Islands_sum(UnitPrice)',
 'Channel Islands_sum(CAST(CustomerID AS BIGINT))',
 'Cyprus_sum(CAST(Quantity AS BIGINT))',
 'Cyprus_sum(UnitPrice)',
 'Cyprus_sum(CAST(CustomerID AS BIGINT))',
 'Czech Republic_sum(CAST(Quantity AS BIGINT))',
 'Czech Republic_

In [55]:
dfWithDate_Filtered.groupBy().avg("Quantity","UnitPrice").collect()

[Row(avg(Quantity)=4.9068425391591095, avg(UnitPrice)=6.118788128606764)]