In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
import pylab as pl
import glob


In [2]:
import pyarrow
pyarrow.__version__

In [3]:
#copy
#the following code takes a list such as
#[1,1,2,6,8,5,5,7,8,8,1,1,4,5,5,0,0,0,1,1,4,4,5,1,3,3,4,5,4,1,1]
#with states labeled as successive integers starting with 0
#and returns a transition matrix, M,
#where M[i][j] is the probability of transitioning from i to j

def transition_matrix(transitions):
    n = 1+ max(transitions) #number of states

    M = [[0]*n for _ in range(n)]

    for (i,j) in zip(transitions,transitions[1:]):
        M[i][j] += 1

    #now convert to probabilities:
    for row in M:
        s = sum(row)
        if s > 0:
            row[:] = [f/s for f in row]
            #print(sum(row))
            #row[:] /= np.sum(row[:])
            #assert sum(row)==1
    return M





In [4]:
#copy
def fix_rowsums(matrix):
  """
  If the row sum is not identically unity, correct the diagonal element to enforce
  """
  matrix_size = matrix.shape[0]
  for i in range(matrix_size):
    diagonal = matrix[i, i]
    rowsum = matrix[i].sum()
    matrix[i, i] = diagonal + 1.0 - rowsum
    
  return matrix

In [5]:
#copy

def fix_negativerates(matrix):
  """
  If a matrix entity is below zero, set to zero and correct the diagonal element to enforce
  """

  matrix_size = matrix.shape[0]
  # For all rows
  for i in range(matrix_size):
      maxval_index = matrix[i].argmax()
      row_adjust = 0.0
      # Search all cols for negative entries
      for j in range(matrix_size):
          if matrix[i, j] < 0.0:
              row_adjust += matrix[i, j]
              matrix[i, j] = 0.0
      # Add the adjustment to the diagonal
      matrix[i, maxval_index] += row_adjust
  return matrix



In [6]:
#copy
def validate(matrix, accuracy=1e-3):
    """ Validate required properties of a transition matrix. The following are checked
    1. check squareness
    2. check that all values are probabilities (between 0 and 1)
    3. check that all rows sum to one
    :param accuracy: accuracy level to use for validation
    :type accuracy: float
    :returns: List of tuples with validation messages
    """
    validation_messages = []

    
    # checking squareness of matrix
    if matrix.shape[0] != matrix.shape[1]:
        validation_messages.append(("Matrix Dimensions Differ: ", matrix.shape))
    else:
        matrix_size = matrix.shape[0]
        # checking that values of matrix are within allowed range
        for i in range(matrix_size):
            for j in range(matrix_size):
                if matrix[i, j] < 0:
                    validation_messages.append(("Negative Probabilities: ", (i, j, matrix[i, j])))

                    raise ValueError("Negative Probabilities: ", (i, j, matrix[i, j]))
                if matrix[i, j] > 1:
                    validation_messages.append(("Probabilities Larger than 1: ", (i, j, matrix[i, j])))
                   
                    raise ValueError("Probabilities Larger than 1: ", (i, j, matrix[i, j]))
        # checking row sums of matrix
        for i in range(matrix_size):
            rowsum = matrix[i].sum()
            if abs(rowsum - 1.0) > accuracy:
                validation_messages.append(("Rowsum not equal to one: ", (i, rowsum)))
                raise ValueError("Rowsum not equal to one: ", (i, rowsum))
    print(validation_messages)

In [7]:
#copy
def transition_matrix_dict(from_dataframe):
  m_dict = {}
  for name, group in from_dataframe.groupby(['store', 'item']):
    t = group.sales.values
    m = transition_matrix(t)
    m = np.array(m)
    m = fix_rowsums(m)
    m = fix_negativerates(m)
    
    print(np.sort(np.unique(t)))
    validate(m)
    #for row in m: print(' '.join('{0:.2f}'.format(x) for x in row))
    #print(m)
    m_dict[name[0], name[1]] = m
  return m_dict

In [8]:
#copy
def markov_forecast(n, m_dict, store, item, sequence_length):
  # last number of previous sequence
  forecast = np.zeros((sequence_length,1))
  #res = np.argmax(m_dict[store, item][n,:])
  probabilities = m_dict[store, item][n,:] #0 - > 0, 1, 2, 3...22 0.4  0.11
  size = len(m_dict[store, item])
  res = np.random.choice(size, p=probabilities)#/sum(probabilities) # if size is 5 - > 0, 1, 2   5
  #res = probabilities.dot(np.arange(size).T)
  forecast[0, :] = res
  for i in range(1, sequence_length):
    probabilities = m_dict[store, item][res,:]
    #print(probabilities))
    #argmax_res = np.argmax(m_dict[store, item][res,:])
    #if res != argmax_res: # not in main diagonal
      #res = argmax_res
    #else:
    res = np.random.choice(size, p=probabilities, replace=True) #/sum(probabilities)
    #new_res = probabilities.dot(np.arange(size).T)
    forecast[i, :] = res
  return forecast 

