In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
398,application_1524108459514_0043,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fb372ee1860>

In [2]:
from pyspark.sql.functions import udf, array, col
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.utils import AnalysisException

import pandas as pd

# configure the runid range to get more data start_id min is 100763
start_id = 100810
end_id = 100817
# Gives line count of dates for f_rk_transaction, f_rk_transaction_detail, f_rk_transaction_promo_detail
#transaction_df = spark.sql("select * from transaction_group order by date_time DESC")

In [3]:
columns = ['bu_code', 'business_date', 'business_date_key', 'campaign_offer_id', 'card_id', 'transaction_comments',
           'contact_bu_code', 'contact_id', 'currency_code', 'invoice_number', 'item_amount', 'item_cost', 
           'item_quantity_value', 'item_redeemed_amount', 'item_spread_discount_amount', 
           'item_staff_discount_2_amount', 'item_staff_discount_amount','item_unit_price_amount',
           'item_vat_amount', 'member_id', 'member_sale_flag', 'member_point_id', 
           'member_voucher_id', 'no_show_card_flag', 'order_number', 
           'payment_amount', 'payment_method_name', 'payment_type_name', 'point_sub_type_name', 'point_type_name',
           'point_value', 'product_id', 'promotion_discount_amount', 'promotion_id', 'promotion_number',
           'sale_amount', 'sale_load_date', 'sale_load_key', 'sale_quantity_value', 'sale_redeemed_amount',
           'sale_staff_discount_2_amount', 'sale_staff_discount_amount', 'store_id', 'transaction_channel_name', 
           'transaction_date', 'transaction_date_key', 
           'transaction_id', 'transaction_type_name', 'voucher_number', 'last_updated_date']

schema = StructType([
          StructField("BU_CODE", StringType(), nullable = False),
          StructField("BUSINESS_DATE", StringType(), nullable = True),
          StructField("BUSINESS_DATE_KEY", StringType(), nullable = True),
          StructField("CAMPAIGN_OFFER_ID", StringType(), nullable = False),
          StructField("CARD_ID", StringType(), nullable = True),
          StructField("TRANSACTION_COMMENTS", StringType(), nullable = True),
          StructField("CONTACT_BU_CODE", StringType(), nullable = True),
          StructField("CONTACT_ID", StringType(), nullable = True),
          StructField("CURRENCY_CODE", StringType(), nullable = True),
          StructField("INVOICE_NUMBER", StringType(), nullable = True),
          StructField("ITEM_AMOUNT", StringType(), nullable = True),
          StructField("ITEM_COST", StringType(), nullable = True),
          StructField("ITEM_QUANTITY_VALUE", StringType(), nullable = True),
          StructField("ITEM_REDEEMED_AMOUNT", StringType(), nullable = True),
          StructField("ITEM_SPREAD_DISCOUNT_AMOUNT_", StringType(), nullable = True),
          StructField("ITEM_STAFF_DISCOUNT_2_AMOUNT", StringType(), nullable = True),
          StructField("ITEM_STAFF_DISCOUNT_AMOUNT", StringType(), nullable = True),
          StructField("ITEM_UNIT_PRICE_AMOUNT", StringType(), nullable = True),
          StructField("ITEM_VAT_AMOUNT", StringType(), nullable = True),
          StructField("MEMBER_ID", StringType(), nullable = True),
          StructField("MEMBER_SALE_FLAG", StringType(), nullable = True),
          StructField("MEMBER_POINT_ID", StringType(), nullable = True),
          StructField("MEMBER_VOUCHER_ID", StringType(), nullable = True),
          StructField("NO_SHOW_CARD_FLAG", StringType(), nullable = True),
          StructField("ORDER_NUMBER", StringType(), nullable = True),
          StructField("PAYMENT_AMOUNT", StringType(), nullable = True),
          StructField("PAYMENT_METHOD_NAME", StringType(), nullable = True),
          StructField("PAYMENT_TYPE_NAME", StringType(), nullable = True),
          StructField("POINT_SUB_TYPE_NAME", StringType(), nullable = True),
          StructField("POINT_TYPE_NAME", StringType(), nullable = True),
          StructField("POINT_VALUE", StringType(), nullable = True),
          StructField("PRODUCT_ID", StringType(), nullable = True),
          StructField("PROMOTION_DISCOUNT_AMOUNT", StringType(), nullable = True),
          StructField("PROMOTION_ID", StringType(), nullable = False),
          StructField("PROMOTION_NUMBER", StringType(), nullable = True),
          StructField("SALE_AMOUNT", StringType(), nullable = True),
          StructField("SALE_LOAD_DATE", StringType(), nullable = True),
          StructField("SALE_LOAD_DATE_KEY", StringType(), nullable = True),
          StructField("SALE_QUANTITY_VALUE", StringType(), nullable = True),
          StructField("SALE_REDEEMED_AMOUNT", StringType(), nullable = True),
          StructField("SALE_STAFF_DISCOUNT_2_AMOUNT", StringType(), nullable = True),
          StructField("SALE_STAFF_DISCOUNT_AMOUNT", StringType(), nullable = True),
          StructField("STORE_ID", StringType(), nullable = True),
          StructField("TRANSACTION_CHANNEL_NAME", StringType(), nullable = True),
          StructField("TRANSACTION_DATE", StringType(), nullable = True),
          StructField("TRANSACTION_DATE_KEY", StringType(), nullable = True),
          StructField("TRANSACTION_ID", StringType(), nullable = False),
          StructField("TRANSACTION_TYPE_NAME", StringType(), nullable = False),
          StructField("VOUCHER_NUMBER", StringType(), nullable = True),
          StructField("LAST_UPDATED_DATE", StringType(), nullable = True)
    ])

