In [1]:
import os
import pyodbc
from datetime import datetime as dt, timedelta
import pandas as pd
import numpy as np
import spacy

### Feature Store Utils

In [6]:
def upsert_sum_multi_key(df1, df2, key_columns, sum_column):
    """
    Performs an upsert operation by summing values in the specified column for matching rows,
    using two key columns to determine the matching rows.

    Parameters:
    - df1 (pd.DataFrame): The first DataFrame (original data).
    - df2 (pd.DataFrame): The second DataFrame (data to be upserted).
    - key_columns (list): A list of two column names on which to match rows.
    - sum_column (str): The numeric column name on which to perform the sum operation.

    Returns:
    - pd.DataFrame: A new DataFrame with upserted rows.
    """
    # Merge df1 and df2 on the specified key columns, using an outer join to keep all rows
    merged_df = pd.merge(df1, df2, on=key_columns, how="outer", suffixes=('_left', '_right'))
    
    # Fill NaN with 0 in the sum columns
    merged_df[sum_column + '_left'].fillna(0, inplace=True)
    merged_df[sum_column + '_right'].fillna(0, inplace=True)
    
    # Sum the columns for overlapping rows
    merged_df[sum_column] = merged_df[sum_column + '_left'] + merged_df[sum_column + '_right']
    
    # Drop the intermediate columns used for summing
    merged_df.drop(columns=[sum_column + '_left', sum_column + '_right'], inplace=True)
    
    return merged_df

In [10]:
class FeatureStore:
    """
    The class is responsible for grabbing data from the SQL Server and transform them to 
    a Customer-Event-Market granularity, indicating the number of selections on each combination.
    """

    def __init__(self,
                 run_datetime = dt.now()
                ):

        self.run_datetime = run_datetime
        self.groupby_cols = ['CustomerKey', 'EventKey', 'EventDeadline', 'MarketKey', 
                             'MarketName', 'EventName', 'LeagueName', 'SportName','ZoneName'
                            ]
        self.selections_col = 'BetSelectionKey'
        self.total_selections_col = "total_selections"
        self.run_datetime_col = "run_datetime"
        self.event_time_colname = "EventDeadline"
        self.output_df_path = './data_lake/feature_store.csv'
        self.connection = pyodbc.connect('Driver={SQL Server};'
                                         'Server=BMA-SQL14-SB09\SB09;'
                                         'Database=SBDW;'
                                         'Trusted_Connection=yes;'
                                         )
        self.feature_store_df = None
        
        self.database_query = """
                              SELECT * FROM [SBDW].[BR].[BetSelectionData] WITH(NOLOCK)
                              """
        
    def extract_from_database(self):
        print("Extracting Data From SQL Server Database")
        self.feature_store_df = pd.read_sql(self.database_query, self.connection)
        print("Finished Extracting Data From SQL Server Database")

    def perform_groupby_operations(self):
        print("Perform groupby operations on dataset: Bring data at the desired granularity level")
        self.feature_store_df_grouped = self.feature_store_df.groupby(self.groupby_cols)[self.selections_col].nunique().reset_index()
        print("Finished performing groupby operations on dataset")
              
    def final_preprocessing(self):
         print("Executing Final Preprocessing: Renaming Columns and creating new columns")
         self.feature_store_df_grouped.rename(columns={self.selections_col:self.total_selections_col}, inplace=True)
         self.feature_store_df_grouped[self.run_datetime_col] = self.run_datetime
         print("Finished Final Preprocessing Step")

    def write_to_lake(self):
        print("Started Writing Data to Lake")
        self.feature_store_df_grouped.to_csv(self.output_df_path, index = False, mode = "w+")
        print("Finished Writing Data to Lake")
        
    def execute_pipeline(self):
        self.extract_from_database()
        self.perform_groupby_operations()
        self.final_preprocessing()
        self.write_to_lake()

### Profiles Utils

In [11]:
def find_favorites(dataframe,
                   major_grouping_col,
                   favorite_grouping_col,
                   ranking_col,
                   ranking_method = "first"
                  ):

    result = dataframe.groupby([major_grouping_col, favorite_grouping_col])[ranking_col].sum().reset_index()

    result['rank'] = (result
                      .groupby(major_grouping_col)[ranking_col]
                      .rank(method=ranking_method, ascending=False)
                      .astype(int)
                      )
    
    result = result[result['rank']==1]

    return result.loc[:, [major_grouping_col, favorite_grouping_col]]

