In [1]:
import warnings
warnings.filterwarnings('ignore')
from fpgrowth_py import fpgrowth

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.gridspec as gr

In [2]:
df = pd.read_csv("main.csv")
df

In [3]:
x1 = df[["CustomerID","cluster","Days_Since_Last_Purchase","Average_Days_Between_Purchases","Average_Transaction_Value"]]
x1

In [4]:
x1['R_score'] = pd.qcut(x1['Days_Since_Last_Purchase'], q=3, labels=[1, 2, 3])  # High recency will have a score of 1
x1['F_score'] = pd.qcut(x1['Average_Days_Between_Purchases'], q=3, labels=[1, 2, 3]) 
x1['M_score'] = pd.qcut(x1['Average_Transaction_Value'], q=3, labels=[1, 2, 3]) 
x1

In [5]:
x1['RFM'] = x1[['R_score', 'F_score', 'M_score']].astype(str).agg(''.join, axis=1)
x1

In [6]:
def assign_cluster(score):
    high_value_scores = ['111', '112', '113', '121', '122', '123']
    nurture_scores = ['133', '213', '222', '223', '232', '233']
    risk_scores = ['131', '132', '231', '311', '312', '313', '321', '322', '323', '331', '332', '333']

    if score in high_value_scores:
        return 'High Value'
    elif score in nurture_scores:
        return 'Nurture'
    elif score in risk_scores:
        return 'Risk'
    else:
        return 'Other'

In [7]:
x1['Segment'] = x1['RFM'].apply(assign_cluster)
x1

In [8]:
xci = x1[["CustomerID","Segment"]]

xci

In [9]:
xci["Segment"].value_counts()

In [10]:
def map_rfm_labels(rfm_score):
    """Maps RFM scores to descriptive labels.

    Args:
        rfm_score: A string representing the RFM score (e.g., '111').

    Returns:
        The corresponding descriptive label (e.g., 'Champions').
    """

    label_map = {
        '111': 'Champions',
        '112': 'Loyalists',
        '113': 'High Potential',
        '121': 'Big Spender',
        '122': 'Regular Spenders',
        '123': 'Emerging Loyalists',
        '133': 'Needs a Spark',
        '213': 'Upscale Focus',
        '222': 'Consistent Spender',
        '223': 'Potential Upscale',
        '232': 'Win-Back Target',
        '233': 'Casual Shopper',
        '131': 'Wake-Up Call',
        '132': 'Slipping Away',
        '231': 'Dormant Upscale',
        '311': 'One-offs', 
        '312': 'One-offs', 
        '313': 'One-offs',
        '321': 'Sporadic',
        '322': 'Sporadic',
        '323': 'Sporadic',
        '331': 'Lost Cause',
        '332': 'Lost Cause',
        '333': 'Lost Cause'
    }

    return label_map.get(rfm_score, 'Uncategorized')

In [11]:
x1['Subsegment'] = x1['RFM'].apply(map_rfm_labels)
x1

In [12]:
x1.columns

In [18]:
cdata = pd.read_csv("express.csv")
cdata



In [19]:
cdata =cdata[["CustomerID","InvoiceNo","StockCode"]]
cdata

 

In [20]:
basket = cdata.groupby(["InvoiceNo","CustomerID"]).agg({
    "StockCode": lambda s : list(set(s))
})

basket

In [17]:
unique = cdata.drop_duplicates(subset="CustomerID")
unique

In [23]:
xci["CustomerID"] = xci["CustomerID"].astype(object)
xci.dtypes

In [28]:
trial = pd.merge(cdata,xci,on="CustomerID",how="left")
trial

In [30]:
trial.sample(5)

In [32]:
n1 = cdata.loc[cdata["CustomerID"]!="Unknown"]
n1

In [33]:

trial = pd.merge(n1, xci, on="CustomerID", how="left")

In [34]:
trial

In [44]:
n1['CustomerID'] = n1['CustomerID'].astype(str)
xci['CustomerID'] = xci['CustomerID'].astype(str)

In [45]:
merged_df = pd.merge(n1, xci, on='CustomerID', how='left')
merged_df

In [47]:
merged_df.dropna(inplace=True)
merged_df

In [48]:
merged_df.Segment.value_counts()

