# Daily Data Utility Example
### Designed to standardize warehouse data within an item hierarchy for ease of use with store analysis and reporting

### In this example: 
#### The item hierarchy consists of a Top Level Item Code, Mid Level Item Code, and UPC. 
#### The store||warehouse relationship is managed using the Mid Level Item Code.
##### Furthermore, store||warehouse data stores daily history and is also effective dated 
#### All store metrics and reporting roll up to the Top Level Item Code.
#### The associated process supported by this utility uses data from the top level
### So to avoid confusion, noise, and lag on the process proper:
#### We standardize the mid level item code data in this program, primarily on max() but also using the most common for cost

In [None]:
OM_QUERY ="""
with 
  maxdate as 
    (select max(run_dt) from schema.Ord_Mult)
select distinct
    all_dt_item.Top_Level_Item,
    all_dt_item.whse_nbr,
    all_dt_item.Ord_Mult
from
  (select distinct
    TLI.Top_Level_Item
    ,OM.Mid_Level_Item
    ,OM.whse_nbr
    ,OM.eff_date as OM_EFF_DT
    ,OM.Ord_Mult
  from schema.Ord_Mult OM
  join schema.dim_mid_level_item TLI
    on OM.Mid_Level_Item = TLI.mid_level_item
  where 1=1 
    and OM.run_dt = (select * from maxdate)
    and OM.eff_date =
      (select max(eff_date) 
      from schema.Ord_Mult a
      where OM.Mid_Level_Item = a.Mid_Level_Item 
        and OM.whse_nbr = a.whse_nbr)
  ) all_dt_item
join 
  (select distinct
    TLI.Top_Level_Item
    ,a.whse_nbr
    ,max(a.eff_date) as max_OM_EFF_DT
  from schema.Ord_Mult a
  join schema.dim_mid_level_item TLI
    on a.Mid_Level_Item = TLI.mid_level_item
  where 1=1 
    and a.run_dt = (select * from maxdate)
    and a.eff_date = 
      (select max(eff_date) 
      from schema.Ord_Mult c
      where a.Mid_Level_Item = c.Mid_Level_Item 
        and a.whse_nbr = c.whse_nbr)
    group by
      TLI.Top_Level_Item
      ,a.whse_nbr 
  ) eff_dt_item
on all_dt_item.Top_Level_Item = eff_dt_item.Top_Level_Item
  and all_dt_item.whse_nbr = eff_dt_item.whse_nbr
  and all_dt_item.OM_EFF_DT = eff_dt_item.max_OM_EFF_DT
"""
#Duplicates due to timing, set up errors, etc. are standardized on max in this example
DOOP_Query="""
select distinct 
  a.Top_Level_Item, 
  a.whse_nbr, 
  max(a.Ord_Mult) as Ord_Mult
from OM a
join 
  (select distinct Top_Level_Item, whse_nbr
  from OM
  group by Top_Level_Item, whse_nbr
  having count(*)>1) b
on a.Top_Level_Item = b.Top_Level_Item 
  and a.whse_nbr = b.whse_nbr
group by a.Top_Level_Item, a.whse_nbr
order by a.Top_Level_Item, a.whse_nbr"""
DOOP_Rev = """
select distinct Top_Level_Item as TLITEMS from
  (select distinct Top_Level_Item, whse_nbr
  from OM
  group by Top_Level_Item, whse_nbr
  having count(*)>1)"""
Delete_Query="""
select a.*
from OM a
anti join DOOP b
on a.Top_Level_Item = b.Top_Level_Item 
  and a.whse_nbr = b.whse_nbr"""
