#ZDR / ZCR documents for SAP (returns, mekubaim)

The process is run once per month on the first day of the month for past month to create files with Mekubaim transactions and Discount (Hechzer) transactions (IDOC type). After they are created and saved to Blob (dbfs:\mnt\prod\process\datastage\mefizim), the pipeline "ZC_ZD_transactions_end_of_month" copies them to backup (\\10.213.3.69\out\Globe\backup) and to LAH (\\10.213.3.27\out\PR\ILELAHUB01\csiordrs01)


### Business process
For each purchase, no matter what customer type it is, distributor is supposed to get a discount (return). Also customers from customer groups L91, L92 get extra compensation "Mekubaim".  The payment of mekubaim and discounts (returns) is monthly with the reference to the performance of previous month. The % depends on the customer type, cluster (customer group), type of product (Product Hierarchy). The amount of discount for returned goods decreases the total amount of payment ==> it enters to SAP as a negative amount.
<br>
### Inbound data 
* `ds_database.hi_sales` -- sales from Pocketlink with relation to distributor numbers
* `ds_database.total_monthly` -- table with aggregated data for commission & discounts calculation (see notebook 800 - mefiz commission) 
### Details of process
<br> 
The process is split in two parts : Returns (Discounts) and Mekubaim. Each part comprises the following steps: <br>
1. Extract data from relevant table (`ds_database.hi_sales` for Mekubaim and `ds_database.total_monthly` for Returns) <br>
2. Process the data to identify the type of transaction (Zd03, Zd12, Zd45) <br>
3. Prepare data to fit the IDOC format: <br>
a. prepare header for each distributor (lines 1-17)<br>
b. prepare body for each distributor for each Material number: three lines with number, billing type, amount <br>
4. Generate dataset with the structure from p.3 <br>
5. Convert it to string and save to file <br>

In [0]:
import time
from datetime import datetime, timedelta
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, concat_ws
from pyspark.sql.types import StructField,  StringType, StructType
from pyspark.sql.functions import lit, col, abs, round, lpad, rpad, sum, round,concat, max, when, right,trim

In [0]:
report_date = datetime.now()
rec_month = f'{report_date.month:02d}'
rec_year = report_date.year
output_location = "dbfs:/mnt/prod/process/datastage/mefizim/"
print("Extract data for the month:", rec_month, "/", rec_year)

Extract data for the month: 01 / 2025


In [0]:
distr_chnl_list = ["06", "08"]
trans_type = ["ZCR", "ZDR"]

## Supporting functions

*ZCR* transaction type includes all the lines with value >0 <br>
*ZDR* transaction type includes lines with value<0

In [0]:
# function to transform the dataframe to the type of transaction ZDR / ZCR
def transform_df_to_type(df, type, step):
  if type == "ZCR" and step =="mekubaim":
    df_1 = df.filter(df.zd12>0).withColumn("type", lit("ZD12")).withColumnRenamed("zd12", "value")
    df_2 = df.filter(df.zd03>0).withColumn("type", lit("ZD03")).withColumnRenamed("zd03", "value")
    df_final = df_1.drop("zd03").union(df_2.drop("zd12"))
    df_final = df_final.orderBy("Distribution_Channel", "CODE_MEFIZ", "Item_Num")
  elif type == "ZDR"and step =="mekubaim":
    df_1 = df.filter(df.zd03<0).withColumn("type", lit("ZD03")).withColumn("value", lit(abs(col("zd03"))))
    df_2 = df.filter(df.zd12<0).withColumn("type", lit("ZD12")).withColumn("value", lit(abs(col("zd12"))))
    df_final = df_1.drop("zd12","zd03").union(df_2.drop("zd12", "zd03")) 
    df_final = df_final.orderBy("Distribution_Channel", "CODE_MEFIZ", "Item_Num")
    
  elif type == "ZCR" and step =="hechzer": 
    df_final = df.filter(df.value>0).withColumn("type", lit("ZD45")).orderBy("CODE_MEFIZ", "Item_Num")
  else:
    df_final = df.filter(df.value<0).withColumn("type", lit("ZD45")).withColumn("value", lit(abs(col("value")))).orderBy("CODE_MEFIZ", "Item_Num")
  
  return df_final

