In [0]:
import dlt
from pyspark.sql.functions import date_format, current_timestamp,col,cast,when,sum,round,trim

In [0]:
container = spark.conf.get("Container")
storageAccount = spark.conf.get("StorageAccount")

In [0]:
location = f'abfss://{container}@{storageAccount}.dfs.core.windows.net'

### BRONZE

In [0]:


@dlt.table(
    name ='bronze_price_state',
    schema="""
    state_id STRING NOT NULL PRIMARY KEY,
    state_label STRING,
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def state():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/price_state/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_price_substate',
    schema="""
    substate_id STRING NOT NULL PRIMARY KEY,
    substate_label STRING,
    state_id STRING,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_state_id FOREIGN KEY (state_id) REFERENCES gsynergy.default.bronze_price_state(state_id)
    """
)
def substate():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/price_substate/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_hldy',
    schema="""
    hldy_id STRING NOT NULL PRIMARY KEY,
    hldy_label STRING,
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def hldy():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/hldy/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_invstatus',
    schema="""
    code_id STRING NOT NULL PRIMARY KEY,
    code_label STRING,
    bckt_id STRING,
    bckt_label STRING,
    ownrshp_id STRING,
    ownrshp_label STRING, 
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def hldy():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/invstatus/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_clnd',
    schema="""
    fscldt_id STRING NOT NULL PRIMARY KEY,
    fscldt_label STRING,
    fsclwk_id STRING,
    fsclwk_label STRING,
    fsclmth_id STRING,
    fsclmth_label STRING,
    fsclqrtr_id STRING,
    fsclqrtr_label STRING,
    fsclyr_id STRING,
    fsclyr_label STRING,
    ssn_id STRING,
    ssn_label STRING,
    ly_fscldt_id STRING,
    lly_fscldt_id STRING,
    fscldow STRING,
    fscldom STRING,
    fscldoq STRING,
    fscldoy STRING,
    fsclwoy STRING,
    fsclmoy STRING, 
    fsclqoy STRING,
    date STRING,
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def clnd():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/clnd/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_invloc',
    schema="""
    loc STRING NOT NULL PRIMARY KEY,
    loc_label STRING,
    loctype STRING,
    loctype_label STRING, 
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def invloc():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/invloc/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_region',
    schema="""
    rgn INTEGER NOT NULL PRIMARY KEY,
    rgn_label STRING, 
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def region():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/region/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_district',
    schema="""
    dstr INTEGER NOT NULL PRIMARY KEY,
    dstr_label STRING, 
    rgn INTEGER ,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_rgn_id FOREIGN KEY (rgn) REFERENCES gsynergy.default.bronze_region(rgn)
    """
)
def district():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/district/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_street',
    schema="""
    str INTEGER NOT NULL PRIMARY KEY,
    str_label STRING, 
    dstr INTEGER ,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_dstr_id FOREIGN KEY (dstr) REFERENCES gsynergy.default.bronze_district(dstr)
    """
)
def street():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/street/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_dept',
    schema="""
    dept_id INTEGER NOT NULL PRIMARY KEY,
    dept_label STRING,
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def dept():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/dept/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_cat',
    schema="""
    cat_id INTEGER NOT NULL PRIMARY KEY,
    cat_label STRING,
    dept_id INTEGER,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_dept_id FOREIGN KEY (dept_id) REFERENCES gsynergy.default.bronze_dept(dept_id)
    """
)
def cat():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/catg/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_subcat',
    schema="""
    subcat_id INTEGER NOT NULL PRIMARY KEY,
    subcat_label STRING,
    cat_id INTEGER,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_cat_id FOREIGN KEY (cat_id) REFERENCES gsynergy.default.bronze_cat(cat_id)
    """
)
def subcat():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/subCatg/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_styl',
    schema="""
    styl_id STRING NOT NULL PRIMARY KEY,
    styl_label STRING,
    subcat_id INTEGER,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_subcat_id FOREIGN KEY (subcat_id) REFERENCES gsynergy.default.bronze_subcat(subcat_id)
    """
)
def styl():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/style/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_stylclr',
    schema="""
    stylclr_id STRING NOT NULL PRIMARY KEY,
    stylclr_label STRING,
    styl_id STRING,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_styl_id FOREIGN KEY (styl_id) REFERENCES gsynergy.default.bronze_styl(styl_id)
    """
)
def stylclr():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/stylclr/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_product',
    schema="""
    sku_id STRING NOT NULL PRIMARY KEY,
    sku_label STRING,
    stylclr_id STRING,
    issvc BOOLEAN,
    isasmbly BOOLEAN,
    isnfs BOOLEAN,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_stylclr_id FOREIGN KEY (stylclr_id) REFERENCES gsynergy.default.bronze_stylclr(stylclr_id)
    """
)
def product():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/product/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_chnl',
    schema="""
    chnl_id STRING NOT NULL PRIMARY KEY,
    chnl_label STRING,
    _rescued_data STRING,
    file_processed_date STRING
    """
)
def chnl():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/chnl/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_subchnl',
    schema="""
    subchnl_id STRING NOT NULL PRIMARY KEY,
    subchnl_label STRING,
    chnl_id STRING,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_chnl_id FOREIGN KEY (chnl_id) REFERENCES gsynergy.default.bronze_chnl(chnl_id)
    """
)
def subchnl():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/subchnl/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_site',
    schema="""
    site_id STRING NOT NULL PRIMARY KEY,
    site_label STRING,
    subchnl_id STRING,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_subchnl_id FOREIGN KEY (subchnl_id) REFERENCES gsynergy.default.bronze_subchnl(subchnl_id)
    """
)
def site():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/site/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_transactions',
    schema="""
    order_id STRING NOT NULL PRIMARY KEY,
    line_id STRING,
    type STRING,
    dt STRING,
    pos_site_id STRING NOT NULL,
    sku_id STRING NOT NULL,
    fscldt_id STRING NOT NULL,
    price_substate_id STRING,
    sales_units STRING,
    sales_dollars STRING,
    discount_dollars STRING,
    original_order_id STRING,
    original_line_id STRING,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_pos_site_id FOREIGN KEY (pos_site_id) REFERENCES gsynergy.default.bronze_site(site_id),
    CONSTRAINT fk_sku_id1 FOREIGN KEY (sku_id) REFERENCES gsynergy.default.bronze_product(sku_id),
    CONSTRAINT fk_fscldt_id2 FOREIGN KEY (fscldt_id) REFERENCES gsynergy.default.bronze_clnd(fscldt_id),
    CONSTRAINT fk_price_substate_id FOREIGN KEY (price_substate_id) REFERENCES gsynergy.default.bronze_price_substate(substate_id)
    """
)
def transactions():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/transactions/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

In [0]:
@dlt.table(
    name ='bronze_averagecosts',
    schema="""
    fscldt_id STRING NOT NULL,
    sku_id STRING NOT NULL,
    average_unit_standardcost STRING,
    average_unit_landedcost STRING,
    _rescued_data STRING,
    file_processed_date STRING,
    CONSTRAINT fk_fscldt_id1 FOREIGN KEY (fscldt_id) REFERENCES gsynergy.default.bronze_clnd(fscldt_id),
    CONSTRAINT fk_sku_id2 FOREIGN KEY (sku_id) REFERENCES gsynergy.default.bronze_product(sku_id)
    """
)
def averagecosts():
    df = spark.readStream.format("cloudFiles")\
      .option("cloudFiles.format", "parquet")\
      .load(f"{location}/averagecosts/")\
      .withColumn('file_processed_date',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'))
    return df

### SILVER

In [0]:
valid_hldy = {"valid_hldy_code": "hldy_id IS NOT NULL AND hldy_label IS NOT NULL"}


@dlt.table(
	name = 'silver_hldy'
)
@dlt.expect_all_or_drop(valid_hldy)
def hldy():
	df = spark.readStream.table('LIVE.bronze_hldy')
	return df

In [0]:
valid_status = {"valid_invstatus": "code_id IS NOT NULL AND bckt_id IS NOT NULL AND ownrshp_id IS NOT NULL"}

@dlt.table(
	name = 'silver_invstatus'
)
@dlt.expect_all_or_drop(valid_status)
def invstatus():
	df = spark.readStream.table('LIVE.bronze_invstatus')
	return df

In [0]:
valid_loc = {"valid_invloc": "loc IS NOT NULL AND loctype IS NOT NULL"}

@dlt.table(
	name = 'silver_invloc'
)
@dlt.expect_all_or_drop(valid_loc)
def invloc():
	df = spark.readStream.table('LIVE.bronze_invloc')
	return df

In [0]:
@dlt.table(
	name = 'silver_clnd'
)
def clnd():
	df = spark.readStream.table('LIVE.bronze_clnd').withColumnRenamed('fscldt_id','clnd_fscldt_id').drop('_rescued_data').drop('file_processed_date')
	return df

In [0]:
valid_str = {"strcode": "str IS NOT NULL AND dstr IS NOT NULL AND str > 0 "}


@dlt.table(
	name = 'silver_street'
)
@dlt.expect_all_or_drop(valid_str)
def street():
	df = spark.readStream.table('LIVE.bronze_street')
	return df

In [0]:
valid_rgn = {"rgncode": "rgn IS NOT NULL AND rgn_label IS NOT NULL"}

@dlt.table(
	name = 'silver_region'
)
@dlt.expect_all_or_drop(valid_rgn)
def region():
	df = spark.readStream\
 	.table('LIVE.bronze_region')
	return df

In [0]:
valid_dstr = {"dstrcode": "dstr IS NOT NULL AND rgn IS NOT NULL"}


@dlt.table(
	name = 'silver_district'
)
@dlt.expect_all_or_drop(valid_dstr)
def district():
	df = spark.readStream\
 	.table('LIVE.bronze_district')
	return df

In [0]:
valid_units = {"standardcost": "average_unit_standardcost >= 0", "landedcost": "average_unit_landedcost >=0","valid_id":"fscldt_id IS NOT NULL AND sku_id IS NOT NULL"}

@dlt.table(
	name = 'silver_averagecosts'
)
@dlt.expect_all_or_drop(valid_units)
def averagecosts():
	df = spark.readStream.table('LIVE.bronze_averagecosts')\
     .withColumn('average_unit_standardcost',col('average_unit_standardcost').cast('double'))\
     .withColumn('average_unit_landedcost',col('average_unit_landedcost').cast('double'))
	return df

In [0]:
valid_trans = {"units": "sales_units >= 0", "sales": "sales_dollars >=0", "discount": "discount_dollars >=0","type":"type is not NULL AND type!=''","valid_id":"pos_site_id IS NOT NULL AND sku_id IS NOT NULL AND fscldt_id IS NOT NULL AND price_substate_id IS NOT NULL"}
	
	
@dlt.table(
	name = 'silver_transactions'
)
@dlt.expect_all_or_drop(valid_trans)
def transactions():
	df = spark.readStream\
 	.table('LIVE.bronze_transactions')\
	.withColumn('discount_dollars',col('discount_dollars').cast('double'))\
	.withColumn('sales_dollars',col('sales_dollars').cast('double'))\
	.withColumn('order_id',col('order_id').cast('long'))\
	.withColumn('line_id',col('line_id').cast('integer'))\
	.withColumn('original_order_id',col('original_order_id').cast('long'))\
	.withColumn('original_line_id',col('original_line_id').cast('integer'))\
	.withColumn('sales_units',col('sales_units').cast('integer'))\
	.withColumn('type',when(col('type') =='','NONTYPE').when( col('type') == None, 'NONTYPE').otherwise(col('type')))
	return df

In [0]:
valid_price_state = {"state": "state_id IS NOT NULL"}


@dlt.table(
	name = 'silver_price_state'
)
@dlt.expect_all_or_drop(valid_price_state)
def price_state():
	df = spark.readStream\
 	.table('LIVE.bronze_price_state')
	return df
	


In [0]:
	
valid_price_substate = {"substate": "substate_id IS NOT NULL AND state_id IS NOT NULL"}


@dlt.table(
	name = 'silver_price_substate'
)
@dlt.expect_all_or_drop(valid_price_substate)
def price_substate():
	df = spark.readStream\
 	.table('LIVE.bronze_price_substate')
	return df

In [0]:
valid_site = {"site": "site_id IS NOT NULL","subchnl" : "subchnl_id IS NOT NULL"}


@dlt.table(
	name = 'silver_site'
)
@dlt.expect_all_or_drop(valid_site)
def site():
	df = spark.readStream\
 	.table('LIVE.bronze_site')
	return df

In [0]:
valid_subchnl = {"chnl": "chnl_id IS NOT NULL","subchnl" : "subchnl_id IS NOT NULL"}

@dlt.table(
	name = 'silver_subchnl'
)
@dlt.expect_all_or_drop(valid_subchnl)
def subchnl():
	df = spark.readStream\
 	.table('LIVE.bronze_subchnl')
	return df

In [0]:
valid_chnl = {"chnl": "chnl_id IS NOT NULL"}

@dlt.table(
	name = 'silver_chnl'
)
@dlt.expect_all_or_drop(valid_chnl)
def chnl():
	df = spark.readStream\
 	.table('LIVE.bronze_chnl')
	return df

In [0]:
valid_dept = {"dept": "dept_id IS NOT NULL"}


@dlt.table(
	name = 'silver_dept'
)
@dlt.expect_all_or_drop(valid_dept)
def dept():
	df = spark.readStream\
 	.table('LIVE.bronze_dept')
	return df

In [0]:
valid_cat = {"cat": "cat_id IS NOT NULL AND dept_id IS NOT NULL"}


@dlt.table(
	name = 'silver_cat'
)
@dlt.expect_all_or_drop(valid_cat)
def cat():
	df = spark.readStream\
 	.table('LIVE.bronze_cat')
	return df
	

In [0]:
valid_subcat = {"subcat": "cat_id IS NOT NULL AND subcat_id IS NOT NULL"}


@dlt.table(
	name = 'silver_subcat'
)
@dlt.expect_all_or_drop(valid_subcat)
def subcat():
	df = spark.readStream\
 	.table('LIVE.bronze_subcat')
	return df

In [0]:
valid_styl = {"styl": "styl_id IS NOT NULL AND subcat_id IS NOT NULL"}


@dlt.table(
	name = 'silver_styl'
)
@dlt.expect_all_or_drop(valid_styl)
def styl():
	df = spark.readStream\
 	.table('LIVE.bronze_styl')
	return df

In [0]:
valid_stylclr = {"stylclr": "styl_id IS NOT NULL AND stylclr_id IS NOT NULL"}


@dlt.table(
	name = 'silver_stylclr'
)
@dlt.expect_all_or_drop(valid_stylclr)
def stylclr():
	df = spark.readStream\
 	.table('LIVE.bronze_stylclr')
	return df

In [0]:
valid_prod = {"prod": "sku_id IS NOT NULL AND stylclr_id IS NOT NULL"}


@dlt.table(
	name = 'silver_product'
)
@dlt.expect_all_or_drop(valid_prod)
def prod():
	df = spark.readStream\
 	.table('LIVE.bronze_product')
	return df

### GOLD

In [0]:
@dlt.table(
	name = 'gold_dim_possite'
)
def site():
	chnl_df = spark.read.table('LIVE.silver_chnl')
	subchnl_df = spark.read.table('LIVE.silver_subchnl')
	site_df = spark.read.table('LIVE.silver_site')
	
	df = chnl_df.join(subchnl_df, chnl_df.chnl_id == subchnl_df.chnl_id, 'left')\
     .join(site_df, subchnl_df.subchnl_id == site_df.subchnl_id, 'left')\
     .select(chnl_df['chnl_id'],subchnl_df['subchnl_id'],site_df['site_id'],site_df['site_label'])
	return df

In [0]:
@dlt.table(
	name = 'gold_dim_state'
)

def state():
	price_state_df = spark.read.table('LIVE.silver_price_state')
	price_substate_df = spark.read.table('LIVE.silver_price_substate')
	
	df = price_state_df.join(price_substate_df, price_state_df.state_id == price_substate_df.state_id, 'left')\
     .select(price_state_df['state_id'],price_substate_df['substate_id'],price_substate_df['substate_label'])
	return df

In [0]:
@dlt.table(
	name = 'gold_dim_product'
)

def product():
	product_df = spark.read.table('LIVE.silver_product')
	styl_df = spark.read.table('LIVE.silver_styl')
	stylclr_df = spark.read.table('LIVE.silver_stylclr')
	subcat_df = spark.read.table('LIVE.silver_subcat')
	cat_df = spark.read.table('LIVE.silver_cat')
	dept_df = spark.read.table('LIVE.silver_dept')

	df = dept_df\
		.join(cat_df, dept_df.dept_id == cat_df.dept_id, 'left')\
		.join(subcat_df, cat_df.cat_id == subcat_df.cat_id, 'left')\
		.join(styl_df, subcat_df.subcat_id == styl_df.subcat_id, 'left')\
		.join(stylclr_df, styl_df.styl_id == stylclr_df.styl_id, 'left')\
		.join(product_df, stylclr_df.stylclr_id == product_df.stylclr_id ,'left')\
		.select(product_df['sku_id'].alias('prod_sku_id'), product_df['sku_label'], stylclr_df['stylclr_id'], styl_df['styl_id'], subcat_df['subcat_id'], cat_df['cat_id'], dept_df['dept_id'], product_df['issvc'], product_df['isasmbly'], product_df['isnfs'])
	return df

In [0]:
@dlt.table(
	name = 'gold_fact_transactions'
)
def transactions():
	product_df = spark.read.table('LIVE.gold_dim_product')
	clnd_df = spark.read.table('LIVE.gold_dim_clnd')
	site_df = spark.read.table('LIVE.gold_dim_possite')
	state_df = spark.read.table('LIVE.gold_dim_state')
	trans_df = spark.read.table('LIVE.silver_transactions')

	#selectList = ['trans_df.*','product_df.sku_id','clnd_df.clnd_fscldt_id', 'site_df.site_id', 'state_df.substate_id','clnd_df._rescued_data','clnd_df.file_processed_date','trans_df.file_processed_date','trans_df._rescued_data']
 
	df = trans_df\
		.join(clnd_df, trim(trans_df.fscldt_id) == trim(clnd_df.clnd_fscldt_id), 'inner')\
		.join(site_df, trim(site_df.site_id) == trim(trans_df.pos_site_id), 'left')\
		.join(state_df, trim(state_df.substate_id) == trim(trans_df.price_substate_id), 'left')\
		.join(product_df, trim(product_df.prod_sku_id) == trim(trans_df.sku_id), 'left')\
     # .drop(*dropList)
	return df

In [0]:
@dlt.table(
	name = 'gold_dim_clnd'
)
def clnd():
	df = spark.read.table('LIVE.silver_clnd')
	return df

In [0]:
@dlt.table(
	name = 'gold_mview_weekly_sales'
)

def weekly_sales():
	clnd_df = spark.read.table('LIVE.gold_dim_clnd')
	trans_df = spark.read.table('LIVE.gold_fact_transactions')
	
	df = trans_df\
		.join(clnd_df, trim(trans_df.fscldt_id) == trim(clnd_df.clnd_fscldt_id), 'inner')\
		.groupBy(clnd_df['fsclwk_id'], trans_df['pos_site_id'], trans_df['price_substate_id'], trans_df['sku_id'],trans_df['type'])\
		.agg(sum('sales_units').alias('total_sales_units')\
        ,sum('sales_dollars').alias('total_sales_dollars')\
        ,sum('discount_dollars').alias('total_discount_dollars'))\
        .select(clnd_df['fsclwk_id'],trans_df['type'], trans_df['sku_id'] , trans_df['pos_site_id'], trans_df['price_substate_id'], 'total_sales_units', round('total_sales_dollars',2).alias('total_sales_dollars'), round('total_discount_dollars',2).alias('total_discount_dollars'))\
        .orderBy('fsclwk_id')
	return df