In [1]:
import pandas as pd
import pyspark

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from scipy.stats import skew,norm,zscore
from scipy.signal import periodogram



In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark import SparkContext

In [4]:
spark = SparkSession.builder.appName("app1").getOrCreate()

In [5]:
spark.version

'3.3.0'

In [6]:
import matplotlib.pyplot as plt
import matplotlib
plt.style.use('default')
plt.rcParams["figure.figsize"] = (20,15)
matplotlib.rcParams['axes.labelsize'] = 14
matplotlib.rcParams['xtick.labelsize'] = 12
matplotlib.rcParams['ytick.labelsize'] = 12
matplotlib.rcParams['text.color'] = 'k'

# Importing Data

In [7]:
orig_holidays_events = spark.read.csv('./store-sales-time-series-forecasting/holidays_events.csv')
orig_oil = pd.read_csv('./store-sales-time-series-forecasting/oil.csv')
orig_stores = spark.read.csv('./store-sales-time-series-forecasting/stores.csv')
orig_transactions = spark.read.csv('./store-sales-time-series-forecasting/transactions.csv')
orig_train = spark.read.csv('./store-sales-time-series-forecasting/train.csv')
orig_test = spark.read.csv('./store-sales-time-series-forecasting/test.csv')
sample_submission = spark.read.csv('./store-sales-time-series-forecasting/sample_submission.csv')

# Fixing headers

In [8]:
def header_df(df):
    first_row=df.first()
    for i in range(len(first_row)):
        df=df.withColumnRenamed('_c'+str(i),first_row[i])
    return(df)

In [9]:
orig_holidays_events=header_df(orig_holidays_events)

In [10]:
orig_holidays_events=orig_holidays_events.filter((orig_holidays_events.date != 'date') & (orig_holidays_events.type != 'type') & (orig_holidays_events.locale_name != 'locale_name') & (orig_holidays_events.description != 'description') & (orig_holidays_events.transferred != 'transferred'))

In [11]:
orig_stores=header_df(orig_stores)

In [12]:
orig_stores=orig_stores.filter((orig_stores.store_nbr != 'store_nbr') & (orig_stores.city != 'city')& (orig_stores.state != 'state') & (orig_stores.type != 'type')& (orig_stores.cluster != 'cluster') )

In [13]:
orig_transactions=header_df(orig_transactions)

In [14]:
orig_transactions=orig_transactions.filter((orig_transactions.store_nbr != 'store_nbr') & (orig_transactions.date != 'date')& (orig_transactions.transactions != 'transactions'))

In [15]:
orig_train=header_df(orig_train)

In [16]:
orig_train=orig_train.filter((orig_train.id != 'id') & (orig_train.date != 'date') & (orig_train.store_nbr != 'store_nbr') & (orig_train.family != 'family') & (orig_train.sales != 'sales') & (orig_train.onpromotion != 'onpromotion'))

In [17]:
orig_test=header_df(orig_test)

In [18]:
orig_test=orig_test.filter((orig_test.id != 'id') & (orig_test.date != 'date') & (orig_test.store_nbr != 'store_nbr') & (orig_test.family != 'family')& (orig_test.onpromotion != 'onpromotion'))

In [19]:
sample_submission=header_df(sample_submission)

In [20]:
sample_submission=sample_submission.filter((sample_submission.id !='id') & (sample_submission.sales != 'sales'))

# Adding features to stores data for stores added in last year

In [21]:
final_df=orig_stores.alias('final_df')
final_df=final_df.withColumn('uniquestore',\
                             F.when(F.col('city').isin(['Quito', 'Guayaquil', 'Santo Domingo', 'Cuenca', 'Manta', 'Machala', 'Latacunga', 'Ambato']),0)\
                             .otherwise(1))


In [22]:
final_df=final_df\
.withColumn('newstore',F.when(F.col('store_nbr').isin(['19', '20', '21', '28', '35', '41', '51', '52']),1).otherwise(0))

#  Joining stores and [train,test]

In [24]:
orig_test=orig_test.withColumn('sales',F.lit(None)).select('id', 'date', 'store_nbr', 'family', 'sales', 'onpromotion')

In [25]:
final_df=orig_train.union(orig_test).join(final_df, on=['store_nbr'], how='left').withColumnRenamed('type', 'store')

