In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## EDA of the Data with PySpark SQL Library 
## Recommendations and Machine Learning using Pyspark ML

<center><h3>The Data is fairly big 3.50 GB and will need Faster Computation by having it in the memory</center>

In [2]:
# Installing PySpark
!pip install pyspark

In [3]:
# Some other Dependencies
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import cv2
import os 
import re
import json
from os import listdir
from os.path import isfile, join


In [4]:
# Reading in csv to do some EDA / Training with them 
articles_df=pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/articles.csv')
customers_df=pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/customers.csv')
transactions_df=pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv')


In [5]:
# Checking the Dataset in the Tabular Form
print(articles_df.head())


In [6]:
print(customers_df.head())


In [7]:
print(transactions_df.head(10))

In [8]:
# Creating a Spark Session
import pyspark
from pyspark.sql import SparkSession
sc=SparkSession.builder.appName('Recommender-Systesm').config("spark.sql.files.maxPartitionBytes",10000000).getOrCreate()
spark=SparkSession(sc)

In [9]:
# Importing the data into the Spark DataFrames to do the operations
# articles=spark.read.format("csv").load('../input/h-and-m-personalized-fashion-recommendations/articles.csv')
# customers=spark.read.format("csv").load('../input/h-and-m-personalized-fashion-recommendations/customers.csv')
# transactions=spark.read.format("csv").load('../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv')
# Read with options as we need the Header names as columns
transactions = spark.read.option('header','true').csv('../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv')
articles = spark.read.option('header','true').csv('../input/h-and-m-personalized-fashion-recommendations/articles.csv')
customers = spark.read.option('header','true').csv('../input/h-and-m-personalized-fashion-recommendations/customers.csv')
# df.printSchema()
articles.printSchema()
customers.printSchema()
transactions.printSchema()
# Here all the schemas of them will be printed

In [10]:
# Checking the data
transactions.show(5)

In [11]:
articles.show(5)

In [12]:
customers.show(5)

In [13]:
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

## Create them as Spark Tables

Registers this DataFrame as a temporary table using the given name.

The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

This tables will be useful as we will be able to perform SQL Like Queries on them with ease

In [14]:
transactions.createOrReplaceTempView('transactions')
articles.createOrReplaceTempView('articles')
customers.createOrReplaceTempView('customers')

## Starting off with a Simple SQL Query of Selecting everything from the Table



In [15]:
query_1 = spark.sql('''
select * 
from transactions
limit 100
''')

query_1.show(10)

In [16]:
query_2 = spark.sql('''
select * 
from customers
limit 100
''')

query_2.show(10)

In [17]:
query_3 = spark.sql('''
select * 
from customers
limit 100
''')

query_3.show(10)

## Aggregating to get some statistics 

In [20]:
# Some statistics from the Transaction Tables
query_4= spark.sql('''
select
  customer_id
  , collect_list(article_id) as article_id_list
  , avg(price) as avg_price
  , max(price) as max_price
  , min(price) as min_price
from transactions
group by customer_id
limit 100
''')

query_4.show(10)

## Getting the Number of Customers below the Age of 25

In [21]:
query_5=spark.sql('''
select count(customer_id)
from 
customers 
where (customers.age<25) 
''')
query_5.show(10)

## List of Customers that  are Active Club Members

In [22]:
query_6=spark.sql('''
select count(customer_id)
from customers
where customers.club_member_status=='ACTIVE'
''')
query_6.show()

## Count of Customers that Follow the Fashion News Regularly

In [23]:
query_7=spark.sql('''
select count(customer_id)
from customers
where customers.fashion_news_frequency=='Regularly'
''')
query_7.show()

## Key Customers - Customers that are Active , Follow Fashion News and club members

In [24]:
query_8=spark.sql('''
select customer_id 
from customers 
where
customers.Active=1.0 AND
customers.fashion_news_frequency=='Regularly' AND
customers.club_member_status=='ACTIVE'

''')
query_8.show()

# Queries on Articles Table 

In [25]:
# Printing the Dataframe to get an Idea  of the Columns
articles_df.head()

## Get a List of Product Names

In [26]:
query_9=spark.sql('''
select DISTINCT prod_name
from articles 

''')
query_9.show(50)

## Get a List of Type of Products

In [27]:
query_10=spark.sql('''
select DISTINCT product_type_name
from articles
''')
query_10.show(100)

## Get a List of Index Names

In [28]:
query_11=spark.sql('''
select DISTINCT index_name
from articles
''')
query_11.show(20)
# Here we can see these are the Broad 5 Categories

## Get a List of Index group Names

In [29]:
query_12=spark.sql('''
select DISTINCT index_group_name
from articles
''')
query_12.show(20)
# Here we can see these are the Broad 5 Categories

## Find Which Index Items are Maximum Present

In [30]:
query_13=spark.sql('''
select count(article_id)
from
articles
GROUP BY index_group_name
''')
query_13.show()
# This shows which Index Group Name has what Number of Items Present in it

## Sort Which Index has the Most Number of Items

In [31]:
query_14=spark.sql('''
select index_name, count (article_id)
from 
articles
GROUP BY index_name
''')
query_14.show(10)

## Get the Number of Each Products Present

