### https://medium.com/@patelneha1495/recommendation-system-in-python-using-als-algorithm-and-apache-spark-27aca08eaab3

In [1]:
spark

In [2]:
sc

In [3]:
df = spark.read.csv("hdfs://devenv/kaggle_file/ratings_COMMODITY_DESC.csv",header=True,
                   schema="COMMODITY_DESC string, household_key string, QUANTITY Integer,\
                   BASKET_ID string, rating float, rating_label Integer") 

In [4]:
df.show(10,truncate=False)

+------------------------------+-------------+--------+---------+------+------------+
|COMMODITY_DESC                |household_key|QUANTITY|BASKET_ID|rating|rating_label|
+------------------------------+-------------+--------+---------+------+------------+
|YOGURT                        |2500         |118     |100      |1.18  |5           |
|WATER - CARBONATED/FLVRD DRINK|2500         |8       |100      |0.08  |1           |
|WAREHOUSE SNACKS              |2500         |31      |100      |0.31  |3           |
|VITAMINS                      |2500         |6       |100      |0.06  |1           |
|VEGETABLES SALAD              |2500         |1       |100      |0.01  |1           |
|VEGETABLES - SHELF STABLE     |2500         |75      |100      |0.75  |4           |
|VEGETABLES - ALL OTHERS       |2500         |7       |100      |0.07  |1           |
|VALUE ADDED FRUIT             |2500         |2       |100      |0.02  |1           |
|VALENTINE                     |2500         |3       

In [5]:
df_new = df.select(df['COMMODITY_DESC'],df['household_key'],df['rating_label'])

In [6]:
df_new.show()

+--------------------+-------------+------------+
|      COMMODITY_DESC|household_key|rating_label|
+--------------------+-------------+------------+
|              YOGURT|         2500|           5|
|WATER - CARBONATE...|         2500|           1|
|    WAREHOUSE SNACKS|         2500|           3|
|            VITAMINS|         2500|           1|
|    VEGETABLES SALAD|         2500|           1|
|VEGETABLES - SHEL...|         2500|           4|
|VEGETABLES - ALL ...|         2500|           1|
|   VALUE ADDED FRUIT|         2500|           1|
|           VALENTINE|         2500|           1|
|      TROPICAL FRUIT|         2500|           2|
|            TOMATOES|         2500|           1|
|             TICKETS|         2500|           1|
|                TEAS|         2500|           1|
|     SYRUPS/TOPPINGS|         2500|           1|
|    SUGARS/SWEETNERS|         2500|           1|
|         STONE FRUIT|         2500|           1|
|STATIONERY & SCHO...|         2500|           1|


### Importing important modules

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

### Converting String to index 
- Before making an ALS model it needs to be clear that ALS only accepts integer value as parameters. Hence we need to convert asin and reviewerID column in index form.
- StringIndexer 
 - MLlib method
 - A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values.

In [8]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# encoding
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df_new.columns)-set(['rating_label']))]

# encoding 簡單一點的寫法
#indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['COMMODITY_DESC','household_key']]


# make pipeline
pipeline = Pipeline(stages=indexer)

# fit and transform
transformed = pipeline.fit(df_new).transform(df_new)

transformed.show()

+--------------------+-------------+------------+--------------------+-------------------+
|      COMMODITY_DESC|household_key|rating_label|COMMODITY_DESC_index|household_key_index|
+--------------------+-------------+------------+--------------------+-------------------+
|              YOGURT|         2500|           5|                52.0|              234.0|
|WATER - CARBONATE...|         2500|           1|                34.0|              234.0|
|    WAREHOUSE SNACKS|         2500|           3|                81.0|              234.0|
|            VITAMINS|         2500|           1|               162.0|              234.0|
|    VEGETABLES SALAD|         2500|           1|                65.0|              234.0|
|VEGETABLES - SHEL...|         2500|           4|                15.0|              234.0|
|VEGETABLES - ALL ...|         2500|           1|                35.0|              234.0|
|   VALUE ADDED FRUIT|         2500|           1|               104.0|              234.0|

### Creating training and test data

In [9]:
(training,test)=transformed.randomSplit([0.8, 0.2])

### Creating ALS model and fitting data
- rating_label type must be numeric

### https://www.twblogs.net/a/5c2917d6bd9eee01606d2f58
### https://codertw.com/%E7%A8%8B%E5%BC%8F%E8%AA%9E%E8%A8%80/563826/
### https://medium.com/@patelneha1495/recommendation-system-in-python-using-als-algorithm-and-apache-spark-27aca08eaab3

In [10]:
als=ALS(maxIter=5, # 算法迭代次數 maximum number of iterations
        regParam=0.09, #正則項權重
        rank=25, #模型中隱藏因子數目
        userCol="household_key_index", #column name for user ids. Ids must be (or can be coerced into) integers.
        itemCol="COMMODITY_DESC_index", #column name for item ids. Ids must be (or can be coerced into) integers.
        ratingCol="rating_label", #column name for ratings
        coldStartStrategy="drop", #drop any rows in the DataFrame of predictions that contain NaN values.
        nonnegative=True) #商品推薦分數是否是非負的


model=als.fit(training)

### Generate predictions and evaluate rmse

In [11]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating_label",predictionCol="prediction")

