In [1]:
import pandas as pd
import os
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import json
import pickle
import numpy as np
import random, string, time

In [2]:
RetailOffersShoppedEvent = '''{"namespace": "com.sabre.synxis.sip.retail.events",
  "type": "record",
  "name": "RetailOffersShoppedEvent",
  "doc": "Event related to shopping retail offers.",
  "fields": [
    {"name": "event_id", "type": "string",
      "doc": "A unique event identifier, set by the client before the given event is published."},
    {"name": "event_type",  "type": "string",
      "doc": "The name of the given event type used by subscribers to initially interpret their interest in a given event."},
    {"name": "version_id", "type": "string",
      "doc":  "The version of the given event structure."},
    {"name": "event_namespace", "type": "string",
      "doc": "The high level domain the event is associated with (e.g. retail, hotel, vacation) for classification purposes."},
    {"name": "source_srn", "type": ["string", "null"],
      "doc": "The Synxis Resource Name representing the resource that generated the given event."},
    {"name": "event_timestamp", "type": "string",
      "doc": "The UTC timestamp the given event was created by the client."},
    {"name": "vendor_id",  "type": "int",
      "doc": "Identifier of the vendor this event is related to."},
    {"name": "client_session_id",  "type": ["string", "null"],
      "doc": "Client side session identifier used to track a user's behavior in a given application (like a web session)."},
    {"name": "returned_product_ids", "type": [{"type": "array", "items": "string"}, "null"],
      "doc": "List of product ids returned from the client shop request."},
    {"name": "store_id",  "type": "int",
      "doc": "Identifier of the store this event is related to."},
    {"name": "transaction_context_code", "type": ["string", "null"],
      "doc": "Values are STND_SHOP, ADDED_TO_CART."},
    {"name": "cust_cat_id",  "type": "string",
      "doc": "Customer category id which was derived from the available contextual data and custcat_ver algorithm"},
    {"name": "cust_cat_algo_ver",  "type": "string",
      "doc": "Version of customer category derivation algorithm used"},
    {"name": "recommender_type",  "type": "string",
      "doc": "Type of the recommendation system have used to generate the recommendation list"},
    {"name": "recommender_ver",  "type": "string",
      "doc": "Version of recommendation system is used "}
  ]
}'''

In [3]:
RetailProductPurchasedEvent = '''{"namespace": "com.sabre.synxis.sip.retail.events",
  "type": "record",
  "name": "RetailProductPurchasedEvent",
  "doc": "Event related to retail product being purchased.",
  "fields": [
    {"name": "event_id", "type": "string",
      "doc": "A unique event identifier, set by the client before the given event is published."},
    {"name": "event_type",  "type": "string",
      "doc": "The name of the given event type used by subscribers to initially interpret their interest in a given event."},
    {"name": "version_id", "type": "string",
      "doc":  "The version of the given event structure."},
    {"name": "event_namespace", "type": "string",
      "doc": "The high level domain the event is associated with (e.g. retail, hotel, vacation) for classification purposes."},
   {"name": "source_srn", "type": ["string", "null"],
      "doc": "The Synxis Resource Name representing the resource that generated the given event."},
    {"name": "event_timestamp", "type": "string",
      "doc": "The UTC timestamp the given event was created by the client."},
    {"name": "vendor_id",  "type": "int",
      "doc": "Identifier of the vendor this event is related to."},
    {"name": "store_id",  "type": "int",
      "doc": "Identifier of the store this event is related to."},
    {"name": "client_session_id",  "type": ["string", "null"],
      "doc": "Client side session identifier used to track a user's behavior in a given application (like a web session)."},
    {"name": "purchased_product_ids",  "type": [{"type": "array", "items": "string"}, "null"],
      "doc": "Identifier of the given products those were purchased."},
    {"name": "cust_cat_id",  "type": "string",
      "doc": "Customer category id which was derived from the available contextual data and custcat_ver algorithm"},
    {"name": "cust_cat_algo_ver",  "type": "string",
      "doc": "Version of customer category derivation algorithm used"}
  ]
}'''

