# Individual Assignment Video Presentation: https://drive.google.com/file/d/1Ne4azMrF_ScRcSIzIkMrWnlUy0emc3BV/view?usp=sharing

### Extra video going over this Spark notebook codes (only codes, no explanation of the project): https://drive.google.com/file/d/1ebKJJSnGheQXzn1d95VlESo4LdL0K5XP/view?usp=sharing

This data set is pulled from: https://www.kaggle.com/jillwang87/online-retail-ii?select=online_retail_10_11.csv which was originally published on UCI Machine Learning Repository: https://archive.ics.uci.edu/ml/datasets/Online+Retail+II

"Data Set Information:

This Online Retail II data set contains all the transactions occurring for a UK-based and registered, non-store online retail between 01/12/2010 and 09/12/2011.The company mainly sells unique all-occasion gift-ware. Many customers of the company are wholesalers.


Attribute Information:

InvoiceNo: Invoice number. Nominal. A 6-digit integral number uniquely assigned to each transaction. If this code starts with the letter 'c', it indicates a cancellation.
StockCode: Product (item) code. Nominal. A 5-digit integral number uniquely assigned to each distinct product.
Description: Product (item) name. Nominal.
Quantity: The quantities of each product (item) per transaction. Numeric.
InvoiceDate: Invice date and time. Numeric. The day and time when a transaction was generated.
UnitPrice: Unit price. Numeric. Product price per unit in sterling (Â£).
CustomerID: Customer number. Nominal. A 5-digit integral number uniquely assigned to each customer.
Country: Country name. Nominal. The name of the country where a customer resides." 
                                                                               (from UCI Machine Learning Repository)
                                                                               
This notebook will analyze the data within the Online Retail II data set and answers some relating business questions.

# A. Spark environment setup

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit
import pyspark.sql.functions as F
from pyspark.sql.functions import split
from pyspark.sql.functions import *
from pyspark.sql.functions import date_format
from pyspark.sql import Row
from pyspark.sql.types import *

# B. Read DataFrame from CSV

In [4]:
df = spark.read \
          .option("inferSchema", "true") \
          .option("header", "true") \
          .csv("online_retail_10_11.csv")

In [5]:
# Get a general idea of what the data frame looks like
df.show(10, False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |22752    |SET 7 BABUSHKA NESTING BOXES       

# C. Data set metadata analysis
Display schema and size of the DataFrame

In [6]:
# See how many rows the data frame has
df.count()

541910

In [7]:
# See the Schema of the data frame
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



# D. Data cleaning and transformation

We see that there are null values for CustomerID, so we need to filter out those customers that do not have IDs
For the description, if it is null, it would be impossible for us to analyze what exactly they bought. So for the purpose our analysis, we take out rows without descriptions

In [8]:
# Check for nulls in CustomerId and Description
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["CustomerId", "Description"]]).show()


+----------+-----------+
|CustomerId|Description|
+----------+-----------+
|    135080|       1454|
+----------+-----------+



In [9]:
# Getting rid of the rows with nulls for CustomerId and Description
df = df.filter(df["CustomerId"].isNotNull())\
    .filter(df["Description"].isNotNull())
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

Next we will work on the column InvoiceDate. Right now it is a string type and we will convert it into date format, extract year, month, and day, and get day of the week of the invoice.

In [10]:
# Get rid of the hours and minutes of InvoiceDate and only keep the dates
df = df.withColumn("DateInvoice", split(col("InvoiceDate"), " ").getItem(0)).drop("InvoiceDate")
df.show()

+---------+---------+--------------------+--------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|DateInvoice|
+---------+---------+--------------------+--------+---------+----------+--------------+-----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|  12/1/2010|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|  12/1/2010|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|  12/1/2010|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|  12/1/2010|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|  12/1/2010|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|     7.65|     17850|United Kingdom|  12/1/2010|
|   536365|    21730|GLASS STAR FROSTE...|       6|     4.25|     17850|United Kingdom|  12/1/2010|


In [11]:
# Split the dates in the DateInvoice column and assign corresponding values to three new columns: Month, Day, and Year
df = df.withColumn("Month", split(col("DateInvoice"), "/").getItem(0))\
               .withColumn("Day", split(col("DateInvoice"), "/").getItem(1))\
               .withColumn("Year", split(col("DateInvoice"), "/").getItem(2))
df.show()

+---------+---------+--------------------+--------+---------+----------+--------------+-----------+-----+---+----+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|DateInvoice|Month|Day|Year|
+---------+---------+--------------------+--------+---------+----------+--------------+-----------+-----+---+----+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|  12/1/2010|   12|  1|2010|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|  12/1/2010|   12|  1|2010|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|  12/1/2010|   12|  1|2010|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|  12/1/2010|   12|  1|2010|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|  12/1/2010|   12|  1|2010|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|     7.65|     17850|United K