predictions=model.transform(test)

rmse=evaluator.evaluate(predictions) # Evaluates the output with optional parameters.
print("RMSE="+str(rmse))

RMSE=0.6552114646946966


In [12]:
predictions.toPandas()

Unnamed: 0,COMMODITY_DESC,household_key,rating_label,COMMODITY_DESC_index,household_key_index,prediction
0,PIES,712,1,148.0,148.0,1.010163
1,PIES,1074,1,148.0,471.0,1.103986
2,PIES,108,1,148.0,496.0,1.051257
3,PIES,1887,1,148.0,2142.0,0.872743
4,PIES,2186,1,148.0,392.0,0.986116
...,...,...,...,...,...,...
56206,SOAP - LIQUID & BAR,2386,1,89.0,1768.0,0.897220
56207,SOAP - LIQUID & BAR,987,1,89.0,755.0,0.964064
56208,SOAP - LIQUID & BAR,2329,1,89.0,656.0,0.964178
56209,SOAP - LIQUID & BAR,1263,1,89.0,521.0,1.036623


### Providing Recommendations

In [13]:
# Generate top 20 recommendations for each user

user_recs=model.recommendForAllUsers(20).show(10)

+-------------------+--------------------+
|household_key_index|     recommendations|
+-------------------+--------------------+
|               1580|[[1, 4.4278526], ...|
|                471|[[1, 3.5909238], ...|
|               1591|[[2, 2.901849], [...|
|               1342|[[1, 4.235496], [...|
|               2122|[[1, 2.9703712], ...|
|               2142|[[0, 2.4845362], ...|
|                463|[[1, 3.4036214], ...|
|                833|[[1, 2.844586], [...|
|               1645|[[1, 2.9448593], ...|
|                496|[[52, 3.2631984],...|
+-------------------+--------------------+
only showing top 10 rows



### Converting back to string form

In [14]:
import pandas as pd

# Generate top 10 recommendations for each user
recs=model.recommendForAllUsers(10).toPandas()


nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['household_key_index'], value_name = "recommendation") \
            .drop("variable", axis = 1) \
            .dropna()
nrecs=nrecs.sort_values('household_key_index')

nrecs=pd.concat([nrecs['recommendation'].apply(pd.Series), nrecs['household_key_index']], axis = 1)
nrecs.columns = [
        
        'ProductID_index',
        'Rating',
        'UserID_index']

# raw data
md=transformed.select(transformed['household_key'],transformed['household_key_index'],transformed['COMMODITY_DESC'],transformed['COMMODITY_DESC_index'])
md=md.toPandas()


dict1 =dict(zip(md['household_key_index'],md['household_key']))
dict2=dict(zip(md['COMMODITY_DESC_index'],md['COMMODITY_DESC']))


nrecs['household_key']=nrecs['UserID_index'].map(dict1)
nrecs['ProductID']=nrecs['ProductID_index'].map(dict2)
nrecs=nrecs.sort_values('household_key')
nrecs.reset_index(drop=True, inplace=True)


new=nrecs[['household_key','ProductID','Rating']]
#new['recommendations'] = list(zip(new.ProductID, new.Rating))
new['recommendations'] = list(new.ProductID)


res=new[['household_key','recommendations']]  
res_new=res['recommendations'].groupby([res.household_key]).apply(list).reset_index()
print(res_new)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


     household_key                                    recommendations
0                1  [YOGURT, CHEESE, FRZN MEAT/MEAT DINNERS, FLUID...
1              100  [SOUP, FROZEN PIZZA, VEGETABLES - SHELF STABLE...
2             1000  [BABY FOODS, YOGURT, CHEESE, FRZN MEAT/MEAT DI...
3             1001  [SOUP, CHEESE, BAKED BREAD/BUNS/ROLLS, BOTTLE ...
4             1002  [CHEESE, YOGURT, BAG SNACKS, SOUP, FROZEN PIZZ...
...            ...                                                ...
2385           995  [BAG SNACKS, SOFT DRINKS, FLUID MILK PRODUCTS,...
2386           996  [FLUID MILK PRODUCTS, BABY FOODS, SOUP, YOGURT...
2387           997  [FRZN MEAT/MEAT DINNERS, BAG SNACKS, SOFT DRIN...
2388           998  [FRZN MEAT/MEAT DINNERS, BAKED BREAD/BUNS/ROLL...
2389           999  [BABY FOODS, FROZEN PIZZA, FRZN MEAT/MEAT DINN...

[2390 rows x 2 columns]


In [15]:
res_new.to_csv('res_new.csv')

In [16]:
def show_recommendation(household_key):
    if household_key not in res_new['household_key']:
        return('Customer not found.')
    else:
        recommendation_list = res_new[res_new['household_key'] == str(household_key)]['recommendations'].tolist()
        return [recommendation_list[0][i] for i in range(0,10)]
    

In [19]:
show_recommendation(1011)

['BAG SNACKS',
 'CHEESE',
 'SOFT DRINKS',
 'BEEF',
 'BAKED BREAD/BUNS/ROLLS',
 'YOGURT',
 'FLUID MILK PRODUCTS',
 'VEGETABLES - SHELF STABLE',
 'SOUP',
 'FRZN MEAT/MEAT DINNERS']