In [None]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


# **Basics**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# get file url from the spark website
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!ls

sample_data  spark-3.1.2-bin-hadoop2.7.tgz


In [None]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\work\\spark-2.4.8-bin-hadoop2.7'

# **Assignment**

Dataset is [here](https://www.kaggle.com/benroshan/ecommerce-data?select=Order+Details.csv).

**Questions:**
1. In which order, there was maximum loss and in which order, there was maximum profit?

2. Which category has been the most profitable and the least profitable (maybe have caused a loss as well)?

3. From which state, most orders have been placed? 

4. Which category of product has been sold the most?

5. For which all months, the sales target have been achieved? So, for each month, there's a sales target for each category. Find out for which category and which month, the sales target was achieved and not achieved.

6. Which customer has spent the most amount on the website?

## **Reading the Data**

In [2]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f


spark = SparkSession.builder.master('local[1]').appName('E-commerce').getOrCreate()

In [4]:
from pyspark.sql.types import FloatType, StringType, StructType, StructField, IntegerType, DateType


order_deets_schema = StructType([
                                 StructField("Order ID", StringType(), True),
                                 StructField("Amount", FloatType(), True),
                                 StructField("Profit", FloatType(), True),
                                 StructField("Quantity", IntegerType(), True),
                                 StructField("Category", StringType(), True),
                                 StructField("Sub-category", StringType(), True)
])
orders_schema = StructType([
                            StructField("Order ID", StringType(), True),
                            StructField("Order Date", DateType(), True),
                            StructField("CustomerName", StringType(), True),
                            StructField("State", StringType(), True),
                            StructField("City", StringType(), True),
])
target_schema = StructType([
                            StructField("Month of Order Date", StringType(), True),
                            StructField("Category", StringType(), True),
                            StructField("Target", FloatType(), True),
])
order_deets_df = spark.read.option("dateFormat", "dd-MM-yyyy").csv("archive/order_details.csv", header = True, schema = order_deets_schema).dropna()
orders_df = spark.read.option("dateFormat", "dd-MM-yyyy").csv("archive/orders.csv", header = True, schema = orders_schema)
target_df = spark.read.option("dateFormat", "dd-MM-yyyy").csv("archive/target.csv", header = True, schema = target_schema)

In [67]:
orders_df.show()

+--------+----------+------------+-----------------+------------------+
|Order ID|Order Date|CustomerName|            State|              City|
+--------+----------+------------+-----------------+------------------+
| B-25601|2018-04-01|      Bharat|          Gujarat|         Ahmedabad|
| B-25602|2018-04-01|       Pearl|      Maharashtra|              Pune|
| B-25603|2018-04-03|       Jahan|   Madhya Pradesh|            Bhopal|
| B-25604|2018-04-03|      Divsha|        Rajasthan|            Jaipur|
| B-25605|2018-04-05|     Kasheen|      West Bengal|           Kolkata|
| B-25606|2018-04-06|       Hazel|        Karnataka|         Bangalore|
| B-25607|2018-04-06|    Sonakshi|Jammu and Kashmir|           Kashmir|
| B-25608|2018-04-08|     Aarushi|       Tamil Nadu|           Chennai|
| B-25609|2018-04-09|      Jitesh|    Uttar Pradesh|           Lucknow|
| B-25610|2018-04-09|      Yogesh|            Bihar|             Patna|
| B-25611|2018-04-11|       Anita|          Kerala |Thiruvananth

In [5]:
order_deets_df.printSchema()
orders_df.printSchema()
target_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Amount: float (nullable = true)
 |-- Profit: float (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-category: string (nullable = true)

root
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)

root
 |-- Month of Order Date: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Target: float (nullable = true)



## **1. Max Loss and Max Profit Order**

In [6]:
from pyspark.sql.functions import sum


order_grouped = order_deets_df.groupBy("Order ID").agg(sum("Profit").alias("Profit Per Order"))

**Max Profit and Min Profit Using Order By**

In [13]:
order_pl = order_grouped.orderBy("Profit Per Order", ascending = False)
order_pl.collect()

[Row(Order ID='B-25973', Profit Per Order=1970.0),
 Row(Order ID='B-25855', Profit Per Order=1432.0),
 Row(Order ID='B-25656', Profit Per Order=1021.0),
 Row(Order ID='B-26093', Profit Per Order=1020.0),
 Row(Order ID='B-25761', Profit Per Order=984.0),
 Row(Order ID='B-25602', Profit Per Order=975.0),
 Row(Order ID='B-25853', Profit Per Order=970.0),
 Row(Order ID='B-25923', Profit Per Order=966.0),
 Row(Order ID='B-26051', Profit Per Order=906.0),
 Row(Order ID='B-26073', Profit Per Order=889.0),
 Row(Order ID='B-25830', Profit Per Order=873.0),
 Row(Order ID='B-25858', Profit Per Order=868.0),
 Row(Order ID='B-25993', Profit Per Order=864.0),
 Row(Order ID='B-26099', Profit Per Order=859.0),
 Row(Order ID='B-25803', Profit Per Order=820.0),
 Row(Order ID='B-25862', Profit Per Order=745.0),
 Row(Order ID='B-25850', Profit Per Order=685.0),
 Row(Order ID='B-26055', Profit Per Order=683.0),
 Row(Order ID='B-25878', Profit Per Order=672.0),
 Row(Order ID='B-25955', Profit Per Order=669.

In [14]:
import pyspark.sql.functions as F
print(order_pl.first())

last=order_pl.orderBy(F.monotonically_increasing_id().desc())
print(last.first())

Row(Order ID='B-25973', Profit Per Order=1970.0)
Row(Order ID='B-25798', Profit Per Order=-1836.0)


**Max Profit and Min Profit using Max and Min Functions**

In [15]:
order_grouped.agg(f.max(f.col("Profit Per Order")).alias("Max Profit")).show()
order_grouped.agg(f.min(f.col("Profit Per Order")).alias("Min Profit")).show()

+----------+
|Max Profit|
+----------+
|    1970.0|
+----------+

+----------+
|Min Profit|
+----------+
|   -1836.0|
+----------+



## **2. Max Profit and Min Profit Category**

In [16]:
category_pl = order_deets_df.groupBy("Category").agg(sum("Profit").alias("Profit Per Category")).orderBy("Profit Per Category", ascending = False)

**Most Profitable Category**

In [17]:
category_pl.first()

Row(Category='Clothing', Profit Per Category=11163.0)

**Least Profitable Category**

In [18]:
category_pl.tail(1)

AttributeError: 'DataFrame' object has no attribute 'tail'

## **3. Most Orders By State**

In [19]:
state_order = orders_df.groupBy("State").count().orderBy("Count", ascending = False)

In [20]:
state_order.first()

Row(State='Madhya Pradesh', count=101)

## **4. Most Sold Categries**

In [21]:
category_count = order_deets_df.groupBy("Category").agg(sum("Quantity").alias("Products Sold Per Cateory")).orderBy("Products Sold Per Cateory", ascending = False)

In [22]:
category_count.first()

Row(Category='Clothing', Products Sold Per Cateory=3516)

## **5. Target Achieved**

In [65]:
months = ["", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
to_month = f.udf(lambda x: str(months[x]))
mod_year = f.udf(lambda x: str(x % 2000))


# Getting order timestamps
orders_na = orders_df.select("Order ID", "Order Date")\
.withColumn("Order Year", mod_year(f.year("Order Date")))\
.withColumn("Order Month", to_month(f.month("Order Date")))



TypeError: not all arguments converted during string formatting

In [64]:
orders_na.collect()

Py4JJavaError: An error occurred while calling o1021.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 72.0 failed 1 times, most recent failure: Lost task 0.0 in stage 72.0 (TID 5068, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 85, in <lambda>
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\DMS240~1\AppData\Local\Temp/ipykernel_20128/753241018.py", line 3, in <lambda>
TypeError: unsupported operand type(s) for %: 'NoneType' and 'int'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:260)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:304)
	at org.apache.spark.sql.Dataset$$anonfun$49.apply(Dataset.scala:3262)
	at org.apache.spark.sql.Dataset$$anonfun$49.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 85, in <lambda>
  File "C:\work\spark-2.4.8-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\DMS240~1\AppData\Local\Temp/ipykernel_20128/753241018.py", line 3, in <lambda>
TypeError: unsupported operand type(s) for %: 'NoneType' and 'int'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:260)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [61]:
order_my = order_deets_df.join(orders_na, on = "Order ID", how = "left")\
.select(f.concat_ws("-", f.col("Order Month"), f.col("Order Year")).alias("Date"), "Category", "Amount")

DataFrame[Order ID: string, Order Date: date, Order Year: string, Order Month: string]

In [24]:
order_my_grouped = order_my.groupBy(["Date", "Category"]).agg(sum("Amount").alias("Amount"))

In [25]:
get_achieved = f.udf(lambda x, y: "No" if x < y else "Yes")

temp_target = target_df.select(f.col("Month of Order Date").alias("Date"), "Category", "Target")

temp_target.join(order_my_grouped, on = ["Date", "Category"], how = "inner")\
.withColumn("Achieved", get_achieved(f.col("Amount"), f.col("Target"))).show(5)

+------+-----------+-------+-------+--------+
|  Date|   Category| Target| Amount|Achieved|
+------+-----------+-------+-------+--------+
|Feb-19|Electronics|16000.0|12593.0|      No|
|Nov-18|Electronics| 9000.0|16651.0|     Yes|
|Mar-19|  Furniture|11800.0|16659.0|     Yes|
|Sep-18|Electronics| 9000.0| 7207.0|      No|
|Aug-18|   Clothing|14000.0|11822.0|      No|
+------+-----------+-------+-------+--------+
only showing top 5 rows



## **6. Best Customer**

In [26]:
order_amount = order_deets_df.groupBy("Order ID").agg(sum("Amount").alias("Amount"))
best_customer = orders_df.join(order_amount, on = "Order ID", how = "inner")\
.groupBy("CustomerName").agg(sum("Amount").alias("Total Purchase Amount")).orderBy("Total Purchase Amount", ascending = False)

In [27]:
best_customer.first()

Row(CustomerName='Yaanvi', Total Purchase Amount=9177.0)