In [1]:
import pandas as pd
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.master("local[4]").appName("ClusterShop").getOrCreate()

# Read the Excel file using Pandas
df_pandas = pd.read_excel("Online Retail.xlsx",na_values='')

# Convert the Pandas DataFrame to PySpark DataFrame 
df_spark = spark.createDataFrame(df_pandas)

23/04/17 16:16:58 WARN Utils: Your hostname, khaldon-LENOVO resolves to a loopback address: 127.0.1.1, but we couldn't find any external IP address!
23/04/17 16:16:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/17 16:17:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/17 16:17:03 WARN MacAddressUtil: Failed to find a usable hardware address from the network interfaces; using random bytes: 20:9a:f3:07:3b:65:00:8d


In [2]:
# set log level to Error
spark.sparkContext.setLogLevel("ERROR")

In [3]:
# Show the DataFrame
df_spark.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

                                                                                

### Data Summary

In [4]:
# Describe the DataFrame
df_spark.describe().show()




+-------+------------------+------------------+--------------------+------------------+-----------------+----------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|        UnitPrice|CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+-----------------+----------+-----------+
|  count|            541909|            541909|              541909|            541909|           541909|    541909|     541909|
|   mean|  559965.752026781|27623.240210938104|                 NaN|  9.55224954743324|4.611113626088897|       NaN|       null|
| stddev|13428.417280798658| 16799.73762842766|                 NaN|218.08115785023472|96.75985306117991|       NaN|       null|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|            -80995|        -11062.06|   12346.0|  Australia|
|    max|           C581569|                 m|   wrongly sold sets|             80995|          

                                                                                

## Explore the data

### Percentage of missing values in each column

In [5]:
# percentage of missing values in each column of the DataFrame
from pyspark.sql.functions import col, count, isnan, when
# get columns excluding 'InvoiceDate'
cols = [col for col in df_spark.columns if col != 'InvoiceDate']


# count missing values in each column as a percentage of the total number of values and total number of values in each column
df_spark.select([count(when(~isnan(c), c)).alias(c) for c in cols]).show()
df_spark.select([count(when(isnan(c), c)).alias(c) for c in cols]).show()

                                                                                

+---------+---------+-----------+--------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+---------+----------+-------+
|   541909|   541909|     540455|  541909|   541909|    406829| 541909|
+---------+---------+-----------+--------+---------+----------+-------+





+---------+---------+-----------+--------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+---------+----------+-------+
|        0|        0|       1454|       0|        0|    135080|      0|
+---------+---------+-----------+--------+---------+----------+-------+



                                                                                

## Clean the data

In [6]:
import math

# filter out the missing values using map 
df_spark_filtered = df_spark.rdd.map(lambda x: x if not math.isnan(x['CustomerID'])  else None).filter(lambda x: x is not None).toDF()
df_spark_filtered.select([count(when(isnan(c), c)).alias(c) for c in cols]).show()




+---------+---------+-----------+--------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+---------+----------+-------+
|        0|        0|          0|       0|        0|         0|      0|
+---------+---------+-----------+--------+---------+----------+-------+



                                                                                

## Percentage of Cancelled Orders

In [7]:

unique_invoice = df_spark_filtered.select('InvoiceNo').distinct().count()
cancelled_invoice_count = df_spark_filtered.filter(df_spark_filtered['InvoiceNo'].startswith('C')).select('InvoiceNo').distinct().count()

# percentage of cancelled invoices
print("Percentage of cancelled invoices: ", (cancelled_invoice_count/unique_invoice)*100)



Percentage of cancelled invoices:  16.466876971608833


                                                                                

In [24]:
# Get all transactions with the same invoice number but starts with 'C'
# such that it has the same customer ID and StockCode and abs(Quantity) is the same
# and abs(UnitPrice) is the same

orderd_cancelled = df_spark_filtered.filter(df_spark_filtered['InvoiceNo'].startswith('C')).select('InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice').orderBy('InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice')

normal_orders = df_spark_filtered.filter(~df_spark_filtered['InvoiceNo'].startswith('C')).select('InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice').orderBy('InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice')

# count the number of rows in each DataFrame
print("Number of rows in orderd_cancelled: ", orderd_cancelled.count())
print("Number of rows in normal_orders: ", normal_orders.count())



                                                                                

Number of rows in orderd_cancelled:  8905


Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:708)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:617)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:574)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:532)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)

Number of rows in normal_orders:  397924


                                                                                

In [25]:
# convert the InvoiceDate column to a date
from pyspark.sql.functions import to_date
df_spark_filtered = df_spark_filtered.withColumn("InvoiceDate", to_date(df_spark_filtered.InvoiceDate, 'MM/dd/yyyy'))

In [26]:

from pyspark.sql.functions import col
from pyspark.sql.functions import round
from pyspark.sql.functions import month

# create a new column called TotalCost
df_spark_filtered = df_spark_filtered.withColumn("TotalCost", col("Quantity") * col("UnitPrice"))