In [12]:
# Here I concactinate columns Year, Month, and Day together and cast it as date type. 
# I name this column InvoiceDate and drop the DateInvoice column which was of string type
df = df.withColumn("InvoiceDate",concat_ws("-",col("Year"),col("Month"),col("Day")).cast("date"))\
        .drop("DateInvoice")
df.show()

+---------+---------+--------------------+--------+---------+----------+--------------+-----+---+----+-----------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|Month|Day|Year|InvoiceDate|
+---------+---------+--------------------+--------+---------+----------+--------------+-----+---+----+-----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|   12|  1|2010| 2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|   12|  1|2010| 2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|   12|  1|2010| 2010-12-01|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|   12|  1|2010| 2010-12-01|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|   12|  1|2010| 2010-12-01|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|     7.65|     17850|United K

In [13]:
# Create a column DayofWeek to see the day of the week of the corresponding invoice date
df = df.withColumn("DayofWeek", date_format(col("InvoiceDate"), "E")).cache() #cache df to make futher analysis faster
df.show()

+---------+---------+--------------------+--------+---------+----------+--------------+-----+---+----+-----------+---------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|Month|Day|Year|InvoiceDate|DayofWeek|
+---------+---------+--------------------+--------+---------+----------+--------------+-----+---+----+-----------+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|   12|  1|2010| 2010-12-01|      Wed|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|   12|  1|2010| 2010-12-01|      Wed|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|   12|  1|2010| 2010-12-01|      Wed|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|   12|  1|2010| 2010-12-01|      Wed|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|   12|  1|2010| 2010-12-01|      Wed|


In [14]:
df.printSchema() #verify we did the transformation successfully

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- DayofWeek: string (nullable = true)



# E. Columns groups basic profiling and overview questions
## Timing related section

In [15]:
print ("Summary of columns Year, Month, Day, DayofWeek")
df.select("Year", "Month", "Day", "DayofWeek").summary().show()

Summary of columns Year, Month, Day, DayofWeek
+-------+-------------------+-----------------+------------------+---------+
|summary|               Year|            Month|               Day|DayofWeek|
+-------+-------------------+-----------------+------------------+---------+
|  count|             406830|           406830|            406830|   406830|
|   mean| 2010.9340019172628|7.605958262664995|15.036113364304502|     null|
| stddev|0.24827905132967706|3.418945197059456| 8.653724410199423|     null|
|    min|               2010|                1|                 1|      Fri|
|    25%|             2011.0|              5.0|               7.0|     null|
|    50%|             2011.0|              8.0|              15.0|     null|
|    75%|             2011.0|             11.0|              22.0|     null|
|    max|               2011|                9|                 9|      Wed|
+-------+-------------------+-----------------+------------------+---------+



In [16]:
print("Checking for nulls on columns Year, Month, DayofMonth and DayOfWeek:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Year", "Month", "Day", "DayofWeek"]]).show()

Checking for nulls on columns Year, Month, DayofMonth and DayOfWeek:
+----+-----+---+---------+
|Year|Month|Day|DayofWeek|
+----+-----+---+---------+
|   0|    0|  0|        0|
+----+-----+---+---------+



In [17]:
print("Checking amount of distinct values in columns Year, Month, DayofMonth and DayOfWeek:")
df.select([countDistinct(c).alias(c) for c in ["Year", "Month", "Day", "DayofWeek"]]).show()

Checking amount of distinct values in columns Year, Month, DayofMonth and DayOfWeek:
+----+-----+---+---------+
|Year|Month|Day|DayofWeek|
+----+-----+---+---------+
|   2|   12| 31|        6|
+----+-----+---+---------+



We know the data covers from December 2010 to December 2011 so it is not surprising to see the distinct values for year is 2 (2010 & 2011), month to be 12, and day to be 31. However,there are only 6 days of the week distinct values so let's investigate further and answer the following questions:

#### 1. What are the most and least frequent occurrences for DayofWeek column?
#### 2. What are the most and least frequent occurrences for Month column?
#### 3. What are the most frequent occurrences for Day column?
#### 4. What are the most frequent occurrences for InvoiceDate column?

In [18]:
print ("Most and least frequent occurrences for DayofWeek column:")
df.groupBy("DayofWeek").count().orderBy(col("count").desc()).show()

print ("Most and least frequent occurrences for Month column:")
df.groupBy("Month").count().orderBy(col("count").desc()).show()