In [9]:
data_path =   '/dbfs/FileStore/tables/'
parquet_path =   '/dbfs/ml/'

In [10]:
def preprocess():

  df_train = pd.read_csv(data_path + 'train.csv')
  df_test = pd.read_csv(data_path + 'test.csv')

 
  # sort
  df_train.sort_values(['store', 'item', 'date'], inplace=True)
  df_test.sort_values(['store', 'item', 'date'], inplace=True)
  
  return df_train, df_test

In [11]:
# LOAD DATA
df_train, df_test = preprocess()


In [12]:
df_train.head(5)

Unnamed: 0,date,store,item,sales
0,2013-01-01,1,1,13
1,2013-01-02,1,1,11
2,2013-01-03,1,1,14
3,2013-01-04,1,1,13
4,2013-01-05,1,1,10


In [13]:
# copy
df_markov = df_train[['store', 'item', 'sales']]

In [14]:
# copy
m_dict = transition_matrix_dict(df_markov)

In [15]:
pd.DataFrame(m_dict[(1, 4)])

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43
0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,0.0,0.0,0.0,0.0,0.0,1.110223e-16,0.142857,0.0,0.0,0.0,0.0,0.0,0.0,0.142857,0.142857,0.285714,0.142857,0.0,0.142857,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.125,0.25,0.0,0.125,0.125,0.0,0.0,0.125,0.125,0.0,0.125,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0625,0.0625,0.1875,0.0625,0.0625,0.0625,0.0625,0.0,0.0,0.0,0.125,0.0625,0.0625,0.0625,0.0625,0.0625,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,0.0,0.0,0.0,0.0,0.0,0.05882353,0.0,0.0,0.0,0.0,0.0,0.176471,0.058824,0.117647,0.117647,0.058824,0.117647,0.0,0.058824,0.176471,0.058824,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.029412,0.147059,0.088235,0.088235,0.088235,0.0,0.088235,0.058824,0.117647,0.029412,0.029412,0.058824,0.0,0.088235,0.029412,0.029412,0.0,0.0,0.029412,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [16]:
# check shapes
df_train.shape, df_test.shape

In [17]:
 # sort
df_train.sort_values(['store', 'item', 'date'], inplace=True)
df_test.sort_values(['store', 'item', 'date'], inplace=True)


In [18]:
# for spark
#df_train['date'] = df_train['date'].dt.strftime('%Y-%m-%d')
#df_test['date'] = df_test['date'].dt.strftime('%Y-%m-%d')

In [19]:
df_train['date'] = pd.to_datetime(df_train['date'])
df_test['date'] = pd.to_datetime(df_test['date'])

In [20]:

#reindex
df_train.reset_index(inplace=True, drop=True)
df_test.reset_index(inplace=True, drop=True)


In [21]:

sequence_length = 21
subseq_length = 3

In [22]:
def gen_labels(id_df, seq_length, label):

    data_matrix = id_df[label].values
    num_elements = data_matrix.shape[0]


    for start, stop in zip(range(seq_length, num_elements-seq_length), range(seq_length+seq_length, num_elements)):
        yield data_matrix[start:stop, :]

In [23]:
def gen_sequence(id_df, seq_length, seq_cols):

    data_matrix = id_df[seq_cols].values
    num_elements = data_matrix.shape[0]

    for start, stop in zip(range(0, num_elements-seq_length-seq_length), range(seq_length, num_elements-seq_length)):
        yield data_matrix[start:stop, :]



In [24]:
({'store', 'item'}.issubset(df_train.columns)) & ({'store', 'item'}.issubset(df_test.columns))

In [25]:
if ({'store', 'item'}.issubset(df_train.columns)) & ({'store', 'item'}.issubset(df_test.columns)):
  stores = set(np.union1d(df_train.store.unique(),df_test.store.unique()))
  items = set(np.union1d(df_train.item.unique(),df_test.item.unique()))

In [26]:
len(stores), len(items)

In [27]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [28]:
df_train['markov_seqs_mean'] = 0.0
df_test['markov_seqs_mean'] = 0.0
df_train['markov_seqs_std'] = 0.0
df_test['markov_seqs_std'] = 0.0

In [29]:
#df_train['target'] = df_train['target'].astype(float)
#df_test['target'] = df_test['target'].astype(float)

In [30]:
df_test.head(5)

Unnamed: 0,id,date,store,item,markov_seqs_mean,markov_seqs_std
0,0,2018-01-01,1,1,0.0,0.0
1,1,2018-01-02,1,1,0.0,0.0
2,2,2018-01-03,1,1,0.0,0.0
3,3,2018-01-04,1,1,0.0,0.0
4,4,2018-01-05,1,1,0.0,0.0


In [31]:

