In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions

In [2]:
spark = SparkSession \
    .builder \
    .appName("Market Analytics") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Load dataset

In [3]:
df = spark.read.format("csv").option("header", "true").load("dataset/tokopedia.csv")
df.show()

+--------------------+---------+---------+---------+--------------------+--------------------+
|          breadcrumb|item_sold|      pid|    price|               title|                 url|
+--------------------+---------+---------+---------+--------------------+--------------------+
|Dapur ;Bekal ;Cup...|        2|389668528|   70.000|Mayonnaise & Ketc...|https://www.tokop...|
|Rumah Tangga ;Keb...|        0|390053120|  160.000|BOLDe Super BROOM...|https://www.tokop...|
|Perawatan Hewan ;...|        0|390055891|1.100.000|Two Tone Brown (S...|https://www.tokop...|
|Fashion Muslim ;P...|        0|389610348|  325.000|MUKENA ALDIVA har...|https://www.tokop...|
|     Produk Lainnya |        1|389666117|  179.900|terpal kolam kota...|https://www.tokop...|
|Fashion Pria ;Aks...|        0|389609406|   58.000|TOPI TRUCKER CONS...|https://www.tokop...|
|Makanan & Minuman...|        0|389613659|  195.000|ARTE CHOCOLATE ( ...|https://www.tokop...|
|Perawatan Hewan ;...|        2|389671338|   10.00

In [4]:
df.dtypes

[('breadcrumb', 'string'),
 ('item_sold', 'string'),
 ('pid', 'string'),
 ('price', 'string'),
 ('title', 'string'),
 ('url', 'string')]

In [6]:
# some data have null in item_sold, pid, price, url
df = df.where(df['item_sold'].isNotNull())\
    .where(df['pid'].isNotNull())\
    .where(df['price'].isNotNull())\
    .where(df['title'].isNotNull())\
    .where(df['url'].isNotNull())

# remove wrong value
df = df.rdd.filter(lambda x: "http" not in x['item_sold'])\
    .filter(lambda x: "http" in x['url']).toDF()

In [None]:
# extract more data


## Total Transaction (QTY)

In [112]:
from operator import add
df_clean.select('item_sold').rdd.map(lambda x: int(x['item_sold'])).reduce(add)

12298304

## Total Transaction (Rupiah)

In [113]:
def price(x):
    return int(x.replace(".", ""))

df_clean.select('price').rdd.map(lambda x: price(x['price'])).reduce(add)

862586607816

## Total Seller

In [114]:
def seller(x):
    return x.split("/")[3]

df_clean.select('url').rdd.map(lambda x: seller(x['url'])).distinct().count()

90144

## Total Product

In [115]:
df_clean.rdd.count()

1281288

## 10 Best Selling

In [118]:
# name, url, market_name, market_url, 
df_clean.rdd.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 75.0 failed 1 times, most recent failure: Lost task 0.0 in stage 75.0 (TID 179, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853)
	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:260)
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply$mcV$sp(TaskResult.scala:50)
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply(TaskResult.scala:48)
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply(TaskResult.scala:48)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:48)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:511)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor102.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853)
	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709)
	at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:260)
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply$mcV$sp(TaskResult.scala:50)
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply(TaskResult.scala:48)
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply(TaskResult.scala:48)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:48)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:511)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51518)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/socketserver.py", line 313, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/anaconda3/lib/python3.7/socketserver.py", line 344, in process_request
    self.finish_request(request, client_address)
  File "/anaconda3/lib/python3.7/socketserver.py", line 357, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/anaconda3/lib/python3.7/socketserver.py", line 717, in __init__
    self.handle()
  File "/anaconda3/lib/python3.7/site-packages/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/anaconda3/lib/python3.7/site-packages/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/anaconda3/lib/python3.7/site-packages/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(sel