In [4]:
with open('./Simulation/avroSchemas/RetailOffersShoppedEvent.avsc', 'wb') as outfile:
    pickle.dump(RetailOffersShoppedEvent, outfile )

with open('./Simulation/avroSchemas/RetailProductPurchasedEvent.avsc', 'wb') as outfile:
    pickle.dump(RetailProductPurchasedEvent, outfile )


In [5]:
ShoppingStaggingTable = pd.DataFrame(columns=['Session_Id', 'Event_Id','Vendor_Id', 'Store_Id', 'Version_id', 'CustCat_Id', 'Custcat_Ver',
                                                  'Event_Timestamp', 'Product_Id', 'Position'])

PurchasedStaggingTable = pd.DataFrame(columns=['Session_Id', 'Event_Id','Vendor_Id', 'Store_Id', 'Version_id','Event_Timestamp',
                                                   'Product_Id', 'Processed'])


In [6]:
userPositionBias = pd.DataFrame(columns=['Position', 'wAlpha', 'wBeta'])
ProductStatistics = pd.DataFrame(columns=['Vendor_Id','Store_Id', 'CustCat_Id', 'Custcat_Ver','Product_Id', 'Alpha', 'Beta', 'Hit', 'Miss'])

volatile_deltaProductStatistics = pd.DataFrame(columns=['Vendor_Id', 'CustCat_Id','Custcat_Ver', 'Product_Id','Alpha', 'Beta',
                                                        'Hit', 'Miss'])

In [7]:
 Productdata = {0: [1001,1001, 100101, 1.0, 110, 1., 1., 0, 0], 
               1: [1001,1001, 100101, 1.0, 111, 1., 1., 0, 0], 
               2: [1001,1001, 100101, 1.0, 112, 1., 1., 0, 0], 
               3: [1001,1001, 100101, 1.0, 113, 1., 1., 0, 0], 
               4: [1001,1001, 100101, 1.0, 114, 1., 1., 0, 0], 
               5: [1001,1001, 100101, 1.0, 115, 1., 1., 0, 0],
               6: [1001,1001, 100101, 1.0, 116, 1., 1., 0, 0],
               7: [1001,1001, 100101, 1.0, 117, 1., 1., 0, 0], 
               8: [1001,1001, 100101, 1.0, 118, 1., 1., 0, 0],
               9: [1001,1001, 100101, 1.0, 119, 1., 1., 0, 0],
               10: [1001,1001, 100102, 1.0, 110, 1., 1., 0, 0], 
               11: [1001,1001, 100102, 1.0, 111, 1., 1., 0, 0], 
               12: [1001,1001, 100102, 1.0, 112, 1., 1., 0, 0], 
               13: [1001,1001, 100102, 1.0, 113, 1., 1., 0, 0], 
               14: [1001,1001, 100102, 1.0, 114, 1., 1., 0, 0], 
               15: [1001,1001, 100102, 1.0, 115, 1., 1., 0, 0],
               16: [1001,1001, 100102, 1.0, 116, 1., 1., 0, 0],
               17: [1001,1001, 100102, 1.0, 117, 1., 1., 0, 0], 
               18: [1001,1001, 100102, 1.0, 118, 1., 1., 0, 0],
               19: [1001,1001, 100102, 1.0, 119, 1., 1., 0, 0],
               20: [1002,1002, 100201, 1.0, 120, 1., 1., 0, 0], 
               21: [1002,1002, 100201, 1.0, 121, 1., 1., 0, 0], 
               22: [1002,1002, 100201, 1.0, 122, 1., 1., 0, 0], 
               23: [1002,1002, 100201, 1.0, 123, 1., 1., 0, 0], 
               24: [1002,1002, 100201, 1.0, 124, 1., 1., 0, 0], 
               25: [1002,1002, 100201, 1.0, 125, 1., 1., 0, 0],
               26: [1002,1002, 100201, 1.0, 126, 1., 1., 0, 0],
               27: [1002,1002, 100201, 1.0, 127, 1., 1., 0, 0], 
               28: [1002,1002, 100201, 1.0, 128, 1., 1., 0, 0],
               29: [1002,1002, 100201, 1.0, 129, 1., 1., 0, 0],
               30: [1002,1002, 100202, 1.0, 120, 1., 1., 0, 0], 
               31: [1002,1002, 100202, 1.0, 121, 1., 1., 0, 0], 
               32: [1002,1002, 100202, 1.0, 122, 1., 1., 0, 0], 
               33: [1002,1002, 100202, 1.0, 123, 1., 1., 0, 0], 
               34: [1002,1002, 100202, 1.0, 124, 1., 1., 0, 0], 
               35: [1002,1002, 100202, 1.0, 125, 1., 1., 0, 0],
               36: [1002,1002, 100202, 1.0, 126, 1., 1., 0, 0],
               37: [1002,1002, 100202, 1.0, 127, 1., 1., 0, 0], 
               38: [1002,1002, 100202, 1.0, 128, 1., 1., 0, 0],
               39: [1002,1002, 100202, 1.0, 129, 1., 1., 0, 0]
               }

