In [1]:
!pip install pyarrow



In [2]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
# import pyspark.pandas
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
import seaborn as sns

import torch
import torch.nn as nn
from sparktorch import serialize_torch_obj, SparkTorch

In [3]:
spark = SparkSession.builder.appName("Vu dep trai").config("spark.executor.memory","10g").getOrCreate()
# conf = pyspark.SparkConf().setMaster("spark://node-master:7077")\
#         .setAppName("Vu dep trai")\
#         .set("spark.executor.memory","15g")
# # sc = SparkContext.getOrCreate(conf=conf)
# # spark.stop()
# sc = SparkContext(conf = conf)
# spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/04 18:36:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df_stores_raw = spark.read.csv("data/ba-walmart/stores.csv", header=True, inferSchema=True)
df_feature_raw = spark.read.csv("data/ba-walmart/features.csv", header=True, inferSchema=True)
df_train_raw = spark.read.csv("data/ba-walmart/train.csv", header=True, inferSchema=True)
df_test_raw = spark.read.csv("data/ba-walmart/test.csv", header=True, inferSchema=True)

In [5]:
df_feature = df_feature_raw.drop("MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5")
df = df_train_raw.join(df_feature, how="left", on=["Store", "Date", "IsHoliday"], ).join(df_stores_raw, how="left", on=["Store"])
df_test = df_test_raw.join(df_feature, how="left", on=["Store", "Date", "IsHoliday"]).join(df_stores_raw, how="left", on=["Store"])
df = df.withColumn("CPI", df["CPI"].cast(FloatType())).withColumn("Unemployment", df["Unemployment"].cast(FloatType()))
df_test = df_test.withColumn("CPI", df_test["CPI"].cast(FloatType())).withColumn("Unemployment", df_test["Unemployment"].cast(FloatType()))


In [6]:
df = df.withColumn("Year", year("Date")).withColumn("Month", month("Date")).withColumn("Week", weekofyear("Date"))
df_test = df_test.withColumn("Year", year("Date")).withColumn("Month", month("Date")).withColumn("Week", weekofyear("Date"))
df = df.withColumn("IsHoliday", df["IsHoliday"].cast(IntegerType()))
df_test = df_test.withColumn("IsHoliday", df_test["IsHoliday"].cast(IntegerType()))

In [7]:
df_clean = df.filter(df["Weekly_Sales"] > 0)
df_clean = df_clean.filter(df_clean["Weekly_Sales"] < 450000)

In [8]:
types = df_clean.select("Type").distinct().collect()
types.sort()
mapping = {t.Type: str(i) for i, t in enumerate(types)}
df_clean = df_clean.replace(mapping, subset=["Type"])
df_test = df_test.replace(mapping, subset=["Type"])
df_clean = df_clean.withColumn("Type", df_clean["Type"].cast(IntegerType()))
df_test = df_test.withColumn("Type", df_test["Type"].cast(IntegerType()))

In [9]:
## From EDA select important columns
drop_col = ['Date', 'Temperature', 'Fuel_Price', 'CPI', 'Unemployment', 'Month']
# input_col = ['Store', 'IsHoliday', 'Type', 'Size', 'Week','Dept','Year']
onehot_col = ['Store', 'Type']
target = 'Weekly_Sales'

In [10]:
# target_scale = df_clean.agg({"Weekly_Sales": "mean"}).collect()[0][0]

In [11]:
df_clean = df_clean.drop(*drop_col)
df_clean = df_clean.na.drop()
df_clean_pd = df_clean.toPandas()
min_target = df_clean_pd[target].min()
max_target = df_clean_pd[target].max()
for oh_cols in onehot_col:
    df_clean_pd = pd.concat([df_clean_pd, pd.get_dummies(df_clean_pd[oh_cols], prefix=oh_cols)], axis=1)
    df_clean_pd = df_clean_pd.drop(oh_cols, axis=1)
    
df_clean_pd = (df_clean_pd - df_clean_pd.min()) / (df_clean_pd.max() - df_clean_pd.min())
df_clean_pd = df_clean_pd.dropna()
df_clean_pd = df_clean_pd.reset_index(drop=True)
df_clean_pd.head()

                                                                                