Insert_Query="""
select * from OM
union 
select * from DOOP 
order by Top_Level_Item, whse_nbr
"""
view_name = 'OM'
DATA = spark.sql(OM_QUERY)
DATA.createOrReplaceTempView(view_name)
view_name = 'DOOP'
DATA = spark.sql(DOOP_Query)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(DOOP_Rev)
print('DOOP OM TLIs:')
DATA.show()
view_name = 'OM'
DATA = spark.sql(Delete_Query)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(Insert_Query)
DATA.createOrReplaceTempView(view_name)
sqlContext.sql("""drop table if exists work_schema.TLI_WHSE_OM""")
sqlContext.sql("""create table work_schema.TLI_WHSE_OM as select * from OM""")

DOOP TOP LEVEL ITEMS:
+-----------+
|    TLITEMS|
+-----------+
|11111123456|
|11111123456|
|11111123456|
+-----------+



In [None]:
CASEPACK_QUERY ="""
with 
  maxdate as
    (select max(run_dt) from schema.casepack),
  maxdate2 as
    (select max(run_dt) from schema.vendor_dim)
select distinct
  all_dt_item.Top_Level_Item,
  all_dt_item.whse_nbr,
  all_dt_item.CASEPACK_QTY
  all_dt_item.INPK_QTY
from
  (select distinct
    TLI.Top_Level_Item
    ,casepack.Mid_Level_Item
    ,vend.vend_num
    ,casepack.whse_nbr
    ,casepack.EFF_FRM||casepack.DATE_ID as CASEPACK_EFF
    ,vend.DATE_ID||lpad(vend.TIME_STP,6,0) as vend_eff
    ,CASEPACK_QTY
    ,INPK_QTY
  from schema.casepack casepack
  join schema.vendor_dim vend
    on casepack.vend_num = vend.vend_num
    and casepack.Mid_Level_Item = vend.Mid_Level_Item
    and casepack.whse_nbr = vend.whse_nbr
  join schema.dim_mid_level_item TLI
    on casepack.Mid_Level_Item = TLI.mid_level_item
  where casepack.run_dt = (select * from maxdate)`
    and casepack.EFF_FRM||casepack.DATE_ID = 
      (select max(a.EFF_FRM||a.DATE_ID) 
      from schema.casepack a
      where d.run_dt = (select * from maxdate)
        and casepack.Mid_Level_Item = a.Mid_Level_Item 
        and casepack.vend_num = a.vend_num
        and casepack.whse_nbr = a.whse_nbr)
    and vend.DATE_ID||lpad(vend.TIME_STP,9,0) =
      (select max(b.DATE_ID||lpad(b.TIME_STP,9,0)) 
      from schema.vendor_dim b
      where b.run_dt = (select * from maxdate2)
        and vend.Mid_Level_Item = b.Mid_Level_Item 
        and vend.vend_num = b.vend_num
        and vend.whse_nbr = b.whse_nbr)
  ) all_dt_item
join 
  (select distinct
    TLI.Top_Level_Item
    ,casepack.whse_nbr
    ,max(casepack.EFF_FRM||casepack.DATE_ID) as max_case_eff
    ,max(vend.DATE_ID||lpad(vend.TIME_STP,9,0)) as max_vendor_eff
  from schema.casepack casepack
  join schema.vendor_dim vend
    on casepack.vend_num = vend.vend_num
    and casepack.Mid_Level_Item = vend.Mid_Level_Item
    and casepack.whse_nbr = vend.whse_nbr
  join schema.dim_mid_level_item TLI
    on casepack.Mid_Level_Item = TLI.mid_level_item
  where casepack.run_dt = (select * from maxdate)
    and casepack.EFF_FRM||casepack.DATE_ID = 
      (select max(a.EFF_FRM||a.DATE_ID) 
      from schema.casepack a
      where d.run_dt = (select * from maxdate)
        and casepack.Mid_Level_Item = a.Mid_Level_Item 
        and casepack.vend_num = a.vend_num
        and casepack.whse_nbr = a.whse_nbr)
    and vend.DATE_ID||lpad(vend.TIME_STP,9,0) =
      (select max(b.DATE_ID||lpad(b.TIME_STP,9,0)) 
      from schema.vendor_dim b
      where b.run_dt = (select * from maxdate2)
        and vend.Mid_Level_Item = b.Mid_Level_Item 
        and vend.vend_num = b.vend_num
        and vend.whse_nbr = b.whse_nbr)
    group by
      TLI.Top_Level_Item
      ,casepack.whse_nbr 
  ) eff_dt_item
on all_dt_item.Top_Level_Item = eff_dt_item.Top_Level_Item
  and all_dt_item.whse_nbr = eff_dt_item.whse_nbr
  and all_dt_item.CASEPACK_EFF = eff_dt_item.max_case_eff
  and all_dt_item.vend_eff = eff_dt_item.max_vendor_eff
order by all_dt_item.Top_Level_Item, all_dt_item.whse_nbr
"""
#Duplicates due to timing, set up errors, etc. are standardized on max
DOOP_Query="""
select distinct
  cs.Top_Level_Item,
  cs.whse_nbr,
  cs.CASEPACK_QTY,
  cs.INPK_QTY
from casepack cs
join
  (select distinct 
    a.Top_Level_Item,
    a.whse_nbr,
    max(a.CASEPACK_QTY+a.INPK_QTY) as pacmax
  from casepack a
  join 
    (select distinct Top_Level_Item, whse_nbr
    from casepack
    group by Top_Level_Item, whse_nbr 
    having count(*) > 1) b 
  on a.Top_Level_Item = b.Top_Level_Item
    and a.whse_nbr = b.whse_nbr
  group by 
    a.Top_Level_Item,
    a.whse_nbr) dup
on cs.Top_Level_Item = dup.Top_Level_Item
  and cs.whse_nbr = dup.whse_nbr
  and (cs.CASEPACK_QTY+cs.INPK_QTY) = dup.pacmax"""