In [8]:
ProductStatistics = pd.DataFrame.from_dict(Productdata, orient='index',columns=['Vendor_Id', 'Store_Id','CustCat_Id', 'Custcat_Ver','Product_Id', 'Alpha', 'Beta', 'Hit', 'Miss'])

In [9]:
ProductStatistics['CustCat_Id'] = ProductStatistics['CustCat_Id'].apply(lambda x: str(x))
ProductStatistics['Custcat_Ver'] = ProductStatistics['Custcat_Ver'].apply(lambda x: str(x))
ProductStatistics['Product_Id'] = ProductStatistics['Product_Id'].apply(lambda x: str(x))

In [10]:
ProductStatistics

Unnamed: 0,Vendor_Id,Store_Id,CustCat_Id,Custcat_Ver,Product_Id,Alpha,Beta,Hit,Miss
0,1001,1001,100101,1.0,110,1.0,1.0,0,0
1,1001,1001,100101,1.0,111,1.0,1.0,0,0
2,1001,1001,100101,1.0,112,1.0,1.0,0,0
3,1001,1001,100101,1.0,113,1.0,1.0,0,0
4,1001,1001,100101,1.0,114,1.0,1.0,0,0
5,1001,1001,100101,1.0,115,1.0,1.0,0,0
6,1001,1001,100101,1.0,116,1.0,1.0,0,0
7,1001,1001,100101,1.0,117,1.0,1.0,0,0
8,1001,1001,100101,1.0,118,1.0,1.0,0,0
9,1001,1001,100101,1.0,119,1.0,1.0,0,0


In [11]:
wAlphaBeta = {0: [1, 0.80, 1.0], 
               1: [2, 0.85, 0.95], 
               2: [3, 0.90, 0.90], 
               3: [4, 0.95, 0.85], 
               4: [5, 1.0, 0.80]}
userPositionBias = pd.DataFrame.from_dict(wAlphaBeta, orient='index',columns=['Position', 'wAlpha', 'wBeta'])

In [12]:
userPositionBias

Unnamed: 0,Position,wAlpha,wBeta
0,1,0.8,1.0
1,2,0.85,0.95
2,3,0.9,0.9
3,4,0.95,0.85
4,5,1.0,0.8


In [13]:
def getEventID():
    x = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
    return x


In [14]:
example = '''{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}'''
with open('example.avsc', 'wb') as outfile:
    pickle.dump( example, outfile )
    
    
schema = avro.schema.Parse(pickle.load(open( "example.avsc", "rb" )))

writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
    print (user)
reader.close()

{'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': None}
{'name': 'Ben', 'favorite_number': 7, 'favorite_color': 'red'}