# Events

## Non transferred events  : Correcting New year which is marked as transferred in 2017

In [26]:
orig_holidays_events=orig_holidays_events.withColumn('transferred',F.when(F.col('date')=='2017-01-01','False').otherwise(F.col('transferred')))

## Removing Duplicates 

In [27]:
orig_holidays_events=orig_holidays_events.dropDuplicates()

## Adding event type 

In [28]:
orig_holidays_events=orig_holidays_events.withColumn('type',F.when(F.col('type')=='Event',F.col('description')[0:7])\
                                                     .otherwise(F.col('type')))

##  Merging orig_holidays_events and final_df 

In [29]:
nat_df=orig_holidays_events.filter(F.col('locale')=='National')
loc_df=orig_holidays_events.filter(F.col('locale')=='Local')
reg_df=orig_holidays_events.filter(F.col('locale')=='Regional')

### Merging national holidays, key = 'date' 

In [30]:
final_df_nat=final_df.join(nat_df,on='date',how='inner')

### Merging local holidays key = ['date,'city'] with locale name = 'city' 

In [31]:
loc_df=loc_df.withColumn('city',F.col('locale_name'))

In [32]:
final_df_loc=final_df.join(loc_df,on=['date','city'],how='inner')

###  Merging regional holidays with key = ['date','state'] with locale name = 'state'

In [33]:
reg_df=reg_df.withColumn('state',F.col('locale_name'))
final_df_reg=final_df.join(reg_df,on=['date','state'],how='inner')

### Adding the no holiday info 

In [34]:
data_holiday=final_df_nat.union(final_df_loc).union(final_df_reg).orderBy('date')

In [35]:
data_holiday_n= final_df.join(data_holiday,on=final_df.columns,how='left')

In [36]:
data_holiday_n=data_holiday_n.withColumnRenamed('type','event_type')


### Marking no holiday event as Normal Day

In [37]:
data_holiday_n = data_holiday_n.withColumn('event_type',F.when(F.col('event_type').isNull(),'Normal Day').otherwise(F.col('event_type')))

## Adding New Year holiday 

In [38]:
data_holiday_ny=data_holiday_n.withColumn('firstday',F.when(F.col('description')=='Primer dia del ano',1).otherwise(0))

## Adding Event indicator 

In [39]:
data_holiday_ny_event=data_holiday_ny.withColumn('isevent',F.when(F.col('event_type')!='Normal Day',1).otherwise(0))

## Adding Easter 

In [40]:
data_holiday_ny_event_easter=data_holiday_ny_event\
.withColumn('isevent',F.when(F.col('date').isin(['2017-04-16', '2016-03-27', '2015-04-05', '2014-04-20', '2013-03-31']),1)\
                             .otherwise(F.col('isevent')))

In [41]:
data_holiday_ny_event_easter=data_holiday_ny_event\
.withColumn('event_type',F.when(F.col('date').isin(['2017-04-16', '2016-03-27', '2015-04-05', '2014-04-20', '2013-03-31']),'Holiday')\
                             .otherwise(F.col('event_type')))

## Adding closing Days 

### closing days are days with sum of sales in store = 0

In [42]:
sums=data_holiday_ny_event_easter.select('date','store_nbr','sales').groupby(['date','store_nbr']).agg(F.sum('sales').alias('sum_sales'))

In [43]:
from pyspark.sql.types import DoubleType
data_holiday_ny_event_easter=data_holiday_ny_event_easter.withColumn('sales',F.col('sales').cast(DoubleType()))

In [44]:
data_holiday_ny_event_easter_sum=data_holiday_ny_event_easter\
.join(sums,on=['date','store_nbr'],how='left')

In [45]:
data_holiday_ny_event_easter_closed=data_holiday_ny_event_easter_sum\
.withColumn('isclosed',F.when(F.col('sum_sales')==0,1).otherwise(1)).drop('sums')

In [46]:
data_holiday_ny_event_easter_closed=data_holiday_ny_event_easter_closed\
.withColumn('isclosed',F.when(F.col('date')=='2017-01-01',1).otherwise(F.col('isclosed')))\
.withColumn('isclosed',F.when(F.col('date')>'2017-08-15',0).otherwise(F.col('isclosed')))