In [0]:
# function that generates header lines - from 1 to 17
def type_header(ttype, step, rec_month, rec_year):
  year = str(rec_year)[2:]  
  if step == "mekubaim":
    header_stmt=f'''
  with stab_columns as (
  SELECT
    *
  FROM
  VALUES
    ('E2EDK01005', '000001', '001'),
    ('E2EDK14', '000008', '006'),
    ('E2EDK14', '000009', '007'),
    ('E2EDK14', '000010', '008'),
    ('E2EDK14', '000011', '012'),
    ('', '000012', ''),
    ('E2EDKA1003', '000013', 'AG '),
    ('E2EDKA1003', '000014', 'WE '),
    ('E2EDK02', '000015', '001'),
    ('E2EDK02', '000016', '017'),
    ('E2EDK02', '000017', '011') AS data(part1, part3, part5))
    select distinct code_mefiz, 
      distribution_channel, 
      cast(part3 as int) as sort_order, 
      case when part3 = "000012" then "" else RPAD(part1, 10, " ") end as part1, 
      case when part3 = '000012' then '' else LPAD("1329000000000055271", 39, " ") end as part2, 
      case when part3 = '000012' then '' else part3 end as part3,  
      case when part3 = '000012' then '' else "00000001" end as part4, 
      case when part3 = '000012' then '' else part5 end as part5,
    case 
    when part3 = "000001" then concat(RPAD(" ILSILS", 79, " "), RPAD(" Price Difference IA  {rec_year}-{rec_month}", 142, " "), RPAD("PR", 158, " "))
    when part3 = "000008" then RPAD("00", 35, " ")
    when part3 = "000009" then RPAD(Lpad(Distribution_channel, 2, "0"), 35, " ")
    when part3 = "000010" then RPAD("IL20", 35, " ")
    when part3 = "000011" then RPAD("{ttype}", 39, " ")
    when part3 = "000013" then RPAD(code_mefiz, 1016, " ")
    when part3 = "000014" then RPAD(code_mefiz, 1016, " ")
    when part3 = "000015" then RPAD(left(payer_desc,17),75, " ")
    when part3 = "000016" then RPAD("{rec_month}/{year}Price Diff",75, " ")
    when part3 = "000017" then RPAD(code_mefiz,75, " ")
    end as part6
    from ds_database.total_monthly
    join (select distinct payer_desc, payer from bo_interfaces.g_clients) g_clients on g_clients.payer = total_monthly.code_mefiz
    cross join stab_columns
    order by code_mefiz, sort_order'''
  else:
    header_stmt=f'''with stab_columns as (
  SELECT
    *
  FROM
  VALUES    
  ('E2EDK01005', '000001', '001'),
  ('E2EDK14', '000008', '006'),
  ('E2EDK14', '000009', '007'),
  ('E2EDK14', '000010', '008'),
  ('E2EDK14', '000011', '012'),
  ('E2EDK03', '000012', '016'),
  ('E2EDKA1003', '000013', 'AG '),
  ('E2EDKA1003', '000014', 'WE '),
  ('E2EDK02', '000015', '001'),
  ('E2EDK02', '000016', '017'),
  ('E2EDK02', '000017', '011') 
 AS data(part1, part3, part5))
    select distinct code_mefiz, 
      distribution_channel, 
      cast(part3 as int) as sort_order, 
      RPAD(part1, 10, " ") as part1, 
      LPAD("1329000000000055271", 39, " ") as part2, 
      part3,  
      "00000001" as part4, 
      part5,
    case 
    when part3 = "000001" then concat(RPAD(" ILSILS", 79, " "), RPAD(" No Return Agreement  {rec_year}-{rec_month}", 142, " "), RPAD("PR", 167, " "))
  when part3 = "000008" then RPAD("00", 35, " ")
  when part3 = "000009" then RPAD(Lpad(Distribution_channel, 2, "0"), 35, " ")
  when part3 = "000010" then RPAD("IL20", 35, " ")
    when part3 = "000011" then RPAD("{ttype}", 39, " ")
  when part3 = "000012" then DATE_FORMAT(current_date, 'yyyyMMdd000000')
    when part3 = "000013" then RPAD(code_mefiz, 1016, " ")
    when part3 = "000014" then RPAD(code_mefiz, 1016, " ")
    when part3 = "000015" then RPAD(left(payer_desc,17),75, " ")
    when part3 = "000016" then RPAD("{rec_month}/{year}No Return",75, " ")
  when part3 = "000017" then RPAD(code_mefiz,75, " ")

    end as part6
    from ds_database.total_monthly
    join (select distinct payer_desc, payer from bo_interfaces.g_clients) g_clients on g_clients.payer = total_monthly.code_mefiz
    cross join stab_columns
    order by code_mefiz, sort_order'''
  return spark.sql(header_stmt)