In [15]:
def getCutomerCat(vendorId, InCart):
    if vendorId == 1001 and len(set(InCart).intersection(set(['111', '113', '115', '117', '119']))) > 0:
        return '100101'
    elif vendorId == 1002 and len(set(InCart).intersection(set(['121', '123', '125', '127', '129']))) > 0:
        return '100201'
    elif vendorId == 1001:
        return '100102'
    elif vendorId == 1002:
        return '100202'
    else:
        return '-1'

In [16]:
sellableProducts = {1001: ['110', '111', '112', '113','114', '115','116', '117','118', '119'], 
                   1002: ['120', '121', '122', '123','124', '125','126', '127','128', '129']}

In [17]:
def writeAvroFile(writer, log):
    writer.append(log)
    return


In [18]:
purchaseLog = []
shoppingLog = []
sessionID = 111111
for i in range(20):
    vendorStoreId = np.random.choice([1001, 1002], 1, p=[0.5, 0.5])[0]
    products = sellableProducts[vendorStoreId]
    cart = []
    
    for j in range(3):
        catID = getCutomerCat(vendorId=vendorStoreId, InCart=cart)
        curproducts = [ i for i in products if i not in cart]
        recommededProduct = list(np.random.choice(curproducts, 5, replace=False))
        
        event_id = getEventID()
        event_type = "RetailOffersShoppedEvent"
        version_id = "1.0"
        event_namespace = "com.sabre.synxis.sip.retail"
        source_srn = "srn:amazon:aws:us-west-2:dev:unit-test:1"
        event_timestamp = time.strftime("%x %X %p %Z")
        vendor_id = vendorStoreId
        client_session_id = str(sessionID)
        returned_product_ids = recommededProduct
        store_id = vendorStoreId
        transaction_context_code = "STND_SHOP"
        cust_cat_id = catID
        cust_cat_algo_ver = "1.0"
        recommender_type = "MABbyVendorStore"
        recommender_ver = "1.0"
        log={"event_id": event_id, "event_type": event_type,"version_id": version_id, "event_namespace": event_namespace,
                       "source_srn": source_srn, "event_timestamp": event_timestamp, "vendor_id": vendor_id,
                       "client_session_id": client_session_id, "returned_product_ids": returned_product_ids,"store_id": store_id,
                       "transaction_context_code": transaction_context_code,
                       "cust_cat_id": cust_cat_id,"cust_cat_algo_ver": cust_cat_algo_ver,
                       "recommender_type": recommender_type,"recommender_ver": recommender_ver}
        
        shoppingLog.append(log)
        
        for p in recommededProduct:
            choose = np.random.choice([0,1], 1, [0.95, 0.05])[0]
            if choose == 1:
                cart.append(p)
                break
    
    if len(cart)>0:
        catID = getCutomerCat(vendorId=vendorStoreId, InCart=cart)
        event_id = getEventID()
        event_type = "RetailProductPurchasedEvent"
        version_id = "1.0"
        event_namespace = "com.sabre.synxis.sip.retail"
        source_srn = "srn:amazon:aws:us-west-2:dev:unit-test:1"
        event_timestamp = time.strftime("%x %X %p %Z")
        vendor_id = vendorStoreId
        store_id = vendorStoreId
        client_session_id = str(sessionID)
        purchased_product_ids = cart
        cust_cat_id = catID
        cust_cat_algo_ver = "1.0"
        purchaseLog.append({"event_id": event_id, "event_type": event_type,"version_id": version_id, 
                      "event_namespace": event_namespace, "source_srn": source_srn, "event_timestamp": event_timestamp,
                      "vendor_id": vendor_id, "client_session_id": client_session_id, "purchased_product_ids": purchased_product_ids,
                      "store_id": store_id, "cust_cat_id": cust_cat_id,"cust_cat_algo_ver": cust_cat_algo_ver})
        
        
    sessionID+=1
         
    
        

