<a href="https://colab.research.google.com/github/fabrizioaymone/h-and-m-challenge/blob/main/LGBMSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import os, sys, shutil
drive.mount("/content/drive")
sys.path.append("/content/drive/MyDrive/env")
os.makedirs('/root/.kaggle')
shutil.copyfile('/content/drive/MyDrive/kaggle.json', '/root/.kaggle/kaggle.json')
if not os.path.exists("/content/datasets"):
  os.symlink("/content/drive/MyDrive/datasets/H&M","/content/datasets")
!chmod 777 -R /content/drive/MyDrive/env/pyspark

Mounted at /content/drive


In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LGBMSpark").config('MaxPartitionBytes', '5000000').getOrCreate()
spark.conf.set('spark.sql.adaptive.enabled', True)
spark.conf.set('spark.sql.adaptive.skewJoin.enabled', True)

In [3]:
spark

In [None]:
customers = spark.read.csv("/content/datasets/customers.csv", header=True)
articles = spark.read.csv("/content/datasets/articles.csv", header=True)
transactions = spark.read.csv("/content/datasets/transactions_train.csv", header=True)

DATA CLEANING

In [None]:
from pyspark.sql import functions as F 

customers.select([(F.count(F.when(F.col(c).isNull() | F.isnan(c), c))/customers.count()).alias(c) for c in customers.columns]).show()

+-----------+------------------+------------------+-------------------+----------------------+--------------------+-----------+
|customer_id|                FN|            Active| club_member_status|fashion_news_frequency|                 age|postal_code|
+-----------+------------------+------------------+-------------------+----------------------+--------------------+-----------+
|        0.0|0.6523783145526902|0.6615081852505138|0.00441843175556495|  0.011668537442236768|0.011560664149623172|        0.0|
+-----------+------------------+------------------+-------------------+----------------------+--------------------+-----------+



In [None]:
customers = customers.drop('FN', 'Active')

In [None]:
customers.groupBy('club_member_status').count().sort('count').show()

+------------------+-------+
|club_member_status|  count|
+------------------+-------+
|         LEFT CLUB|    467|
|              null|   6062|
|        PRE-CREATE|  92960|
|            ACTIVE|1272491|
+------------------+-------+



In [None]:
customers = customers.fillna('LEFT CLUB', ['club_member_status'])

In [None]:
customers.groupBy('fashion_news_frequency').count().sort('count').show()

+----------------------+------+
|fashion_news_frequency| count|
+----------------------+------+
|                  None|     2|
|               Monthly|   842|
|                  null| 16009|
|             Regularly|477416|
|                  NONE|877711|
+----------------------+------+



In [None]:
customers = customers.fillna('NONE', ['fashion_news_frequency'])\
            .withColumn('fashion_news_frequency', F.when(F.col('fashion_news_frequency')=='None', 'NONE').otherwise(F.col('fashion_news_frequency')))

In [None]:
customers.select(F.count(F.when(customers.age.isNull(), 1))).show()

+-----------------------------------------+
|count(CASE WHEN (age IS NULL) THEN 1 END)|
+-----------------------------------------+
|                                    15861|
+-----------------------------------------+



In [None]:
customers = customers.fillna('25', ['age'])

In [None]:
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- club_member_status: string (nullable = false)
 |-- fashion_news_frequency: string (nullable = false)
 |-- age: string (nullable = false)
 |-- postal_code: string (nullable = true)



In [None]:
customers = customers.withColumn('age', F.col('age').astype('Integer'))

In [None]:
articles.printSchema()

root
 |-- article_id: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- prod_name: string (nullable = true)
 |-- product_type_no: string (nullable = true)
 |-- product_type_name: string (nullable = true)
 |-- product_group_name: string (nullable = true)
 |-- graphical_appearance_no: string (nullable = true)
 |-- graphical_appearance_name: string (nullable = true)
 |-- colour_group_code: string (nullable = true)
 |-- colour_group_name: string (nullable = true)
 |-- perceived_colour_value_id: string (nullable = true)
 |-- perceived_colour_value_name: string (nullable = true)
 |-- perceived_colour_master_id: string (nullable = true)
 |-- perceived_colour_master_name: string (nullable = true)
 |-- department_no: string (nullable = true)
 |-- department_name: string (nullable = true)
 |-- index_code: string (nullable = true)
 |-- index_name: string (nullable = true)
 |-- index_group_no: string (nullable = true)
 |-- index_group_name: string (nullable = true)
 |-- sec