In [12]:
class CustomerProfile:

    def __init__(self,
                ):
        
        self.feature_store_path = "./data_lake/feature_store.csv"
        self.major_grouping_col = "CustomerKey"
        self.ranking_col = "total_selections"
        self.sport_grouping_col = "SportName"
        self.league_grouping_col = "LeagueName"
        self.zone_grouping_col = "ZoneName"
        self.ranking_method = "first"
        self.event_time_colname = "EventDeadline"
        self.customer_profile_dataframe = None
        # create the profile based on events up to a month back from now.
        self.upper_limit_for_datetime_of_events = dt.now()
        self.lower_limit_for_datetime_of_events = dt.now()-timedelta(days=7)
        self.output_df_path = './data_lake/customer_profile.csv'
        self.connection = pyodbc.connect('Driver={SQL Server};'
                                         'Server=BMA-SQL14-SB09\SB09;'
                                         'Database=SBDW;'
                                         'Trusted_Connection=yes;'
                                         )
        self.database_query_for_actives = """
                                          select distinct CustomerKey
                                          from DW.Bet AS b
                                          WHERE b.CouponArrivedDate >= CAST(DATEADD(d, -1, GetDate()) as date)
                                          """
        
    def _find_favorite_sport(self, dataframe):
        favorite_sport_df = find_favorites(dataframe = dataframe,
                                           major_grouping_col = self.major_grouping_col,
                                           favorite_grouping_col = self.sport_grouping_col,
                                           ranking_col = self.ranking_col,
                                           ranking_method = self.ranking_method
                                           )

        return favorite_sport_df

    def _find_favorite_league(self, dataframe):
        favorite_league_df = find_favorites(dataframe = dataframe,
                                            major_grouping_col = self.major_grouping_col,
                                            favorite_grouping_col = self.league_grouping_col,
                                            ranking_col = self.ranking_col,
                                            ranking_method = self.ranking_method
                                            )

        return favorite_league_df

    def _find_favorite_zone(self, dataframe):
        favorite_zone_df = find_favorites(dataframe = dataframe,
                                          major_grouping_col = self.major_grouping_col,
                                          favorite_grouping_col = self.zone_grouping_col,
                                          ranking_col = self.ranking_col,
                                          ranking_method = self.ranking_method
                                          )

        return favorite_zone_df
        
    def get_data_from_lake(self):
        feature_store_df = pd.read_csv(self.feature_store_path)
        return feature_store_df

    def get_actives(self):
        df_actives = pd.read_sql(self.database_query_for_actives, self.connection)
        return df_actives

    def join_actives_and_betting_activity_data(self, feature_store_df, df_actives):
        feature_store_df_actives = feature_store_df.merge(df_actives, on = self.major_grouping_col, how = "inner")
        return feature_store_df_actives
    
    def get_past_info_and_preprocess(self, feature_store_df_actives):
        feature_store_df_actives[self.event_time_colname] = pd.to_datetime(feature_store_df_actives[self.event_time_colname])
        df_processed = feature_store_df_actives[(feature_store_df_actives[self.event_time_colname]<=self.upper_limit_for_datetime_of_events)
                                                &(feature_store_df_actives[self.event_time_colname]>=self.lower_limit_for_datetime_of_events)
                                                ]

        return df_processed
        
    def create_customer_profile_dataframe(self, df_processed):
        favorite_sport_df = self._find_favorite_sport(df_processed)
        favorite_league_df = self._find_favorite_league(df_processed)
        favorite_zone_df = self._find_favorite_zone(df_processed)
        customer_profile_dataframe = (favorite_sport_df
                                       .merge(favorite_league_df, on = self.major_grouping_col, how = "inner")
                                       .merge(favorite_zone_df, on = self.major_grouping_col, how = "inner")
                                      )

        return customer_profile_dataframe
        
    def write_to_lake(self):
        self.customer_profile_dataframe.to_csv(self.output_df_path, index = False)


    def execute_pipeline(self):
        feature_store_df = self.get_data_from_lake()
        df_actives = self.get_actives()
        feature_store_df_actives = self.join_actives_and_betting_activity_data(feature_store_df = feature_store_df, df_actives=df_actives)
        df_processed = self.get_past_info_and_preprocess(feature_store_df_actives)
        self.customer_profile_dataframe = self.create_customer_profile_dataframe(df_processed)
        self.write_to_lake()

    

In [13]:
class EventProfile:
    
    def __init__(self,
                ):

        self.events_df = None
        self.connection = pyodbc.connect('Driver={SQL Server};'
                                         'Server=BMA-SQL14-SB09\SB09;'
                                         'Database=SBDW;'
                                         'Trusted_Connection=yes;'
                                         )

        # query for tomorrow's events, so that only those are recommended.
        self.database_query = """
                              select events.EventKey,  
                              events.EventName,
                              events.EventDeadline,
                              sport.CategoryName as SportName, 
                              league.SubcategoryName as LeagueName,
                              zone.SubCategoryGroupName as ZoneName
                              from DW.Event as events
                              inner join DW.SubCategory as league WITH(NOLOCK)
                              on events.SubCategoryKey = league.SubCategoryKey
                              inner join DW.Category as sport WITH(NOLOCK)
                              on sport.CategoryKey = league.CategoryKey
                              inner join DW.SubCategoryGroup as zone WITH(NOLOCK)
                              on league.SubCategoryGroupKey = zone.SubCategoryGroupKey
                              where EventDeadline >= CAST(DATEADD(d, 1, GetDate()) as date)
                              and EventDeadline <= CAST(DATEADD(d, 2, GetDate()) as date)
                              and sport.CategoryName in ('Football', 'Basketball', 'Tennis')
                              and league.SubcategoryName not like 'Test%'
                              and zone.SubCategoryGroupName not in ('FIFA', 'NBA2K')
                              and EventTypeKey = 2;
                              """
        
        self.output_df_path = './data_lake/event_profile.csv'

    def extract_from_database(self):
        print("Extracting Data From SQL Server Database")
        self.events_df = pd.read_sql(self.database_query, self.connection)
        print("Finished Data From SQL Server Database")

    
    def preprocessing(self):
        pass

    
    def write_to_lake(self):
        print("Started Writing Data to Lake")
        self.events_df.to_csv(self.output_df_path, index = False, mode='w+')
        print("Finished Writing Data to Lake")


    def execute_pipeline(self):
        self.extract_from_database()
        self.preprocessing()
        self.write_to_lake()
    