In [19]:
test = '''{"namespace": "com.sabre.synxis.sip.retail.events",
  "type": "record",
  "name": "RetailOffersShoppedEvent",
  "doc": "Event related to shopping retail offers.",
  "fields": [
    {"name": "event_id", "type": "string",
      "doc": "A unique event identifier, set by the client before the given event is published."},
    {"name": "event_type",  "type": "string",
      "doc": "The name of the given event type used by subscribers to initially interpret their interest in a given event."},
    {"name": "version_id", "type": "string",
      "doc":  "The version of the given event structure."},
    {"name": "event_namespace", "type": "string",
      "doc": "The high level domain the event is associated with (e.g. retail, hotel, vacation) for classification purposes."},
    {"name": "source_srn", "type": ["string", "null"],
      "doc": "The Synxis Resource Name representing the resource that generated the given event."},
    {"name": "event_timestamp", "type": "string",
      "doc": "The UTC timestamp the given event was created by the client."},
    {"name": "vendor_id",  "type": "int",
      "doc": "Identifier of the vendor this event is related to."},
    {"name": "client_session_id",  "type": ["string", "null"],
      "doc": "Client side session identifier used to track a user's behavior in a given application (like a web session)."},
    {"name": "returned_product_ids", "type": [{"type": "array", "items": "string"}, "null"],
      "doc": "List of product ids returned from the client shop request."},
    {"name": "store_id",  "type": "int",
      "doc": "Identifier of the store this event is related to."},
    {"name": "transaction_context_code", "type": ["string", "null"],
      "doc": "Values are STND_SHOP, ADDED_TO_CART."},
    {"name": "cust_cat_id",  "type": "string",
      "doc": "Customer category id which was derived from the available contextual data and custcat_ver algorithm"},
    {"name": "cust_cat_algo_ver",  "type": "string",
      "doc": "Version of customer category derivation algorithm used"},
    {"name": "recommender_type",  "type": "string",
      "doc": "Type of the recommendation system have used to generate the recommendation list"},
    {"name": "recommender_ver",  "type": "string",
      "doc": "Version of recommendation system is used "}
  ]
}'''

In [20]:
with open('test.avsc', 'wb') as outfile:
    pickle.dump(test, outfile )

In [21]:
schema = avro.schema.Parse(pickle.load(open( 'test.avsc', "rb" )))
filename = 'test.avro'
writer = DataFileWriter(open(filename, "wb"), DatumWriter(), schema)
writer.append({'event_id': 'oJYfihgnYmIpjANq', 'event_type': 'RetailOffersShoppedEvent', 'version_id': '1.0', 'event_namespace': 'com.sabre.synxis.sip.retail', 'source_srn': 'srn:amazon:aws:us-west-2:dev:unit-test:1', 'event_timestamp': '01/16/19 06:28:34 AM UTC', 'vendor_id': 1001, 'client_session_id': '111111', 'returned_product_ids': ['119', '113', '118', '119', '117'], 'store_id': 1001, 'transaction_context_code': 'STND_SHOP', 'cust_cat_id': '100102', 'cust_cat_algo_ver': '1.0', 'recommender_type': 'MABbyVendorStore', 'recommender_ver': '1.0'})
writer.close()

In [22]:
eventDataToStaggingShopingMap = {'client_session_id' :'Session_Id', 'event_id':'Event_Id', 'vendor_id':'Vendor_Id', 'store_id': 'Store_Id',
 'version_id':'Version_id', 'cust_cat_id':'CustCat_Id', 'cust_cat_algo_ver':'Custcat_Ver','event_timestamp':'Event_Timestamp',
 'returned_product_ids':'Product_Id', 'Position':'Position'}

In [23]:
eventDataToStaggingPurchaseMap = {'client_session_id' :'Session_Id', 'event_id':'Event_Id', 'vendor_id':'Vendor_Id',
                                  'store_id': 'Store_Id', 'version_id':'Version_id', 'event_timestamp':'Event_Timestamp',
                                  'purchased_product_ids':'Product_Id'}