# round the TotalCost column to 2 decimal places
df_spark_filtered = df_spark_filtered.withColumn("TotalCost", round(df_spark_filtered.TotalCost, 2))
df_spark_filtered.show()




+---------+---------+--------------------+--------+-----------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|TotalCost|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|   17850.0|United Kingdom|     15.3|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|   17850.0|United Kingdom|    20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|   17850.0|United Kingdom|     22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|   17850.0|United Kingdom|    20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|   17850.0|United Kingdom|    20.34|
|   536365|    22752|SET 7 BABUSHKA NE...|       2| 2010-12-01|     7.65|   17850.0|United Kingdom|     15.3|
|   536365

In [27]:
# Basket size
basket_price = df_spark_filtered.groupBy('CustomerID','InvoiceNo').agg({'TotalCost': 'sum'}).withColumnRenamed('sum(TotalCost)', 'TotalCost').sort('CustomerID')
basket_price.show()



+----------+---------+------------------+
|CustomerID|InvoiceNo|         TotalCost|
+----------+---------+------------------+
|   12346.0|   541431|           77183.6|
|   12346.0|  C541433|          -77183.6|
|   12347.0|   537626|            711.79|
|   12347.0|   542237|            475.39|
|   12347.0|   573511|1294.3200000000002|
|   12347.0|   549222|            636.25|
|   12347.0|   556201|            382.52|
|   12347.0|   562032|            584.91|
|   12347.0|   581180|224.82000000000002|
|   12348.0|   541998|227.43999999999997|
|   12348.0|   539318| 892.8000000000001|
|   12348.0|   548955|             367.0|
|   12348.0|   568172|             310.0|
|   12349.0|   577609|           1757.55|
|   12350.0|   543037|334.40000000000003|
|   12352.0|   567505|            366.25|
|   12352.0|   547390|160.32999999999998|
|   12352.0|   544156|296.49999999999994|
|   12352.0|   568699|            266.25|
|   12352.0|  C545329|            -463.8|
+----------+---------+------------

                                                                                

### Analyze description Column

In [33]:
# analyze the description column 


# This function takes as input the dataframe and analyzes the content of the Description column by performing the following operations:

#     extract the names (proper, common) appearing in the products description
#     for each name, I extract the root of the word and aggregate the set of names associated with this particular root
#     count the number of times each root appears in the dataframe
#     when several words are listed for the same root, I consider that the keyword associated with this root is the shortest name (this systematically selects the singular when there are singular/plural variants)

from nltk import word_tokenize
from nltk.stem import SnowballStemmer

import nltk
nltk.download('punkt')
# pos_tag
nltk.download('averaged_perceptron_tagger')

def analyze_description(df):
    # get the description column
    description = df.select('Description').rdd.map(lambda x: x[0]).collect()
    # tokenize the description column
    tokens = [word_tokenize(x) for x in description if x is not None]
    
    # get nouns and proper nouns
    tokens = [[x[0] for x in nltk.pos_tag(y) if x[1] == 'NN' or x[1] == 'NNP'] for y in tokens]
    
    # get the root of each word
    stemmer = SnowballStemmer('english')
    stems = [[stemmer.stem(x) for x in y] for y in tokens]
    # create a dictionary that maps each root to the set of words associated with this root
    stem_dict = {}
    for i in range(len(stems)):
        for j in range(len(stems[i])):
            if stems[i][j] not in stem_dict:
                stem_dict[stems[i][j]] = set()
            stem_dict[stems[i][j]].add(tokens[i][j])
    # count the number of times each root appears in the dataframe
    stem_count = {}
    for i in range(len(stems)):
        for j in range(len(stems[i])):
            if stems[i][j] not in stem_count:
                stem_count[stems[i][j]] = 0
            stem_count[stems[i][j]] += 1
    # create a dictionary that maps each root to the shortest word associated with this root
    stem_shortest = {}
    for key in stem_dict:
        stem_shortest[key] = min(stem_dict[key], key=len)
    return stem_dict, stem_count, stem_shortest


print("Analyzing the description column...")
stem_dict, stem_count, stem_shortest = analyze_description(df_spark_filtered)
print("Done!")

# get the top 10 most frequent roots
top_10 = sorted(stem_count.items(), key=lambda x: x[1], reverse=True)[:10]
print("Top 10 most frequent roots: ", top_10)


[nltk_data] Downloading package punkt to /home/khaldon/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/khaldon/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


Analyzing the description column...


                                                                                

In [None]:
# Horizontal bar chart of the roots and their frequency in stem_count
import matplotlib.pyplot as plt
import numpy as np

# get the roots and their frequency in stem_count
sorted_stem_count = sorted(stem_count.items(), key=lambda x: x[1], reverse=True)
roots = [x[0] for x in sorted_stem_count]
counts = [x[1] for x in sorted_stem_count]

# plot the roots and their frequency in stem_count
plt.figure(figsize=(10, 5))
plt.barh(roots, counts)
plt.title('Top 10 most frequent roots')
plt.xlabel('Frequency')
plt.ylabel('Root')
plt.show()