In [None]:
articles = articles\
            .selectExpr('cast (article_id as int) article_id', 'cast (product_type_no as int) product_type_no', 'cast (graphical_appearance_no as int) graphical_appearance_no',
                'cast (colour_group_code as int) colour_group_code ','cast (perceived_colour_value_id as int) perceived_colour_value_id',
                'cast (department_no as int) department_no',  'cast (index_group_no as int) index_group_no',
                'cast (section_no as int) section_no', 'cast (garment_group_no as int) garment_group_no')

In [None]:
transactions.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [None]:
transactions  = transactions.drop('price', 'sales_channel_id')\
                .withColumn('t_dat', F.col('t_dat').astype('timestamp'))

#Feature engineering

In [None]:
transactions.select(F.max(transactions.t_dat)).show()

+-------------------+
|         max(t_dat)|
+-------------------+
|2020-09-22 00:00:00|
+-------------------+



In [None]:
transactions = transactions.withColumn('week', (F.datediff(F.to_date(F.lit('2020-09-22')), F.col('t_dat'))/7).cast('int'))\
                .withColumn('y', F.lit(1))\
                .filter(F.col('week')<=25)

In [None]:
transactions.show()

+-------------------+--------------------+----------+----+---+
|              t_dat|         customer_id|article_id|week|  y|
+-------------------+--------------------+----------+----+---+
|2020-03-25 00:00:00|000e17cb73012035b...|0739363001|  25|  1|
|2020-03-25 00:00:00|00271f14209e94d09...|0448509014|  25|  1|
|2020-03-25 00:00:00|00271f14209e94d09...|0825782001|  25|  1|
|2020-03-25 00:00:00|002affa1fc4d40114...|0224606019|  25|  1|
|2020-03-25 00:00:00|0032d1f1d796a9470...|0594987002|  25|  1|
|2020-03-25 00:00:00|0032d1f1d796a9470...|0719655001|  25|  1|
|2020-03-25 00:00:00|0032d1f1d796a9470...|0603145001|  25|  1|
|2020-03-25 00:00:00|0032d1f1d796a9470...|0598515022|  25|  1|
|2020-03-25 00:00:00|00421861a907eb361...|0808650001|  25|  1|
|2020-03-25 00:00:00|00421861a907eb361...|0664074056|  25|  1|
|2020-03-25 00:00:00|00421861a907eb361...|0870468001|  25|  1|
|2020-03-25 00:00:00|00471e8bcab33fac6...|0843872003|  25|  1|
|2020-03-25 00:00:00|00471e8bcab33fac6...|0843873004|  

In [None]:
from pyspark.sql import Window

articles_rank = transactions.groupBy(['article_id', 'week']).count()

top12 = articles_rank.withColumn('rank', F.rank().over(Window.partitionBy('week').orderBy(F.col('count').desc())))\
        .filter(F.col('rank')<=12)
#top12.show()

In [None]:
import numpy as np

top12_arr = np.array(top12.filter(F.col('week')==1).orderBy('rank').select('article_id').collect())

In [None]:
top12_0 = top12.filter(F.col('week')==1).crossJoin(customers.select('customer_id'))

In [None]:
top12 = top12.withColumn('week', F.col('week'))\
        .join(transactions.select('customer_id', 'week').dropDuplicates(), 'week')\
        .drop('t_dat')\
        .withColumn('week', F.col('week')-1)

In [None]:
transactions = transactions.withColumn('week_ref', F.dense_rank().over(Window.partitionBy('customer_id').orderBy(F.col('week').asc())))

#transactions.filter(F.col('customer_id')==transactions.take(1)[0].customer_id).show()


In [None]:
week0 = customers.select('customer_id')\
          .withColumn('week', F.lit('0'))
#week0.show()