print ("Most frequent occurrences for Day column:")
df.groupBy("Day").count().orderBy(col("count").desc()).show()

print ("Most frequent occurrences for InvoiceDate column:")
df.groupBy("InvoiceDate").count().orderBy(col("count").desc()).show()

Most and least frequent occurrences for DayofWeek column:
+---------+-----+
|DayofWeek|count|
+---------+-----+
|      Thu|82374|
|      Wed|70599|
|      Tue|68110|
|      Mon|66382|
|      Sun|63237|
|      Fri|56128|
+---------+-----+

Most and least frequent occurrences for Month column:
+-----+-----+
|Month|count|
+-----+-----+
|   11|65598|
|   10|50695|
|   12|44512|
|    9|40822|
|    5|28908|
|    6|27836|
|    3|27822|
|    8|27662|
|    7|27502|
|    4|23198|
|    1|21912|
|    2|20363|
+-----+-----+

Most frequent occurrences for Day column:
+---+-----+
|Day|count|
+---+-----+
|  6|18915|
|  5|16723|
|  8|16254|
|  7|16011|
|  4|15165|
| 17|15139|
| 20|14940|
| 14|14549|
| 23|14545|
| 10|14473|
| 13|14401|
|  1|13937|
| 28|13778|
| 11|13707|
| 21|13509|
| 18|13234|
|  9|13226|
| 16|12910|
| 27|12708|
| 22|12583|
+---+-----+
only showing top 20 rows

Most frequent occurrences for InvoiceDate column:
+-----------+-----+
|InvoiceDate|count|
+-----------+-----+
| 2011-11-06| 34

When we group by DayofWeek, we see that this online retail store do not accept invoices on Saturday, which is a unique choice.
We also see that the most frequent occurrances for month is November, for day is the 6th, and on November 6th, 2011, the most number of invoices were places.

## Item and geographical location related section
#### 5. What are the top 10 countries with the most customers?
Every customer has an unique CustomerID, so the total number of customers is count(distinct CustomerID). Then we group by country and order by count of customers in descending order

In [19]:
spark.sql("drop table if exists df1")
df.write.mode("overwrite").saveAsTable("df1")

In [20]:
spark.sql("""select Country,
                count(distinct CustomerID) as countCustomer
            from df1
            group by Country
            order by countCustomer desc
            limit 10
            """).show()

+--------------+-------------+
|       Country|countCustomer|
+--------------+-------------+
|United Kingdom|         3950|
|       Germany|           95|
|        France|           87|
|         Spain|           31|
|       Belgium|           25|
|   Switzerland|           21|
|      Portugal|           19|
|         Italy|           15|
|       Finland|           12|
|       Austria|           11|
+--------------+-------------+



#### 6. What are the top 10 most sold items?
Using sum(Quantity) to calculate total sales and group by StockCode to calculate total sales by item. 
We find that 84077, which is "WORLD WAR 2 GLIDERS ASSTD DESIGNS" is the most sold item. 

In [21]:
spark.sql("""select StockCode,
                sum(Quantity) as sumQuantity
            from df1
            group by StockCode
            order by sumQuantity desc
            limit 10
            """).show()

+---------+-----------+
|StockCode|sumQuantity|
+---------+-----------+
|    84077|      53215|
|    22197|      48712|
|   85099B|      45066|
|    84879|      35314|
|   85123A|      34204|
|    21212|      33409|
|    23084|      27094|
|    22492|      25880|
|    22616|      25321|
|    21977|      24163|
+---------+-----------+



In [22]:
spark.sql("""select StockCode, Description
            from df1
            where StockCode = 84077""").show(1, False)

+---------+---------------------------------+
|StockCode|Description                      |
+---------+---------------------------------+
|84077    |WORLD WAR 2 GLIDERS ASSTD DESIGNS|
+---------+---------------------------------+
only showing top 1 row



#### 7. What are the top 10 countries by quantity of items sold?
Under the Quantity column, returned items are marked as negatives. Therefore, by using sum(Quantity) we can calculate total sales taken into account returned items. Then we group by countries and order by the sum of quantity in descending order

In [23]:
spark.sql("""select country,
              sum(quantity) as sumQuantity
          from df1
          group by country
          order by sumQuantity desc
          limit 10""").show()

+--------------+-----------+
|       country|sumQuantity|
+--------------+-----------+
|United Kingdom|    4008533|
|   Netherlands|     200128|
|          EIRE|     136329|
|       Germany|     117448|
|        France|     109849|
|     Australia|      83653|
|        Sweden|      35637|
|   Switzerland|      29778|
|         Spain|      26824|
|         Japan|      25218|
+--------------+-----------+