# Function to convert to datetime format 
def parse(date):
    if date == 'N/A':
        return date
    return "{}-{}-{}".format(date[:4], date[4:6], date[6:])

udf_parse = udf(parse, StringType())

# Preliminary setup to read from the parquet files for the following tables
container = "liyu-output"
account = "aswstagingsastorage"
tables = {
    # Raw Tables
    "WEB_TLOG2": "transaction_date",
    "ATLOG_0x01": "date",
    "ATLOG_0x03": "date", 
    "ATLOG_0x04": "date",
    "ATLOG_0x05": "date",
    # RDM Tables
    "f_rk_transaction": "utc_date_key",
    "f_rk_transaction_detail": "utc_date_key",
    "f_rk_transaction_promo_detail": "utc_date_key",
    "f_rk_tender": "utc_date_key"
    }


In [4]:
# Need this dataframe in order create the join by utc_date_key 
try:
    date_df = spark.read.parquet("wasbs://%s@%s.blob.core.windows.net/phase2_output/%s/D_RK_DATE.parquet/" 
                                  % (container, account, end_id)).select("date_key", "compact_date")
except AnalysisException:
    print("Did not read parquet")
    raise
    
first_table = True
merge_df = None
for table_value in tables:
    df = None
    group_col = tables[table_value]
    # Read from parquet for the range in the runids and union the dataframes 
    for runid in range(start_id, end_id + 1):
        try: 
            if table_value == "WEB_TLOG2":
                input_df = spark.read.parquet("wasbs://%s@%s.blob.core.windows.net/phase1_output/%s/WEB_TLOG2.parquet/" 
                                              % (container, account, 
                                                 runid)).select(group_col)
                
            elif table_value.startswith('ATLOG'):
                input_df = spark.read.parquet("wasbs://%s@%s.blob.core.windows.net/phase1_output/%s/ATLOG.parquet/%s/" 
                                              % (container, account, runid, 
                                                 table_value)).select(group_col)
            else:
                input_df = spark.read.parquet("wasbs://%s@%s.blob.core.windows.net/phase2_output/%s/%s.parquet/"
                                              % (container, account, runid, 
                                                 table_value.upper())).select(group_col)
            if df is None:
                df = input_df

            else:
                df = df.union(input_df)
        except AnalysisException:
            print(runid, table_value)
            # means the runid does not have the table
            pass
    
    # group by date and cast as datetime object    
    if table_value.startswith('ATLOG'):
        df = df.withColumn(group_col, udf_parse(group_col))
        
    elif table_value.startswith('f_rk'):
        df = df.join(date_df, df.utc_date_key == date_df.date_key, how='left')
        df = df.withColumn(group_col, udf_parse('compact_date'))
        
    df = df.selectExpr('cast(%s as date) as date_time' % group_col)
    df = df.groupby('date_time').count()
    df = df.selectExpr('date_time', 'count as %s' % table_value)

    if first_table:
        first_table = False
        merge_df = df
    else:
        merge_df = merge_df.join(df, on=['date_time'], how='outer')