In [0]:
# function that generates 3 lines for each material in specific format to fit SAP format
def material_lines(dataset, code):
  data = [("E2EDP01006","1329000000000055271","000000013  "),
  ("E2EDP05002","1329000000000055271","00001802   "),  
  ("E2EDP19001","1329000000000055271", "00002102002" )]
  df = spark.createDataFrame(data, ["part1", "part2", "part4"])
  df_final = dataset.filter(col("code_mefiz") == code).crossJoin(df).orderBy("code_mefiz","Item_Num", "part1")\
      .withColumn("part5", when(col("part1")=="E2EDP01006", rpad(lit("   001 10"),460, " ")).otherwise(when(col("part1")=="E2EDP05002", rpad(lit(col("type")),110, " ")).otherwise(rpad(lit(col("Item_Num")), 423, " "))))\
        .withColumn("part6", when(col("part1") =="E2EDP05002", rpad(rpad(col("value"), 7, " "),127," ")).otherwise(""))
  window_spec = Window.partitionBy("code_mefiz").orderBy("code_mefiz", "Item_Num", "type", "part1")
  df_final = df_final.withColumn("row_number", row_number().over(window_spec))\
          .withColumn("part3", lpad(col("row_number")+17, 6, "0"))
  df_final = df_final.select("part1", lpad("part2", 39, " ").alias("part2"), "part3", "part4", "part5", "part6", "row_number")
  return df_final

##Mekubaim

If one payer has two points of sale, they will be both aggregated under payer account.

Specific case: distributor 4082544, sales employee Yaniv Gerbi - in data stage it is replaced by sales employee 10467039 (Shmuel Gerbi), not really clear why. As a result, this distributor is not a part of the file with mekubaim

In [0]:
mekubaim_stmt =f'''
select 
Distribution_Channel, 
CODE_MEFIZ,
Item_Num,
round(sum(HAN_MISHARIT),2) as zd03,
round(sum(HAN_MIVZA),2) as zd12
 from ds_database.hi_sales
 join (select distinct payer_desc, payer from bo_interfaces.g_clients) g_clients on g_clients.payer = hi_sales.code_mefiz
where REC_MONTH = {rec_month}
and rec_year = {rec_year}
and code_mefiz <> 4082544 ---specific case, see above
group by Distribution_Channel, CODE_MEFIZ, Item_Num
order by Distribution_Channel, CODE_MEFIZ, cast(Item_Num as int)
'''
#print(mekubaim_stmt)
mekubaim = spark.sql(mekubaim_stmt)
display(mekubaim)

Distribution_Channel,CODE_MEFIZ,Item_Num,zd03,zd12
6,2819344,6910899,3452.38,6421.45
6,2819344,6910949,824.04,985.32
6,2819344,6921298,0.0,18591.3
6,2819344,6921321,0.0,44.99
6,2819344,6921322,0.0,55.76
6,2819344,6921370,0.0,120.12
6,2819344,6921383,0.0,904.4
6,2819344,6921385,0.0,136.53
6,2819344,6921387,0.0,815.44
6,2819344,6921393,0.0,95.94


In [0]:
total_time_start = time.time()
column = StructType([StructField("merged", StringType(), True)])