#### 8. How are the sales figure like for each country?
By using sum(UnitPrice * Quantity), we can calcualte total sales in pounds(£). Then we group by country, order by sum of sales in descending order, and see that in UK the retail store achieved the largest sales figures.

In [24]:
spark.sql("""select country,
                sum(UnitPrice * Quantity) as sumSales
            from df1
            group by country
            order by sumSales desc""").show()

+---------------+------------------+
|        country|          sumSales|
+---------------+------------------+
| United Kingdom|  6767873.39400001|
|    Netherlands| 284661.5399999992|
|           EIRE|250285.21999999924|
|        Germany|221698.21000000037|
|         France|196730.84000000043|
|      Australia|137077.26999999987|
|    Switzerland| 55739.40000000008|
|          Spain| 54774.58000000016|
|        Belgium|40910.960000000014|
|         Sweden| 36595.90999999998|
|          Japan|35340.619999999995|
|         Norway| 35163.46000000001|
|       Portugal|29059.809999999954|
|        Finland|22326.739999999994|
|Channel Islands| 20086.28999999999|
|        Denmark| 18768.13999999999|
|          Italy|16890.509999999995|
|         Cyprus|12946.289999999994|
|        Austria|10154.319999999996|
|      Singapore| 9120.390000000001|
+---------------+------------------+
only showing top 20 rows



#### 9. What are the top 10 countries with the most return orders?
We can use count(distinct InvoiceNo) to calculate the total number of orders and since the returned order begins with "C" rather than integer, we can use wildcard like 'C%' to filter out those returned orders. Then we group by country and see that UK has the most return orders.

In [25]:
spark.sql("""select country,
                count(distinct InvoiceNo) as sumReturn
            from df1
            where InvoiceNo like 'C%'
            group by country
            order by sumReturn desc
            limit 10""").show()

+--------------+---------+
|       country|sumReturn|
+--------------+---------+
|United Kingdom|     3208|
|       Germany|      146|
|        France|       69|
|          EIRE|       59|
|       Belgium|       21|
|   Switzerland|       20|
|         Italy|       17|
|         Spain|       15|
|      Portugal|       13|
|     Australia|       12|
+--------------+---------+



# F. More business questions

## 10. What are the top key words?
First, we have to prepare the data we need and then we prepare the schema of the dataframe. Finally, we make the word_count_df that sorts words in descending order

Data preparation section:
Step 1: Select & change the descriptions to lower case so when we count them later, same word with different capitalization will not be counted multiple times

Step 2: Transform step 1 into a resilient distributed dataset (RDD) and use flatMap to Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. The function we applied here is to split each row of description by space so we extract the individual words

Step 3: Then we use the map transformation to apply a function to every element of the previous RDD (individual words) and return a new RDD. The function we applied here is to create a tuple that appends an integer 1 after each word. The integer 1 could later be used to count.

Step 4: Use reduceByKey we sum all the integer values for each unique key

Step 5: We sort by the count values in descending order

#### 

In [26]:
word_count = spark.sql("select lower(description) as words from df1")\
    .rdd.flatMap(lambda row: row["words"].split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x + y)\
    .sortBy(lambda x:x[1], False)

Data frame creation section:
1. Create schema
    For the two columns that we will have in our table, "Word" will be of StringType and "Count" will be of  
    IntegerType
2. Create DataFrame
    First create DF and pass in "word_count" as data and "schema" as schema
    Afterwards, we have to filter out the row where the word is '', since it has the highest count and it is not a  
    word

In [27]:
# Create schema
schema = StructType([
    StructField("Word",StringType(),True),
    StructField("Count",IntegerType(),True)
])

# Create DF
word_count_df = spark.createDataFrame(word_count, schema) # Create DF
word_count_df = word_count_df.filter(word_count_df["Word"] != '') # Filter DF
word_count_df.show(100)

+----------+-----+
|      Word|Count|
+----------+-----+
|       set|41623|
|        of|41479|
|       bag|38452|
|       red|32590|
|     heart|29443|
| retrospot|27029|
|   vintage|26043|
|    design|24007|
|      pink|20521|
| christmas|19334|
|       box|18359|
|      cake|16630|
|     white|16360|
|     metal|15897|
|     jumbo|15737|
|     lunch|15211|
|         3|15040|
|      blue|14047|
|   hanging|13220|
|    holder|12975|
|      sign|12722|
|      pack|12144|
|   t-light|11531|
|     paper|10845|
|     small|10592|
|    wooden|10183|
|         6| 9782|
|     cases| 9655|
|      card| 9621|
|     glass| 8997|
|        12| 8958|
|       tea| 8957|
|  polkadot| 8937|
|decoration| 8792|
|  spaceboy| 8713|
|    bottle| 8703|
|        in| 8563|
|       and| 8018|
|       hot| 8005|
|      home| 8001|
|    pantry| 7760|
|     large| 7691|
|       tin| 7640|
|     water| 7625|
|   regency| 7195|
|      with| 7145|
|   ceramic| 7109|
|         4| 6886|
|   doormat| 6847|
|   paisley|