In [10]:
class RecommendationEngine:

    def __init__(self):
        self.nlp = spacy.load("en_core_web_md")
        self.customer_profiles_path = "./data_lake/customer_profile.csv"
        self.event_profiles_path = "./data_lake/event_profile.csv"
        self.league_name_colname = 'LeagueName'
        self.zone_name_colnmame = 'ZoneName'
        self.sport_name_colname = "SportName"
        self.customer_key_colname = 'CustomerKey'
        self.event_key_colname = 'EventKey'
        self.similarity_colname = "similarity"
        self.rank_colname = "rank"
        self.run_datetime_colname = "run_datetime"
        self.grouping_profile_cols = [self.sport_name_colname, self.league_name_colname, self.zone_name_colnmame]
        self.similarity_league_weight = 0.4
        self.similarity_zone_weight = 0.6
        self.top_n_recommendations_to_provide = 3
        self.run_datetime = dt.now()
        self.df_final_similarities = None
        self.output_df_path = './data_lake/customer_event_recommendations.csv'
    
    def get_event_and_customer_profiles(self):
        self.customer_profiles = pd.read_csv(self.customer_profiles_path)
        self.event_profiles = pd.read_csv(self.event_profiles_path)
        
    def create_event_and_customer_profile_groups(self):
        self.customer_profile_elements  = (self.customer_profiles
                                          .groupby(self.grouping_profile_cols)
                                          .agg(customers_list = (self.customer_key_colname , 'unique'))
                                          .reset_index()
                                          )
        
        self.event_profile_elements = (self.event_profiles
                                      .groupby(self.grouping_profile_cols)
                                      .agg(events_list = (self.event_key_colname , 'unique'))
                                      .reset_index()
                                      )
        
    def calculate_similarities(self):
        list_of_dataframes = []
        for index_customer, row_customer in self.customer_profile_elements.iterrows():
            print("Running loop: ", index_customer, "/", len(self.customer_profile_elements), "customer profiles completed")
            df_similarities = pd.DataFrame()
            events_to_check = self.event_profile_elements[self.event_profile_elements[self.sport_name_colname]==row_customer[self.sport_name_colname]]
            for index_event, row_event in events_to_check.iterrows():
                customer_league_text = row_customer[self.league_name_colname]
                customer_zone_text = row_customer[self.zone_name_colnmame]
                event_league_text = row_event[self.league_name_colname]
                event_zone_text = row_event[self.zone_name_colnmame]
            
                similarity_league = self.similarity_league_weight*self.nlp(customer_league_text).similarity(self.nlp(event_league_text))
                similarity_zone = self.similarity_zone_weight*self.nlp(customer_zone_text).similarity(self.nlp(event_zone_text))
                total_similarity = similarity_league + similarity_zone
                
                row_dict = {self.customer_key_colname: row_customer['customers_list'],
                            self.event_key_colname: row_event['events_list'],
                            self.similarity_colname: total_similarity
                           }
                
                df_similarities = df_similarities._append(row_dict, ignore_index=True)
            list_of_dataframes.append(df_similarities)
        
        self.df_final_similarities = pd.concat(list_of_dataframes)
        
    def final_preprocessing(self):
        self.ranking_dataframe = self.df_final_similarities.explode(self.customer_key_colname).explode(self.event_key_colname)
        self.ranking_dataframe[self.customer_key_colname] = self.ranking_dataframe[self.customer_key_colname].astype("int")
        self.ranking_dataframe[self.rank_colname] = (self.ranking_dataframe
                                                      .groupby(self.customer_key_colname)[self.similarity_colname]
                                                      .rank(method="first", ascending=False)
                                                      .astype(int)
                                                      )
        
        self.ranking_dataframe = (self.ranking_dataframe[self.ranking_dataframe[self.rank_colname]<=self.top_n_recommendations_to_provide]
                                 .sort_values([self.customer_key_colname, self.rank_colname], ascending=[True, True])
                                 .reset_index(drop=True)
                                )
        
        self.ranking_dataframe[self.run_datetime_colname] = self.run_datetime
        
    def write_to_lake(self):
        self.ranking_dataframe.to_csv(self.output_df_path, index = False, mode='w+')
        
    def execute_pipeline(self):
        self.get_event_and_customer_profiles()
        self.create_event_and_customer_profile_groups()
        self.calculate_similarities()
        self.final_preprocessing()
        self.write_to_lake()