In [53]:
high_value_df = merged_df[merged_df['Segment'] == 'High Value']
risk_df = merged_df[merged_df['Segment'] == 'Risk']
nurture_df = merged_df[merged_df['Segment'] == 'Nurture']
other_df = merged_df[merged_df['Segment'].isin(['High Value', 'Risk', 'Nurture']) == False]

high_value_df

In [153]:
# Aggregate Transctions
def aggregate_transactions(df):
     transactions = df.groupby(["InvoiceNo","CustomerID"]).agg({"StockCode": lambda s : list(set(s))})
     return transactions

In [154]:
def get_rules(df):
    hbasket = aggregate_transactions(df)
    freqItemSet, rules = fpgrowth(hbasket['StockCode'].values, minSupRatio=0.01, minConf=0.8)
    print('Number of rules generated : ', len(rules))
    association=pd.DataFrame(rules,columns =['basket','next_product','proba']) 
    association=association.sort_values(by='proba',ascending=False)
    
    return association
    

In [79]:
lb = get_rules(other_df)
lb

In [155]:
#Get frequent item sets
def get_most_frequent_itemsets(df):
    # Group by InvoiceNo and CustomerID and aggregate unique StockCodes into lists
    hbasket = aggregate_transactions(df)
    
    # Run FP-Growth algorithm to find frequent itemsets
    freqItemSet, _ = fpgrowth(hbasket['StockCode'].values, minSupRatio=0.01, minConf=0.7)
    
    # Sort items within each itemset and convert frequent itemsets to DataFrame
    frequent_itemsets_df = pd.DataFrame({'Frequent Itemset': [sorted(itemset) for itemset in freqItemSet]})
    
    # Remove duplicates (after sorting, duplicate itemsets will be identical)
    frequent_itemsets_df = frequent_itemsets_df.drop_duplicates(subset='Frequent Itemset')
    
    # Filter out itemsets with less than 2 items
    frequent_itemsets_df = frequent_itemsets_df[frequent_itemsets_df['Frequent Itemset'].apply(len) > 2]
    
    return frequent_itemsets_df


In [98]:
xg = get_most_frequent_itemsets(nurture_df)
xg

In [157]:
xg["Frequent Itemset"][107]

## Promotional Strategies

In [149]:
#Get Buy One,Get One Discounted Bundles
def get_bogd_bundles(df):
        hbasket = aggregate_transactions(df)
        freqItemSet, rules = fpgrowth(hbasket['StockCode'].values, minSupRatio=0.01, minConf=0.9)
        print('Number of rules generated : ', len(rules))
        
        association=pd.DataFrame(rules,columns =['basket','next_product','proba']) 
        association=association.sort_values(by='proba',ascending=False)
    
        return association
    

In [150]:
# Buy One,Get One Discounted
def bogd(product_bundles, order):
    # Check if any bundle matches the items in the order
    matching_bundles = product_bundles[product_bundles['basket'].apply(lambda x: set(x).issubset(set(order)))]

    if matching_bundles.empty:
        print("Order Not Eligible For Discount")
        return []

    # Extract and return consequent products from matching bundles
    recommended_products = matching_bundles['next_product'].tolist()
    recommended_item_codes = [item for sublist in recommended_products for item in sublist]
    return recommended_item_codes

In [151]:
gx = get_bogd_bundles(high_value_df)
gx

In [145]:
order = gx["basket"][2]  # Assuming you want to check this specific itemset
bogd(gx, order)

In [148]:
order = {'21080','45373', '21086','67262','63773'}
bogd(gx, order)

In [None]:
#Fixed amount discount
def apply_fixed_discount(total_amount, fixed_discount):
    """
    Apply a fixed amount discount to the total purchase amount.

    Parameters:
    - total_amount (float): Total purchase amount before discount.
    - fixed_discount (float): Fixed discount amount to be subtracted.

    Returns:
    - discounted_amount (float): Total purchase amount after applying the fixed discount.
    """
    discounted_amount = max(total_amount - fixed_discount, 0)  # Ensure discounted amount doesn't go below zero
    return discounted_amount