df_train['label'] = df_train['sales']
#df_test['label'] = df_test['sales']

In [32]:
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
dfs_train = spark.createDataFrame(df_train)
dfs_test = spark.createDataFrame(df_test)


In [33]:
#from pyspark.sql.functions import col
#dfs_test = dfs_test_original.select([col(c).cast("float") if c not in {'date'} else col(c) for c in dfs_test_original.columns])
#dfs_train = dfs_test_original.select([col(c).cast("float") if c not in {'date'} else col(c) for c in dfs_train_original.columns])


In [34]:
group_columns = ['store', 'item'] 

In [35]:
scaler_y = StandardScaler().fit(df_train.sales.values.reshape(-1,1))

In [36]:
cols_unknown = ['sales'] 

In [37]:
label = ['label']
markov_mean = ['markov_seqs_mean']
markov_std = ['markov_seqs_std'] 
features_cols = cols_unknown

features_schema = dfs_train.select(features_cols + markov_mean + markov_std + label).schema

In [38]:
display(dfs_train.take(5))

date,store,item,sales,markov_seqs_mean,markov_seqs_std,label
2013-01-01T00:00:00.000+0000,1,1,13,0.0,0.0,13
2013-01-02T00:00:00.000+0000,1,1,11,0.0,0.0,11
2013-01-03T00:00:00.000+0000,1,1,14,0.0,0.0,14
2013-01-04T00:00:00.000+0000,1,1,13,0.0,0.0,13
2013-01-05T00:00:00.000+0000,1,1,10,0.0,0.0,10


In [39]:
scaler_X = StandardScaler().fit(df_train[features_cols].values)

In [40]:
@pandas_udf(features_schema, PandasUDFType.GROUPED_MAP)
def createF(pdf):
    f = []
    store, item = pdf['store'].unique()[0], pdf['item'].unique()[0]
    print(store, item)
    #group_key = pdf[group_columns].iloc[0]
    for sequence, labels in zip(gen_sequence(pdf, sequence_length, cols_unknown), gen_labels(pdf, sequence_length, label)):
      last_sale = sequence[-1:, 0][0].astype(int) #last element from 21
      print(last_sale)
  
      markov_seqs = np.zeros((sequence_length, 10))
      for ii in range(10):
        seq = markov_forecast(last_sale, m_dict, store, item, sequence_length)
        markov_seqs[:, ii] = seq.reshape(-1)

      markov_seqs_mean = np.mean(markov_seqs, axis=1).reshape(-1,1)
      markov_seqs_std = np.std(markov_seqs, axis=1).reshape(-1,1)/100
      #
      
      #sequence_features = np.concatenate([sequence1, sequence2], axis=1)
      sequence_features = scaler_X.transform(sequence)
      markov_seqs_mean = scaler_y.transform(markov_seqs_mean)
      labels = scaler_y.transform(labels)
      sequence = np.concatenate([sequence_features, markov_seqs_mean, markov_seqs_std, labels], axis=1)
      f.append(sequence)
      
    
    #raise Exception(np.array(f).shape)
    res = pd.DataFrame(np.array(f).reshape(-1, len(features_cols) + 2 + 1)) # 2 markov, 1 label
    return res



In [41]:
dfs_train_f = dfs_train.select(group_columns + features_cols + markov_mean + markov_std + label).groupby(group_columns).apply(createF)
#dfs_test_f = dfs_test.select(group_columns + features_cols + markov_mean + markov_std + label).groupby(group_columns).apply(createF)

In [42]:
len(dfs_train_f.dtypes)

In [43]:
display(dfs_train_f.take(5))

sales,markov_seqs_mean,markov_seqs_std,label
-1,-0.8558792249713443,0.0412795348811005,-1
-1,-0.6788028260894039,0.0762954782408499,0
0,-0.515614772217812,0.1024890238025516,-1
0,-0.6475540498161204,0.08522910301065,-1
-1,-0.3975638396298518,0.1299076595124398,-1


In [44]:
len(df_train), len(df_test), 


In [45]:
dfs_train_f.cache().count()

In [46]:
df_test.shape

In [47]:
51658341/21

In [48]:
'''dfs_test_f \
.repartition(10) \
.write \
.mode("overwrite") \
.parquet(parquet_path + 'test/')

# .option("parquet.block.size", 1024 * 1024) \'''

In [49]:
dfs_train_f \
.repartition(30) \
.write \
.mode("overwrite") \
.parquet(parquet_path)

# .option("parquet.block.size", 1024 * 1024) \
# .option("parquet.compression", "snappy") \

In [50]:
import os
[f for f in os.listdir(parquet_path)]

In [51]:
import pyarrow.parquet as pq

underscore_files = [f for f in os.listdir(get_local_path(output_path)) if f.startswith("_")]
underscore_files

In [52]:
pq.EXCLUDED_PARQUET_PATHS.update(underscore_files)