In [24]:
def getStaggingShoppingTable(ShoppingStaggingTable=ShoppingStaggingTable):
    # In AWS this would equivalent to acquiring the connection to the Stagging Table Database and
    # Return the Connection Object.
    return ShoppingStaggingTable

def getStaggingPurchasedTable(PurchasedStaggingTable=PurchasedStaggingTable):
    # In AWS this would equivalent to acquiring the connection to the Stagging Table Database and
    # Return the Connection Object.
    return PurchasedStaggingTable

def getUserPositionBiasTable(userPositionBias=userPositionBias):
    # In AWS this would equivalent to acquiring the connection to the userPositionBias Table Database and
    # Return the Connection Object.
    return userPositionBias
def getProductStatisticsTable(ProductStatistics=ProductStatistics):
    # In AWS this would equivalent to acquiring the connection to the userPositionBias Table Database and
    # Return the Connection Object.    
    return ProductStatistics


In [25]:
def processShoppingEvent(event=None, context=None):
    # read the shopping event schema
    # read the shopping event file
    # parse the shopping event file based on schema and put it in a list
    
    tmp = pd.DataFrame.from_records(shoppingLog)
    cols = [i for i in list(tmp.columns) if i!='returned_product_ids']
    tmp = tmp.returned_product_ids.apply(pd.Series).merge(tmp, right_index = True, left_index = True) \
    .drop(["returned_product_ids"], axis = 1).melt(id_vars = cols , value_name = "returned_product_ids").dropna()
    tmp['Position'] = tmp['variable'].apply(lambda x: x+1)
    sourceColumns = list(eventDataToStaggingShopingMap.keys())
    targetColumns = [eventDataToStaggingShopingMap.get(i) for i in sourceColumns]
    tmp = tmp[sourceColumns]
    tmp.columns = targetColumns
    # establish a connection to stagging table database
    # Insert all the data in shoppingEventStaggingTable
    ShoppingStaggingTable = getStaggingShoppingTable()
    tmp = tmp[list(ShoppingStaggingTable.columns)]
    ShoppingStaggingTable =  pd.concat([ShoppingStaggingTable, tmp], ignore_index=True)
    return ShoppingStaggingTable
    
    


def processPurchaseEvent(event=None, context=None):
    # read the purchased event schema
    # read the purchased event file
    # parse the purchased event file based on schema
    # put these in a dataframe
    tmp = pd.DataFrame.from_records(purchaseLog)
    cols = [i for i in list(tmp.columns) if i!='purchased_product_ids']
    tmp = tmp.purchased_product_ids.apply(pd.Series).merge(tmp, right_index = True, left_index = True) \
    .drop(["purchased_product_ids"], axis = 1).melt(id_vars = cols , value_name = "purchased_product_ids").dropna()
    sourceColumns = list(eventDataToStaggingPurchaseMap.keys())
    targetColumns = [eventDataToStaggingPurchaseMap.get(i) for i in sourceColumns]
    tmp = tmp[sourceColumns]
    tmp.columns = targetColumns
    tmp['Processed'] = 0
    # establish a connection to stagging table database
    # Insert all the data in purchaseEventStaggingTable
    PurchasedStaggingTable = getStaggingPurchasedTable()
    tmp = tmp[list(PurchasedStaggingTable.columns)]
    PurchasedStaggingTable =  pd.concat([PurchasedStaggingTable, tmp], ignore_index=True)
    return PurchasedStaggingTable