In [174]:
#Bundled Discounts
def apply_bundle_discount(bundle, discount_df):
    """
    Apply discounts to each item in a product bundle and return the final price.

    Parameters:
    - bundle (list): List of product IDs in the bundle.
    - discount_df (DataFrame): DataFrame containing product IDs and their discounts.

    Returns:
    - final_price (float): Final price of the bundle after applying discounts.
    """
    final_price = 0
    
    # Iterate through each item in the bundle
    for item in bundle:
        # Look up the discount for the item in the discount DataFrame
        item_discount = discount_df.loc[discount_df['ProductID'] == item, 'Discount'].values
        
        # If the item is found in the discount DataFrame, apply the discount
        if len(item_discount) > 0:
            item_discount = item_discount[0]  # Extract the discount value
            # Assume original price of the item is 0 if not found in discount DataFrame
            original_price = discount_df.loc[discount_df['ProductID'] == item, 'Price'].values[0]
            # Apply the discount to the original price of the item
            discounted_price = original_price * (1 - item_discount)
            # Add the discounted price to the final price
            final_price += discounted_price
        else:
            print(f"Discount not found for item {item}. Assuming original price.")

    return final_price


In [172]:
# Dummy Discount Data
# Define the discount data
discount_data = {
    'ProductID': ['20725', '20727', '22383', '20728', '85099B', '23209', '23203','23170', '23171', '23172'],
    'Price': [5.0, 8.0, 10.0, 7.0, 12.0, 9.0, 6.0,15.0, 20.0, 25.0],
    'Discount': [0.1, 0.2, 0.15, 0.1, 0.25, 0.2, 0.15,0.1, 0.2, 0.15]  # Assuming sample discount percentages
}

# Create the discount DataFrame
discount_df = pd.DataFrame(discount_data)
discount_df


In [175]:
# Example product bundle
bundle = xg["Frequent Itemset"][107]

# Calculate final price of the bundle after applying discounts
final_price = apply_bundle_discount(bundle, discount_df)
print("Final price of the bundle after applying discounts:", final_price)

In [7]:
# Get Tiered Discount
def calculate_tiered_discount(total_price, tiers):
    discount = 0
    for tier in tiers:
        if total_price >= tier['min_amount']:
            discount = tier['discount']
        else:
            break
    return total_price - (total_price * (discount / 100))




In [10]:
# Example tiers
tiers = [
    {'min_amount': 0, 'discount': 0},   # Tier 1: $0 - $100 (0% discount)
    {'min_amount': 101, 'discount': 5}, # Tier 2: $101 - $200 (5% discount)
    {'min_amount': 201, 'discount': 10} # Tier 3: $201 and above (10% discount)
]

# Example usage
total_price = 400
discounted_amount = calculate_tiered_discount(total_price, tiers)
print(f"Discounted Amount: ${discounted_amount}")

In [11]:
# Loyalty Points
def calculate_loyalty_points(total_price, points_per_dollar):
    return total_price * points_per_dollar

In [12]:
# Example usage
total_price = 150
points_per_dollar = 2  # Assume 2 points per dollar spent
loyalty_points = calculate_loyalty_points(total_price, points_per_dollar)
print(f"Loyalty Points Earned: {loyalty_points}")


## Peak Selection

In [15]:
def find_promotion_periods(sales_forecast, threshold_increase_pct, threshold_decrease_pct):
    promotion_periods = []
    current_period = []

    for day, sales_volume in sales_forecast.items():
        if current_period:
            prev_day, prev_sales = current_period[-1]
            increase_pct = (sales_volume - prev_sales) / prev_sales * 100
            decrease_pct = (prev_sales - sales_volume) / prev_sales * 100
            if increase_pct >= threshold_increase_pct or decrease_pct >= threshold_decrease_pct:
                current_period.append((day, sales_volume))
            else:
                promotion_periods.append(current_period)
                current_period = [(day, sales_volume)]
        else:
            current_period.append((day, sales_volume))

    # Add the last period if it exists
    if current_period:
        promotion_periods.append(current_period)

    # Merge consecutive periods if the next peak is within 2 days
    merged_periods = []
    i = 0
    while i < len(promotion_periods):
        period_start = promotion_periods[i][0][0]
        period_end = promotion_periods[i][-1][0]
        merged = False
        for j in range(i + 1, min(i + 3, len(promotion_periods))):
            next_period_start = promotion_periods[j][0][0]
            if next_period_start - period_end <= 2:
                period_end = promotion_periods[j][-1][0]
                merged = True
            else:
                break
        merged_periods.append((period_start, period_end))
        if merged:
            i = j + 1
        else:
            i += 1

    # Separate peak and lull periods
    peak_periods = []
    lull_periods = []
    for period in merged_periods:
        period_start, period_end = period
        period_sales = [sales_forecast[day] for day in range(period_start, period_end + 1)]
        if all(sales > sales_forecast[period_start] for sales in period_sales):
            peak_periods.append(period)
        elif all(sales < sales_forecast[period_start] for sales in period_sales):
            lull_periods.append(period)

    # Ensure there are at least 2 lull periods and 1 peak period
    while len(lull_periods) < 2:
        if peak_periods:
            lull_periods.append(peak_periods.pop(0))
        else:
            break

    # Return both peak and lull periods
    return lull_periods, peak_periods