DOOP_Rev = """
select distinct Top_Level_Item as TLITEMS from DOOP"""
Delete_Query="""
select a.*
from casepack a
anti join DOOP b
on a.Top_Level_Item = b.Top_Level_Item 
and a.whse_nbr = b.whse_nbr"""
Insert_Query="""
select * from casepack
union 
select * from DOOP 
order by Top_Level_Item, whse_nbr
"""
view_name = 'casepack'
DATA = spark.sql(CASEPACK_QUERY)
DATA.createOrReplaceTempView(view_name)
view_name = 'DOOP'
DATA = spark.sql(DOOP_Query)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(DOOP_Rev)
print('DOOP TOP LEVEL ITEMS:')
DATA.show()
view_name = 'casepack'
DATA = spark.sql(Delete_Query)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(Insert_Query)
DATA.createOrReplaceTempView(view_name)
sqlContext.sql("""drop table if exists work_schema.TLI_WHSE_CASE""")
sqlContext.sql("""create table work_schema.TLI_WHSE_CASE as select * from casepack""")


DOOP TOP LEVEL ITEMS:
+-----------+
|    TLITEMS|
+-----------+
|11111123456|
|11111123456|
|11111123456|
+-----------+



In [None]:
WHSCOST_QUERY ="""
select distinct
  all_dt_item.whse_nbr,
  all_dt_item.Top_Level_Item,
  all_dt_item.Mid_Level_Item,
  all_dt_item.WHSE_COST
from 
  (select distinct
    cst.whse_nbr,
    cst.Mid_Level_Item,
    TLI.Top_Level_Item,
    cst.WHS_COST,
    cst.EFF_FRM||cst.DATE_ID as EFF_DATE
  from schema.cost_dim cst
  join schema.vendor_dim vend
    on cst.vend_num = vend.vend_num
    and cst.Mid_Level_Item = vend.Mid_Level_Item
    and cst.whse_nbr = vend.whse_nbr
  join schema.dim_mid_level_item TLI 
    on cst.Mid_Level_Item = TLI.MID_LEVEL_ITEM
  where per_id = (select max(per_id) from schema.cost_dim)
    and cst.EFF_FRM||cst.DATE_ID = 
      (select max(a.EFF_FRM||a.DATE_ID) 
      from schema.cost_dim a
      where a.per_id = (select max(per_id) from schema.cost_dim)
        and cst.Mid_Level_Item = a.Mid_Level_Item 
        and cst.whse_nbr = a.whse_nbr
        and cst.vend_num = a.vend_num)
    and vend.DATE_ID||lpad(vend.TIME_STP,9,0) =
      (select max(b.DATE_ID||lpad(b.TIME_STP,9,0)) 
      from schema.vendor_dim b
      where b.run_dt = (select * from maxdate2)
        and vend.Mid_Level_Item = b.Mid_Level_Item 
        and vend.vend_num = b.vend_num
        and vend.whse_nbr = b.whse_nbr)
  ) all_dt_item
join 
  (select distinct
    cst.whse_nbr,
    TLI.Top_Level_Item,
    max(cst.EFF_FRM||cst.DATE_ID) as MAX_EFF_DATE
  from schema.cost_dim cst
  join schema.vendor_dim vend
    on cst.vend_num = vend.vend_num
    and cst.Mid_Level_Item = vend.Mid_Level_Item
    and cst.whse_nbr = vend.whse_nbr
  join schema.dim_mid_level_item TLI 
    on cst.Mid_Level_Item = TLI.MID_LEVEL_ITEM
  where per_id = (select max(per_id) from schema.cost_dim)
    and cst.EFF_FRM||cst.DATE_ID = 
      (select max(a.EFF_FRM||a.DATE_ID) 
      from schema.cost_dim a
      where a.per_id = (select max(per_id) from schema.cost_dim)
        and cst.Mid_Level_Item = a.Mid_Level_Item 
        and cst.whse_nbr = a.whse_nbr
        and cst.vend_num = a.vend_num)
    and vend.DATE_ID||lpad(vend.TIME_STP,9,0) =
      (select max(b.DATE_ID||lpad(b.TIME_STP,9,0)) 
      from schema.vendor_dim b
      where b.run_dt = (select * from maxdate2)
        and vend.Mid_Level_Item = b.Mid_Level_Item 
        and vend.vend_num = b.vend_num
        and vend.whse_nbr = b.whse_nbr)
  group by 
    cst.whse_nbr, 
    TLI.Top_Level_Item
  ) eff_dt_item
on all_dt_item.Top_Level_Item = eff_dt_item.Top_Level_Item
  and all_dt_item.whse_nbr = eff_dt_item.whse_nbr
  and all_dt_item.EFF_DATE = eff_dt_item.MAX_EFF_DATE
order by all_dt_item.Top_Level_Item
"""
MLI_STR_QUERY="""
SELECT DISTINCT
STORE.MID_LEVEL_ITEM,
STORE.Top_Level_Item
from
  (SELECT DISTINCT
    MLI.MID_LEVEL_ITEM
    , MLI.Top_Level_Item
    , count(distinct TBLA.STORE_NBR) as store_count
  FROM
    schema.STR_TBL_A TBL_A
  LEFT JOIN schema.STR_TBL_B TBL_B
    ON TBL_A.STR_DIM1 = TBL_B.STR_DIM1 
    AND TBL_A.STR_DIM2 = TBL_B.STR_DIM2
  LEFT JOIN schema.PLANNED_UPC_TBL UPC  
    ON UPC.STR_DIM2 = TBL_B.STR_DIM2 
    AND UPC.STR_DIM1 = TBL_B.STR_DIM1
  LEFT JOIN schema.MID_LEVEL_UPC_MAP MLI
    ON MLI.MID_LEVEL_ITEM=UPC.MLI
    AND MLI.UPC_NBR=UPC.UPC
  LEFT JOIN schema.DIM_LOC LOC
    ON TBL_A.STORE_NBR = LOC.STORE_NBR
  JOIN
    (Select distinct Top_Level_Item
        from COST
        group by 
          Top_Level_Item,
          whse_nbr
        having count(distinct WHSE_COST) > 1) ITEM
    ON MLI.Top_Level_Item = ITEM.Top_Level_Item
  WHERE 1=1
    AND (TBL_A.FROM_DATE <= current_date() OR TBL_A.FROM_DATE IS NULL) 
    AND (TBL_A.TO_DATE >= current_date() OR TBL_A.TO_DATE IS NULL) 
  GROUP BY
    MLI.MID_LEVEL_ITEM
  , MLI.Top_Level_Item) STORE
join
  (SELECT DISTINCT
    sc.Top_Level_Item,
    max(sc.store_count) as store_count
  from
    (SELECT DISTINCT
      MLI.MID_LEVEL_ITEM
      , MLI.Top_Level_Item
      , count(distinct lp.STORE_NBR) as store_count
    FROM schema.STR_TBL_A TBL_A
    LEFT JOIN schema.STR_TBL_B TBL_B
      ON TBL_A.STR_DIM1 = TBL_B.STR_DIM1 
      AND TBL_A.STR_DIM2 = TBL_B.STR_DIM2
    LEFT JOIN schema.PLANNED_UPC_TBL UPC  
      ON UPC.STR_DIM2 = TBL_B.STR_DIM2 
      AND UPC.STR_DIM1 = TBL_B.STR_DIM1
    LEFT JOIN schema.MID_LEVEL_UPC_MAP MLI
      ON MLI.MID_LEVEL_ITEM=UPC.MLI
      AND MLI.UPC_NBR=UPC.UPC
    LEFT JOIN schema.DIM_LOC LOC
      ON TBL_A.STORE_NBR = LOC.STORE_NBR
    JOIN
      (Select distinct 
            Top_Level_Item
          from COST
          group by 
            Top_Level_Item,
            whse_nbr
          having count(distinct WHSE_COST) > 1) ITEM
  ON MLI.Top_Level_Item = ITEM.Top_Level_Item
    WHERE 1=1 
      AND (TBL_A.FROM_DATE <= current_date() OR TBL_A.FROM_DATE IS NULL) 
      AND (TBL_A.TO_DATE >= current_date() OR TBL_A.TO_DATE IS NULL) 
    GROUP BY
      MLI.MID_LEVEL_ITEM
      ,MLI.Top_Level_Item) sc
  GROUP BY sc.Top_Level_Item) maxsc
on schema.Top_Level_Item = maxsc.Top_Level_Item
and schema.store_count = maxsc.store_count
"""
STR_LIST_QUERY = """
Select distinct a.*
from COST a
JOIN STR_CNT_CHECK b
on a.Top_Level_Item = b.Top_Level_Item
and a.mid_level_item = b.mid_level_item
"""
STR_DELETE_QUERY = """
Select distinct a.*
from COST a
ANTI JOIN STR_CNT_CHECK b
on a.Top_Level_Item = b.Top_Level_Item
"""
STR_INSERT_QUERY="""
select distinct Top_Level_Item, whse_nbr,WHSE_COST from COST
union 
select distinct Top_Level_Item, whse_nbr,WHSE_COST from STR_LIST 
"""
STR_REV = """
select distinct Top_Level_Item as TLITEMS from STR_CNT_CHECK
"""
DOOP_Query = """
  select distinct
  a.Top_Level_Item,
  a.whse_nbr,
  max(a.WHSE_COST) as WHSE_COST
  from COST a
  join 
    (Select distinct 
      Top_Level_Item, 
      whse_nbr
    from COST
    group by 
      Top_Level_Item,
      whse_nbr
    having count(*) > 1) b
  on a.Top_Level_Item = b.Top_Level_Item
    and a.whse_nbr = b.whse_nbr
  group by
  a.Top_Level_Item
  ,a.whse_nbr
  """