# Regarding monthly data

## 11. What are the total sales by month in British Pounds?
To get the monthly sales in British Pounds, we need the invoice date, quantity, and unit price. 
First we create the monthly sales in British Pounds data
    Step 1: Create a new column in the original data frame called "InvoiceMonth" by concatenating "Year" and "Month" 
    with "-" and cast that to date type. So the first day of every month will represent the entire month 
    Step 2: Select "InvoiceMonth", "Quantity", and "UnitPrice" column and transform it into a RDD
    Step 3: Use map to transform 3 columns to 2 columns by multiplying "Quantity" with "UnitPrice" to get total sales 
    in Pounds
    Step 4: Sum the integer values of each unique key using reduceByKey
    Step 5: Sort it by Month
Second we create the schema for the two column data table: "Month" with date type and "Total_Sales" with double type
Third we create the data frame

In [28]:
# Prepare monthly sales in British Pounds data
sales_m = df.withColumn("InvoiceMonth",concat_ws("-",col("Year"),col("Month")).cast("date"))\
    .select("InvoiceMonth", "Quantity", "UnitPrice").rdd\
    .map(lambda row:(row[0],row[1]*row[2]))\
    .reduceByKey(lambda x, y: x + y)\
    .sortByKey()

# Create schema
schema_m = StructType([
    StructField("Month",DateType(),True),
    StructField("Total_Sales",DoubleType(),True)
])

# Create DF
monthly_sales_pounds = spark.createDataFrame(sales_m, schema_m) # Create DF
monthly_sales_pounds.show()

+----------+------------------+
|     Month|       Total_Sales|
+----------+------------------+
|2010-12-01|  554604.020000018|
|2011-01-01|475074.38000001636|
|2011-02-01| 436546.1500000147|
|2011-03-01| 579964.6100000151|
|2011-04-01| 426047.8510000125|
|2011-05-01|  648251.080000003|
|2011-06-01| 608013.1600000106|
|2011-07-01|  574238.481000012|
|2011-08-01| 616368.0000000028|
|2011-09-01| 931440.3719999959|
|2011-10-01| 974603.5899999909|
|2011-11-01|1132407.7399999578|
|2011-12-01| 342524.3800000034|
+----------+------------------+



## 12. What are the total sales by month in quantities?
This is similar to previous step except when we prepare the monthly sales in quantity data, we do not need the "UnitPrice" column. Then we create the schema and create the DF

In [29]:
sales_m_q = df.withColumn("InvoiceMonth",concat_ws("-",col("Year"),col("Month")).cast("date"))\
    .select("InvoiceMonth", "Quantity").rdd\
    .reduceByKey(lambda x, y: x + y)\
    .sortByKey()

# Create schema
schema_m_q = StructType([
    StructField("Month",DateType(),True),
    StructField("Total_Quantity",IntegerType(),True)
])

# Create DF
monthly_sales_quan = spark.createDataFrame(sales_m_q, schema_m_q) # Create DF
monthly_sales_quan.show()

+----------+--------------+
|     Month|Total_Quantity|
+----------+--------------+
|2010-12-01|        296362|
|2011-01-01|        269379|
|2011-02-01|        262833|
|2011-03-01|        344012|
|2011-04-01|        278585|
|2011-05-01|        367852|
|2011-06-01|        356922|
|2011-07-01|        363418|
|2011-08-01|        386612|
|2011-09-01|        537496|
|2011-10-01|        569666|
|2011-11-01|        669915|
|2011-12-01|        203837|
+----------+--------------+



Join the two data frames to get a more comprehensive view of the monthly sales situation.

In [30]:
monthly_sales = monthly_sales_pounds.join(monthly_sales_quan, "Month").sort("Month")
monthly_sales.show()

+----------+------------------+--------------+
|     Month|       Total_Sales|Total_Quantity|
+----------+------------------+--------------+
|2010-12-01|  554604.020000018|        296362|
|2011-01-01|475074.38000001636|        269379|
|2011-02-01| 436546.1500000147|        262833|
|2011-03-01| 579964.6100000151|        344012|
|2011-04-01| 426047.8510000125|        278585|
|2011-05-01|  648251.080000003|        367852|
|2011-06-01| 608013.1600000106|        356922|
|2011-07-01|  574238.481000012|        363418|
|2011-08-01| 616368.0000000028|        386612|
|2011-09-01| 931440.3719999959|        537496|
|2011-10-01| 974603.5899999909|        569666|
|2011-11-01|1132407.7399999578|        669915|
|2011-12-01| 342524.3800000034|        203837|
+----------+------------------+--------------+



