# Set up packages and dataframes

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import CountVectorizer, IDF

import os
import numpy as np
from nltk.stem.snowball import SnowballStemmer
import re
import regex
from itertools import product
import pandas as pd

spark = SparkSession.builder.getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [2]:
DATA_FOLDER = 'data/'

transactions = spark.read.options(    
        header=True,  
        inferSchema=True
    ).csv(
        os.path.join(DATA_FOLDER, 'sales_train.csv'), 
    )

items = spark.read.options(    
        header=True,  
        inferSchema=True
    ).csv(
        os.path.join(DATA_FOLDER, 'items.csv'), 
    )

item_categories = spark.read.options(    
        header=True,  
        inferSchema=True
    ).csv(
        os.path.join(DATA_FOLDER, 'item_categories.csv'), 
    )

shops = spark.read.options(    
        header=True,  
        inferSchema=True
    ).csv(
        os.path.join(DATA_FOLDER, 'shops.csv'), 
    )

test = spark.read.options(    
        header=True,  
        inferSchema=True
    ).csv(
        os.path.join(DATA_FOLDER, 'test.csv'), 
    )

# EDA

## Look at dataframes

Print the top 10 rows and the total number of rows.

In [3]:
print('Total number of rows: {}'.format(transactions.count()))
transactions.show(10)

print('Total number of rows: {}'.format(items.count()))
items.show(10)

print('Total number of rows: {}'.format(item_categories.count()))
item_categories.show(10)

print('Total number of rows: {}'.format(shops.count()))
shops.show(10)

print('Total number of rows: {}'.format(test.count()))
test.show(10)

Total number of rows: 2935849
+----------+--------------+-------+-------+----------+------------+
|      date|date_block_num|shop_id|item_id|item_price|item_cnt_day|
+----------+--------------+-------+-------+----------+------------+
|02.01.2013|             0|     59|  22154|     999.0|         1.0|
|03.01.2013|             0|     25|   2552|     899.0|         1.0|
|05.01.2013|             0|     25|   2552|     899.0|        -1.0|
|06.01.2013|             0|     25|   2554|   1709.05|         1.0|
|15.01.2013|             0|     25|   2555|    1099.0|         1.0|
|10.01.2013|             0|     25|   2564|     349.0|         1.0|
|02.01.2013|             0|     25|   2565|     549.0|         1.0|
|04.01.2013|             0|     25|   2572|     239.0|         1.0|
|11.01.2013|             0|     25|   2572|     299.0|         1.0|
|03.01.2013|             0|     25|   2573|     299.0|         3.0|
+----------+--------------+-------+-------+----------+------------+
only showing top 1

We see that in the training dataset, there are 2935849 transactions, 22170 items, 84 item categories and 60 shops.

# Feature Extraction

## Create the base training dataframe

For every month we create a grid from all shops/items combinations from that month.

In [4]:
transactions.createOrReplaceTempView('transactions')

index_cols = ['shop_id', 'item_id', 'date_block_num']

grid = []
date_block_nums = transactions.select('date_block_num').distinct().toPandas()['date_block_num'].unique()
for block_num in date_block_nums:
    cur_shops = spark.sql("""SELECT DISTINCT shop_id 
                                FROM transactions
                                WHERE date_block_num = {block_num}""".format(block_num = block_num)
                            ).toPandas()['shop_id'].unique()
    cur_items = spark.sql("""SELECT DISTINCT item_id 
                                FROM transactions
                                WHERE date_block_num = {block_num}""".format(block_num = block_num)
                            ).toPandas()['item_id'].unique()
    grid.append(np.array(list(product(*[cur_shops, cur_items, [block_num]])),dtype='int32'))

# Turn the grid into a dataframe
grid = pd.DataFrame(np.vstack(grid), columns = index_cols, dtype=np.int32)
train = spark.createDataFrame(grid)

del grid

## Aggregate sales and revenue by month, item id and shop id.

Add item revenue as a feature to the transactions dataframe.

In [5]:
transactions = transactions.withColumn('item_revenue', transactions.item_cnt_day * transactions.item_price)

transactions.createOrReplaceTempView('transactions')

