In [1]:
# # **Loading Hive Tables and Data Preparation for Analysis**

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import * 
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder.appName("Online Retail Analysis").config(
    "spark.ui.port", "0").config(
        "spark.sql.catalogImplementation=hive").config(
        "spark.sql.warehouse.dir",
        "hdfs://nameservice1/user/itv003722/warehouse/online_retail.db").config(
            "spark.serializer",
    "org.apache.spark.serializer.KryoSerializer").enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel('OFF')

In [33]:
Rdf = spark.table('online_retail.online_retail')

In [34]:
Rdf.createOrReplaceTempView('Rdf')

In [35]:
Rdf.show()
Rdf.printSchema()
Rdf.count()

+----------+----------+--------------------+--------+---------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|   invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+---------------+----------+-----------+--------------+
|     36365|    85123A|WHITE HANGING HEA...|       6|2010-12-01 8:26|      2.55|      17850|United Kingdom|
|    536365|     71053| WHITE METAL LANTERN|       6|2010-12-01 8:26|      3.39|      17850|United Kingdom|
|    536365|    84406B|CREAM CUPID HEART...|       8|2010-12-01 8:26|      2.75|      17850|United Kingdom|
|    536365|    84029G|KNITTED UNION FLA...|       6|2010-12-01 8:26|      3.39|      17850|United Kingdom|
|    536365|    84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 8:26|      3.39|      17850|United Kingdom|
|    536365|     22752|SET 7 BABUSHKA NE...|       2|2010-12-01 8:26|      7.65|      17850|United Kingdom|
|    536365|     21730|GLASS

541909

In [36]:
#descritptive Statistics
summary = Rdf.describe().toPandas()
summary = summary.T
summary.columns = summary.iloc[0]
summary = summary.drop(summary.index[0])
summary

summary,count,mean,stddev,min,max
invoice_no,532620,559964.7247099245,13447.583077573228,36365,581587
stock_code,541909,27623.240210938104,16799.737628427636,10002,m
description,541909,20713.0,,,wrongly sold sets
quantity,541909,9.55224954743324,218.08115785023452,-80995,80995
invoice_date,541909,,,2010-12-01 10:03,2011-12-09 9:57
unit_price,541909,4.6111136260830365,96.75985306117936,-11062.06,38970.0
customer_id,406829,15287.690570239583,1713.600303321604,12346,18287
country,541909,,,Australia,Unspecified


In [37]:
def datashape(data):
    print("Data shape (rows, columns):", data.count(), "x", len(data.columns))
    
datashape(Rdf)

Data shape (rows, columns): 541909 x 8


In [40]:
#check the missing value
Rdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Rdf.columns])

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
9289,0,0,0,0,0,135080,0


In [41]:
spark.sql("""

select count(*) from Rdf where invoice_no is null
""").show()

+--------+
|count(1)|
+--------+
|    9289|
+--------+



In [42]:
spark.sql("""

select * from Rdf where invoice_no is null
""").show(10)

+----------+----------+--------------------+--------+----------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|    invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+----------------+----------+-----------+--------------+
|      null|     22556|PLASTERS IN TIN C...|     -12|2010-12-01 10:24|      1.65|      17548|United Kingdom|
|      null|     21984|PACK OF 12 PINK P...|     -24|2010-12-01 10:24|      0.29|      17548|United Kingdom|
|      null|     21983|PACK OF 12 BLUE P...|     -24|2010-12-01 10:24|      0.29|      17548|United Kingdom|
|      null|     21980|PACK OF 12 RED RE...|     -24|2010-12-01 10:24|      0.29|      17548|United Kingdom|
|      null|     21484|CHICK GREY HOT WA...|     -12|2010-12-01 10:24|      3.45|      17548|United Kingdom|
|      null|     22557|PLASTERS IN TIN V...|     -12|2010-12-01 10:24|      1.65|      17548|United Kingdom|
|      null|     22

In [43]:
#remove the cancelled invoice start with C
Rdf = Rdf[~Rdf['invoice_no'].startswith("C")]

In [44]:
datashape(Rdf)

Data shape (rows, columns): 532620 x 8


In [45]:
#remove the row having null value o=in column invoice_no and cutomer_id that null id belongs to wholesaler sale
Rdf.na.drop(subset=["invoice_no"]) \
   .show(truncate=False)