In [None]:
lb_purchase = transactions.drop('y', 'article_id','t_dat', 'week_ref')\
              .union(week0)\
              .dropDuplicates()\
              .withColumn('week_ref', F.dense_rank().over(Window.partitionBy('customer_id').orderBy(F.col('week').asc())))\
              .withColumnRenamed('week', 'new_week')

week0.unpersist()

#lb_purchase.show() 

DataFrame[customer_id: string, week: string]

In [None]:
transactions_lw = transactions.join(lb_purchase, on=['customer_id', 'week_ref'], how='inner')\
                  .drop('week')\
                  .withColumnRenamed('new_week', 'week')\
                  .drop('week_ref', 't_dat', 'y')

lb_purchase.unpersist()

transactions = transactions.drop('week_ref', 't_dat')

#transactions_lw.filter(F.col('customer_id')==transactions_lw.take(1)[0].customer_id).show()

#ASSEMBLING FINAL TRAIN

In [None]:
top12.show()
transactions_lw.show()
transactions.show()

+----+----------+-----+----+--------------------+
|week|article_id|count|rank|         customer_id|
+----+----------+-----+----+--------------------+
|   0|0909370001| 1283|   1|00125440be6cd148c...|
|   0|0909370001| 1283|   1|04d2413f2b9fb52e4...|
|   0|0909370001| 1283|   1|066f40f3e7c5da52a...|
|   0|0909370001| 1283|   1|07d887f7ab97358cc...|
|   0|0909370001| 1283|   1|134ae4ea61e2f2f67...|
|   0|0909370001| 1283|   1|146c194a3b0e61bfd...|
|   0|0909370001| 1283|   1|181946657503995de...|
|   0|0909370001| 1283|   1|1dbb69025a7a5d33b...|
|   0|0909370001| 1283|   1|1e685f6a9e07d635c...|
|   0|0909370001| 1283|   1|1fe99816833e5f925...|
|   0|0909370001| 1283|   1|24509459b7fc78f7d...|
|   0|0909370001| 1283|   1|2cc286ec937f0b954...|
|   0|0909370001| 1283|   1|341edbd7f022062e6...|
|   0|0909370001| 1283|   1|37579f709fae1d910...|
|   0|0909370001| 1283|   1|39a03679843674850...|
|   0|0909370001| 1283|   1|3b077b6c7ae6bccc4...|
|   0|0909370001| 1283|   1|3d72481f66fe77270...|


In [None]:
train = transactions.join(F.broadcast(transactions_lw), on = ['customer_id', 'article_id', 'week'], how='outer')\

transactions.unpersist()
transactions_lw.unpersist()

DataFrame[customer_id: string, article_id: string, week: string]

In [None]:
train = train.join(top12, on = ['customer_id', 'article_id', 'week'], how='outer')\

top12.unpersist()

DataFrame[week: int, article_id: string, count: bigint, rank: int, customer_id: string]

In [None]:
top12_0 = top12_0.select('customer_id', 'article_id', 'week', 'count', 'rank')
train = train.select('customer_id', 'article_id', 'week', 'count', 'rank', 'y')

In [None]:
test = train.filter(F.col('week')==0).drop('y')\
       .union(top12_0)

top12_0.unpersists()

In [None]:
import numpy as np
train = train.filter(F.col('week')!=0)\
        .dropDuplicates()\
        .fillna({'rank':999, 'count':0, 'y':0})\
        .join(customers, 'customer_id')\
        .join(articles, 'article_id')\
        .orderBy(['week', 'customer_id'])

train = train.filter(F.col('week')<=6)

qids = np.array(train.groupBy(['week', 'customer_id']).count().select('count').collect())
np.save('/content/datasets/qids.numpy', qids)

In [None]:
test = test.fillna({'rank':999, 'count':0})\
        .dropDuplicates()\
        .join(customers, 'customer_id')\
        .join(articles, 'article_id')       


In [None]:
train.repartition(1).write.parquet('/content/datasets/train.parquet')
test.repartition(1).write.parquet('/content/datasets/test.parquet')

In [None]:
import gc

train.unpersist()
test.unpersist()
gc.collect()

674

#MODEL TRAINING

In [None]:
import pandas as pd

train = pd.read_parquet('/content/datasets/train.parquet/')
test = pd.read_parquet('/content/datasets/test.parquet/')