Aggregate sales.

In [6]:
# Groupby data to get shop-item-month aggregates
gb = transactions.select('item_cnt_day', *index_cols).groupby(index_cols) \
        .sum('item_cnt_day').withColumnRenamed('sum(item_cnt_day)', 'target')
# Join it to the grid
train = train.join(gb, how='left', on=index_cols).fillna(0)

# Same as above but with shop-month aggregates
gb = transactions.select('item_cnt_day', *index_cols).groupby('shop_id', 'date_block_num') \
        .sum('item_cnt_day').withColumnRenamed('sum(item_cnt_day)', 'target_shop')
# Join it to the grid
train = train.join(gb, how='left', on=['shop_id', 'date_block_num']).fillna(0)

# Same as above but with item-month aggregates
gb = transactions.select('item_cnt_day', *index_cols).groupby('item_id', 'date_block_num') \
        .sum('item_cnt_day').withColumnRenamed('sum(item_cnt_day)', 'target_item')
# Join it to the grid
train = train.join(gb, how='left', on=['item_id', 'date_block_num']).fillna(0)

Aggregate revenue.

In [7]:
# Groupby data to get shop-item-month aggregates
gb = transactions.select('item_revenue', *index_cols).groupby(index_cols) \
        .sum('item_revenue').withColumnRenamed('sum(item_revenue)', 'revenue')
# Join it to the grid
train = train.join(gb, how='left', on=index_cols).fillna(0)

# Same as above but with shop-month aggregates
gb = transactions.select('item_revenue', *index_cols).groupby('shop_id', 'date_block_num') \
        .sum('item_revenue').withColumnRenamed('sum(item_revenue)', 'revenue_shop')
# Join it to the grid
train = train.join(gb, how='left', on=['shop_id', 'date_block_num']).fillna(0)

# Same as above but with item-month aggregates
gb = transactions.select('item_revenue', *index_cols).groupby('item_id', 'date_block_num') \
        .sum('item_revenue').withColumnRenamed('sum(item_revenue)', 'revenue_item')
# Join it to the grid
train = train.join(gb, how='left', on=['item_id', 'date_block_num']).fillna(0)

## Normalize numerical features

Min-max scale target_shops and target_items.

In [9]:
# target_shops
stats = train.select(F.max(train.target_shop).alias('max'),
                  F.min(train.target_shop).alias('min')).collect()

mx = stats[0]['max']
mn = stats[0]['min']

train = train.withColumn('normalized_target_shop', (train.target_shop - mn) / (mx - mn))

# target_items
stats = train.select(F.max(train.target_item).alias('max'),
                  F.min(train.target_item).alias('min')).collect()

mx = stats[0]['max']
mn = stats[0]['min']

train = train.withColumn('normalized_target_item', (train.target_item - mn) / (mx - mn))

Do the same for revenue, revenue_shops and revenue_items.

In [10]:
# revenue
stats = train.select(F.max(train.revenue).alias('max'),
                  F.min(train.revenue).alias('min')).collect()

mx = stats[0]['max']
mn = stats[0]['min']

train = train.withColumn('normalized_revenue', (train.revenue - mn) / (mx - mn))

# revenue_shops
stats = train.select(F.max(train.revenue_shop).alias('max'),
                  F.min(train.revenue_shop).alias('min')).collect()

mx = stats[0]['max']
mn = stats[0]['min']

train = train.withColumn('normalized_revenue_shop', (train.revenue_shop - mn) / (mx - mn))

# revenue_items
stats = train.select(F.max(train.revenue_item).alias('max'),
                  F.min(train.revenue_item).alias('min')).collect()

mx = stats[0]['max']
mn = stats[0]['min']

train = train.withColumn('normalized_revenue_item', (train.revenue_item - mn) / (mx - mn))

## Add values from previous months as features

Create new features using lags from [1, 2, 3, 4, 5, 12] months ago.

In [11]:
cols_to_rename = [col for col in train.columns if col not in index_cols]

shift_range = [1, 2, 3, 4, 5, 12]

