In [0]:
#Importing All the libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
from datetime import datetime, timedelta
import pyspark.sql.functions as f
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import *
from effodata import ACDS, golden_rules, Sifter, Equality, Joiner
import kayday as kd
from toolbox.config import segmentation
from kpi_metrics import KPI, AliasMetric, CustomMetric, AliasGroupby, available_metrics, get_metrics
import seg
from seg.utils import DateType
from flowcate.files import FilePath
import toolbox.config as con
import pyspark.sql.functions as f
import pyspark.sql.types as t
import datetime as dt


In [0]:
#Getting Eligiblity Table

eligibility_path = FilePath('abfss://landingzone@sa8451entlakegrnprd.dfs.core.windows.net/mart/comms/prd/fact/eligibility_fact')
latest_eligibility_df = spark.read.parquet(eligibility_path.find_latest_file()) #This will give us the Dates we are using in.
print(eligibility_path.find_latest_file())


### Getting all the Segmentation

In [0]:
def get_all_segmentation_name():
    '''
    Get all the Segmentation names
    
    '''
    segmentation_names = con.segmentations.all_segmentations

    return segmentation_names

In [0]:
def get_all_columns(segmentation_names):
    '''
    Input: Segmentation names
    Output: Final Dataframe with all segmentation
    '''

    my_schema = t.StructType([
        t.StructField("EHHN", t.StringType(), True),
        t.StructField("SEGMENT", t.StringType(), True),
        t.StructField("SEGMENTATION", t.StringType(), True),
        t.StructField("FRONTEND_NAME", t.StringType(), True),
        t.StructField("PROPENSITY", t.StringType(), True),
        t.StructField("SEGMENT_TYPE", t.StringType(), True),
        t.StructField("PERCENTILE_SEGMENT", t.StringType(), True)
    ])
    df = spark.createDataFrame([], schema=my_schema)
    leftover_segment = []
    for segmentations in segmentation_names:
          try:
            segment = con.segmentation(segmentations)
            frontend_name = segment.frontend_name
            propensities = segment.propensities
            final_propensity = "".join(propensities)
            type_of_segment = segment.segment_type
            percentile_of_segment = segment.type
            latest_file = segment.files[-1]
            reading_file = segment.directory + latest_file
            segment_file = spark.read.format("delta").load(reading_file)

            segment_file = (
                  segment_file.withColumn("SEGMENTATION", f.lit(segmentations))
                  .withColumn('FRONTEND_NAME', f.lit(frontend_name))
                  .withColumn("PROPENSITY", f.lit(final_propensity))
                  .withColumn("SEGMENT_TYPE", f.lit(type_of_segment))
                  .withColumn("PERCENTILE_SEGMENT", f.lit(percentile_of_segment))
            )
            
            segment_file = segment_file.filter(f.col("SEGMENT").isin(segment.propensities))
            segment_file = segment_file.select("EHHN", "SEGMENT", "SEGMENTATION", "FRONTEND_NAME",
                                                "PROPENSITY", "SEGMENT_TYPE", "PERCENTILE_SEGMENT")
            df = df.union(segment_file)
          
          except Exception as e:
                leftover_segment.append(str(e))
                pass

    return df, leftover_segment

In [0]:
segmentation_name = get_all_segmentation_name()
final_df, leftover_seg = get_all_columns(segmentation_names=segmentation_name)
segmentation_name_df = spark.createDataFrame([row for row in [(value,) for value in segmentation_name]], ["SEGMENTATION"])

In [0]:
#If the result is empty then we are getting all the segmentation if not then we are missing one.

display(segmentation_name_df.join(final_df, on='SEGMENTATION', how='left_anti')
        .select('segmentation').distinct())

In [0]:
display(final_df.sort('EHHN'))

In [0]:
## Frank's Edits
### need to rename df, this is too generic for troubleshooting later and might overwrite things that were named df before;

#Heather's Old Logic (2+ year old)
# select divis,
# kpm_two,
# EM_ELIG,
# ecommerce_flag,
# count(distinct ehhn)
# from (select a.*, case when tdc_eligible_flag = 'Y' or sse_eligible_flag = 'Y' or ffd_eligible_flag = 'Y' or email_eligible_flag ='Y' or cba_eligible_flag ='Y' or
# mobile_eligible_flag='Y' or has_digital_account_flag='Y' or pinterest_eligible_flag ='Y' then 'Y' else 'N' end as KPM_ELIG,
# case when sse_eligible_flag = 'Y' or ffd_eligible_flag = 'Y' then 'Y' else 'N' end as EM_ELIG,
# case when preferred_store_division is null or preferred_store_division = 'DEFAULT_HEIRARCHY' then last_store_shop_divison else preferred_store_division end as divis,
# case when facebook_flag ='Y' or ecommerce_flag='Y' or sse_eligible_flag = 'Y' or ffd_eligible_flag = 'Y' then 'Y' else 'N' end as kpm_two
# from  MKT_CM.ELIGIBILITY_FACT a
# where date_id = to_date(20220206,'YYYYMMDD')
# and last_shop_date >= to_date(20200210,'YYYYMMDD'))
 
