### https://medium.com/@patelneha1495/recommendation-system-in-python-using-als-algorithm-and-apache-spark-27aca08eaab3

In [1]:
spark

In [2]:
sc

In [13]:
df = spark.read.csv("hdfs://devenv/user/spark/recommendation_system/data/ratings.csv",header=True,
                   schema="COMMODITY_DESC string, household_key string, QUANTITY Integer,\
                   BASKET_ID string, rating float, rating_label Integer") 

In [11]:
df.show(10,truncate=False)

+------------------------------+-------------+--------+---------+------+------------+
|COMMODITY_DESC                |household_key|QUANTITY|BASKET_ID|rating|rating_label|
+------------------------------+-------------+--------+---------+------+------------+
|YOGURT                        |2500         |118     |100      |1.18  |5           |
|WATER - CARBONATED/FLVRD DRINK|2500         |8       |100      |0.08  |5           |
|WAREHOUSE SNACKS              |2500         |31      |100      |0.31  |5           |
|VITAMINS                      |2500         |6       |100      |0.06  |5           |
|VEGETABLES SALAD              |2500         |1       |100      |0.01  |3           |
|VEGETABLES - SHELF STABLE     |2500         |75      |100      |0.75  |5           |
|VEGETABLES - ALL OTHERS       |2500         |7       |100      |0.07  |5           |
|VALUE ADDED FRUIT             |2500         |2       |100      |0.02  |4           |
|VALENTINE                     |2500         |3       

In [14]:
df_new = df.select(df['COMMODITY_DESC'],df['household_key'],df['rating_label'])

In [15]:
df_new.show()

+--------------------+-------------+------------+
|      COMMODITY_DESC|household_key|rating_label|
+--------------------+-------------+------------+
|              YOGURT|         2500|           5|
|WATER - CARBONATE...|         2500|           5|
|    WAREHOUSE SNACKS|         2500|           5|
|            VITAMINS|         2500|           5|
|    VEGETABLES SALAD|         2500|           3|
|VEGETABLES - SHEL...|         2500|           5|
|VEGETABLES - ALL ...|         2500|           5|
|   VALUE ADDED FRUIT|         2500|           4|
|           VALENTINE|         2500|           5|
|      TROPICAL FRUIT|         2500|           5|
|            TOMATOES|         2500|           4|
|             TICKETS|         2500|           4|
|                TEAS|         2500|           5|
|     SYRUPS/TOPPINGS|         2500|           3|
|    SUGARS/SWEETNERS|         2500|           5|
|         STONE FRUIT|         2500|           5|
|STATIONERY & SCHO...|         2500|           5|


### Importing important modules

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

### Converting String to index 
- Before making an ALS model it needs to be clear that ALS only accepts integer value as parameters. Hence we need to convert asin and reviewerID column in index form.
- StringIndexer 
 - MLlib method
 - A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values.

In [17]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# encoding
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df_new.columns)-set(['rating_label']))]

# encoding 簡單一點的寫法
#indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['COMMODITY_DESC','household_key']]


# make pipeline
pipeline = Pipeline(stages=indexer)

# fit and transform
transformed = pipeline.fit(df_new).transform(df_new)

transformed.show()

+--------------------+-------------+------------+--------------------+-------------------+
|      COMMODITY_DESC|household_key|rating_label|COMMODITY_DESC_index|household_key_index|
+--------------------+-------------+------------+--------------------+-------------------+
|              YOGURT|         2500|           5|                51.0|              234.0|
|WATER - CARBONATE...|         2500|           5|                35.0|              234.0|
|    WAREHOUSE SNACKS|         2500|           5|                81.0|              234.0|
|            VITAMINS|         2500|           5|               162.0|              234.0|
|    VEGETABLES SALAD|         2500|           3|                65.0|              234.0|
|VEGETABLES - SHEL...|         2500|           5|                16.0|              234.0|
|VEGETABLES - ALL ...|         2500|           5|                36.0|              234.0|
|   VALUE ADDED FRUIT|         2500|           4|               104.0|              234.0|