for month_shift in shift_range:
    train_shift = train.select(*(index_cols + cols_to_rename))
    
    train_shift.withColumn('date_block_num', train_shift.date_block_num + 1)
    
    for col in cols_to_rename:
        train_shift = train_shift.withColumnRenamed(col, '{}_lag_{}'.format(col, month_shift))
        
    train = train.join(train_shift, on=index_cols, how='left').fillna(0)

## Extract text-based features

### Stem the text

Define a stemmer that can handle both Russian and English text using nltk's Snowball Stemmer.

In [None]:
en_stemmer = SnowballStemmer('english')
ru_stemmer = SnowballStemmer('russian')

cyr_regex = regex.compile('\p{Cyrillic}+', regex.UNICODE)
lat_regex = regex.compile('\p{Latin}+', regex.UNICODE)

In [None]:
def clean_text(text):
    """ Removes punctuation from string, unwanted unicode characters, and numbers. Returns in lowercase.
    
    Args:
        text (str): The text to clean.
    
    Returns:
        The cleaned text after filtered by the regex expression and made lowercase.
    
    For more information on the unicode categories used in the regex expression see here:
    https://www.regular-expressions.info/unicode.html#category
    
    >>> clean_text("!$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ Can't, - Trademark™ ...「（Punctuation）」42.32 ?")
    cant trademark punctuation
    
    """
    # remove URLs
    text = re.sub(r"http\S+", "", text)
    # remove apostrophes 
    text = text.replace("'", "")
    
    # Define regex unicode Categories and strip from string
    remove = regex.compile('[\p{C}|\p{M}|\p{P}|\p{S}|\p{Z}|\p{N}]+', regex.UNICODE)
    text = remove.sub(" ", text).strip()
    
    # make lowercase
    text = text.lower()
    
    return text

def stemmer(text):
    """Identify the words written in Cyrillic and Latin characters in a string,
    and apply a Russian or English stemmer, respectively.
    
    Args:
        text(str): The string whose Cyrillic and Latin text will be stemmed.
    
    Returns:
        A stemmed version of the text.
    """
    if text is None:
        return []
    
    text = clean_text(text)

    words = re.split('\s', text)
    stemmed_word_list = []
    for word in words:
        ru = regex.search(cyr_regex, word)
        en = regex.search(lat_regex, word)
        if ru:
            stemmed_word = ru_stemmer.stem(word)
        elif en:
            stemmed_word = en_stemmer.stem(word)
        else:
            stemmed_word = word
        stemmed_word_list.append(stemmed_word)
    
    return stemmed_word_list

Demonstrate function on sample text from the dataset.

In [None]:
text = '(Кино) - Blu-Ray'

stemmer(text)

Apply stemmer to columns containing text.

In [None]:
udf_stemmer = F.udf(stemmer, ArrayType(StringType(), True))

In [None]:
items = items.withColumn('stemmed_item_name', udf_stemmer(items.item_name))
item_categories = item_categories.withColumn('stemmed_item_category_name', udf_stemmer(item_categories.item_category_name))
shops = shops.withColumn('stemmed_shop_name', udf_stemmer(shops.shop_name))

### Vectorize using TF-IDF

In [None]:
cv = CountVectorizer()
idf = IDF()

In [None]:
cv.setInputCol('stemmed_item_name')
cv.setOutputCol('bow_item_name')
cvmodel = cv.fit(items)
items = cvmodel.transform(items)

idf.setInputCol('bow_item_name')
idf.setOutputCol('tfidf_item_name')
idfmodel = idf.fit(items)
items = idfmodel.transform(items)

In [None]:
cv.setInputCol('stemmed_item_category_name')
cv.setOutputCol('bow_item_category_name')
cvmodel = cv.fit(item_categories)
item_categories = cvmodel.transform(item_categories)

idf.setInputCol('bow_item_category_name')
idf.setOutputCol('tfidf_item_category_name')
idfmodel = idf.fit(item_categories)
item_categories = idfmodel.transform(item_categories)

In [None]:
cv.setInputCol('stemmed_shop_name')
cv.setOutputCol('bow_shop_name')
cvmodel = cv.fit(shops)
shops = cvmodel.transform(shops)