# group by divis,
# kpm_two,
# EM_ELIG,
# ecommerce_flag;
# Heather's logic:

## put these in the story:

# Heather's Comments: 1) we need to X.98 for the result we get for the reason of LT holdout
# we dont usually (we never?) target a shopper that has not been in a Kroger store in the past year - if last shoppeed date is outside of the the current year, we deem the customer as non-active/non-eligible (Frank cannot get to this)

eligibility_analytical_dataset = final_df.join(latest_eligibility_df, on="ehhn", how='left')
### eligibility flags (onsite, offsite, overall)

### overall logic:

from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType

# data = [("John", "Y", "N", "Y"),
#         ("Alice", "N", "N", "N"),
#         ("Bob", "Y", "Y", "Y")]

# # Create DataFrame
# df = spark.createDataFrame(data, ["Name", "Col1", "Col2", "Col3"])

column_names = ['NATIVE_ELIGIBLE_FLAG','TDC_ELIGIBLE_FLAG','SSE_ELIGIBLE_FLAG','FFD_ELIGIBLE_FLAG','EMAIL_ELIGIBLE_FLAG','PUSH_FLAG',
                'FACEBOOK_FLAG','PANDORA_FLAG','CHICORY_FLAG','PUSH_FLAG','PREROLL_VIDEO_ELIGIBLE_FLAG','PINTEREST_ELIGIBLE_FLAG','ROKU_FLAG']

# Function to check if any column contains 'Y'
def any_column_contains_Y(*args):
    return 'Y' in args

# Register UDF
contains_Y_udf = udf(any_column_contains_Y, BooleanType())

# Apply UDF to create a new column


eligibility_with_on_off_overall_flag = eligibility_analytical_dataset.withColumn('onsite_flag', f.when(((f.col('TDC_ELIGIBLE_FLAG') == 'Y') | (f.col('SSE_ELIGIBLE_FLAG') == 'Y') | (f.col('NATIVE_ELIGIBLE_FLAG') == 'Y')|(f.col('EMAIL_ELIGIBLE_FLAG') == 'Y') | (f.col('FFD_ELIGIBLE_FLAG') == 'Y') | (f.col('SS_ELIGIBLE_FLAG') == 'Y') | (f.col('PUSH_FLAG') == 'Y')), '1').otherwise('0').cast('integer'))\
                  .withColumn('offsite_flag', f.when(((f.col('FACEBOOK_FLAG') == 'Y') | (f.col('PANDORA_FLAG') == 'Y') | (f.col('PINTEREST_ELIGIBLE_FLAG') =='Y') | (f.col('PREROLL_VIDEO_ELIGIBLE_FLAG') == 'Y') | (f.col('CBA_ELIGIBLE_FLAG') == 'Y') | (f.col('ROKU_FLAG') == 'Y') | (f.col('CHICORY_FLAG') == 'Y')), '1').otherwise('0').cast('integer'))\
                  .withColumn("Overall Eligibility", contains_Y_udf(*[col(column) for column in column_names]))
                  

In [0]:
display(eligibility_with_on_off_overall_flag.limit(5)
        )

In [0]:
display(eligibility_with_on_off_overall_flag.groupBy('SEGMENTATION')
        .agg(f.count('EHHN').alias("TOTAL_HH"),
             f.max("frontend_name").alias("FRONTEND_NAME"),
             f.max("SEGMENT").alias("SEGMENT"),
             f.max("PROPENSITY").alias("PROPENSITY"),
             f.max("SEGMENT_TYPE").alias("SEGMENT_TYPE"),
             f.max("PERCENTILE_SEGMENT").alias("PERCENTILE_SEGMENT"),
             f.sum('onsite_flag').alias('ONSITE_COUNT'),
             f.sum('offsite_flag').alias('OFFSITE_COUNT')
))

In [0]:
#Extra's for checking onsite and offsite flag

# getting_count = (final_df.join(elig, on="ehhn", how='left')
#                   .withColumn('onsite_flag', f.when(((f.col('TDC_ELIGIBLE_FLAG') == 'Y') | (f.col('SSE_ELIGIBLE_FLAG') == 'Y') | (f.col('NATIVE_ELIGIBLE_FLAG') == 'Y') |(f.col('EMAIL_ELIGIBLE_FLAG') == 'Y') | (f.col('PUSH_FLAG') == 'Y')), '1').otherwise('0').cast('integer'))
#                   .withColumn('offsite_flag', f.when(((f.col('FACEBOOK_FLAG') == 'Y') | (f.col('PANDORA_FLAG') == 'Y') | (f.col('PINTEREST_ELIGIBLE_FLAG') =='Y') | (f.col('PREROLL_VIDEO_ELIGIBLE_FLAG') == 'Y') | (f.col('ROKU_FLAG') == 'Y') | (f.col('CHICORY_FLAG') == 'Y')), '1').otherwise('0').cast('integer'))
                  
#                   )

# ehhn_invalid = (getting_count
#         .filter((f.col('onsite_flag') == '0') & (f.col('offsite_flag') == '0'))
# )
# times = []
# for i in file:
#   names = i[0:-1]
#   times.append(names)
# times