### Creating training and test data

In [18]:
(training,test)=transformed.randomSplit([0.8, 0.2])

### Creating ALS model and fitting data
- rating_label type must be numeric

### https://www.twblogs.net/a/5c2917d6bd9eee01606d2f58
### https://codertw.com/%E7%A8%8B%E5%BC%8F%E8%AA%9E%E8%A8%80/563826/
### https://medium.com/@patelneha1495/recommendation-system-in-python-using-als-algorithm-and-apache-spark-27aca08eaab3

In [19]:
als=ALS(maxIter=5, # 算法迭代次數 maximum number of iterations
        regParam=0.09, #正則項權重
        rank=25, #模型中隱藏因子數目
        userCol="household_key_index", #column name for user ids. Ids must be (or can be coerced into) integers.
        itemCol="COMMODITY_DESC_index", #column name for item ids. Ids must be (or can be coerced into) integers.
        ratingCol="rating_label", #column name for ratings
        coldStartStrategy="drop", #drop any rows in the DataFrame of predictions that contain NaN values.
        nonnegative=True) #商品推薦分數是否是非負的


model=als.fit(training)

### Generate predictions and evaluate rmse

In [20]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating_label",predictionCol="prediction")

predictions=model.transform(test)

rmse=evaluator.evaluate(predictions) # Evaluates the output with optional parameters.
print("RMSE="+str(rmse))

RMSE=0.8317624175044451


In [21]:
predictions.toPandas()

Unnamed: 0,COMMODITY_DESC,household_key,rating_label,COMMODITY_DESC_index,household_key_index,prediction
0,PIES,1337,4,148.0,31.0,3.703735
1,PIES,2201,5,148.0,2488.0,3.973958
2,PIES,1358,4,148.0,1223.0,3.836623
3,PIES,1622,5,148.0,588.0,4.395650
4,PIES,1535,1,148.0,296.0,3.229756
...,...,...,...,...,...,...
56446,SOAP - LIQUID & BAR,1263,5,89.0,521.0,4.292670
56447,SOAP - LIQUID & BAR,2328,5,89.0,36.0,4.514470
56448,SOAP - LIQUID & BAR,1249,5,89.0,1993.0,4.562069
56449,SOAP - LIQUID & BAR,951,5,89.0,2057.0,4.581108


### Providing Recommendations

In [23]:
# Generate top 20 recommendations for each user

user_recs=model.recommendForAllUsers(20).show(10)