def prepareDataToUpdateProductStatistics(vendor_id=None, store_id = None):
    
    if not vendor_id is None and not store_id is None:
        workingPurchaseData = PurchasedStaggingTable[(PurchasedStaggingTable['Processed'] == 0) &
                                             (PurchasedStaggingTable['Vendor_Id'] == vendor_id) & 
                                            (PurchasedStaggingTable['Store_Id'] == store_id)]
    elif vendor_id is None and not store_id is None:
        workingPurchaseData = PurchasedStaggingTable[(PurchasedStaggingTable['Processed'] == 0) & 
                                            (PurchasedStaggingTable['Store_Id'] == store_id)]
    elif not vendor_id is None and store_id is None:
        workingPurchaseData = workingData = PurchasedStaggingTable[(PurchasedStaggingTable['Processed'] == 0) &
                                             (PurchasedStaggingTable['Vendor_Id'] == vendor_id)]
    else:
        workingPurchaseData = PurchasedStaggingTable[PurchasedStaggingTable['Processed'] == 0]

    workingPurchaseIds = workingPurchaseData.index
    PurchasedStaggingTable.at[workingPurchaseIds, 'Processed'] = -1 #Processing. To make sure no other instace takes up the same data
    workingPurchaseSesionIds = list(workingPurchaseData['Session_Id'])
    workingShoppingData = ShoppingStaggingTable[ShoppingStaggingTable['Session_Id'].isin(workingPurchaseSesionIds)]
    workingPurchaseData.drop(columns = ['Event_Id', 'Version_id','Event_Timestamp', 'Processed'], inplace = True)
    workingPurchaseData['Hit'] = 1
    workingData = workingShoppingData.merge(workingPurchaseData, how= 'left',
                                            left_on=['Session_Id','Vendor_Id', 'Store_Id', 'Product_Id'], 
                                            right_on = ['Session_Id','Vendor_Id', 'Store_Id', 'Product_Id'])
    del workingShoppingData, workingPurchaseData

    Hits = pd.DataFrame(workingData[workingData['Hit']==1])
    Misses = pd.DataFrame(workingData[workingData['Hit']!=1])

    del workingData
    
    Hits.sort_values(by = ['Vendor_Id', 'Store_Id', 'Session_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id', 'Position'], inplace = True)
    Misses.sort_values(by = ['Vendor_Id', 'Store_Id', 'Session_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id', 'Position'], inplace = True)
    Hits.drop_duplicates(subset=['Session_Id', 'Vendor_Id', 'Store_Id', 'Version_id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id' ], keep='last', inplace=True)
    Misses.drop_duplicates(subset=['Session_Id', 'Vendor_Id', 'Store_Id', 'Version_id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id' ], keep='first', inplace=True)
    Hits['Position'] = Hits['Position'].astype('int')
    Misses['Position'] = Misses['Position'].astype('int')
    Misses['Miss'] = 1
    Misses.drop(columns = 'Hit', inplace=True)
    Misses = Misses.merge(userPositionBias[['Position', 'wBeta']], left_on = 'Position', right_on = 'Position')
    Hits = Hits.merge(userPositionBias[['Position', 'wAlpha']], left_on = 'Position', right_on = 'Position')
    Misses = Misses[['Vendor_Id', 'Store_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id', 'Miss', 'wBeta']]
    Hits = Hits[['Vendor_Id', 'Store_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id', 'Hit', 'wAlpha']]
    Misses = Misses.groupby(by=['Vendor_Id', 'Store_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id']).sum()
    Misses.reset_index(inplace=True)
    Hits = Hits.groupby(by=['Vendor_Id', 'Store_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id']).sum()
    Hits.reset_index(inplace=True)
    deltaProductStatistics = Hits.merge(Misses, how='outer', on=['Vendor_Id', 'Store_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id'])
    deltaProductStatistics = deltaProductStatistics.fillna(0)
    del Hits, Misses
    flag = updateProductStatistics(deltaProductStatistics)
    if flag:
        PurchasedStaggingTable.at[workingPurchaseIds, 'Processed'] = 1
    else:
        PurchasedStaggingTable.at[workingPurchaseIds, 'Processed'] = 0
    
    return True
