In [9]:
from pyspark.ml import Transformer
from pyspark.ml import Pipeline
import pyspark.sql.functions as F 
import pyspark.sql.types as T 
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as SparkDataFrame

In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/17 05:12:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
train_raw: SparkDataFrame = spark.read.csv("../raw/train.csv", header=True)
test_raw: SparkDataFrame = spark.read.csv("../raw/test.csv", header=True)
store_raw: SparkDataFrame = spark.read.csv("../raw/store.csv", header=True)

In [5]:
train_raw.withColumn("Year",F.year(F.col("Date"))).toPandas()

                                                                                

Unnamed: 0,Store,DayOfWeek,Date,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday,Year
0,1,5,2015-07-31,5263,555,1,1,0,1,2015
1,2,5,2015-07-31,6064,625,1,1,0,1,2015
2,3,5,2015-07-31,8314,821,1,1,0,1,2015
3,4,5,2015-07-31,13995,1498,1,1,0,1,2015
4,5,5,2015-07-31,4822,559,1,1,0,1,2015
...,...,...,...,...,...,...,...,...,...,...
1017204,1111,2,2013-01-01,0,0,0,0,a,1,2013
1017205,1112,2,2013-01-01,0,0,0,0,a,1,2013
1017206,1113,2,2013-01-01,0,0,0,0,a,1,2013
1017207,1114,2,2013-01-01,0,0,0,0,a,1,2013


In [6]:
store_raw.limit(10).toPandas()

Unnamed: 0,Store,StoreType,Assortment,CompetitionDistance,CompetitionOpenSinceMonth,CompetitionOpenSinceYear,Promo2,Promo2SinceWeek,Promo2SinceYear,PromoInterval
0,1,c,a,1270,9,2008,0,,,
1,2,a,a,570,11,2007,1,13.0,2010.0,"Jan,Apr,Jul,Oct"
2,3,a,a,14130,12,2006,1,14.0,2011.0,"Jan,Apr,Jul,Oct"
3,4,c,c,620,9,2009,0,,,
4,5,a,a,29910,4,2015,0,,,
5,6,a,a,310,12,2013,0,,,
6,7,a,c,24000,4,2013,0,,,
7,8,a,a,7520,10,2014,0,,,
8,9,a,c,2030,8,2000,0,,,
9,10,a,a,3160,9,2009,0,,,


In [8]:
store_raw.withColumn("CompetitionOpenSinceMonths",F.make_date(F.col("CompetitionOpenSinceYear"),F.col("CompetitionOpenSinceMonth"),F.lit(1))).toPandas()

Unnamed: 0,Store,StoreType,Assortment,CompetitionDistance,CompetitionOpenSinceMonth,CompetitionOpenSinceYear,Promo2,Promo2SinceWeek,Promo2SinceYear,PromoInterval,CompetitionOpenSinceMonths
0,1,c,a,1270,9,2008,0,,,,2008-09-01
1,2,a,a,570,11,2007,1,13,2010,"Jan,Apr,Jul,Oct",2007-11-01
2,3,a,a,14130,12,2006,1,14,2011,"Jan,Apr,Jul,Oct",2006-12-01
3,4,c,c,620,9,2009,0,,,,2009-09-01
4,5,a,a,29910,4,2015,0,,,,2015-04-01
...,...,...,...,...,...,...,...,...,...,...,...
1110,1111,a,a,1900,6,2014,1,31,2013,"Jan,Apr,Jul,Oct",2014-06-01
1111,1112,c,c,1880,4,2006,0,,,,2006-04-01
1112,1113,a,c,9260,,,0,,,,
1113,1114,a,c,870,,,0,,,,


In [21]:
class CompetitionOpenForMonths(Transformer):
    def __init__(self) -> None:
        super().__init__()

    def _transform(self, dataset: SparkDataFrame) -> SparkDataFrame:
        return dataset.withColumn(
            "CompetitionOpenForMonths",
            F.months_between(
                F.col("Date"),
                F.make_date(
                    F.col("CompetitionOpenSinceYear"),
                    F.col("CompetitionOpenSinceMonth"),
                    F.lit(1),
                ),
            ),
        )


In [22]:
p = Pipeline(stages=[CompetitionOpenForMonths()])

In [23]:
train_joined = train_raw.join(store_raw, on="Store")
mdl = p.fit(train_joined)

In [24]:
mdl.transform(train_joined).toPandas()

                                                                                

Unnamed: 0,Store,DayOfWeek,Date,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday,StoreType,Assortment,CompetitionDistance,CompetitionOpenSinceMonth,CompetitionOpenSinceYear,Promo2,Promo2SinceWeek,Promo2SinceYear,PromoInterval,CompetitionOpenForMonths
0,1,5,2015-07-31,5263,555,1,1,0,1,c,a,1270,9,2008,0,,,,82.967742
1,2,5,2015-07-31,6064,625,1,1,0,1,a,a,570,11,2007,1,13,2010,"Jan,Apr,Jul,Oct",92.967742
2,3,5,2015-07-31,8314,821,1,1,0,1,a,a,14130,12,2006,1,14,2011,"Jan,Apr,Jul,Oct",103.967742
3,4,5,2015-07-31,13995,1498,1,1,0,1,c,c,620,9,2009,0,,,,70.967742
4,5,5,2015-07-31,4822,559,1,1,0,1,a,a,29910,4,2015,0,,,,3.967742
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1017204,1111,2,2013-01-01,0,0,0,0,a,1,a,a,1900,6,2014,1,31,2013,"Jan,Apr,Jul,Oct",-17.000000
1017205,1112,2,2013-01-01,0,0,0,0,a,1,c,c,1880,4,2006,0,,,,81.000000
1017206,1113,2,2013-01-01,0,0,0,0,a,1,a,c,9260,,,0,,,,
1017207,1114,2,2013-01-01,0,0,0,0,a,1,a,c,870,,,0,,,,


23/05/17 16:13:05 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3210968 ms exceeds timeout 120000 ms
23/05/17 16:13:06 WARN SparkContext: Killing executors is not supported by current scheduler.
23/05/17 16:13:06 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.