idf.setInputCol('bow_shop_name')
idf.setOutputCol('tfidf_shop_name')
idfmodel = idf.fit(shops)
shops = idfmodel.transform(shops)

### Join TFIDF-encoded item/item category/shop names onto the training dataframe

In [None]:
train.createOrReplaceTempView('train')
items.createOrReplaceTempView('items')

train = spark.sql(('SELECT train.*, items.item_category_id, items.tfidf_item_name '
                  ' FROM train '
                  ' LEFT JOIN items '
                  '  ON train.item_id = items.item_id '))

In [None]:
train.createOrReplaceTempView('train')
item_categories.createOrReplaceTempView('item_categories')

train = spark.sql(('SELECT train.*, item_categories.tfidf_item_category_name '
                  ' FROM train '
                  ' LEFT JOIN item_categories '
                  '  ON train.item_category_id = item_categories.item_category_id '))

In [None]:
train.createOrReplaceTempView('train')
shops.createOrReplaceTempView('shops')

train = spark.sql(('SELECT train.*, shops.tfidf_shop_name '
                  ' FROM train '
                  ' LEFT JOIN shops '
                  '  ON train.shop_id = shops.shop_id '))

## Display the resulting dataframe

In [12]:
train.limit(10).toPandas()

Unnamed: 0,shop_id,item_id,date_block_num,target,target_shop,target_item,revenue,revenue_shop,revenue_item,normalized_target_shop,...,target_shop_lag_12,target_item_lag_12,revenue_lag_12,revenue_shop_lag_12,revenue_item_lag_12,normalized_target_shop_lag_12,normalized_target_item_lag_12,normalized_revenue_lag_12,normalized_revenue_shop_lag_12,normalized_revenue_item_lag_12
0,0,491,1,1.0,6127.0,33.0,361.0,3670958.0,19320.6,0.375513,...,6127.0,33.0,361.0,3670958.0,19320.6,0.375513,0.004372,0.005422,0.234091,0.001511
1,0,1119,0,1.0,5578.0,6.0,273.0,2966412.0,2268.0,0.341871,...,5578.0,6.0,273.0,2966412.0,2268.0,0.341871,0.002226,0.005408,0.189261,0.001145
2,0,1223,1,0.0,6127.0,2.0,0.0,3670958.0,698.0,0.375513,...,6127.0,2.0,0.0,3670958.0,698.0,0.375513,0.001908,0.005365,0.234091,0.001111
3,0,1409,1,1.0,6127.0,10.0,1088.0,3670958.0,13678.0,0.375513,...,6127.0,10.0,1088.0,3670958.0,13678.0,0.375513,0.002544,0.005537,0.234091,0.00139
4,0,2441,1,4.0,6127.0,98.0,3580.0,3670958.0,78260.27,0.375513,...,6127.0,98.0,3580.0,3670958.0,78260.27,0.375513,0.00954,0.005933,0.234091,0.002779
5,0,2822,1,1.0,6127.0,12.0,246.0,3670958.0,4635.0,0.375513,...,6127.0,12.0,246.0,3670958.0,4635.0,0.375513,0.002703,0.005404,0.234091,0.001196
6,0,3007,0,4.0,5578.0,122.0,4512.0,2966412.0,210436.0,0.341871,...,5578.0,122.0,4512.0,2966412.0,210436.0,0.341871,0.011448,0.006081,0.189261,0.00562
7,0,3528,0,1.0,5578.0,22.0,964.0,2966412.0,36454.0,0.341871,...,5578.0,22.0,964.0,2966412.0,36454.0,0.341871,0.003498,0.005518,0.189261,0.00188
8,0,4328,0,2.0,5578.0,11.0,154.0,2966412.0,1495.0,0.341871,...,5578.0,11.0,154.0,2966412.0,1495.0,0.341871,0.002623,0.005389,0.189261,0.001128
9,0,4572,0,0.0,5578.0,4.0,0.0,2966412.0,1196.0,0.341871,...,5578.0,4.0,0.0,2966412.0,1196.0,0.341871,0.002067,0.005365,0.189261,0.001122