def updateProductStatistics(deltaProductStatistics):
    # Connect with database
    # Take the output file of the prepareDataToUpdateProductStatistics
    # take the snapshot of the current values from product statistics by vendorStoreID, CustCatID, ProductID
    # Use bulk update in dataframe as mentioned at https://stackoverflow.com/questions/24768657/replace-column-values-based-on-another-dataframe-python-pandas-better-way
    # update the values in centralized database
    # close the connection
    try:
        pastProductStatistics = pd.DataFrame(ProductStatistics[(ProductStatistics['Vendor_Id'].isin(deltaProductStatistics['Vendor_Id'])) &
                         (ProductStatistics['Store_Id'].isin(deltaProductStatistics['Store_Id'])) &
                         (ProductStatistics['CustCat_Id'].isin(deltaProductStatistics['CustCat_Id'])) &
                         (ProductStatistics['Custcat_Ver'].isin(deltaProductStatistics['Custcat_Ver'])) &
                         (ProductStatistics['Product_Id'].isin(deltaProductStatistics['Product_Id']))])
        deltaProductStatistics.rename({'wAlpha':'Alpha', 'wBeta': 'Beta'}, axis='columns', inplace=True)
        deltaProductStatistics = deltaProductStatistics[list(pastProductStatistics.columns)]
        deltaProductStatistics = pd.concat([pastProductStatistics, deltaProductStatistics], ignore_index=True)
        deltaProductStatistics = deltaProductStatistics.groupby(by=['Vendor_Id', 'Store_Id', 'CustCat_Id', 'Custcat_Ver', 'Product_Id']).sum()
        deltaProductStatistics.reset_index(inplace=True)
        ProductStatistics.at[ProductStatistics[(ProductStatistics['Vendor_Id'].isin(deltaProductStatistics['Vendor_Id'])) &
                         (ProductStatistics['Store_Id'].isin(deltaProductStatistics['Store_Id'])) &
                         (ProductStatistics['CustCat_Id'].isin(deltaProductStatistics['CustCat_Id'])) &
                         (ProductStatistics['Custcat_Ver'].isin(deltaProductStatistics['Custcat_Ver'])) &
                         (ProductStatistics['Product_Id'].isin(deltaProductStatistics['Product_Id']))].index, ['Alpha','Beta', 'Hit', 'Miss']]  = deltaProductStatistics[['Alpha','Beta', 'Hit', 'Miss']].values
        
        return True
    except:
        return False


In [26]:
ShoppingStaggingTable

Unnamed: 0,Session_Id,Event_Id,Vendor_Id,Store_Id,Version_id,CustCat_Id,Custcat_Ver,Event_Timestamp,Product_Id,Position


In [27]:
PurchasedStaggingTable

Unnamed: 0,Session_Id,Event_Id,Vendor_Id,Store_Id,Version_id,Event_Timestamp,Product_Id,Processed


In [28]:
ShoppingStaggingTable = processShoppingEvent()

In [29]:
PurchasedStaggingTable = processPurchaseEvent()

In [30]:
for i in [1001, 1002]:
    flag = prepareDataToUpdateProductStatistics(vendor_id=i, store_id=i)
    if flag:
        with pd.option_context('display.max_rows', None, 'display.max_columns', None):
            print(ProductStatistics)
    print('XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')
    

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  errors=errors)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


    Vendor_Id  Store_Id CustCat_Id Custcat_Ver Product_Id  Alpha  Beta  Hit  \
0        1001      1001     100101         1.0        110   1.00  2.85  0.0   
1        1001      1001     100101         1.0        111   1.80  3.70  1.0   
2        1001      1001     100101         1.0        112   1.85  2.70  1.0   
3        1001      1001     100101         1.0        113   1.00  1.00  0.0   
4        1001      1001     100101         1.0        114   1.00  5.30  0.0   
5        1001      1001     100101         1.0        115   1.00  3.60  0.0   
6        1001      1001     100101         1.0        116   1.85  3.80  1.0   
7        1001      1001     100101         1.0        117   1.00  2.75  0.0   
8        1001      1001     100101         1.0        118   1.85  2.95  1.0   
9        1001      1001     100101         1.0        119   1.80  1.80  1.0   
10       1001      1001     100102         1.0        110   1.00  4.75  0.0   
11       1001      1001     100102         1.0      