In [1]:
import pandas as pd
import calc_elasticity
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import random
import math



"""Custom Environment that follows gym interface."""
class CustomEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 30}
    def __init__(self,df,number_of_customers):
        super(CustomEnv,self).__init__()

        self.cost=round(df['price_per_unit'][len(df)-10:].mean()*0.8,2)
        #action is an array of 20 elements from cost to max observed price in the last year
        self.actions=np.arange(self.cost, self.cost*1.2,(self.cost*1.2-self.cost)/30).round(2)
        self.action_space = gym.spaces.Discrete(len(self.actions))

        self.observations = ['Shop1','Shop2','Shope3']
        self.observation_space = spaces.Box(low=0, high=50,shape=(3,))

        self.elasticity=abs(calc_elasticity.PED(df))
        self.market_avg_price=self.cost*1.2
        self.number_of_customers=number_of_customers
        self.customers=self.create_customers()
        self.reward=0
        self.df=df

    def step(self, action):

        p0=self.actions[action]
        self.competitor1_price=round(self.cost*1.1,2)
        p1=self.competitor1_price
        self.competitor2_price=round(self.cost*1.15,2)
        p2=self.competitor2_price

        if self.duration==0:
           self.competitor3_price=round(self.cost*1.12*(1+self.newton(self.cost*1.12,self.cost,self.df.products_quantity.mean(),calc_elasticity.PED(self.df))/100),2)
        else:
          self.competitor3_price=round((p0+p1+p2)/3*(1+self.newton((p0+p1+p2)/3,self.cost,self.df.products_quantity.mean(),calc_elasticity.PED(self.df))/100),2)
        p3=self.competitor3_price

        observation=self._get_obs()


        #sample customers to shop in this week
        weekly_customers=self.customers[self.customers['Shoped']==0].sample(n=int(self.number_of_customers/(12*4)*1.1))
        weekly_customers=weekly_customers.reset_index()

        sales,customers_results=self.market_simulation([p0,p1,p2,p3],weekly_customers)
        customers=self.merge_customers_df(self.customers,customers_results)


        self.reward=sales[0]*(p0-self.cost)
        #print(reward,p0,p1)

        terminated=bool(self.duration==23)

        self.duration+=1
        info={'cost':self.cost,'p0':p0,'p1':p1,'p2':p2,'p3':p3,'s0':sales[0],'s1':sales[1],'s2':sales[2],'s3':sales[3]}

        return np.array(list(observation.values())), self.reward, terminated, False, info

    def reset(self, seed=40, options=None):
        super().reset(seed = seed)
        info={}
        self.duration=0
        self.reward=0
        self.customers=self.create_customers()
        self.competitor1_price=round(self.cost*1.1,2)
        self.competitor2_price=round(self.cost*1.15,2)
        return np.array([self.cost,self.cost,self.cost]), info

    def render(self):
        pass

    def close(self):
        pass
    def _get_obs(self):
        return {"Shop1": self.competitor1_price,"Shop2": self.competitor2_price,"Shop3": self.competitor3_price}
    #newton#
    def find_rev(self,data,increment):
      price = data['Avg_Price_per_unit']*(1+increment/100)
      multiplier = (1-(data['Incremental_acquisition']*increment*10))
      volumes = data['Average_units_sold']*multiplier
      if volumes<0:
        volumes=0

      profit_w_multiplier = price*volumes- volumes*data['Cost_per_unit']
      #print(increment,price,multiplier,volumes,profit_w_multiplier)
      return profit_w_multiplier

    def newton(self,avg_price,cost,sales_forecast,elasticity):
      dataa = {'Avg_Price_per_unit': avg_price,
          'Cost_per_unit':cost,
          'Average_units_sold':sales_forecast,
          'Incremental_acquisition':-elasticity/10}
      increment=np.arange(-20,21, 1, dtype=int)
      max=0
      max_i=0
      for i in increment:
        if self.find_rev(dataa,i) >max:
          max=self.find_rev(dataa,i)
          max_i=i
      return(max_i)
    #end of newton

    def create_customers(self):

      customers=pd.DataFrame(columns=['WTP','Tactic','Seller_Prefernce','Lifetime','Shoped'])
      customers_temp=pd.DataFrame(columns=['WTP','Tactic','Seller_Prefernce','Lifetime','Shoped'])


      list_seg=[0.25,0.2,0.15,0.1,0.1,0.1,0.1]
      list_tactic=['BH','AS','CT','FL','FL','FL','FL']
      list_seller_preference=[0,0,0,0,1,2,3,4]
      list_lifetime=[10,0,0,0,0,0,0]

      for q,w,e,r in zip(list_seg,list_tactic,list_seller_preference,list_lifetime):
        customers_temp['WTP']=self.simulate_customers(q)
        customers_temp['Tactic']=w
        customers_temp['Seller_Prefernce']=e
        customers_temp['Lifetime']=r
        customers_temp['Shoped']=0
        customers=pd.concat([customers,customers_temp])
        customers_temp=customers_temp[0:0]
        customers.reset_index(drop=True, inplace=True)

      return customers
    def f(self,*args):
      return 0.2 * (1 - math.exp(-0.5*self.elasticity))

    def simulate_customers(self,segment):
      sigma=0.1
      wtp=0.9+self.f()
      segment_customers=np.random.normal(wtp, sigma, int(segment*self.number_of_customers))
      if segment==0.25:
        segment_customers=segment_customers*self.market_avg_price*0.9
      else:
        segment_customers=segment_customers*self.market_avg_price

      return segment_customers
    def market_simulation(self,list_shops,customers):

      sales=[0]*len(list_shops)

      for index, customer in customers.iterrows():
        #print(f"customer : {index}")
        if customer.Tactic=='FL':
          if list_shops[customer.Seller_Prefernce]<=customer.WTP:
            sales[customer.Seller_Prefernce]+=1
            customers.iloc[index,5]=1
            #print(f"FL {customer.Seller_Prefernce}")

        elif customer.Tactic=='BH':
          min_store=pd.Series(list_shops).idxmin()
          if min(list_shops)<=customer.WTP:
            sales[min_store]+=1
            customers.iloc[index,5]=1
            #print(f"BH {min_store}")

        elif customer.Tactic=='CT':
          shops=[0,1,2,3]
          s1=shops.pop(random.randrange(len(shops)))
          s2=shops.pop(random.randrange(len(shops)))
          minimum=s1
          if list_shops[s1]>list_shops[s2]:
            minimum=s2
          if list_shops[minimum]<=customer.WTP:
            sales[minimum]+=1
            customers.iloc[index,5]=1
            #print(f"CT {minimum}")

        elif customer.Tactic=='AS':
          shops=[0,1,2,3]
          s=shops.pop(random.randrange(len(shops)))
          if list_shops[s]<=customer.WTP:
            sales[s]+=1
            customers.iloc[index,5]=1
            #print(f"AS {s}")

      return sales,customers

    def merge_customers_df(self,df_original,df_shoped):
      for index, shoped in df_shoped.iterrows():
        if df_shoped.Shoped[index]==1:
          df_original.iloc[df_shoped.iloc[index,0],4]=1
      return df_original

ModuleNotFoundError: ignored