Unnamed: 0,IsHoliday,Dept,Weekly_Sales,Size,Year,Week,Store_1,Store_2,Store_3,Store_4,...,Store_39,Store_40,Store_41,Store_42,Store_43,Store_44,Store_45,Type_0,Type_1,Type_2
0,0.0,0.0,0.05902,0.630267,0.0,0.078431,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
1,1.0,0.0,0.109019,0.630267,0.0,0.098039,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
2,0.0,0.0,0.098496,0.630267,0.0,0.117647,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
3,0.0,0.0,0.045947,0.630267,0.0,0.137255,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
4,0.0,0.0,0.051687,0.630267,0.0,0.156863,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0


In [12]:
df_clean = spark.createDataFrame(df_clean_pd)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [13]:
# min max scaler
all_col = df_clean.columns
all_col.remove(target)
mm_assembler = VectorAssembler(inputCols=all_col, outputCol="features")
mm_pipeline = pyspark.ml.Pipeline(stages=[mm_assembler]).fit(df_clean)
df_clean = mm_pipeline.transform(df_clean)

In [14]:
df_clean[["Weekly_Sales"]].show(5, truncate=False)

23/02/04 18:38:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/04 18:38:35 WARN TaskSetManager: Stage 16 contains a task of very large size (16633 KiB). The maximum recommended task size is 1000 KiB.


[Stage 16:>                                                         (0 + 1) / 1]

23/02/04 18:38:39 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 16 (TID 24): Attempting to kill Python Worker
+-------------------+
|Weekly_Sales       |
+-------------------+
|0.05901994249481135|
|0.10901918001495786|
|0.0984961529339467 |
|0.04594658606039068|
|0.05168734897215822|
+-------------------+
only showing top 5 rows



                                                                                

In [15]:
# split 80% first data for training
df_train, df_valid = df_clean.randomSplit([0.8, 0.2], seed=1234)

In [22]:
net = nn.Sequential(
    nn.Linear(53, 128),
    nn.ReLU(),
    nn.Linear(128, 256),
    nn.ReLU(),
    nn.Linear(256, 512),
    nn.ReLU(),
    nn.Linear(512, 256),
    nn.ReLU(),
    nn.Linear(256, 128),
    nn.ReLU(),
    nn.Linear(128, 1)
)

torch_obj = serialize_torch_obj(
    model=net,
    criterion=nn.MSELoss(),
    optimizer=torch.optim.Adam,
    lr=0.0001
)

spark_model = SparkTorch(
    inputCol='features',
    labelCol=target,
    predictionCol="prediction",
    torchObj=torch_obj,
    miniBatch=1000,
    iters=50,
    verbose=1
)

In [23]:
p = pyspark.ml.Pipeline(stages=[spark_model])
model = p.fit(df_train)

23/02/04 18:52:08 WARN TaskSetManager: Stage 20 contains a task of very large size (16633 KiB). The maximum recommended task size is 1000 KiB.


[Stage 20:>                                                       (0 + 12) / 12]