+----------+----------+-----------------------------------+--------+---------------+----------+-----------+--------------+
|invoice_no|stock_code|description                        |quantity|invoice_date   |unit_price|customer_id|country       |
+----------+----------+-----------------------------------+--------+---------------+----------+-----------+--------------+
|36365     |85123A    |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 8:26|2.55      |17850      |United Kingdom|
|536365    |71053     |WHITE METAL LANTERN                |6       |2010-12-01 8:26|3.39      |17850      |United Kingdom|
|536365    |84406B    |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 8:26|2.75      |17850      |United Kingdom|
|536365    |84029G    |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 8:26|3.39      |17850      |United Kingdom|
|536365    |84029E    |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 8:26|3.39      |17850      |United Kingdom|
|536365    |2275

In [46]:
datashape(Rdf)

Data shape (rows, columns): 532620 x 8


In [47]:
#check the missing value
Rdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Rdf.columns])

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
0,0,0,0,0,0,134694,0


In [48]:
spark.sql("""

select * from Rdf where customer_id is null
""").show(10)

+----------+----------+--------------------+--------+----------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|    invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+----------------+----------+-----------+--------------+
|    536414|     22139|                    |      56|2010-12-01 11:52|       0.0|       null|United Kingdom|
|    536544|     21773|DECORATIVE ROSE B...|       1|2010-12-01 14:32|      2.51|       null|United Kingdom|
|    536544|     21774|DECORATIVE CATS B...|       2|2010-12-01 14:32|      2.51|       null|United Kingdom|
|    536544|     21786|  POLKADOT RAIN HAT |       4|2010-12-01 14:32|      0.85|       null|United Kingdom|
|    536544|     21787|RAIN PONCHO RETRO...|       2|2010-12-01 14:32|      1.66|       null|United Kingdom|
|    536544|     21790|  VINTAGE SNAP CARDS|       9|2010-12-01 14:32|      1.66|       null|United Kingdom|
|    536544|     21

In [52]:
#change the Datatype of quantity to Folat
Rdf = Rdf.withColumn("quantity", col("quantity").cast("Float"))

In [53]:
df=Rdf.withColumn('invoice_date', \

         to_timestamp('invoice_date').cast('timestamp'))\

In [54]:
df.printSchema()