In [21]:
# Example usage
sales_forecast = {1: 100, 2: 120, 3: 130, 4: 150, 5: 160, 6: 170, 7: 175, 8: 180, 9: 175, 10: 170}
threshold_increase_pct = 0  # Define threshold percentage increase
threshold_decrease_pct = -10  # Define threshold percentage decrease

lull_periods, peak_periods = find_promotion_periods(sales_forecast, threshold_increase_pct, threshold_decrease_pct)
print("Lull Periods:", lull_periods)
print("Peak Periods:", peak_periods)

In [22]:
import pandas as pd

def find_promo_days(sales_data, peak_threshold=1.2, lull_threshold=0.8, num_promos=3):
    """Identifies peak and lull promotion days based on percentage thresholds of average sales volume.

    Args:
        sales_data (pd.DataFrame): DataFrame with at least 'Date' and 'Sales Volume' columns.
        peak_threshold (float, optional): Multiplier for avg. sales volume to define a peak. Defaults to 1.2 (20% above average).
        lull_threshold (float, optional): Multiplier for avg. sales volume to define a lull. Defaults to 0.8 (20% below average).
        num_promos (int, optional): The maximum number of promotions per month. Defaults to 3.

    Returns:
        tuple: A tuple containing two lists:
            - peak_days: A list of the top 'num_promos' sales peaks (dates).
            - tull_days: A list of the top 'num_promos' sales tulls (dates). 
    """

    if not isinstance(sales_data, pd.DataFrame):
        raise TypeError("sales_data must be a pandas DataFrame")

    # Calculate average sales volume
    avg_sales = sales_data['Sales Volume'].mean()

    # Apply thresholds
    peak_condition = sales_data['Sales Volume'] >= avg_sales * peak_threshold
    lull_condition = sales_data['Sales Volume'] <= avg_sales * lull_threshold

    # Get top peaks and tulls
    peak_days = sales_data[peak_condition]['Date'].iloc[:num_promos].tolist()
    tull_days = sales_data[lull_condition]['Date'].iloc[:num_promos].tolist()

    return peak_days, tull_days 


In [23]:
import pandas as pd
import numpy as np
import random

# 1. Create date range (let's assume a month)
dates = pd.date_range(start='2024-03-01', end='2024-03-31')

# 2. Generate some base sales data with fluctuations
base_sales = np.random.randint(50, 150, size=len(dates))

# 3. Add some random peaks and dips 
for i in random.sample(range(len(dates)), k=5):  # Introduce 5 random spikes
    base_sales[i] *= random.uniform(1.5, 2)  
for i in random.sample(range(len(dates)), k=5):  # Introduce 5 random dips
    base_sales[i] *= random.uniform(0.5, 0.8)  

# 4. Create DataFrame
sales_data = pd.DataFrame({'Date': dates, 'Sales Volume': base_sales})

# 5. Test the function
peak_days, tull_days = find_promo_days(sales_data)
print("Peak Promotion Days:", peak_days)
print("Tull Promotion Days:", tull_days)


In [25]:
sales_data

In [26]:
import pandas as pd
import numpy as np
import random