for chnl in distr_chnl_list:
  print("*************************************")
  print("Distribution channel", chnl)

  full_df = spark.createDataFrame([], column)
  # first line of the template
  first_line = spark.createDataFrame([("EDI_DC40", "  1329000000000055271", "620 0322  ","ORDERS05                      /GLB/OGTORDERS05_01           ORDERS                               LS  ILELAHUB01                                                                                                     ","LS                                                                                                       ")], ["part1", "part2", "part3", "part4", "part5"])
  first_line = first_line.withColumn("part6", rpad(lit(datetime.strftime(datetime.now(), "%Y%m%d%H%M%s")[:-8]), 146, " "))
  #list of distributors by channel
  mefiz_list = mekubaim.filter(col("Distribution_channel")==chnl).select("code_mefiz").distinct().orderBy("code_mefiz").rdd.flatMap(lambda x: x).collect()
  print("List of distributors in Dist chnl ", chnl, mefiz_list)
  for code in mefiz_list:
    time_start = time.time()

    print("*****************************")
    print("Distributor:", code)
    material = mekubaim.filter(col("CODE_MEFIZ")==code)  
    
    header_df_zdr = type_header("ZDR", "mekubaim", rec_month, rec_year)
    header_df_zcr = type_header("ZCR", "mekubaim", rec_month, rec_year)
    
    for ttype in trans_type:      # ZDR, ZCR
      material_df = transform_df_to_type(material, ttype, "mekubaim") # creation of specific dataset for each type of transaction
      print(ttype,"number of lines:", material_df.count())
      if material_df.count()>0:        
        main = material_lines(material_df, code)
        if ttype=="ZDR":
          header = header_df_zdr.filter(col("CODE_MEFIZ")==code).drop("code_mefiz", "distribution_channel", "sort_order")
        else:
          header = header_df_zcr.filter(col("CODE_MEFIZ")==code).drop("code_mefiz", "distribution_channel", "sort_order")

        end_number = main.select(max("row_number")).collect()[0][0]
        last_line = spark.createDataFrame([("E2EDS01   ", "                    1329000000000055271", "00000001", "                           ", "")], ["part1", "part2",  "part4", "part5", "part6"]).withColumn("part3", lpad(lit(end_number)+20,6,"0"))

        final_df = first_line.union(header).union(main.drop("row_number")).union(last_line.select("part1", "part2", "part3", "part4", "part5", "part6"))
        final_df = final_df.withColumn("merged", (concat_ws("", *final_df.columns)))
        total_value = main.select(sum(trim("part6").astype('float'))).collect()[0][0]
        print(f"total value of {ttype} document:  {total_value}")
        full_df=full_df.union(final_df.select("merged"))
        time_end = time.time()
        print(code, "processing time:", int(time_end-time_start), 's')
        
      else:
        print(f"No lines for {ttype} for distributor {code}")
  full_df_string = full_df.select("merged").collect()
  if chnl == '06':
    file_06_name = datetime.strftime(datetime.now(), "%Y%m%d_%H%M%s")[:-8]
    count_06 = full_df.count()
    csv_string_06 = '\n'.join(row['merged'] for row in full_df_string)   
  else:
    file_08_name = datetime.strftime(datetime.now(), "%Y%m%d_%H%M%s")[:-8]
    count_08 = full_df.count()
    csv_string_08 = '\n'.join(row['merged'] for row in full_df_string)   
total_time_end = time.time()
print("the job took ", int((total_time_end - total_time_start)/60), "mins")      



*************************************
Distribution channel 06
List of distributors in Dist chnl  06 ['2819344', '2820225', '2820226', '2820248', '2820270', '2820283', '2820290', '2820292', '2820347', '2820355', '2820357', '2820384', '2820389', '2820415', '3016129', '3553396', '3871381', '3910846', '3946007', '4048372', '4068540', '5070229', '5268076', '5442980', '5450846', '6172353', '6901630']
*****************************
Distributor: 2819344
ZCR number of lines: 76
total value of ZCR document:  136104.24145507812
ZCR processing time: 13 s
ZDR number of lines: 0
No lines for ZDR for distributor 2819344
*****************************
Distributor: 2820225
ZCR number of lines: 159
total value of ZCR document:  110909.6696126461
ZCR processing time: 9 s
ZDR number of lines: 10
total value of ZDR document:  139.82999831438065
ZDR processing time: 18 s
*****************************
Distributor: 2820226
ZCR number of lines: 189
total value of ZCR document:  96995.4493150711
ZCR processing ti

In [0]:
display(csv_string_06)

In [0]:
# dist channel = 6
print('file name:', file_06_name)
print('total length of dataset:', count_06)
dbutils.fs.put(output_location+file_06_name, csv_string_06)

In [0]:
display(csv_string_08)

In [0]:
# dist channel = 8
print('file name:', file_08_name)
print('total length of dataset:', count_08)
dbutils.fs.put(output_location+file_08_name, csv_string_08)

##Returns

In [0]:
hechzer_stmt = f'''
select 
CODE_MEFIZ,
Item_Num,
round(sum(Discount),2) as value
 from ds_database.total_monthly
 join (select distinct payer_desc, payer from bo_interfaces.g_clients) g_clients on g_clients.payer = total_monthly.code_mefiz
where rec_year = {rec_year}
and rec_month = {rec_month}
---and code_mefiz = 2819344
group by code_mefiz, Item_Num
having round(sum(Discount),0) <> 0
order by code_mefiz, cast(Item_Num as int)'''
hechzer = spark.sql(hechzer_stmt)
display(hechzer)

In [0]:
total_time_start = time.time()
column = StructType([StructField("merged", StringType(), True)])
full_df = spark.createDataFrame([], column)
# list of distributors : all distributors with discount>0
mefiz_list = hechzer.select("code_mefiz").distinct().orderBy("code_mefiz").rdd.flatMap(lambda x: x).collect()
print("List of distributors", mefiz_list)