root
 |-- invoice_no: long (nullable = true)
 |-- stock_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: float (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- country: string (nullable = true)



In [55]:
#Number i=of records as per date and Time
df.groupby("invoice_date").count().sort("invoice_date", ascending=True).limit(10)

invoice_date,count
2010-12-01 08:26:00,7
2010-12-01 08:28:00,2
2010-12-01 08:34:00,16
2010-12-01 08:35:00,1
2010-12-01 08:45:00,20
2010-12-01 09:00:00,1
2010-12-01 09:01:00,2
2010-12-01 09:02:00,16
2010-12-01 09:09:00,1
2010-12-01 09:32:00,18


In [56]:
df.select(min((df.invoice_date))).show()

+-------------------+
|  min(invoice_date)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



In [57]:
df.select(max((df.invoice_date))).show()

+-------------------+
|  max(invoice_date)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



In [17]:
#to check  total country
spark.sql("""

select count(distinct country) from Rdf
""").show()

+-----------------------+
|count(DISTINCT country)|
+-----------------------+
|                     38|
+-----------------------+



In [13]:
#distinct Country
spark.sql("""

select distinct country from Rdf
""").show()

+------------------+
|           country|
+------------------+
|            Sweden|
|         Singapore|
|           Germany|
|               RSA|
|            France|
|            Greece|
|European Community|
|           Belgium|
|           Finland|
|             Malta|
|       Unspecified|
|             Italy|
|              EIRE|
|         Lithuania|
|            Norway|
|             Spain|
|           Denmark|
|         Hong Kong|
|           Iceland|
|            Israel|
+------------------+
only showing top 20 rows



In [9]:
#to check  the country name EIRE
df.filter(df.country == "EIRE").limit(5)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
536540,22968,ROSE COTTAGE KEEP...,4.0,2010-12-01 14:05:00,9.95,14911,EIRE
536540,85071A,BLUE CHARLIE+LOLA...,6.0,2010-12-01 14:05:00,2.95,14911,EIRE
536540,85071C,"""CHARLIE+LOLA""""EX...",6.0,2010-12-01 14:05:00,2.55,14911,EIRE
536540,22355,CHARLOTTE BAG SUK...,50.0,2010-12-01 14:05:00,0.85,14911,EIRE
536540,21579,LOLITA DESIGN C...,6.0,2010-12-01 14:05:00,2.25,14911,EIRE


In [58]:
#Replcae EIRE with Ireland
df = df.replace(['EIRE'],['Ireland'])

In [59]:
#change the Datatype of quantity to Folat
df = df.withColumn("quantity", col("quantity").cast("Float"))

In [60]:
#Daily Sales Activity
df[(df["invoice_date"]> '2010-12-01 08:00:00') & (df["invoice_date"]< '2010-12-15 12:00:00') ]

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
36365,85123A,WHITE HANGING HEA...,6.0,2010-12-01 08:26:00,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6.0,2010-12-01 08:26:00,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEART...,8.0,2010-12-01 08:26:00,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLA...,6.0,2010-12-01 08:26:00,3.39,17850,United Kingdom
536365,84029E,RED WOOLLY HOTTIE...,6.0,2010-12-01 08:26:00,3.39,17850,United Kingdom
536365,22752,SET 7 BABUSHKA NE...,2.0,2010-12-01 08:26:00,7.65,17850,United Kingdom
536365,21730,GLASS STAR FROSTE...,6.0,2010-12-01 08:26:00,4.25,17850,United Kingdom
536366,22633,HAND WARMER UNION...,6.0,2010-12-01 08:28:00,1.85,17850,United Kingdom
536366,22632,HAND WARMER RED P...,6.0,2010-12-01 08:28:00,1.85,17850,United Kingdom
536367,84879,ASSORTED COLOUR B...,32.0,2010-12-01 08:34:00,1.69,13047,United Kingdom


In [61]:
#sale per week

df = df.withColumn("weekofyear", weekofyear("invoice_date"))
df.show()

+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+----------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|weekofyear|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+----------+
|     36365|    85123A|WHITE HANGING HEA...|     6.0|2010-12-01 08:26:00|      2.55|      17850|United Kingdom|        48|
|    536365|     71053| WHITE METAL LANTERN|     6.0|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|        48|
|    536365|    84406B|CREAM CUPID HEART...|     8.0|2010-12-01 08:26:00|      2.75|      17850|United Kingdom|        48|
|    536365|    84029G|KNITTED UNION FLA...|     6.0|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|        48|
|    536365|    84029E|RED WOOLLY HOTTIE...|     6.0|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|        48|
|    536365|    

In [62]:
#Create a new column Amount to check the revenue per country
df = df.withColumn("amount", round(col("Quantity") * col("unit_price"),2))
df.limit(5)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,weekofyear,amount
560729,22705,WRAP GREEN PEARS,25.0,2011-07-20 14:39:00,0.42,17511,United Kingdom,29,10.5
560729,22704,WRAP RED APPLES,25.0,2011-07-20 14:39:00,0.42,17511,United Kingdom,29,10.5
560729,22710,WRAP I LOVE LONDON,25.0,2011-07-20 14:39:00,0.42,17511,United Kingdom,29,10.5
560729,23241,TREASURE TIN GYMK...,6.0,2011-07-20 14:39:00,2.08,17511,United Kingdom,29,12.48
560729,23242,TREASURE TIN BUFF...,6.0,2011-07-20 14:39:00,2.08,17511,United Kingdom,29,12.48


In [63]:
#Revenue Aggregate By top 5 countries
df.groupBy("country").agg(round(sum("amount"),2).alias("Total Revenue")).orderBy(col("Total Revenue").desc()).show(5)

+--------------+-------------+
|       country|Total Revenue|
+--------------+-------------+
|United Kingdom|   9014127.87|
|   Netherlands|    285446.34|
|       Ireland|    283453.96|
|       Germany|    228867.14|
|        France|    209715.11|
+--------------+-------------+
only showing top 5 rows



It means United kingdom top country in case of Revenue

In [71]:
# Daily sales Activity
df.groupBy("invoice_date","invoice_no").agg(round(sum("amount"),2).alias("Total Amount")).orderBy(col("invoice_date").asc()).show(20)

+-------------------+----------+------------+
|       invoice_date|invoice_no|Total Amount|
+-------------------+----------+------------+
|2010-12-01 08:26:00|     36365|        15.3|
|2010-12-01 08:26:00|    536365|      123.82|
|2010-12-01 08:28:00|    536366|        22.2|
|2010-12-01 08:34:00|    536368|       70.05|
|2010-12-01 08:34:00|    536367|      278.73|
|2010-12-01 08:35:00|    536369|       17.85|
|2010-12-01 08:45:00|    536370|      855.86|
|2010-12-01 09:00:00|    536371|       204.0|
|2010-12-01 09:01:00|    536372|        22.2|
|2010-12-01 09:02:00|    536373|      259.86|
|2010-12-01 09:09:00|    536374|       350.4|
|2010-12-01 09:32:00|    536375|      259.86|
|2010-12-01 09:32:00|    536376|       328.8|
|2010-12-01 09:34:00|    536377|        22.2|
|2010-12-01 09:37:00|    536378|      444.98|
|2010-12-01 09:41:00|    536381|      449.98|
|2010-12-01 09:41:00|    536379|       -27.5|
|2010-12-01 09:41:00|    536380|        34.8|
|2010-12-01 09:45:00|    536382|  

In [72]:
#Check Hourly Sale 
df=df.withColumn("hour",hour("invoice_date"))

In [73]:
df.show(5)

+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+----------+------+----+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|weekofyear|amount|hour|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+----------+------+----+
|     36365|    85123A|WHITE HANGING HEA...|     6.0|2010-12-01 08:26:00|      2.55|      17850|United Kingdom|        48|  15.3|   8|
|    536365|     71053| WHITE METAL LANTERN|     6.0|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|        48| 20.34|   8|
|    536365|    84406B|CREAM CUPID HEART...|     8.0|2010-12-01 08:26:00|      2.75|      17850|United Kingdom|        48|  22.0|   8|
|    536365|    84029G|KNITTED UNION FLA...|     6.0|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|        48| 20.34|   8|
|    536365|    84029E|RED WOOLLY HOTTIE...|     6.0|20

In [74]:
#sale per Hours
df.groupBy("hour","customer_id").agg(round(sum("amount"),2).alias("Total Amount")).orderBy(col("hour").asc()).show(20)

+----+-----------+------------+
|hour|customer_id|Total Amount|
+----+-----------+------------+
|   6|      14305|        4.25|
|   7|      12736|       234.0|
|   7|      14619|      394.44|
|   7|      12823|       535.5|
|   7|      15694|       306.4|
|   7|      14911|       539.0|
|   7|      13090|       160.6|
|   7|      15838|       277.5|
|   7|      13098|     1974.06|
|   7|      15189|      459.45|
|   7|      16684|      1494.0|
|   7|      16612|      317.36|
|   7|      13026|      170.64|
|   7|      15505|      880.58|
|   7|      13741|      200.17|
|   7|      16422|      385.14|
|   7|      13953|       500.0|
|   7|      12775|       419.8|
|   7|      17679|      348.91|
|   7|      18061|       213.8|
+----+-----------+------------+
only showing top 20 rows



In [75]:
#check the sale as per stock-code
df.filter(df.stock_code == "85123A").limit(5)

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country,weekofyear,amount,hour
36365,85123A,WHITE HANGING HEA...,6.0,2010-12-01 08:26:00,2.55,17850,United Kingdom,48,15.3,8
536373,85123A,WHITE HANGING HEA...,6.0,2010-12-01 09:02:00,2.55,17850,United Kingdom,48,15.3,9
536375,85123A,WHITE HANGING HEA...,6.0,2010-12-01 09:32:00,2.55,17850,United Kingdom,48,15.3,9
536390,85123A,WHITE HANGING HEA...,64.0,2010-12-01 10:19:00,2.55,17511,United Kingdom,48,163.2,10
536394,85123A,WHITE HANGING HEA...,32.0,2010-12-01 10:39:00,2.55,13408,United Kingdom,48,81.6,10


In [76]:
#No. of transaction per invoice
df.groupBy("invoice_no").agg(count("stock_code").alias("Basket Size")).show()

+----------+-----------+
|invoice_no|Basket Size|
+----------+-----------+
|    560996|        101|
|    561032|          6|
|    561473|         25|
|    562162|         14|
|    562933|        290|
|    563540|         30|
|    564921|          1|
|    565456|         50|
|    565556|          1|
|    565735|          1|
|    565750|         10|
|    566486|         13|
|    566571|         75|
|    566924|          1|
|    567201|          1|
|    567816|         21|
|    568387|         22|
|    568402|         14|
|    568706|         17|
|    568715|         37|
+----------+-----------+
only showing top 20 rows



In [77]:
#Total Item sold by frequency
df.groupBy("invoice_no","Description").agg(sum("quantity").alias("Total Qunatity sold")).orderBy(col("Total Qunatity sold").desc()).show(5)

+----------+--------------------+-------------------+
|invoice_no|         Description|Total Qunatity sold|
+----------+--------------------+-------------------+
|    581483|"PAPER CRAFT , LI...|            80995.0|
|    541431|MEDIUM CERAMIC TO...|            74215.0|
|    578841|ASSTD DESIGN 3D P...|            12540.0|
|    542504|                    |             5568.0|
|    573008|WORLD WAR 2 GLIDE...|             4800.0|
+----------+--------------------+-------------------+
only showing top 5 rows