Delete_Query="""
select a.*
from COST a
anti join DOOP b
on a.Top_Level_Item = b.Top_Level_Item 
and a.whse_nbr = b.whse_nbr"""
Insert_Query="""
select * from COST
union 
select * from DOOP 
order by Top_Level_Item, whse_nbr
"""
DOOP_Rev = """
select distinct Top_Level_Item as TLITEMS from DOOP"""

view_name = 'COST'
DATA = spark.sql(WHSCOST_QUERY)
DATA.createOrReplaceTempView(view_name)
view_name = 'STR_CNT_CHECK'
DATA = spark.sql(MLI_STR_QUERY)
DATA.createOrReplaceTempView(view_name)
view_name = 'STR_LIST'
DATA = spark.sql(STR_LIST_QUERY)
DATA.createOrReplaceTempView(view_name)
view_name = 'COST'
DATA = spark.sql(STR_DELETE_QUERY)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(STR_INSERT_QUERY)
DATA.createOrReplaceTempView(view_name)
view_name = 'DOOP'
DATA = spark.sql(DOOP_Query)
DATA.createOrReplaceTempView(view_name)
view_name = 'COST'
DATA = spark.sql(Delete_Query)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(Insert_Query)
DATA.createOrReplaceTempView(view_name)
DATA = spark.sql(STR_REV)
print('Most Common MLI cost TLIs:')
DATA.show()
DATA = spark.sql(DOOP_Rev)
print('MAX COST TLIs:')
DATA.show()
sqlContext.sql("""drop table if exists work_schema.TLI_WHSE_CST""")
sqlContext.sql("""create table work_schema.TLI_WHSE_CST as select * from COST""")