In [31]:
monthly_sales.toPandas().to_csv('monthly_sales.csv') #save to csv

# Regarding daily data

## 13. What are the total sales by day in British Pounds?
This is similar to the question that asks for monthly sales in British Pounds. Except for here, we do not need to concact Year and Month together as the transformation of "InvoiceDate" in the beginning of the notebook has already transformed the column to date type and it's unit is to days rather than to months as in question 11.
The rest of the steps are the same as to question 11.

In [32]:
sales_d = df.select("InvoiceDate", "Quantity", "UnitPrice").rdd\
    .map(lambda row: (row[0], row[1]*row[2]))\
    .reduceByKey(lambda x, y: x + y)\
    .sortByKey()

# Create schema
schema_d = StructType([
    StructField("Date",DateType(),True),
    StructField("Total_Sales",DoubleType(),True)
])

# Create DF
daily_sales_pounds = spark.createDataFrame(sales_d, schema_d) # Create DF
daily_sales_pounds.show()

+----------+------------------+
|      Date|       Total_Sales|
+----------+------------------+
|2010-12-01| 46051.26000000007|
|2010-12-02|  45775.4299999999|
|2010-12-03|22598.460000000086|
|2010-12-05|31380.600000000162|
|2010-12-06|30465.080000000165|
|2010-12-07| 53125.99000000011|
|2010-12-08|38048.680000000095|
|2010-12-09|37177.850000000035|
|2010-12-10| 32005.35000000008|
|2010-12-12| 17217.62000000005|
|2010-12-13|27429.430000000066|
|2010-12-14| 26913.41000000016|
|2010-12-15| 29310.79000000012|
|2010-12-16| 48011.52999999994|
|2010-12-17|18162.420000000035|
|2010-12-19|  7399.78999999999|
|2010-12-20| 17574.58000000003|
|2010-12-21|15750.460000000005|
|2010-12-22| 4821.119999999998|
|2010-12-23| 5384.169999999993|
+----------+------------------+
only showing top 20 rows



## 14. What are the total sales by day in quantities?
This is similar to question 12, except for here we change everything from monthly to daily.

In [33]:
sales_d_q = df.select("InvoiceDate", "Quantity").rdd\
    .reduceByKey(lambda x, y: x + y)\
    .sortByKey()

# Create schema
schema_d_q = StructType([
    StructField("Date",DateType(),True),
    StructField("Total_Quantity",IntegerType(),True)
])

# Create DF
daily_sales_quan = spark.createDataFrame(sales_d_q, schema_d_q) # Create DF
daily_sales_quan.show()

+----------+--------------+
|      Date|Total_Quantity|
+----------+--------------+
|2010-12-01|         24032|
|2010-12-02|         20855|
|2010-12-03|         11548|
|2010-12-05|         16394|
|2010-12-06|         16095|
|2010-12-07|         19351|
|2010-12-08|         21275|
|2010-12-09|         16904|
|2010-12-10|         15388|
|2010-12-12|         10561|
|2010-12-13|         15234|
|2010-12-14|         17108|
|2010-12-15|         18169|
|2010-12-16|         29482|
|2010-12-17|         10517|
|2010-12-19|          3735|
|2010-12-20|         12617|
|2010-12-21|         10888|
|2010-12-22|          3053|
|2010-12-23|          3156|
+----------+--------------+
only showing top 20 rows



Join the two data frames together to get a more comprehensive view of the daily sales situation

In [34]:
daily_sales = daily_sales_pounds.join(daily_sales_quan, "Date").sort("Date")
daily_sales.show()

+----------+------------------+--------------+
|      Date|       Total_Sales|Total_Quantity|
+----------+------------------+--------------+
|2010-12-01| 46051.26000000007|         24032|
|2010-12-02|  45775.4299999999|         20855|
|2010-12-03|22598.460000000086|         11548|
|2010-12-05|31380.600000000162|         16394|
|2010-12-06|30465.080000000165|         16095|
|2010-12-07| 53125.99000000011|         19351|
|2010-12-08|38048.680000000095|         21275|
|2010-12-09|37177.850000000035|         16904|
|2010-12-10| 32005.35000000008|         15388|
|2010-12-12| 17217.62000000005|         10561|
|2010-12-13|27429.430000000066|         15234|
|2010-12-14| 26913.41000000016|         17108|
|2010-12-15| 29310.79000000012|         18169|
|2010-12-16| 48011.52999999994|         29482|
|2010-12-17|18162.420000000035|         10517|
|2010-12-19|  7399.78999999999|          3735|
|2010-12-20| 17574.58000000003|         12617|
|2010-12-21|15750.460000000005|         10888|
|2010-12-22| 