100813 WEB_TLOG2
100817 ATLOG_0x05
100817 ATLOG_0x04
100817 ATLOG_0x03
100817 ATLOG_0x01

In [25]:
unique_stores = spark.sql("""
select d.compact_date, count(distinct s.store_id) as store_count from f_rk_transaction as t
left join d_rk_date as d
on t.utc_date_key=d.date_key
left join d_rk_store as s
on t.store_key=s.store_key
group by d.compact_date
order by d.compact_date DESC
limit 1000
""")

unique_stores = unique_stores.na.drop().withColumn(
    'parsed_date', udf_parse('compact_date')).selectExpr('cast(parsed_date as date)', 'store_count')

#unique_stores.show()

transaction_df = merge_df.join(unique_stores, [merge_df.date_time == unique_stores.parsed_date], 'left')
transaction_df = transaction_df.orderBy('date_time', ascending=False).fillna(0)

transaction_df = transaction_df.selectExpr('date_time', "WEB_TLOG2", "ATLOG_0X01 as x01", 
                                           "f_rk_transaction_detail as transaction_detail",
                                           "ATLOG_0x03 as x03", "f_rk_transaction_promo_detail as promo_detail",
                                           "ATLOG_0x04 as x04", "f_rk_tender as tender",
                                           "ATLOG_0x05 as x05", "f_rk_transaction as transaction",
                                           "store_count")



In [26]:
# Last 7 RunIds
transaction_df.show(100)

+----------+---------+------+------------------+-------+------------+------+------+------+-----------+-----------+
| date_time|WEB_TLOG2|   x01|transaction_detail|    x03|promo_detail|   x04|tender|   x05|transaction|store_count|
+----------+---------+------+------------------+-------+------------+------+------+------+-----------+-----------+
|2018-04-09|      555|    33|                 0|     71|           0|    19|     0|    10|          0|          0|
|2018-04-08|     1596|292321|            248917| 555514|      304000|194295|191255|108871|     107174|          0|
|2018-04-07|     7834|938334|            785322|2346321|     1383711|623638|621221|280209|     279211|          0|
|2018-04-06|     1569|466995|            198464| 883463|      496277|310821|313177|177005|      85707|          0|
|2018-04-05|     1555|585628|            265934|1181955|      671519|361287|361336|206526|     108767|          0|
|2018-04-04|     1365|576986|            495353|1117383|      629911|357709|3580

In [5]:
# Raw table
tlog2_05_path = "wasbs://liyu@aswprodsastorage.blob.core.windows.net/ASW/WTCTW/raw/RTLOG/WEB_TLOG2/2018/04/05/*csv.gz"
tlog2_06_path = "wasbs://liyu@aswprodsastorage.blob.core.windows.net/ASW/WTCTW/raw/RTLOG/WEB_TLOG2/2018/04/06/*csv.gz"

web_tlog2_05 = spark.read.csv(tlog2_05_path, schema=schema)
web_tlog2_06 = spark.read.csv(tlog2_06_path, schema=schema)

# RDM tables
transaction_812 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100812/F_RK_TRANSACTION.parquet/"
transaction_813 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100813/F_RK_TRANSACTION.parquet/"
promo_detail_812 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100812/F_RK_TRANSACTION_PROMO_DETAIL.parquet/"
promo_detail_813 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100813/F_RK_TRANSACTION_PROMO_DETAIL.parquet/"
transaction_detail_812 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100812/F_RK_TRANSACTION_DETAIL.parquet/"
transaction_detail_813 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100813/F_RK_TRANSACTION_DETAIL.parquet/"
tender_812 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100812/F_RK_TENDER.parquet/"
tender_813 = "wasbs://liyu-output@aswstagingsastorage.blob.core.windows.net/phase2_output/100813/F_RK_TENDER.parquet/"