23/02/04 18:52:13 ERROR Executor: Exception in task 0.0 in stage 20.0 (TID 72)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/opt/bitnami/spark/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/bitnami/python/lib/python3.8/site-packages/sparktorch/distributed.py", line 254, in <lambda>
    rdd, lambda i, x: handle_model(
  File "/opt/bitnami/python/lib/python3.8/site-packages/sparktorch/distributed.py", line 105, in handle_model
    dist.init_process_group('gloo', rank=index + 1, world_size=world_size)
  File "/opt/bitnami/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 503, in init_process_group
    _update_default_

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(20, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/opt/bitnami/spark/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/bitnami/python/lib/python3.8/site-packages/sparktorch/distributed.py", line 254, in <lambda>
    rdd, lambda i, x: handle_model(
  File "/opt/bitnami/python/lib/python3.8/site-packages/sparktorch/distributed.py", line 105, in handle_model
    dist.init_process_group('gloo', rank=index + 1, world_size=world_size)
  File "/opt/bitnami/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 503, in init_process_group
    _update_default_pg(_new_process_group_helper(
  File "/opt/bitnami/python/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 588, in _new_process_group_helper
    pg = ProcessGroupGloo(
RuntimeError: [/pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:769] connect [172.17.0.2]:24477: Connection refused

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


23/02/04 18:52:13 WARN TaskSetManager: Lost task 10.0 in stage 20.0 (TID 67) (c84f15796326 executor driver): TaskKilled (Task ResultTask(20, 0) from barrier stage ResultStage 20 (fit at /tmp/ipykernel_7295/2981807082.py:2) failed.)


[Stage 20:>                                                        (0 + 5) / 12]

23/02/04 18:52:16 WARN PythonRunner: Incomplete task 9.0 in stage 20 (TID 69) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN PythonRunner: Incomplete task 4.0 in stage 20 (TID 65) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN PythonRunner: Incomplete task 1.0 in stage 20 (TID 68) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN PythonRunner: Incomplete task 8.0 in stage 20 (TID 61) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN PythonRunner: Incomplete task 4.0 in stage 20 (TID 65) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN TaskSetManager: Lost task 4.0 in stage 20.0 (TID 65) (c84f15796326 executor driver): TaskKilled (Task ResultTask(20, 0) from barrier stage ResultStage 20 (fit at /tmp/ipykernel_7295/2981807082.py:2) failed.)
23/02/04 18:52:16 WARN PythonRunner: Incomplete task 9.0 in stage 20 (TID 69) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN Ta

[Stage 20:>                                                        (0 + 2) / 12]

23/02/04 18:52:16 WARN PythonRunner: Incomplete task 7.0 in stage 20 (TID 66) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN TaskSetManager: Lost task 7.0 in stage 20.0 (TID 66) (c84f15796326 executor driver): TaskKilled (Task ResultTask(20, 0) from barrier stage ResultStage 20 (fit at /tmp/ipykernel_7295/2981807082.py:2) failed.)
23/02/04 18:52:16 WARN PythonRunner: Incomplete task 1.0 in stage 20 (TID 68) interrupted: Attempting to kill Python Worker
23/02/04 18:52:16 WARN TaskSetManager: Lost task 1.0 in stage 20.0 (TID 68) (c84f15796326 executor driver): TaskKilled (Task ResultTask(20, 0) from barrier stage ResultStage 20 (fit at /tmp/ipykernel_7295/2981807082.py:2) failed.)
23/02/04 18:55:57 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-11ad814e-5f4f-4e7d-90aa-344e34f9504a. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-11ad814e-5f4f-4e7d-90aa-344e34f9504a
	at org.apache.spark.

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 37720)
Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.r

In [None]:
pred = model.transform(df_valid).select("prediction", target)
pred = pred.withColumn("prediction", pred["prediction"] * (max_target - min_target) + min_target)
pred = pred.withColumn(target, pred[target] * (max_target - min_target) + min_target)
pred[["prediction", target]].show()

[Stage 34:>               (0 + 12) / 12][Stage 36:>                 (0 + 0) / 1]

23/02/04 18:23:06 WARN TaskSetManager: Stage 36 contains a task of very large size (16633 KiB). The maximum recommended task size is 1000 KiB.


[Stage 36:>                                                         (0 + 1) / 1]

+------------------+------------------+
|        prediction|      Weekly_Sales|
+------------------+------------------+
| 29245.37892680168|3440.6899999999996|
|29337.512592778203|           3874.45|
| 29684.89068370342|           3917.19|
|27879.058874142167|           4061.89|
| 29620.98361145258|           4136.05|
| 27807.19763525009|           4150.96|
| 28062.58364939928|           4159.25|
| 28525.33491374254|           4204.65|
| 29422.45038094282|           4205.62|
| 29048.07909554958|           4238.56|
|29009.868889975547|           4324.95|
|28943.353430008887|           4377.74|
|  27976.0600621891|           4417.17|
|28853.091887905597|           4444.66|
|28407.019831380843|           4444.92|
| 28893.86643122196|           4484.47|
| 29150.75958374977|           4544.62|
|29044.636904764175|           4645.99|
|28644.257256453035|           4660.39|
| 28166.19233346939|           4756.69|
+------------------+------------------+
only showing top 20 rows



                                                                                

In [None]:
# get pred target max
pred_pd = pred.toPandas()
pred_pd["diff"] = pred_pd["prediction"] - pred_pd[target]
pred_pd["diff"].mean()

23/02/04 18:26:02 WARN TaskSetManager: Stage 37 contains a task of very large size (16633 KiB). The maximum recommended task size is 1000 KiB.


ERROR:root:KeyboardInterrupt while sending command.               (0 + 12) / 12]
Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/bitnami/python/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/bitnami/python/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 