# 1

## 

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json
import re
import datetime
from ast import literal_eval
sc = SparkContext("local", "Bebop")
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession
spark

In [2]:
df = spark.read.csv("data.csv",header=True)
df.show()

+---------+-----------+-------------------+---------+--------------------+
|InvoiceNo|Customer_ID|               Date|   Planet|            Purchase|
+---------+-----------+-------------------+---------+--------------------+
|   536365|    17850.0|        Dec 01 2010|   Abydos|[{'ItemNo': u'851...|
|   536366|    17850.0|              40513|   Abydos|[{'ItemNo': 22633...|
|   536367|    13047.0|         12/01/2010|   Abydos|"[{'ItemNo': 8487...|
|   536368|    13047.0|        Dec 01 2010|   Abydos|[{'ItemNo': 22960...|
|   536369|    13047.0|         12/01/2010|   Abydos|[{'ItemNo': 21756...|
|   536370|    12583.0|              40513|Altair IV|[{'ItemNo': 22728...|
|   536371|    13748.0|2010-12-01 09:00:00|   Abydos|[{'ItemNo': 22086...|
|   536372|    17850.0|              40513|   Abydos|[{'ItemNo': 22632...|
|   536373|    17850.0|        Dec 01 2010|   Abydos|[{'ItemNo': u'851...|
|   536374|    15100.0|              40513|   Abydos|[{'ItemNo': 21258...|
|   536375|    17850.0|  

matching date format

In [3]:
# convert 01/04/2011 format to 2011-04-01 00:00:00 format
convert_monthdateformat = F.udf(lambda s: datetime.datetime.strptime(s,"%m/%d/%Y").strftime("%Y-%m-%d %H:%M:%S"))
df_date_convert_monthdateformat = df.filter(F.col("Date").rlike(r"\d{2}/\d{2}/\d{4}"))
df_date_convert_monthdateformat = df_date_convert_monthdateformat.withColumn("Date",convert_monthdateformat(df_date_convert_monthdateformat["Date"]))
# convert Jan 04 2011 format to 2011-04-01 00:00:00 format
convert_monthletterformat = F.udf(lambda s: datetime.datetime.strptime(s,"%b %d %Y").strftime("%Y-%m-%d %H:%M:%S"))
df_date_convert_monthletterformat = df.filter(F.col("Date").rlike(r"[A-Za-z]{3}\s\d{2}\s\d{4}"))
df_date_convert_monthletterformat = df_date_convert_monthletterformat.withColumn("Date",convert_monthletterformat(df_date_convert_monthletterformat["Date"]))
# convert 5 digit code format (40886) to 2011-04-01 00:00:00 format
convert_digitformat = F.udf(lambda s: (datetime.date(1899,12,30)+datetime.timedelta(days=int(s))).strftime("%Y-%m-%d %H:%M:%S"))
df_date_convert_digitformat = df.filter(F.col("Date").rlike(r"\d{5}"))
df_date_convert_digitformat = df_date_convert_digitformat.withColumn("Date",convert_digitformat(df_date_convert_digitformat["Date"]))
# capture 2011-04-01 00:00:00 format dates
df_actualformat = df.filter(F.col("Date").rlike(r"\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}"))
# combine all date formats
Bebop_df = df_date_convert_monthdateformat.union(df_date_convert_monthletterformat).union(df_date_convert_digitformat).union(df_actualformat)

In [4]:
Bebop_df

DataFrame[InvoiceNo: string, Customer_ID: string, Date: string, Planet: string, Purchase: string]

In [5]:
# Changing data types InvoiceNo: INT, Customer_ID: INT, Date: Date, Planet: string, Purchase, string
Bebop_df = Bebop_df.withColumn("InvoiceNo",Bebop_df["InvoiceNo"].cast(IntegerType()))
Bebop_df = Bebop_df.withColumn("Customer_ID",Bebop_df["Customer_ID"].cast(IntegerType()))
# Bebop_df = Bebop_df.withColumn("Date",Bebop_df["Date"].cast(DateType()))

In [6]:
Bebop_df

DataFrame[InvoiceNo: int, Customer_ID: int, Date: string, Planet: string, Purchase: string]

In [7]:
Bebop_df.createOrReplaceTempView("Bebop")

In [8]:
spark.sql("create table vn0hxf8.bebop as select * from Bebop")

DataFrame[]

In [9]:
# Creating invoices table
spark.sql("CREATE TABLE vn0hxf8.invoices AS SELECT InvoiceNo,Customer_ID,Date,Planet FROM vn0hxf8.bebop")

DataFrame[]

In [10]:
spark.sql('''CREATE TABLE vn0hxf8.orders AS
SELECT invoiceno,Customer_ID, ItemNo,CAST(UnitPrice AS DOUBLE) AS UnitPrice, CAST(TRIM(Quantity) AS INT) AS Quantity FROM(
SELECT invoiceno,Customer_ID, CASE WHEN ItemNo LIKE "%'%" THEN SUBSTRING(ItemNo,4,LENGTH(ItemNo)-4) ELSE ItemNo END AS ItemNo, UnitPrice, Quantity FROM(
SELECT invoiceno,Customer_ID,
split(split(results,",")[0],':')[1] AS  ItemNo,
regexp_replace(split(split(results,",")[1],':')[1], "[\\]|}]", "") AS UnitPrice,
regexp_replace(split(split(results,",")[3],':')[1], "[\\]|}]", "") AS Quantity
FROM (SELECT invoiceno,Customer_ID, split(translate(purchase, '"\\[|]|\""',''), "}, ") AS r
FROM vn0hxf8.bebop) t1 LATERAL VIEW explode(r) rr AS results ) data_cleaning) convert_data_types''')

DataFrame[]

In [11]:
spark.sql('''CREATE TABLE vn0hxf8.product AS
SELECT invoiceno, ItemNo,CAST(UnitPrice AS DOUBLE) AS UnitPrice, Description FROM(
SELECT invoiceno, CASE WHEN ItemNo LIKE "%'%" THEN SUBSTRING(ItemNo,4,LENGTH(ItemNo)-4) ELSE ItemNo END AS ItemNo, UnitPrice, 
CASE WHEN Description LIKE "%'%" THEN SUBSTRING(Description,4,LENGTH(Description)-4) ELSE Description END AS Description FROM(
SELECT invoiceno,
       split(split(results,",")[0],':')[1] AS  ItemNo,
       regexp_replace(split(split(results,",")[1],':')[1], "[\\]|}]", "") AS UnitPrice,
       regexp_replace(split(split(results,",")[2],':')[1], "[\\]|}]", "") AS Description
    FROM
       (SELECT invoiceno,
             split(translate(purchase, '"\\[|]|\""',''), "}, ") AS r
       FROM vn0hxf8.bebop) t1 LATERAL VIEW explode(r) rr AS results) data_cleaning) convert_data_types''')

DataFrame[]

In [12]:
#invoices table
spark.sql("desc vn0hxf8.invoices").show()
# sample data
spark.sql("select * from vn0hxf8.invoices").show(5,False)

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|  InvoiceNo|      int|   null|
|Customer_ID|      int|   null|
|       Date|   string|   null|
|     Planet|   string|   null|
+-----------+---------+-------+

+---------+-----------+-------------------+------+
|InvoiceNo|Customer_ID|Date               |Planet|
+---------+-----------+-------------------+------+
|536367   |13047      |2010-12-01 00:00:00|Abydos|
|536369   |13047      |2010-12-01 00:00:00|Abydos|
|536388   |16250      |2010-12-01 00:00:00|Abydos|
|536400   |13448      |2010-12-01 00:00:00|Abydos|
|536405   |14045      |2010-12-01 00:00:00|Abydos|
+---------+-----------+-------------------+------+
only showing top 5 rows



In [13]:
# orders table
spark.sql("desc vn0hxf8.orders").show()
# sample data
spark.sql("select * from vn0hxf8.orders").show(5)

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|  invoiceno|      int|   null|
|Customer_ID|      int|   null|
|     ItemNo|   string|   null|
|  UnitPrice|   double|   null|
|   Quantity|      int|   null|
+-----------+---------+-------+

+---------+-----------+------+---------+--------+
|invoiceno|Customer_ID|ItemNo|UnitPrice|Quantity|
+---------+-----------+------+---------+--------+
|   536371|      13748| 22086|     2.55|      80|
|   536378|      14688| 22386|     1.95|      10|
|   536378|      14688|85099C|     1.95|      10|
|   536378|      14688| 21033|     2.95|      10|
|   536378|      14688| 20723|     0.85|      10|
+---------+-----------+------+---------+--------+
only showing top 5 rows



In [14]:
# product table
spark.sql("desc vn0hxf8.product").show()
# sample data
spark.sql("select * from vn0hxf8.product").show(5)

+-----------+---------+-------+
|   col_name|data_type|comment|
+-----------+---------+-------+
|  invoiceno|      int|   null|
|     ItemNo|   string|   null|
|  UnitPrice|   double|   null|
|Description|   string|   null|
+-----------+---------+-------+

+---------+------+---------+--------------------+
|invoiceno|ItemNo|UnitPrice|         Description|
+---------+------+---------+--------------------+
|   536365|85123A|     2.55|    ADORIAN CROSSBOW|
|   536365| 71053|     3.39| WHITE METAL LANTERN|
|   536365|84406B|     2.75|CREAM CUPID HEART...|
|   536365|84029G|     3.39|KNITTED UNION FLA...|
|   536365|84029E|     3.39|RED WOOLLY HOTTIE...|
+---------+------+---------+--------------------+
only showing top 5 rows



 Data Model
![title](DM.png)

### 1.

In [15]:
#UNIQUE CUSTOMERS
spark.sql('''SELECT COUNT(*) AS unique_customers FROM (SELECT Customer_ID FROM vn0hxf8.invoices GROUP BY Customer_ID)
             unique_customers''').show()

+----------------+
|unique_customers|
+----------------+
|            4373|
+----------------+



### 2
SELECT Customer_ID, SUM(revenue_per_invoice) AS total_revenue FROM(
ORDER BY total_revenue desc

In [16]:
# second biggest customer
spark.sql('''SELECT Customer_ID AS second_biggest_customer FROM (
SELECT Customer_ID, SUM(revenue_per_invoice) AS total_revenue FROM(
SELECT Customer_ID, (Quantity*UnitPrice) AS revenue_per_invoice FROM vn0hxf8.orders WHERE Customer_ID IS NOT NULL
AND Quantity IS NOT NULL) subquery GROUP BY Customer_ID ORDER BY total_revenue desc LIMIT 2)
subquery1 ORDER BY total_revenue LIMIT 1 ''').show()

+-----------------------+
|second_biggest_customer|
+-----------------------+
|                  14646|
+-----------------------+



### 3

In [17]:
# number of customers visited multiple planets
spark.sql('''SELECT COUNT(*) AS total_multi_planet_customers FROM (
SELECT Customer_ID, COLLECT_SET(Planet) AS Planets_list FROM vn0hxf8.invoices WHERE Customer_ID IS NOT NULL
GROUP BY Customer_ID) multi_planet_customers WHERE SIZE(Planets_list) > 1''').show()

+----------------------------+
|total_multi_planet_customers|
+----------------------------+
|                           8|
+----------------------------+



### 4

In [18]:
# most price changes product
spark.sql(''' SELECT ItemNo AS most_price_changes_product FROM (
          SELECT ItemNo,COLLECT_SET(UnitPrice) AS changes_list FROM vn0hxf8.orders GROUP BY ItemNo
          ) price_changes ORDER BY SIZE(changes_list) DESC LIMIT 1''').show()

+--------------------------+
|most_price_changes_product|
+--------------------------+
|                         M|
+--------------------------+



### 5

In [19]:
# most attracted item
spark.sql('''SELECT ItemNo as most_attracted_item FROM(
SELECT ItemNo, COLLECT_SET(Customer_ID) AS customers_list FROM vn0hxf8.orders WHERE Customer_ID IS NOT NULL
GROUP BY ItemNo ) subquery ORDER BY SIZE(customers_list) DESC LIMIT 1
''').show()

+-------------------+
|most_attracted_item|
+-------------------+
|              22423|
+-------------------+



### 6

In [20]:
# customer - shortest average length of time between purchases
spark.sql(''' SELECT Customer_ID FROM (
SELECT Customer_ID, AVG(time_diff) AS average_purchase_time FROM (
SELECT Customer_ID,CAST(UNIX_TIMESTAMP(Date) - UNIX_TIMESTAMP(previous_date) AS DOUBLE) as time_diff FROM(
SELECT Customer_ID,Date,LAG(Date,1) over(Partition BY Customer_ID order by Date) as previous_date FROM vn0hxf8.invoices
WHERE Customer_ID IS NOT NULL order by Customer_ID) timediff) avg_time WHERE time_diff IS NOT NULL GROUP BY Customer_ID
)shorest_avg_time_customer ORDER BY average_purchase_time LIMIT 1''').show()


+-----------+
|Customer_ID|
+-----------+
|      12403|
+-----------+



### 7

In [21]:
# Total revenue of top 3 items
spark.sql(''' SELECT SUM(total_revenue) AS combined_revenue FROM(
SELECT InvoiceNo, ItemNo, SUM(revenue) AS total_revenue FROM(
SELECT InvoiceNo, ItemNo, UnitPrice*Quantity as revenue, COUNT(ItemNo) OVER (PARTITION BY InvoiceNo) as items_per_invoice
FROM vn0hxf8.orders WHERE InvoiceNo IS NOT NULL) subquery WHERE items_per_invoice >=3 GROUP BY InvoiceNo, ItemNo
ORDER BY total_revenue DESC LIMIT 3)subquery1''').show()

+----------------+
|combined_revenue|
+----------------+
|        20223.52|
+----------------+



## 3

In [22]:
def mystery(array):
    mylist = []
    for i in range(len(array)):
        for j in range(len(array)):
            elem1 = array[i]
            elem2 = array[j]
            if elem1 == elem2 and i!=j:
                if elem2 not in mylist:
                    mylist.append(elem2)
    return mylist

### 1
mystery function returns DUPLICATE elements in input array.

### 2
Run time complexity of mystery function is O(N * N)
Space Complexity O(N)
-- Where N is length of array.

### 3


In [23]:
# Efficient solution
def duplicates(array):
        duplicate_list = []
        unique_list = []
        
        for elem in array:
            if elem in unique_list:
                duplicate_list.append(elem)
            else:
                unique_list.append(elem)
        return duplicate_list

### 4
Run time complexity of mystery function is O(N)
Space Complexity O(2*N)
-- Where N is length of array.

### 5
mystery function is a two dimensional array and each element is compared based on its indices resulting O(N*N) time complexity with space complexiy of O(N) for storing results in mylist.
duplicates function iterates input array once and stores duplicates if the element is not present in unique list. This function is optimized resulting O(N) time complexity with space complexity of O(2*N).