# tables that were rerun from p2_out_retry9 from liyu-output 
retry_transaction_813 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100813/f_rk_transaction.parquet/"
retry_transaction_815 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100815/f_rk_transaction.parquet/"
retry_promo_detail_813 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100813/f_rk_transaction_promo_detail.parquet/"
retry_promo_detail_815 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100815/f_rk_transaction_promo_detail.parquet/"
retry_transaction_detail_813 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100813/f_rk_transaction_detail.parquet/"
retry_transaction_detail_815 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100815/f_rk_transaction_detail.parquet/"
retry_tender_813 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100813/f_rk_tender.parquet/"
retry_tender_815 = "wasbs://devliyu@devincrloadingeastus.blob.core.windows.net/k_chan/100815/f_rk_tender.parquet/"


def remove_str(id_val):
    if id_val[:6] == "WTCTW_":
        return id_val[6:]
    return id_val

def remove_salt_str(id_val):
    if id_val[3:10] == "_WTCTW_":
        return id_val[10:]
    return id_val

# For promo_detail
def append_promo_remove_str(ids):
    promo_id = ids[0]
    detail_id = ids[1]
    if detail_id[:6] == "WTCTW_":
        return detail_id[6:] + '_' + promo_id
    return '_' + promo_id

udf_remove_salt_str = udf(remove_salt_str, StringType())
udf_remove_str = udf(remove_str, StringType())
udf_append_promo_remove_str = udf(append_promo_remove_str, StringType())

web_tlog2_05.createOrReplaceTempView("web_tlog2_05")
web_tlog2_06.createOrReplaceTempView("web_tlog2_06")

tables = { "f_rk_transaction" : [transaction_812, transaction_813],
           "f_rk_promo_detail" : [promo_detail_812, promo_detail_813],
           "f_rk_transaction_detail": [transaction_detail_812, transaction_detail_813],
           "f_rk_tender" : [tender_812, tender_813]
         }

web_tlog2_store_ids = ['987', '970', '953', '952', '988', '962']
missing_data = None