In [35]:
daily_sales.toPandas().to_csv('daily_sales.csv') #save to csv

# Return Situation
## 15. Which country has the most amount of return invoices?
Step 1: create a returndf by using the code from question 9
Step 2: create a buydf by using similar logic but here the wild card is not like 'C%' so we get InvoiceNo without "C"
    in the beginning to indicate that they were not returned

In [36]:
print("Return DF")
returndf = spark.sql("""select country as Country,
                count(distinct InvoiceNo) as sumReturn
            from df1
            where InvoiceNo like 'C%'
            group by Country""")
returndf.show()

print("Buy DF")
buydf = spark.sql("""select country as country_b,
              count(distinct InvoiceNo) as sumBuy
          from df1
          where InvoiceNo not like 'C%'
          group by country_b""")
buydf.show()


Return DF
+------------------+---------+
|           Country|sumReturn|
+------------------+---------+
|            Sweden|       10|
|         Singapore|        3|
|           Germany|      146|
|            France|       69|
|            Greece|        1|
|European Community|        1|
|           Belgium|       21|
|           Finland|        7|
|             Malta|        5|
|             Italy|       17|
|              EIRE|       59|
|            Norway|        4|
|             Spain|       15|
|           Denmark|        3|
|            Israel|        1|
|   Channel Islands|        7|
|               USA|        2|
|            Cyprus|        4|
|      Saudi Arabia|        1|
|       Switzerland|       20|
+------------------+---------+
only showing top 20 rows

Buy DF
+------------------+------+
|         country_b|sumBuy|
+------------------+------+
|            Sweden|    36|
|         Singapore|     7|
|           Germany|   457|
|               RSA|     1|
|            Fran

Step 3: Join returndf and buydf together. Since we care about the returns, we use left_outer to make sure all 
    countries with customers that have returned will be captured by our data frame. Since the newly joined 
    return_ratio_df will have two country columns, we only select one of them and rank them by sum of returns in 
    descending order

In [37]:
return_ratio_df = returndf.join(buydf, returndf["Country"] == buydf["country_b"], "left_outer")
return_ratio_df = return_ratio_df.select(return_ratio_df["Country"],
                                         return_ratio_df["sumBuy"],return_ratio_df["sumReturn"])
return_ratio_df.sort(F.col("sumReturn").desc()).show()

+---------------+------+---------+
|        Country|sumBuy|sumReturn|
+---------------+------+---------+
| United Kingdom| 16649|     3208|
|        Germany|   457|      146|
|         France|   389|       69|
|           EIRE|   260|       59|
|        Belgium|    98|       21|
|    Switzerland|    51|       20|
|          Italy|    38|       17|
|          Spain|    90|       15|
|       Portugal|    57|       13|
|      Australia|    57|       12|
|         Sweden|    36|       10|
|          Japan|    19|        9|
|Channel Islands|    26|        7|
|        Finland|    41|        7|
|    Netherlands|    95|        6|
|          Malta|     5|        5|
|         Poland|    19|        5|
|         Cyprus|    16|        4|
|         Norway|    36|        4|
| Czech Republic|     2|        3|
+---------------+------+---------+
only showing top 20 rows



## 16. Which country has the highest and the lowest return ratios?
First have to add a new column "ReturnRatio" by computing the ratio between sum of return and sum of purchase.
To find the highest return ratio, we rank it in descending order. Here we see anoutlier of Czech Republich where only 2 purchase orders were made but received 3 return orders from December 2010 to December 2011. It is very possible that someone from there purchased an item a couple months before December 2010 and its purchased was not noted within our data frame but the return was.
To find the lowest return ratio, we sort it by ascending order.

In [38]:
print ("Return ratios ranked in descending order")
return_ratio_df = return_ratio_df.withColumn("ReturnRatio", return_ratio_df.sumReturn/return_ratio_df.sumBuy)\
    .sort(F.col("ReturnRatio").desc())
return_ratio_df.show()

print ("Return ratios ranked in ascending order")
return_ratio_df.sort(F.col("ReturnRatio")).show()

