In [None]:
!pip install surprise

In [None]:
import warnings
import datetime
import pandas as pd
from datetime import date
import psycopg2
from surprise import Dataset, Reader, SVD
from multiprocessing import Pool

def load_items_data():
    # Load items data from the database
    con = psycopg2.connect(dbname='dev', host='mantix-cluster.cgzkthavydhk.us-east-2.redshift.amazonaws.com',
                           port='5439', user='mantix', password='Mantix123!')
    query_items = """(SELECT *
                     FROM cdp.items_data)"""
    items_data = pd.read_sql(query_items, con)
    con.close()
    return items_data

def load_orders_data():
    # Load orders data from the database
    con = psycopg2.connect(dbname='dev', host='mantix-cluster.cgzkthavydhk.us-east-2.redshift.amazonaws.com',
                           port='5439', user='mantix', password='Mantix123!')
    query_orders = """(SELECT *
                      FROM cdp.orders_data
                      WHERE tenant_id = 'TNB00001'
                      AND   store_id = 'STOM000000001'
                      AND   status NOT IN ('Abandoned','Errored','Pending','PendingReview','Cancelled','null'))"""
    orders_data = pd.read_sql(query_orders, con)
    con.close()
    return orders_data

def generate_recommendations(loc):
    
    # Load items and orders data
    items_data = load_items_data()
    orders_data = load_orders_data()
    
    # Generate recommendations for a specific location
    ord_col = ['order_number', 'customer_account_id']
    item_col = ['order_number', 'fulfillment_location_code', 'product_code', 'quantity']
    recommend_res = pd.DataFrame(columns=['customer_account_id', 'rec_prod'])
    reader = Reader(rating_scale=(1, 5))
    items_filter = items_data[items_data.fulfillment_location_code == loc][item_col]
    df = pd.merge(orders_data[ord_col], items_filter, on=['order_number'], how='left')
    df = df[['customer_account_id', 'product_code', 'quantity']]
    df = df[~df.customer_account_id.isin(recommend_res.customer_account_id)]
    df = df.groupby(['customer_account_id', 'product_code'])['quantity'].sum().reset_index()
    df = df.dropna()

    filter_df = df.groupby(['product_code'])['quantity'].sum().reset_index().sort_values(by='quantity')
    index = round(filter_df.shape[0] - (filter_df.shape[0] * 0.05))
    req_prod_list = filter_df['product_code'].tolist()[index:]

    data = Dataset.load_from_df(df[['customer_account_id', 'product_code', 'quantity']], reader)

    # Train model
    svd = SVD(n_epochs=10)
    trainset = data.build_full_trainset()
    svd.fit(trainset)

    customer_list = df.customer_account_id.unique()
    prod_list = df.product_code.unique()
    prod_list = [prod for prod in prod_list if prod in req_prod_list]
    print(f'Location: {loc} | Total customer: {len(customer_list)} | Total Prod: {len(prod_list)}')

    recommendations = []
    for cust_id in customer_list:
        d = pd.DataFrame(columns=['uid', 'iid', 'rui', 'est', 'details'])
        remove_prod_list = df[df.customer_account_id == cust_id]['product_code'].tolist()
        for prod in prod_list:
            if prod not in remove_prod_list:
                d.loc[len(d)] = svd.predict(uid=cust_id, iid=prod)
        top_rec = d.sort_values(by='est', ascending=False)['iid'].tolist()[:10]
        recommendations.append((cust_id, ','.join(top_rec)))

    return recommendations

def start_recommendation():
    # Load items and orders data
    items_data = load_items_data()
    orders_data = load_orders_data()

    print(orders_data.head())

    reader = Reader(rating_scale=(1, 5))

    locations = items_data.fulfillment_location_code.unique()
    orders_data.order_number = orders_data.order_number.astype(int).astype(str)
    items_data.order_number = items_data.order_number.astype(int).astype(str)
    ord_col = ['order_number', 'customer_account_id']
    item_col = ['order_number', 'fulfillment_location_code', 'product_code', 'quantity']

    recommend_res = pd.DataFrame(columns=['customer_account_id', 'rec_prod'])
    print(f"Total Location: {len(locations)}")

    # Generate recommendations in parallel using multiprocessing
    with Pool(processes=7) as pool:
        results = pool.map(generate_recommendations, locations)
        for result in results:
            recommend_res = recommend_res.append(pd.DataFrame(result, columns=['customer_account_id', 'rec_prod']))

    today_date = date.today()

    recommend_res['create_dms'] = today_date
    recommend_res['tenant_id'] = 'TNB00001'
    recommend_res['store_id'] = 'store_id'
    recommend_res = recommend_res.rename(columns={'rec_prod': 'product_code'})
    return recommend_res