## Transactions

In [47]:
data_holiday_ny_event_easter_closed_transactions=data_holiday_ny_event_easter_closed\
.join(orig_transactions,on=['date','store_nbr'],how='left')

### Closure days have 0 transactions 

In [48]:
from pyspark.sql.types import DoubleType, IntegerType, DateType
data_holiday_ny_event_easter_closed_transactions=data_holiday_ny_event_easter_closed_transactions\
.withColumn('transactions',F.col('transactions').cast(DoubleType()))\
.withColumn('transactions',F.when((F.col('transactions').isNull()) & (F.col('isclosed')==1),0).otherwise(F.col('transactions')))

In [49]:
group_df=data_holiday_ny_event_easter_closed_transactions.groupby(['store_nbr','date']).agg(F.avg(F.col('transactions')).alias('average_transactions'))

In [50]:
data_holiday_ny_event_easter_closed_transactions=data_holiday_ny_event_easter_closed_transactions\
.join(group_df,on=['date','store_nbr'],how='left')

### Replacing with the Average transaction number for the store in case of missing values

In [51]:
data_holiday_ny_event_easter_closed_transactions=data_holiday_ny_event_easter_closed_transactions\
.withColumn('transactions',F.when((F.col('transactions').isNull()) & (F.col('isclosed')==0),F.col('average_transactions')).otherwise(F.col('transactions')))

In [52]:
data_holiday_ny_event_easter_closed_transactions=data_holiday_ny_event_easter_closed_transactions.drop('average_transactions')

In [53]:
data_holiday_ny_event_easter_closed_transactions=data_holiday_ny_event_easter_closed_transactions\
.withColumn('transferred',F.when(F.col('transferred').isNull(),0).otherwise(F.col('transferred')))

# Promos : adding sum of the promotions on the store/day level

In [54]:
total_promos=data_holiday_ny_event_easter_closed_transactions.groupby(['date','store_nbr']).agg(F.sum('onpromotion').alias('tot_store_day_onprom'))

In [55]:
data_holiday_ny_event_easter_closed_transactions_promo=data_holiday_ny_event_easter_closed_transactions\
.join(total_promos,on=['date','store_nbr'],how='left')

In [56]:
data_holiday_ny_event_easter_closed_transactions_promo=data_holiday_ny_event_easter_closed_transactions_promo\
.withColumn('transactions',F.when(F.col('date')>'2017-08-15',None).otherwise(F.col('transactions')))

In [57]:
data_holiday_ny_event_easter_closed_transactions_promo.write.parquet('data_without_oil.parquet')

## Adding oil info 

In [58]:
data_without_oil=pd.read_parquet('data_without_oil.parquet', engine='pyarrow')
data_without_oil['date']=pd.to_datetime(data_without_oil.date)

In [59]:
def oil_func (orig_df):

    df = orig_df.copy()

    # Adding missing values
    df = df.set_index('date').resample("D").mean().interpolate(limit_direction='backward').reset_index()

    # Adding new features
    df['lagoil_1_dcoilwtico'] = df['dcoilwtico'].shift(1)
    df['lagoil_2_dcoilwtico'] = df['dcoilwtico'].shift(2)
    df['lagoil_3_dcoilwtico'] = df['dcoilwtico'].shift(3)
    df['lagoil_4_dcoilwtico'] = df['dcoilwtico'].shift(4)
    df['oil_week_avg'] = df['dcoilwtico'].rolling(7).mean()

    df.dropna(inplace = True)

    # Merging orig_oil and final_df
    df = data_without_oil.merge(df, on=['date'], how='left')

    return df

In [60]:
orig_oil['date']=pd.to_datetime(orig_oil.date)
final_df = oil_func(orig_oil)

In [61]:
final_df[['dcoilwtico','lagoil_1_dcoilwtico','lagoil_2_dcoilwtico','lagoil_3_dcoilwtico','lagoil_4_dcoilwtico','oil_week_avg']]=final_df[['dcoilwtico','lagoil_1_dcoilwtico','lagoil_2_dcoilwtico','lagoil_3_dcoilwtico','lagoil_4_dcoilwtico','oil_week_avg']].fillna(final_df.dcoilwtico.min())

In [63]:
final_df.to_csv('prepared_data.csv',index=False)