MOST COMMON MID LEVEL :
+-----------+
|    TLITEMS|
+-----------+
|11111123456|
|11111123456|
|11111123456|
+-----------+

DOOP TOP LEVEL ITEMS:
+-----------+
|    TLITEMS|
+-----------+
|11111123456|
|11111123456|
|11111123456|
+-----------+



In [None]:

Fin_Join = """
select distinct 
  spine.Top_Level_Item,
  spine.whse_nbr,
  OM.Ord_Mult,
  casepack.CASEPACK_QTY
  casepack.INPK_QTY,
  COST.WHSE_COST
from 
(select distinct Top_Level_Item, whse_nbr from
  (select distinct Top_Level_Item, whse_nbr from work_schema.TLI_WHSE_OM
  union
  select distinct Top_Level_Item, whse_nbr from work_schema.TLI_WHSE_CASE
  union
  select distinct Top_Level_Item, whse_nbr from work_schema.TLI_WHSE_CST)) spine
left join work_schema.TLI_WHSE_OM OM 
  on spn.Top_Level_Item = OM.Top_Level_Item 
  and spn.whse_nbr = OM.whse_nbr
left join work_schema.TLI_WHSE_CASE casepack  
  on spn.Top_Level_Item = casepack.Top_Level_Item 
  and spn.whse_nbr = casepack.whse_nbr
left join work_schema.TLI_WHSE_CST) COST   
  on spn.Top_Level_Item = COST.Top_Level_Item 
  and spn.whse_nbr = COST.whse_nbr
  """
view_name = 'finjoin'
DATA = spark.sql(Fin_Join)
DATA.createOrReplaceTempView(view_name)
try:
    spark.sql('select * from finjoin')
    sqlContext.sql("""drop table if exists work_schema.TLI_DC_DATA""")
    sqlContext.sql("""create table work_schema.TLI_DC_DATA as select * from finjoin""")
    sqlContext.sql("""drop table if exists work_schema.TLI_WHSE_OM""")
    sqlContext.sql("""drop table if exists work_schema.TLI_WHSE_CASE""")
    sqlContext.sql("""drop table if exists work_schema.TLI_WHSE_CST""")
    display("Success: OM, casepack, and cost data joined into work_schema.TLI_DC_DATA and then dropped")
except:
    dbutils.notebook.exit("Run Error: Data not updated") 


'Success: OM, casepack, and cost data joined into work_schema.TLI_DC_DATA and then dropped'