In [32]:
# Get a List of Products that are Most in Number
query_15=spark.sql('''
select count(prod_name),prod_name
from 
articles
GROUP BY prod_name
ORDER BY count(prod_name)
DESC
''')
query_15.show()

## Query to Find out which colour Products are present most in the Inventory

In [33]:
query_16=spark.sql('''
select count(colour_group_name), colour_group_name
from
articles
GROUP BY colour_group_name
ORDER BY count(colour_group_name)
DESC
''')
query_16.show(20)
# From this we can know which colours are present in what quantity in the Inventory

# Queries on the Transaction Table + Joins to Perform Complex Queries

### Printing the Transaction Table to familiarize  with the Features

In [34]:
transactions_df.head()

## Getting the Count of Number of Transactions through various Sales Id

In [35]:
query_17=spark.sql('''
select count(sales_channel_id), sales_channel_id
from transactions
GROUP BY sales_channel_id
ORDER BY count(sales_channel_id)
DESC
''')
query_17.show()
# There are only two sales channels and these are the counts corresponding to the sales via each of em

## Get the List of Customers who have done most Number of Purchases

In [36]:
query_18=spark.sql('''
select customer_id , count(customer_id)
from transactions 
GROUP BY customer_id
ORDER BY count(customer_id) DESC

''')
query_18.show()
# These are the customers who have purchased the Maximum amount of Items

## Get the List of Customers who have done Maximum Amount worth of Purchases

In [37]:
query_20=spark.sql('''
select customer_id, sum(price)
from 
transactions
GROUP BY transactions.customer_id
ORDER BY (SUM(price)) 
DESC

''')
query_20.show()
# These are the customers that have done the most Amount worth of purchasing from the Store 

## What are the Items that are Most sold by H&M

In [48]:
query_21=spark.sql('''
select
*
from
transactions as t
inner join articles as ar on ar.article_id = t.article_id
''')
query_21.show(9)

In [49]:
merged = spark.sql('''
select
  * 
from transactions as tt
left join customers as cs on cs.customer_id = tt.customer_id
left join articles as ar on ar.article_id = tt.article_id
limit 100
''')

print(merged.count())
print(merged.columns)


df_merged = merged.toPandas()

print(df_merged.shape)
df_merged.head()


## Building an ALS Recommender with Pyspark

Collaborative filtering: Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users. Consider example if a person A likes item 1, 2, 3 and B like 2,3,4 then they have similar interests and A should like item 4 and B should like item 1.

Alternating least square(ALS) matrix factorization: The idea is basically to take a large (or potentially huge) matrix and factor it into some smaller representation of the original matrix through alternating least squares. We end up with two or more lower dimensional matrices whose product equals the original one.ALS comes inbuilt in Apache Spark.

In [66]:
# Importing some dependencies 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [52]:
#  Reading them in PySpark DataFrames
transactions = spark.read.option('header','true').csv('../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv')
articles = spark.read.option('header','true').csv('../input/h-and-m-personalized-fashion-recommendations/articles.csv')
customers = spark.read.option('header','true').csv('../input/h-and-m-personalized-fashion-recommendations/customers.csv')

In [54]:
min_date, max_date = transactions.select(min("t_dat"), max("t_dat")).first()
min_date, max_date

In [57]:
# Preparing the Datetime dataset for the Recommender System
hm =  transactions.withColumn('t_dat', transactions['t_dat'].cast('string'))
hm = hm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
hm = hm.withColumn('year', year(col('date')))
hm = hm.withColumn('month', month(col('date')))
hm = hm.withColumn('day', date_format(col('date'), "d"))

hm = hm[hm['year'] == 2020]
hm = hm[hm['month'] == 9]
hm = hm[hm['day'] == 22]
transactions.unpersist()

# Prepare the dataset
hm = hm.groupby('customer_id', 'article_id').count()
hm.show(5)

In [58]:
# Count the total number of article count in the dataset
numerator = hm.select("count").count()

# Count the number of distinct customerid and distinct articleid
num_users = hm.select("customer_id").distinct().count()
num_articles = hm.select("article_id").distinct().count()

# Set the denominator equal to the number of customer multiplied by the number of articles
denominator = num_users * num_articles

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("Sparsity: ", "%.2f" % sparsity + "%.")

In [64]:
userId_count = hm.groupBy("customer_id").count().orderBy('count', ascending=False)
userId_count.show()

In [65]:
articleId_count = hm.groupBy("article_id").count().orderBy('count', ascending=False)
articleId_count.show()

In [67]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(hm.columns)-set(['count'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(hm).transform(hm)
transformed.show()

In [68]:
# Preparing the Dataset for Training and Test
(training,test)=transformed.randomSplit([0.8, 0.2])

In [69]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

In [70]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="count",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)

In [71]:
user_recs=model.recommendForAllItems(10).show(10)

In [72]:
item_recs=model.recommendForAllUsers(10).show(10)

In [73]:
%%time
userRecsDf = model.recommendForAllUsers(10).cache()
userRecsDf.count()

In [74]:
userRecsDf.select("customer_id_index","recommendations.article_id_index").show(10,False)

## Bring in the Name of the Article Id to recommmend the Article Detail