In [0]:
#%run ../utils/nginx_commons

In [0]:
class NGINXDataTransform():
  """
  transform, clean, and process data in preparation for nginx propensity modeling
  write data to feature store to track features
  read training, scoring, and evaluation data from feature store
  """
  
  def __init__(self
               , train_dates=["2020-01-01","2021-01-01"]
               , score_dates=["2022-01-01"]
               , dv="next_qtr_nginx_purchase"
               , env="dev"
               , primary_keys = ["account_id","calendar_date"]
               , database_prefix="mlops_nginx_example_"
               , raw_training_table_name="nginx_raw_training_data"
               , raw_validation_table_name="nginx_raw_validation_data"
               , preprocessed_training_table_name="nginx_preprocessed_training_data"
               , preprocessed_validation_table_name="nginx_preprocessed_validation_data"
               , raw_scoring_table_name="nginx_raw_scoring_data"
               , preprocessed_scoring_table_name="nginx_preprocessed_scoring_data"
               , training_fs_update=True
               , scoring_fs_update=True
               , limit=5000
              ):
    self.env = env
    self.limit = limit
    self.dv = dv
    self.train_dates = train_dates
    self.score_dates = score_dates
    self.primary_keys = primary_keys
    self.training_fs_update = training_fs_update
    self.scoring_fs_update = scoring_fs_update
    
    # define db name & table names
    self.database_prefix = database_prefix
    self.raw_training_table_name = raw_training_table_name
    self.raw_validation_table_name = raw_validation_table_name
    self.preprocessed_training_table_name = preprocessed_training_table_name
    self.preprocessed_validation_table_name = preprocessed_validation_table_name
    self.raw_scoring_table_name = raw_scoring_table_name
    self.preprocessed_scoring_table_name = preprocessed_scoring_table_name
    
    if(training_fs_update | scoring_fs_update):
      # vars related to ede, used to write raw_training/socring data to ede, only for dev/qa env 
      if env == 'dev' or env == 'qa':
        self.ede_schema = f"{self.env}_DATA_SCIENCE"
        self.ede_pipeline = EDEPipeline(ede_schema = self.ede_schema)
        
      self.effective_date = str(date.today())  # initialize today as another primary key
      self.update_feature_store(self.train_dates, self.score_dates)  # update the feature store
    
  def load_dv(self, dates):
    dv_stmt = f"select calendar_date, account_id, {self.dv} from EXP_MKTG.DATA_SCIENCE.NGINX_TARGET WHERE CALENDAR_DATE IN {(str(dates).replace('[','(').replace(']',')'))}"
    dv_df = pysh.safe_name(dbh.read_ede_data(dv_stmt)) # read data from EDE and safe name
    dv_df = cast_col_type(dv_df, cols=[self.dv], new_col_type="boolean") # int to boolean dtype conversion
    return dv_df
  
  def load_raw_daily_snapshot_data(self, dates):
    """
    extract or create all feature columns for set of dates from calendar_dataset, mintigo_target, latest years purchase qty/spend info from EDE
    Safe naming (lower case) of all columns
    
    """
    
    ## to save time while testing, specify features
    raw_predictors = ['pct_leads_persona_level_total_past_3mo_individual_contributor', 'pct_leads_persona_function_past_3mo_network_admin_netops', 'pct_leads_persona_function_past_3mo_app_dev_devops', 'pct_leads_persona_function_past_12mo_network_admin_netops', 'pct_leads_persona_level_total_past_12mo_individual_contributor', 'pct_leads_persona_function_past_12mo_app_dev_devops', 'pct_leads_persona_function_past_12mo_information_security', 'pct_leads_persona_level_total_past_12mo_sr__manager_manager', 'pct_leads_persona_function_past_12mo_ciso_cio', 'pct_leads_persona_level_total_past_6mo_sr__director_director', 'leads_persona_total_past_3mo', 'leads_persona_total_past_12mo', 'num_person', 'last_6_month_gtm_campaign_nginx', 'last_6_month_gtm_campaign_secure_digital_exp', 'last_3_month_gtm_campaign_enable_modern_app', 'last_6_month_gtm_campaign_legacy', 'last_3months_digital_engagements_count', 'last_3months_web_visits', 'web_visits_per_person', 'last_6_month_gtm_campaign_service_provider', 'is_unix_linux_user', 'database_positions', 'has_intent_technology__data_center', 'is_data_warehouse_user', 'has_secured_connection', 'has_security_certification', 'has_intent_technology__data_center_and_managed_hosting', 'has_intent_technology__security_consulting', 'has_intent_technology__application_security', 'has_intent_technology__security', 'has_intent_technology__it_management', 'has_bigdata_solution', 'is_nosql_db_related_hiring', 'has_intent_technology__archiving', 'has_intent_technology__threat_hunting', 'has_mobile_website', 'has_blog', 'is_javascript_user', 'has_intent_technology__software_defined_data_center', 'has_intent_marketing__marketo', 'has_sql_db', 'has_education_titles', 'has_intent_technology__mobile_security', 'has_intent_technology__wireless_communications', 'total_csp_subtype_code', 'total_3_performance_degraded_severity_code', 'total_closed_sub_status', 'total_3_medium_priority_code', 'total_closed_status', 'total_sr_resolved', 'total_esrp_area', 'total_app_conn_proxy_area', 'total_ihealth_subtype_code', 'total_app_conn_svc_ctr_area', 'total_sdm_svc_provider_hot_ind', 'total_open_status', 'total_security_area', 'total_spend_brand_big_ip', 'last_5years_total_spend_forecast_group_traffic_management', 'total_spend_product_line_service', 'lifetime_months_complete_sales_cycle', 'total_spend_product_family_security', 'last_6months_std_booking_amount', 'total_renewal_qty_purchased', 'fiscal_qtr_code', 'q1_closed_opp_percentage', 'q2_closed_opp_percentage', 'q3_closed_opp_percentage', 'q4_closed_opp_percentage']
    
    # get calendar dataset daily snapshot data from EDE for specified dates
    daily_snapshot_select_stmt = f"select * from EXP_MKTG.DATA_SCIENCE.CALENDAR_DATASET WHERE CALENDAR_DATE IN {(str(dates).replace('[','(').replace(']',')'))}"
    daily_snapshot_df = pysh.safe_name(dbh.read_ede_data(daily_snapshot_select_stmt))
    
    # isolate old marketing features 
    old_marketing_select_stmt = f"select * from exp_mktg.data_science.marketing_target WHERE CALENDAR_DATE IN {(str(dates).replace('[','(').replace(']',')'))}"
    old_marketing_select_df = (type_to_type(pysh.safe_name(dbh.read_ede_data(old_marketing_select_stmt)), 'decimal', 'double'))
    old_marketing_cols = [col for col in old_marketing_select_df.columns if col not in self.primary_keys]

    # remove date/timestamp cols, cols related to DVs, and old marketing features
    remove_cols = [col for col, dtype in daily_snapshot_df.dtypes if dtype in ('date','timestamp') or 'next' in col or 'country' in col or 'state' in col] + old_marketing_cols

    daily_snapshot_df = daily_snapshot_df.drop(*remove_cols)

    
    # get mintigo dataset data from EDE for specified dates
    mintigo_select_stmt = f"select * from EXP_MKTG.DATA_SCIENCE.MINTIGO_TARGET WHERE CALENDAR_DATE IN {(str(dates).replace('[','(').replace(']',')'))}"
    mintigo_df = (pysh.safe_name(dbh.read_ede_data(mintigo_select_stmt))
                  .withColumnRenamed("salesforce_account_id","account_id")
                 )
    remove_cols = [col for col, dtype in mintigo_df.dtypes if (dtype=='string' or dtype=='date') and col not in (self.primary_keys)]
    mintigo_df = mintigo_df.drop(*remove_cols)
    
    # get marketing feature data from EDE for specified dates
    marketing_select_stmt = f"select * from exp_mktg.data_science.marketing_feature_account_target WHERE CALENDAR_DATE IN {(str(dates).replace('[','(').replace(']',')'))}"
    marketing_df = pysh.safe_name(dbh.read_ede_data(marketing_select_stmt))
    
    # get last X years spend and quantity data from EDE for specified dates
    last_x_years_purchase_select_stmt = f"select * from EXP_MKTG.DATA_SCIENCE.PRODUCT_LIFETIME_AND_LAST_YEARS_TARGET WHERE CALENDAR_DATE IN {(str(dates).replace('[','(').replace(']',')'))}"
    last_x_years_purchase_df = pysh.safe_name(dbh.read_ede_data(last_x_years_purchase_select_stmt))
    
    # get dv
    dv_df = self.load_dv(dates)
    
    # get features for modeling that are natively in datasets
    marketing_features = marketing_df.columns 
    raw_daily_snapshot_features = [col for col in daily_snapshot_df.columns if col not in marketing_features]
    raw_mintigo_features = mintigo_df.columns
    last_x_years_purchase_features = last_x_years_purchase_df.columns
    
    # join all datasets together 
    df = (daily_snapshot_df
          .select(self.primary_keys+raw_daily_snapshot_features)
          .join(other=(mintigo_df
                       .select(self.primary_keys+raw_mintigo_features)
                      )
                , on=self.primary_keys
                , how="left"
               )
          .join(other=marketing_df.select(self.primary_keys + marketing_features)
                , on=self.primary_keys
                , how="left"
               )
          .join(other=last_x_years_purchase_df.select(self.primary_keys + last_x_years_purchase_features)
                , on=self.primary_keys
                , how="left"
               )
          .select(self.primary_keys+raw_predictors) # remove after testing
          .join(other=dv_df
                , on=self.primary_keys
                , how="left"
               )
         )

    return df
  
  
  def handle_decimal_types(self, df):
    """change all decimal data types to double data type"""
    return type_to_type(df, 'decimal', 'double')

    
  
  def get_accounts_to_remove(self):
    """
    Return df of account ids that represent a known population of accounts that should not be part of modeling based on name conventions 
    Such as F5 test accounts, nginx, marketplace, dev central accounts etc.
    And population of accounts that haven't made an active purchase in last 5 years.
    """
    acct_firma_select_stmt = f"select * from PRD_ENT_RAW.SALESFORCE.ACCOUNT"
    acct_firma_df = pysh.safe_name(dbh.read_ede_data(acct_firma_select_stmt))
    
    remove_accts = (acct_firma_df
                    .filter((F.lower(F.col("name")).contains("(marketplace)"))
                            | (F.lower("name").contains("nginx"))
                            | (F.lower("name").contains("f5"))
                            | (F.col("account_org_name_c").isin("Sales Ops Test","M2S Test Account"))
                           )
                    .select("id")
                    .withColumnRenamed("id","account_id")
                   )
    
    return remove_accts
  

    
  
  def preprocess_data(self, df):
    """preprocess data such as null handling, type conversion"""
    
    # convert decimal types to double
    df = self.handle_decimal_types(df) 
    
    # list of columns
    raw_features = [col for col in df.columns if col not in self.primary_keys]
    med_replace_list = ['avg_days_between_product_opps',
                       'avg_days_between_renewal_opps',
                       'avg_days_between_opps',
                       'avg_days_complete_product_sales_cycle',
                       'avg_days_complete_renewal_sales_cycle',
                       'avg_days_complete_sales_cycle',
                       'avg_time_between_successes',
                       'avg_days_resolve_sr',
                       'avg_days_close_sr',
                       'avg_days_til_first_response',
                       'avg_sr_status_chg',
                       'avg_times_in_queue',
                       'days_since_last_opened_sr',
                       'days_since_last_resolved_sr',
                       'days_since_last_closed_sr',
                       'months_since_last_opened_sr',
                       'months_since_last_resolved_sr',
                       'months_since_last_closed_sr']
    
    
    max_replace_list = ['days_since_last_product_purchase',
                       'months_since_last_product_purchase',
                       'days_since_last_renewal_purchase',
                       'months_since_last_renewal_purchase',
                       'days_since_last_purchase',
                       'months_since_last_purchase',
                       'days_since_first_product_purchase',
                       'months_since_first_product_purchase',
                       'days_since_first_renewal_purchase',
                       'months_since_first_renewal_purchase',
                       'days_since_first_purchase',
                       'months_since_first_purchase',
                       'days_since_last_offline_success',
                       'days_since_last_online_success',
                       'days_since_last_security_success',
                       'days_since_last_eng',
                       'avg_time_between_successes',
                       'days_since_first_swp_purchase',
                       'days_since_latest_swp_purchase',
                       'months_since_first_swp_purchase',
                       'months_since_latest_swp_purchase',
                       'days_since_first_awf_silverline_purchase',
                       'days_since_latest_awf_silverline_purchase',
                       'months_since_first_awf_silverline_purchase',
                       'months_since_latest_awf_silverline_purchase',
                       'months_since_first_vpr_purchase',
                       'months_since_latest_vpr_purchase']
    
    categorical_list = ['account_type','theater','industry_grouping','industry','revenue_segment','fiscal_period_desc','fiscal_qtr_code','org_industry']
    
    ftrs_med_replace_list = [col for col in raw_features if col in med_replace_list]
    ftrs_max_replace_list = [col for col in raw_features if col in max_replace_list]
    ftrs_cat_custom_list = [col for col in raw_features if col in categorical_list]
    ftrs_zero_replace = [self.dv] + list(set(raw_features) - set(max_replace_list) - set(med_replace_list) - set(categorical_list))
    
    # null replacement,  one hot encoding, int-->boolean dtype of categoricals
    processed_df = null_replace(df, cols=ftrs_cat_custom_list, replace_type='custom',custom='None')
    processed_df = one_hot_encoding(processed_df, cols=ftrs_cat_custom_list)
    int_to_bool = [col for col in processed_df.columns for cat in ftrs_cat_custom_list if cat in col]
    processed_df = cast_col_type(processed_df, cols=int_to_bool, new_col_type="boolean")
    
    # median, max, and zero null replace
    processed_df = null_replace(processed_df, cols=ftrs_med_replace_list, replace_type='median')
    processed_df = null_replace(processed_df, cols=ftrs_max_replace_list, replace_type='max')
    processed_df = null_replace(processed_df, cols=ftrs_zero_replace, replace_type='zero')
    
    # safe name
    processed_df = pysh.safe_name(processed_df)
    
    # remove string features or original categorical features
    remove_cols = [col for col,dtype in processed_df.dtypes if dtype=='string' and col not in self.primary_keys]
    processed_df = processed_df.drop(*remove_cols)
    
    return processed_df
    
  def train_validation_split(self, df):
    """
    Split dataframe into training and validation frames
    Assumes both X and y dataframes have same dates
    Validation frame uses last date in list of calendar dates
    Validation frame uses 20% randomized set of accounts
    
    Return 2 data frames: (training_df, validation_df)
    """

    # split accounts into train & validation sets
    validation_accts = (df
                        .select("account_id")
                        .dropDuplicates()
                        .sample(withReplacement=False, fraction=0.2, seed=123)
                       )

    # # split calendar dates into train and validation sets 
    calendar_dates = [r['calendar_date'] for r in df.select("calendar_date").dropDuplicates().collect() if r['calendar_date'] != '2022-07-01'] # get available calendar dates
    calendar_dates.sort() # order calendar dates asc
    validation_dates = calendar_dates.pop(-1) # use latest date as validation date
    training_dates = calendar_dates # use remaining dates as train dates

    # # create training and validation dataframe
    training_df = (df
                     .filter(F.col("calendar_date").isin(training_dates))
                     .join(other=validation_accts
                           , on=["account_id"]
                           , how="left_anti"
                          )
                    )
    validation_df = (df
                       .filter(F.col("calendar_date") == validation_dates )
                       .join(other=validation_accts
                           , on=["account_id"]
                           , how="inner"
                          )
                      )  

    return  (training_df, validation_df)
  
  def update_training_and_validation_fs_table(self, dates, table_name=('raw_train','preprocessed_train', 'raw_validation', 'preprocessed_validation')):
    """
    update raw & pre processed training and validation feature store table with features and label
    """
    # get accounts to remove 
    remove_accts_df = self.get_accounts_to_remove()
    
    # load raw data
    raw_df = (self.load_raw_daily_snapshot_data(dates)
              .withColumn("effective_date", F.lit(self.effective_date))
              .join(other=remove_accts_df
                    , on=["account_id"]
                    , how="left_anti"
                   )
             )
    
    # preprocess raw data
    preprocessed_df = (self.preprocess_data(raw_df)
                       .withColumn("effective_date", F.lit(self.effective_date))
                      )
    
    # train & validation split on preprocessed data
    preprocessed_training_df, preprocessed_validation_df = self.train_validation_split(df=preprocessed_df)
    
    # match train & validation split populations on raw data
    raw_training_df = (raw_df
                       .join(other=(preprocessed_training_df
                                    .select(self.primary_keys)
                                    .dropDuplicates()
                                   )
                             , on=self.primary_keys
                             , how="inner"
                            )
                       
                      )
    
    raw_validation_df = (raw_df
                       .join(other=(preprocessed_validation_df
                                    .select(self.primary_keys)
                                    .dropDuplicates()
                                   )
                             , on=self.primary_keys
                             , how="inner"
                            )
                         
                      )

    
    # write raw and processed training and validation data to fs
    self.write_data_to_fs(raw_training_df, table_name[0])
    # write raw data to ede
    if (self.env != "prod"):
      self.ede_pipeline.write_to_ede(df=raw_training_df
                                     , mode="overwrite"
                                     , ede_table_name=table_name[0].split(".")[1]
                                    )
    self.write_data_to_fs(preprocessed_training_df, table_name[1])
    self.write_data_to_fs(raw_validation_df, table_name[2])
    self.write_data_to_fs(preprocessed_validation_df, table_name[3])
    
  
  def update_scoring_fs_table(self, dates, table_name=('raw_score','preprocessed_score')):
    """
    update scoring feature store table with features only 
    this is an example: in reality, it should be different from training fs
    """
    
    # get accounts to remove 
    remove_accts_df = self.get_accounts_to_remove()
    
    # load raw data
    scoring_data = (self.load_raw_daily_snapshot_data(dates)
                    .join(other=remove_accts_df
                          , on=["account_id"]
                          , how="left_anti"
                         )
                    .withColumn("effective_date", F.lit(self.effective_date))
                   )
    # preprocess raw data
    preprocessed_scoring_data = self.preprocess_data(scoring_data)
    # drop dv from raw and processed data
    scoring_data = (scoring_data
                   .drop(self.dv) #remove dv field
                   )
    preprocessed_scoring_data = (preprocessed_scoring_data
                                 .withColumn("effective_date", F.lit(self.effective_date))
                                 .drop(self.dv) #remove dv field
                                )
    
    # write raw and processed data to fs
    self.write_data_to_fs(scoring_data, table_name[0])
    # write raw data to ede
    if (self.env != "prod"):
      self.ede_pipeline.write_to_ede(df=scoring_data
                                     , mode="overwrite"
                                     , ede_table_name=table_name[0].split(".")[1]
                                    )
    self.write_data_to_fs(preprocessed_scoring_data, table_name[1])
    
  def write_data_to_fs(self, df, table_name, description=''):
    """
    write df to feature store
    """ 
    # create database if not existing
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.database_prefix+self.env}")
    
    fs = FeatureStoreClient()
    try:
      print("Reading in table " + table_name)
      df_fs = fs.read_table(table_name)
      df_merge = unify_col_type(df_fs, df)  # unify col type before merge
      # when the table exists, merge it
      fs.write_table(
        name=table_name,
        df=df_merge,
        mode="merge")
    except:
      print("creating table")
      # when not exists, create the table
      fs.create_table(
        name=table_name,
        primary_keys=['calendar_date', 'account_id','effective_date'],
        df = df,
        description='nginx data')
      
      
  def update_feature_store(self, train_dates, score_date):
    """
    update the training and scoring feature score table
    """


    # update training data
    if self.training_fs_update:
      self.update_training_and_validation_fs_table(table_name = (f"{self.database_prefix + self.env}.{self.raw_training_table_name}"
                                                                 , f"{self.database_prefix + self.env}.{self.preprocessed_training_table_name}"
                                                                 , f"{self.database_prefix + self.env}.{self.raw_validation_table_name}"
                                                                 , f"{self.database_prefix + self.env}.{self.preprocessed_validation_table_name}"
                                                                )
                                                   , dates=train_dates
                                                  )
    
    if self.scoring_fs_update:
      self.update_scoring_fs_table(table_name = (f"{self.database_prefix + self.env}.{self.raw_scoring_table_name}"
                                                 , f"{self.database_prefix+self.env}.{self.preprocessed_scoring_table_name}"
                                                )
                                   , dates=score_date
                                  )
    


  
  def get_training_data(self, predictors, is_raw=False, primary_keys=['calendar_date','account_id']):
    """
    read training data from fs
    returns X and y pyspark dataframes
    """
    
    ### Add call to preprocessing 
    
    fs = FeatureStoreClient()
    if is_raw==True:
      table_name = f"{self.database_prefix + self.env}.{self.raw_training_table_name}"
    if is_raw==False:
      table_name = f"{self.database_prefix + self.env}.{self.preprocessed_training_table_name}"
    
    latest_effective_date = (spark.sql(f"select max(effective_date) as latest_effective_date from {table_name}")
                             .collect()[0]["latest_effective_date"])
    
    # extract the training data on the latest_effective_date
    training_data = fs.read_table(table_name).filter(F.col("effective_date") == latest_effective_date)
    return training_data.select(primary_keys+predictors), training_data.select(primary_keys+[self.dv])  # return X, y as dataframe
  
  def get_validation_data(self, predictors, is_raw=False, primary_keys=['calendar_date','account_id']):
    """
    read validation data from fs
    return X, y pyspark dataframes
    """
    
    ### Add call to preprocessing 
    
    fs = FeatureStoreClient()
    if is_raw==True:
      table_name = f"{self.database_prefix + self.env}.{self.raw_validation_table_name}"
    if is_raw==False:
      table_name = f"{self.database_prefix + self.env}.{self.preprocessed_validation_table_name}"
    
    latest_effective_date = (spark.sql(f"select max(effective_date) as latest_effective_date from {table_name}")
                             .collect()[0]["latest_effective_date"])
    
    # extract the training data on the latest_effective_date
    validation_data = fs.read_table(table_name).filter(F.col("effective_date") == latest_effective_date)
    return validation_data.select(primary_keys+predictors), validation_data.select(primary_keys+[self.dv])  # return X, y as dataframe
  
  
  def get_scoring_data(self, predictors, date, is_raw=False, primary_keys=['calendar_date','account_id']):
    """
    read scoring data from fs
    """
    
    fs = FeatureStoreClient()
    if is_raw==True:
      table_name = f"{self.database_prefix + self.env}.{self.raw_scoring_table_name}"
    if is_raw==False:
      table_name = f"{self.database_prefix+self.env}.{self.preprocessed_scoring_table_name}"
    
    latest_effective_date = (spark.sql(f"select max(effective_date) as latest_effective_date from {table_name} where calendar_date = '{date}'")
                             .collect()[0]["latest_effective_date"])
    
    # extract the training data on the latest_effective_date
    scoring_data = (fs
                    .read_table(table_name)
                    .filter(F.col("effective_date") == latest_effective_date)
                    .filter(F.col("calendar_date")==date)
                   )
    
    # check existence of predictors in score df
    unexisting_columns = [col for col in predictors if col not in scoring_data.columns]
    print("Cols missing from score data: ",unexisting_columns)
    existing_predictors = [col for col in predictors if col not in unexisting_columns]
    return scoring_data.select(primary_keys+existing_predictors)
  
  
  def get_evaluation_data(self, is_raw=False, primary_keys=['calendar_date','account_id']):
    """
    read evaluation data from fs, including all features to evaluate different models
    """
    fs = FeatureStoreClient()
    if is_raw==True:
      validation_table_name = f"{self.database_prefix + self.env}.{self.raw_validation_table_name}"
      training_table_name = f"{self.database_prefix + self.env}.{self.raw_training_table_name}"
    if is_raw==False:
      validation_table_name = f"{self.database_prefix + self.env}.{self.preprocessed_validation_table_name}"
      training_table_name = f"{self.database_prefix + self.env}.{self.preprocessed_training_table_name}"
      
    
    validation_latest_effective_date = (spark.sql(f"select max(effective_date) as latest_effective_date from {validation_table_name}")
                             .collect()[0]["latest_effective_date"])
    training_latest_effective_date = (spark.sql(f"select max(effective_date) as latest_effective_date from {training_table_name}")
                             .collect()[0]["latest_effective_date"])
    
    # extract the training and evaluation data on the latest_effective_date
    validation_evaluation_data = fs.read_table(validation_table_name).filter(F.col("effective_date") == validation_latest_effective_date)
    training_evaluation_data = fs.read_table(training_table_name).filter(F.col("effective_date") == training_latest_effective_date)
    
    
    return training_evaluation_data, validation_evaluation_data
    
  
  
  
  