+-------------------+--------------------+
|household_key_index|     recommendations|
+-------------------+--------------------+
|               1580|[[6, 4.766709], [...|
|                471|[[3, 4.656679], [...|
|               1591|[[1, 4.702969], [...|
|               1342|[[7, 4.7689257], ...|
|               2122|[[107, 4.76182], ...|
|               2142|[[7, 4.781441], [...|
|                463|[[3, 4.6809464], ...|
|                833|[[1, 4.741335], [...|
|               1645|[[297, 4.673454],...|
|                496|[[292, 4.5957026]...|
+-------------------+--------------------+
only showing top 10 rows



### Converting back to string form

In [24]:
import pandas as pd

# Generate top 10 recommendations for each user
recs=model.recommendForAllUsers(10).toPandas()


nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['household_key_index'], value_name = "recommendation") \
            .drop("variable", axis = 1) \
            .dropna()
nrecs=nrecs.sort_values('household_key_index')

nrecs=pd.concat([nrecs['recommendation'].apply(pd.Series), nrecs['household_key_index']], axis = 1)
nrecs.columns = [
        
        'ProductID_index',
        'Rating',
        'UserID_index']

# raw data
md=transformed.select(transformed['household_key'],transformed['household_key_index'],transformed['COMMODITY_DESC'],transformed['COMMODITY_DESC_index'])
md=md.toPandas()


dict1 =dict(zip(md['household_key_index'],md['household_key']))
dict2=dict(zip(md['COMMODITY_DESC_index'],md['COMMODITY_DESC']))


nrecs['household_key']=nrecs['UserID_index'].map(dict1)
nrecs['ProductID']=nrecs['ProductID_index'].map(dict2)
nrecs=nrecs.sort_values('household_key')
nrecs.reset_index(drop=True, inplace=True)


new=nrecs[['household_key','ProductID','Rating']]
#new['recommendations'] = list(zip(new.ProductID, new.Rating))
new['recommendations'] = list(new.ProductID)


res=new[['household_key','recommendations']]  
res_new=res['recommendations'].groupby([res.household_key]).apply(list).reset_index()
print(res_new)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


     household_key                                    recommendations
0                1  [YOGURT, BAKED BREAD/BUNS/ROLLS, CHEESE, BAG S...
1               10  [VEGETABLES - SHELF STABLE, BAKED BREAD/BUNS/R...
2              100  [BAG SNACKS, BEEF, CHEESE, EGGS, FLUID MILK PR...
3             1000  [BAKED BREAD/BUNS/ROLLS, EGGS, FLUID MILK PROD...
4             1001  [BAG SNACKS, EGGS, VEGETABLES - SHELF STABLE, ...
...            ...                                                ...
2495           995  [BAKED BREAD/BUNS/ROLLS, SOFT DRINKS, EGGS, FL...
2496           996  [BAG SNACKS, BEEF, SOFT DRINKS, EGGS, FLUID MI...
2497           997  [FD WRAPS/BAGS/TRSH BG, PAPER TOWELS, SOUP, YO...
2498           998  [SOUP, EGGS, VEGETABLES - SHELF STABLE, CHEESE...
2499           999  [SOUP, BEEF, BAKED BREAD/BUNS/ROLLS, SOFT DRIN...

[2500 rows x 2 columns]


In [25]:
res_new.to_csv('./output_data/res_new.csv')

In [26]:
def show_recommendation(household_key):
    if household_key not in res_new['household_key']:
        return('Customer not found.')
    else:
        recommendation_list = res_new[res_new['household_key'] == str(household_key)]['recommendations'].tolist()
        return [recommendation_list[0][i] for i in range(0,10)]
    

In [27]:
show_recommendation(1011)

['CHEESE',
 'FLUID MILK PRODUCTS',
 'EGGS',
 'SOUP',
 'VEGETABLES - SHELF STABLE',
 'BAKED BREAD/BUNS/ROLLS',
 'BEEF',
 'SALD DRSNG/SNDWCH SPRD',
 'SOFT DRINKS',
 'BAG SNACKS']

In [None]:
import pymysql
import csv
import sys

host = '3.113.29.214'  # '3.113.29.214'
user = 'eric'  # 'eric'
passwd = '123456'  # '123456'
port = 3306
conninfo = {'host' : host ,'port' : port,'user' : user , 'passwd' : passwd, 'db' : 'recommendation_system','charset' : 'utf8mb4'}


def add_csv(path):
    try:
        conn = pymysql.connect(**conninfo)
        cursor = conn.cursor()
        csv_data = csv.reader(open(path))
        cursor.execute("""DROP TABLE IF EXISTS als""")
        cursor.execute("""CREATE TABLE als (household_key longtext, recommendations longtext)""")
        
        for row in csv_data:
            cursor.execute('INSERT INTO als(household_key, recommendations)'\
                           'VALUES("%s", "%s")',row)
        # close the connection to the database.
        conn.commit()
        print("Done")
    except:
        print('異常')
        print(sys.exc_info()[0])
        print(sys.exc_info()[1])
    finally:
        cursor.close()
        conn.close()
        print("db close")

In [None]:
add_csv('./output_data/res_new.csv')