for code in mefiz_list:     # generating data per distributor
  time_start = time.time()

  print("********************************")
  print("Distributor:", code)
  
  material = hechzer.filter(col("CODE_MEFIZ")==code)  

  header_df_zdr = type_header("ZDR", "hechzer", rec_month, rec_year)
  header_df_zcr = type_header("ZCR", "hechzer", rec_month, rec_year)
  
  for ttype in trans_type:      
    material_df = transform_df_to_type(material, ttype, "hechzer")
    print(ttype,"number of lines:", material_df.count())
    if material_df.count()>0:        
      main = material_lines(material_df, code)
      if ttype=="ZDR":
        header = header_df_zdr.filter(col("CODE_MEFIZ")==code).drop("code_mefiz", "sort_order")
      else:
        header = header_df_zcr.filter(col("CODE_MEFIZ")==code).drop("code_mefiz", "sort_order")  
      end_number = main.select(max("row_number")).collect()[0][0]
      last_line = spark.createDataFrame([("E2EDS01   ", "                    1329000000000055271", "00000001", "                           ", "")], ["part1", "part2",  "part4", "part5", "part6"]).withColumn("part3", lpad(lit(end_number)+20,6,"0"))

      final_df = first_line.union(header.drop("Distribution_channel")).union(main.drop("row_number")).union(last_line.select("part1", "part2", "part3", "part4", "part5", "part6"))
      final_df = final_df.withColumn("merged", (concat_ws("", *final_df.columns)))
      total_value = main.select(sum(trim("part6").astype('float'))).collect()[0][0]
      print(f"  Total value of {ttype}: {total_value}")
      full_df=full_df.union(final_df.select("merged"))
  time_end = time.time()
  print(" processing time:", int(time_end-time_start), "seconds")
total_time_end = time.time()
print("the job took ", int((total_time_end - total_time_start)/60), "mins")      



In [0]:
display(full_df)

In [0]:
dbutils.fs.put(output_location+datetime.now().strftime("%Y%m%d_%H%m%s")[:-8], full_df.to_koalas().to_csv(header=False), True)

In [0]:
dbutils.notebook.exit("Bye!")

# END

In [0]:
# test החזרות
rec_year  = 2024
rec_month = 12
total_time_start = time.time()
column = StructType([StructField("merged", StringType(), True)])
full_df = spark.createDataFrame([], column)
# list of distributors : all distributors with discount>0
mefiz_list = ['2820355']
print("List of distributors", mefiz_list)

for code in mefiz_list:     # generating data per distributor
  time_start = time.time()

  print("********************************")
  print("Distributor:", code)
  
  material = hechzer.filter(col("CODE_MEFIZ")==code)  

  header_df_zdr = type_header("ZDR", "hechzer", rec_month, rec_year)
  header_df_zcr = type_header("ZCR", "hechzer", rec_month, rec_year)
  
  for ttype in trans_type:      
    material_df = transform_df_to_type(material, ttype, "hechzer")
    print(ttype,"number of lines:", material_df.count())
    if material_df.count()>0:        
      main = material_lines(material_df, code)
      if ttype=="ZDR":
        header = header_df_zdr.filter(col("CODE_MEFIZ")==code).drop("code_mefiz", "sort_order")
      else:
        header = header_df_zcr.filter(col("CODE_MEFIZ")==code).drop("code_mefiz", "sort_order")  
      end_number = main.select(max("row_number")).collect()[0][0]
      last_line = spark.createDataFrame([("E2EDS01   ", "                    1329000000000055271", "00000001", "                           ", "")], ["part1", "part2",  "part4", "part5", "part6"]).withColumn("part3", lpad(lit(end_number)+20,6,"0"))
      first_line = spark.createDataFrame([("EDI_DC40", "  1329000000000055271", "620 0322  ","ORDERS05                      /GLB/OGTORDERS05_01           ORDERS                               LS  ILELAHUB01                                                                                                     ","LS                                                                                                       ")], ["part1", "part2", "part3", "part4", "part5"])
      first_line = first_line.withColumn("part6", rpad(lit(datetime.strftime(datetime.now(), "%Y%m%d%H%M%s")[:-8]), 146, " "))
      final_df = first_line.union(header.drop("Distribution_channel")).union(main.drop("row_number")).union(last_line.select("part1", "part2", "part3", "part4", "part5", "part6"))
      final_df = final_df.withColumn("merged", (concat_ws("", *final_df.columns)))
      total_value = main.select(sum(trim("part6").astype('float'))).collect()[0][0]
      print(f"  Total value of {ttype}: {total_value}")
      full_df=full_df.union(final_df.select("merged"))
  time_end = time.time()
  print(" processing time:", int(time_end-time_start), "seconds")
total_time_end = time.time()
print("the job took ", int((total_time_end - total_time_start)/60), "mins")      



In [0]:
%sql
select * from ds_database.total_monthly
where rec_month =12
and rec_year = 2024
and code_mefiz = 2820355