Return ratios ranked in descending order
+------------------+------+---------+-------------------+
|           Country|sumBuy|sumReturn|        ReturnRatio|
+------------------+------+---------+-------------------+
|    Czech Republic|     2|        3|                1.5|
|      Saudi Arabia|     1|        1|                1.0|
|             Malta|     5|        5|                1.0|
|             Japan|    19|        9|0.47368421052631576|
|             Italy|    38|       17| 0.4473684210526316|
|         Singapore|     7|        3|0.42857142857142855|
|               USA|     5|        2|                0.4|
|       Switzerland|    51|       20|0.39215686274509803|
|           Germany|   457|      146|0.31947483588621445|
|            Sweden|    36|       10| 0.2777777777777778|
|   Channel Islands|    26|        7| 0.2692307692307692|
|            Poland|    19|        5| 0.2631578947368421|
|European Community|     4|        1|               0.25|
|            Cyprus|    16|    

In [39]:
return_ratio_df.toPandas().to_csv('returnratio.csv') #save to csv

## 17. What is the return ratio situation like for countries with more than 10 invoices?
Filter by column "sumBuy" and set it to greater than 10. Since return_ratio_df is already sorted by return ratio in descending order, the output will be in that order as well adn we can see Japan has the highest return ratio for more than 10 orders and Netherlands on the contrary has the lowest. 

In [40]:
return_ratio_df10 = return_ratio_df.filter(return_ratio_df["sumBuy"] > 10)
return_ratio_df10.show()

+---------------+------+---------+-------------------+
|        Country|sumBuy|sumReturn|        ReturnRatio|
+---------------+------+---------+-------------------+
|          Japan|    19|        9|0.47368421052631576|
|          Italy|    38|       17| 0.4473684210526316|
|    Switzerland|    51|       20|0.39215686274509803|
|        Germany|   457|      146|0.31947483588621445|
|         Sweden|    36|       10| 0.2777777777777778|
|Channel Islands|    26|        7| 0.2692307692307692|
|         Poland|    19|        5| 0.2631578947368421|
|         Cyprus|    16|        4|               0.25|
|       Portugal|    57|       13|0.22807017543859648|
|           EIRE|   260|       59|0.22692307692307692|
|        Belgium|    98|       21|0.21428571428571427|
|      Australia|    57|       12|0.21052631578947367|
| United Kingdom| 16649|     3208|0.19268424530001801|
|         France|   389|       69|0.17737789203084833|
|        Finland|    41|        7|0.17073170731707318|
|         

In [41]:
return_ratio_df10.toPandas().to_csv('returnratio10.csv') #save to csv

## 18.  What is the relationship between average unit price and the quantity sold?
Since the unit price of an item is constantly changing, we use the average to measure for each item and we use the sum of quantity to calculate quantity sold. We then group by stock code and sort sum of quantity sold either descendingly or ascendingly

In [42]:
print("Sort 'sumQuan' in descending order")
unitp_salesdf = spark.sql("""select StockCode,
                                avg(distinct UnitPrice) as avgUnitPrice,
                                sum(Quantity) as sumQuan
                            from df1
                            group by StockCode""")
unitp_salesdf = unitp_salesdf.sort(F.col("sumQuan").desc())
unitp_salesdf.show()

print("Sort 'sumQuan' in ascending order")
unitp_salesdf.sort(F.col("sumQuan")).show(50)

Sort 'sumQuan' in descending order
+---------+-------------------+-------+
|StockCode|       avgUnitPrice|sumQuan|
+---------+-------------------+-------+
|    84077|              0.284|  53215|
|    22197| 1.0666666666666667|  48712|
|   85099B|  2.098888888888889|  45066|
|    84879| 1.5699999999999998|  35314|
|   85123A|              3.355|  34204|
|    21212|              0.655|  33409|
|    23084|             2.4175|  27094|
|    22492| 0.9433333333333334|  25880|
|    22616|0.33875000000000005|  25321|
|    21977|              0.655|  24163|
|    17003|            7.48125|  22960|
|    22178|              1.545|  21984|
|    15036|              0.696|  21132|
|    21915|             1.4375|  20912|
|    22386| 2.1557142857142852|  19709|
|    23203| 1.8439999999999999|  18926|
|    84991|              0.655|  17539|
|    20725|               2.41|  17345|
|    22469| 1.7385714285714282|  16640|
|   85099F| 2.1557142857142857|  16557|
+---------+-------------------+-------+
only 

Here we see generally higher prices are related with less sales. Interestingly, we see many negative numbers for sum of quantity sold, which could mean that they were not bought within our analysis period (Dec 2010 - Dec 2011) but returned during this time.

In [43]:
unitp_salesdf.toPandas().to_csv('unitpricesales.csv') #save to csv