In [None]:
import numpy as np

#train = train.orderBy(['week', 'customer_id'])
qids = train.groupby(['week', 'customer_id'])['article_id'].count().to_numpy()
#np.array(train.groupBy(['week', 'customer_id']).count().select('count').collect())
#np.save('/content/datasets/qids.numpy', qids)
qids

array([12, 12, 12, ...,  9,  2,  4])

In [None]:
train.count()

article_id                   8728387
customer_id                  8728387
week                         8728387
count                        8728387
rank                         8728387
y                            8728387
club_member_status           8728387
fashion_news_frequency       8728387
age                          8728387
postal_code                  8728387
product_type_no              8728387
graphical_appearance_no      8728387
colour_group_code            8728387
perceived_colour_value_id    8728387
department_no                8728387
index_group_no               8728387
section_no                   8728387
garment_group_no             8728387
dtype: int64

In [None]:
test.count()

article_id                   19789216
customer_id                  19789216
week                         19789216
count                        19789216
rank                         19789216
club_member_status           19789216
fashion_news_frequency       19789216
age                          19789216
postal_code                  19789216
product_type_no              19789216
graphical_appearance_no      19789216
colour_group_code            19789216
perceived_colour_value_id    19789216
department_no                19789216
index_group_no               19789216
section_no                   19789216
garment_group_no             19789216
dtype: int64

In [None]:
X_train = train.drop(['y', 'customer_id', 'week', 'postal_code'], axis=1).astype({'article_id':int, 'club_member_status': 'category', 'fashion_news_frequency': 'category'})
train = train.drop(['week', 'postal_code'], axis=1)
y_train = train[['y']]

In [None]:
X_test = test.drop(['customer_id', 'postal_code', 'week'], axis=1).astype({'article_id':int, 'club_member_status': 'category', 'fashion_news_frequency': 'category'})
test = test.drop(['week', 'postal_code'], axis=1)

In [None]:
import lightgbm as lgb

model = lgb.LGBMRanker(
    objective="lambdarank",
    metric="ndcg",
    boosting_type="dart",
    n_estimators=100,
    importance_type='gain',
    verbose=10,
    random_state = 17)

In [None]:
model.fit(X=X_train, y=y_train, group=qids)

LGBMRanker(boosting_type='dart', importance_type='gain', metric='ndcg',
           objective='lambdarank', random_state=17, verbose=10)

In [None]:
submission = pd.read_csv('/content/datasets/sample_submission.csv')

In [None]:
test['prediction'] = model.predict(X_test)

In [None]:
preds = test.sort_values('prediction', ascending=False).groupby('customer_id')['article_id'].unique().to_dict()

In [None]:
submission.head()

Unnamed: 0,customer_id,prediction
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,0706016001 0706016002 0372860001 0610776002 07...
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,0706016001 0706016002 0372860001 0610776002 07...
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,0706016001 0706016002 0372860001 0610776002 07...
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,0706016001 0706016002 0372860001 0610776002 07...
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,0706016001 0706016002 0372860001 0610776002 07...


In [None]:
top12_arr = list(top12_arr.flatten().astype(str))

In [None]:
pred = []
for c_id in submission.customer_id:
  p = preds.get(c_id, [])
  p = list(p) + list(top12_arr)
  idx = np.unique(p, return_index=True)[1]
  p = [p[i] for i in sorted(idx)]
  pred.append(p[:12])

In [None]:
pred = [' '.join(i) for i in pred]

In [None]:
submission.prediction = pred

In [None]:
submission.head()

Unnamed: 0,customer_id,prediction
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,0568601043 0865799006 0909370001 0918522001 09...
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,0826211002 0865799006 0909370001 0924243001 09...
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,0794321007 0865799006 0909370001 0918522001 09...
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,0909370001 0865799006 0924243001 0918522001 07...
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,0927530004 0896152002 0791587015 0730683050 08...


In [None]:
submission.to_csv('/content/submission.csv', index=False)

In [None]:
!kaggle competitions submit -c h-and-m-personalized-fashion-recommendations -f submission.csv -m "ok"

100% 258M/258M [00:05<00:00, 48.4MB/s]
Successfully submitted to H&M Personalized Fashion Recommendations