def find_promo_days(sales_data, peak_threshold=1.2, lull_threshold=0.8, num_promos=3, proximity_days=3):
    """Identifies peak and lull promotional periods based on percentage thresholds of average sales volume, 
    considering proximity to group close dates into extended periods.

    Args:
        sales_data (pd.DataFrame): DataFrame containing 'Date' and 'Sales Volume' columns.
        peak_threshold (float, optional): Multiplier for avg. sales volume to define a peak. Defaults to 1.2 (20% above average).
        lull_threshold (float, optional): Multiplier for avg. sales volume to define a lull. Defaults to 0.8 (20% below average).
        num_promos (int, optional): The maximum number of promotions per month (ignored in this implementation). Defaults to 3.
        proximity_days (int, optional): The maximum number of days between dates to consider them part of the same promotional period. Defaults to 3.

    Returns:
        list: A list of lists, where each sub-list represents a promotional period (containing 'Date' objects).
    """

    if not isinstance(sales_data, pd.DataFrame):
        raise TypeError("sales_data must be a pandas DataFrame")

    # Calculate average sales volume
    avg_sales = sales_data['Sales Volume'].mean()

    # Apply thresholds to identify days within peak or lull zones
    peak_condition = sales_data['Sales Volume'] >= avg_sales * peak_threshold
    lull_condition = sales_data['Sales Volume'] <= avg_sales * lull_threshold

    # Get top peaks and tulls (as dates) based on the conditions
    peak_days = sales_data[peak_condition]['Date'].tolist()
    tull_days = sales_data[lull_condition]['Date'].tolist()

    # Process peak days for proximity and group into promotional periods
    promo_periods = cluster_close_dates(peak_days, proximity_days) 

    # Process tull days for proximity and group into promotional periods
    promo_periods.extend(cluster_close_dates(tull_days, proximity_days))

    return promo_periods

def cluster_close_dates(dates, proximity_days):
    """Groups closely spaced dates into promotional periods.

    Args:
        dates (list): A list of 'Date' objects.
        proximity_days (int): The maximum number of days between dates to consider them part of the same promotional period.

    Returns:
        list: A list of lists, where each sub-list represents a promotional period (containing 'Date' objects).
    """

    promo_periods = []  # List to store promotional periods (groups of dates)
    current_period = []  # Temporary list to store dates in a potential period

    for date in dates:
        # Check if a new period needs to be started (current period with a gap of more than 'proximity_days')
        if current_period and (date - current_period[-1]).days > proximity_days:
            promo_periods.append(current_period)  # Add the previous period to results
            current_period = []  # Reset the current period list

        current_period.append(date)  # Add the current date to the period list

    if current_period:  # Add the last remaining period, if any
        promo_periods.append(current_period)

    return promo_periods

# (Optional) Dummy data generation for testing purposes
import datetime

def generate_dummy_data(start_date='2024-03-01', end_date='2024-03-31'):
  """Generates dummy sales data with random fluctuations, peaks, and dips."""
  dates = pd.date_range(start=start_date, end=end_date)
  base_sales = np.random.randint(50, 150, size=len(dates))
  for i in random.sample(range(len(dates)), k=5):
      base_sales[i] *= random.uniform(1.5, 2)  # Introduce random peaks
  for i in random.sample(range(len(dates)), k=5):
      base_sales[i] *= random.uniform(0.5, 0.8)  # Introduce random dips
  return pd


In [31]:
def generate_dummy_data(start_date='2024-03-01', end_date='2024-03-31'):
    dates = pd.date_range(start=start_date, end=end_date)
    base_sales = np.random.randint(50, 150, size=len(dates))

    # Introduce clustered peaks and valleys
    peak_clusters = [[2, 3, 5], [10, 11], [20, 22, 23]] 
    lull_clusters = [[8, 9], [16, 18], [27, 28, 29]]

    for cluster in peak_clusters:
        for i in cluster:
            base_sales[i] *= random.uniform(1.5, 2)

    for cluster in lull_clusters:
        for i in cluster:
            base_sales[i] *= random.uniform(0.5, 0.8)

    return pd.DataFrame({'Date': dates, 'Sales Volume': base_sales})

In [32]:
# Generate test data
sales_data = generate_dummy_data()

# Find promotional periods with a proximity of 2 days
promo_periods = find_promo_days(sales_data, proximity_days=2) 

# Print results
for period in promo_periods:
    start_date = period[0].strftime('%Y-%m-%d')
    end_date = period[-1].strftime('%Y-%m-%d')
    print(f"Promotional Period: {start_date} to {end_date}")


In [33]:
sales_data

In [37]:
plt.figure(figsize=(10,6))
plt.plot(sales_data["Date"],sales_data["Sales Volume"]);