In [None]:
def load_db(df):
    con=psycopg2.connect(dbname= 'dev', host='mantix-cluster.cgzkthavydhk.us-east-2.redshift.amazonaws.com', 
    port= '5439', user= 'mantix', password= 'Mantix123!')
    cur = con.cursor()

    def drop_table(conn,table):
        try:
            query = f"""DROP TABLE IF EXISTS {table}"""
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            print(f'Drop table {table} successfully')
            #cur.close()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            #cur.close()
            #return 1

    def create_user_recommendation_table(conn,table):
        try:
            query = f"""CREATE TABLE {table} (customer_account_id VARCHAR(1000),
                            recommendation_product VARCHAR(5000),
                            tenant_id VARCHAR(1000),
                            store_id VARCHAR(1000),
                            create_dms VARCHAR(1000)
                        )"""
            cur = conn.cursor()
            cur.execute("ROLLBACK")
            conn.commit()
            cur.execute(query)
            conn.commit()
            print(f'Successfully created the table {table}')
            cur.close()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cur.close()
            return 1
            
    def single_insert(conn, cursor,insert_req):
        """ Execute a single INSERT request """
        #cursor = conn.cursor()
        try:
            cursor.execute(insert_req)
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            #cursor.close()
            return 1
        #cursor.close()
        
    def batch_insert(conn, batch_df):
        # Create a cursor object from the connection
        cursor = conn.cursor()
        statment= f"""INSERT into {table} (customer_account_id,recommendation_product ,tenant_id ,store_id,create_dms) values(%s,%s,%s,%s,%s);"""
        try:
            # Execute the batch INSERT request
            cursor.executemany(statment, batch_df)
            # Commit the changes to the database
            conn.commit()
            print("Batch INSERT successful.")
        except (Exception, psycopg2.DatabaseError) as error:
            # Rollback the transaction in case of an error
            conn.rollback()
            print("Batch INSERT failed, Error:", error)
        finally:
            # Close the cursor
            cursor.close() 
        
        
    def insert_dataframe(final_df,conn,table):
        batch_df=[]
        for i in final_df.index:
            current_tuple=(
                        str(df.loc[i]['customer_account_id']),
                        str(df.loc[i]['product_code']),
                        str(df.loc[i]['tenant_id']),
                        str(df.loc[i]['store_id']),
                        str(df.loc[i]['create_dms'])
                )
            batch_df.append(current_tuple)
            if (len(batch_df) == 500): 
                batch_insert(conn, batch_df)
                batch_df=[]
                print("Current index: {}, time: {}".format(i, datetime.datetime.now()))
            #endif
        # Insert any remaining rows
        if (len(batch_df) > 0):
            batch_insert(conn,batch_df)
        print('Successfully inserted dataframe into the table')
        
    table = 'cdp.user_recommendations'
    drop_table(con,table)
    create_user_recommendation_table(con,table)
    startTime = datetime.datetime.now()
    insert_dataframe(df,con,table)
    print("Start: {}, Finish: {}".format(startTime, datetime.datetime.now()))

if __name__ == '__main__':

    # start customer recommendation
    print('~ Start recommendation')
    startTime = datetime.datetime.now()
    recommend_res = start_recommendation()
    print("Start: {}, Finish: {}".format(startTime, datetime.datetime.now()))
    # load data into database
    print('~ Start to insert recommendation result into database')
    startTime = datetime.datetime.now()
    #load_db(recommend_res)
    print("Start: {}, Finish: {}".format(startTime, datetime.datetime.now()))