for table in tables:
    for path in tables[table]:
        raw_data = None
        parse_rdm = None
        ingested_data = None
        if "100812" in path:
            raw_table = "web_tlog2_05"
            run_id = '100812'
            transaction_path = transaction_812
        else:
            raw_table = "web_tlog2_06"
            run_id = '100813'
            transaction_path = transaction_813
    
        rdm_data = spark.read.parquet(path)
        
        # Each table shares similar logic. Majority consist of inner joinining on the parsed transaction_id
        if table == "f_rk_transaction":
            raw_data = spark.sql('''
            select transaction_id, store_id
            from %s
            where transaction_type_name == 'Sale'
            ''' % raw_table)

            parse_rdm = rdm_data.selectExpr('cast(transaction_id as string) as rdm_id').withColumn(
                'parsed_id', udf_remove_str('rdm_id'))

            ingested_data = parse_rdm.join(raw_data, [raw_data.transaction_id == parse_rdm.parsed_id], 'inner')
        
        elif table == "f_rk_promo_detail":
            raw_data = spark.sql('''
            select transaction_id, store_id
            from %s
            where transaction_type_name == 'Promotion'
            ''' % raw_table)
            
            promo_detail = rdm_data            
            promo_detail.createOrReplaceTempView('promo_detail')
            
            # To get the promo id to build the parsed transaction_id
            joined_rdm_data = spark.sql('''
            select d_adm_promo.promo_id, promo_detail.transaction_detail_id from promo_detail
                left join d_adm_promo on
                promo_detail.promo_key == d_adm_promo.promo_key
            ''').na.fill(0)
            
            parsed_rdm = joined_rdm_data.withColumn('parsed_id', udf_append_promo_remove_str(
                    array('promo_id', 'transaction_detail_id')))
            
            # join the parsed id with the transaction_id from raw data
            ingested_data = parsed_rdm.join(raw_data, [raw_data.transaction_id == parsed_rdm.parsed_id], 'inner')
            
        elif table == "f_rk_transaction_detail":
            raw_data = spark.sql('''
            select transaction_id, store_id
            from %s
            where transaction_type_name == 'Item'
            ''' % raw_table)
            
            parse_rdm = rdm_data.selectExpr('cast(transaction_detail_id as string) as rdm_id').withColumn(
                'parsed_id', udf_remove_salt_str('rdm_id'))
            
            ingested_data = parse_rdm.join(raw_data, [raw_data.transaction_id == parse_rdm.parsed_id], 'inner')
        
        else:
            raw_data = spark.sql('''
            select order_number, store_id
            from %s
            where transaction_type_name == 'Tender'
            ''' % raw_table)
            
            f_rk_transaction_data = spark.read.parquet(transaction_path)
            # Join with f_rk_transaction to get the rdm transaction_id to inner join with the raw transaction_id
            tender_data = rdm_data.selectExpr('transaction_key as tender_key', 'tender_method_name', 'tender_detail_name')
            tender_data = f_rk_transaction_data.join(tender_data, [tender_data.tender_key == 
                                                             f_rk_transaction_data.TRANSACTION_KEY], 'inner')
            parse_rdm = tender_data.withColumn('parsed_id', udf_remove_str('TRANSACTION_ID'))
            
            ingested_data = parse_rdm.join(raw_data, [parse_rdm.parsed_id == raw_data.order_number],
                                          'inner')
            
        raw_store_count = raw_data.groupBy('store_id').count()
        rdm_data.createOrReplaceTempView('rdm_data')
        rdm_store_count = spark.sql('''select store_id as rdm_store_ids, count(store_id) as rdm_count
        from d_rk_store
        inner join rdm_data 
        on d_rk_store.store_key == rdm_data.store_key
        group by d_rk_store.store_id
        ''')
        
        rdm_store_count = rdm_store_count.filter(rdm_store_count.rdm_store_ids.isin(web_tlog2_store_ids))
        
        print(table + ' ' + run_id)
        print("Unique Values: " + str(ingested_data.select('parsed_id').distinct().count()))
        print('Row Count of RDM Data Merged: ' + str(ingested_data.count()))
        print('Row Count of Raw Data: ' + str(raw_data.count()))
        print('==============')
        print('Raw and RDM Store_id Count')
        raw_store_count.join(rdm_store_count, [rdm_store_count.rdm_store_ids == raw_store_count.store_id], 
                          'outer').selectExpr('store_id', 'count as raw_count', 'rdm_count').show()
        print('==============')

f_rk_promo_detail 100812
Unique Values: 3636
Row Count of RDM Data Merged: 3636
Row Count of Raw Data: 3636
Raw and RDM Store_id Count
+--------+---------+---------+
|store_id|raw_count|rdm_count|
+--------+---------+---------+
|     987|     1482|     1482|
|     970|      528|      528|
|     953|      515|      515|
|     988|     1111|     1111|
+--------+---------+---------+

f_rk_promo_detail 100813
Unique Values: 0
Row Count of RDM Data Merged: 0
Row Count of Raw Data: 3306
Raw and RDM Store_id Count
+--------+---------+---------+
|store_id|raw_count|rdm_count|
+--------+---------+---------+
|     987|        2|     null|
|     970|      226|     null|
|     953|      398|     null|
|     952|      477|     null|
|     988|     2203|     null|
+--------+---------+---------+

f_rk_transaction_detail 100812
Unique Values: 1847
Row Count of RDM Data Merged: 1847
Row Count of Raw Data: 1847
Raw and RDM Store_id Count
+--------+---------+---------+
|store_id|raw_count|rdm_count|
+---

As it stands the data from web_tlog_05 was all ingested, where nothing from web_tlog_06 was found in rdm.