In [0]:
from pyspark.sql.functions import current_timestamp, lit, year, month, dayofmonth, round, rand, col, from_unixtime
from datetime import datetime
from pyspark.sql.types import StringType, TimestampType, BooleanType, StructType, StructField, DoubleType, LongType
from pyspark.sql import SparkSession

import datetime as dt


class hp:
    catalogue = "workspace"
    database = "default"
    display_target = "development"

    def measuretime(self,sp: SparkSession,statement: str) -> dt.time:
        start = datetime.now()
        df=sp.sql(statement)
        df.display()
        end = datetime.now()
        return end-start
    
    def runtest(self,sp: SparkSession,testname: str,statement: str):
        print(testname)
        dur=self.measuretime(sp,statement)
        print(f"{testname} took {dur.seconds} seconds, or {dur}")
        return

    def setup(self, catalogue = "workspace", database = "default", display_target = "development"):
        s = None
        try:
            s = dbutils.widgets.get('catalogue_bronze_name')
        except:
            print('No catalogue_bronze_name parameter was found')
            pass
        if s is not None:
            self.catalogue = s
        s = None
        try:
            s = dbutils.widgets.get('database_bronze_name')
        except:
            print('No database_bronze_name parameter was found')
            pass
        if s is not None:
            self.database = s
        s = None
        try:
            s = dbutils.widgets.get('display_target')
        except:
            print('No display_target parameter was found')
            pass
        if s is not None:
            self.display_target = s
        s = None


        if catalogue is not None:
            self.catalogue = catalogue
        if database is not None:
            self.database = database
        if display_target is not None:
            self.display_target = display_target


        spark.sql(f'USE CATALOG {self.catalogue}')
        spark.sql(f'USE DATABASE {self.database}')


        #spark.sql("DROP VARIABLE IF EXISTS catalogue_bronze_name")
        #spark.sql("DROP VARIABLE IF EXISTS database_bronze_name")
        #spark.sql("DROP VARIABLE IF EXISTS display_target")
        #spark.sql("DECLARE VARIABLE catalogue_bronze_name STRING")
        #spark.sql("DECLARE VARIABLE database_bronze_name STRING")
        #spark.sql("DECLARE VARIABLE display_target STRING")
        
        spark.sql("drop temporary variable if exists catalogue_bronze_name;")
        spark.sql("declare variable catalogue_bronze_name string;")
        spark.sql(f"set variable catalogue_bronze_name='{self.catalogue}';")          

        spark.sql("drop temporary variable if exists database_bronze_name;")
        spark.sql("declare variable database_bronze_name string;")
        spark.sql(f"set variable database_bronze_name='{self.database}';")          

        spark.sql("drop temporary variable if exists display_target;")
        spark.sql("declare variable display_target string;")
        spark.sql(f"set variable display_target='{self.display_target}';")          


    def __init__(self, catalogue = None, database = None, display_target = None):
        #self.setup(catalogue = catalogue, database = database, display_target = display_target)

        pass

    def add_standard_columns(self, df,createdBy=None,modifiedBy=None):
        df = df.withColumn('timestamp', current_timestamp())

        if modifiedBy is not None:
            df = df.withColumn('modifiedOn', current_timestamp().cast(TimestampType()))
            df = df.withColumn('modifiedBy', lit(modifiedBy).cast(StringType()))
        else:
            df = df.withColumn('modifiedOn', lit(None).cast(TimestampType()))
            df = df.withColumn('modifiedBy', lit(None).cast(StringType()))
        if createdBy is not None:
            df = df.withColumn('createdOn', current_timestamp())
            df = df.withColumn('createdBy', lit(createdBy).cast (StringType()))
        else:
            df = df.withColumn('createdOn', lit(None).cast(TimestampType()))
            df = df.withColumn('createdBy', lit(None).cast (StringType()))
        
        df = df.withColumn('isCurrent', lit(True).cast(BooleanType()))

        return df
    
    def transactions_pt(self, df):
        df = (
          df
            .withColumn('year', year(df['time']))
            .withColumn('month', month(df['time']))
            .withColumn('customer_partition', df['customer_id']%10)
        )

        return df


chp = hp()

In [0]:
%sql

USE CATALOG ops;
CREATE DATABASE IF NOT EXISTS test2;
USE DATABASE test2;
--CREATE VOLUME IF NOT EXISTS test1_v;

In [0]:
from pyspark.sql.functions import *

df = (
    spark
        .range(0,10000000000,1)#,32)
        .select(
            'id',
            round(rand()*1000,2).alias('amount'),
            (col('id')%10).alias('country_id'),
            (col('id')%100).alias('store_id'),
            round(rand()*100000000,0).alias('customer_id'),
            from_unixtime(lit(1701692381+col('id'))).alias('time')
        )
)

df = chp.add_standard_columns(df,'etl','mml2')

df = chp.transactions_pt(df)

(df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/ops/test1/test1_v/transactions")
 )


In [0]:
from pyspark.sql.functions import *

df = (
    spark
        .range(0,10000000000,1)#,32)
        .select(
            'id',
            round(rand()*1000,2).alias('amount'),
            (col('id')%10).alias('country_id'),
            (col('id')%100).alias('store_id'),
            round(rand()*100000000,0).alias('customer_id'),
            from_unixtime(lit(1701692381+col('id'))).alias('time')
        )
)

df = chp.add_standard_columns(df,'etl','mml2')

df = chp.transactions_pt(df)

(df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy(['year','month','customer_partition'])
    .save("/Volumes/ops/test1/test1_v/transactions_2")
 )


In [0]:
from pyspark.sql.functions import *

df = (
    spark
        .range(0,10000000000,1)#,32)
        .select(
            'id',
            round(rand()*1000,2).alias('amount'),
            (col('id')%10).alias('country_id'),
            (col('id')%100).alias('store_id'),
            round(rand()*100000000,0).alias('customer_id'),
            from_unixtime(lit(1701692381+col('id'))).alias('time')
        )
)

df = chp.add_standard_columns(df,'etl','mml2')

df = chp.transactions_pt(df)

(df
    .write
    .format("delta")
    .mode("overwrite")
    .clusterBy(['year','store_id','customer_partition'])
    .save("/Volumes/ops/test1/test1_v/transactions_3")
 )


In [0]:
from pyspark.sql.functions import *

df = (
    spark
        .range(0,10000000000,1)#,32)
        .select(
            'id',
            round(rand()*1000,2).alias('amount'),
            (col('id')%10).alias('country_id'),
            (col('id')%100).alias('store_id'),
            round(rand()*100000000,0).alias('customer_id'),
            from_unixtime(lit(1701692381+col('id'))).alias('time')
        )
)

df = chp.add_standard_columns(df,'etl','mml2')

df = chp.transactions_pt(df)

(df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy(['year'])
    .clusterBy(['store_id','customer_partition'])
    .save("/Volumes/ops/test1/test1_v/transactions_4")
 )


In [0]:
%sql
select count(*) from transactions;



In [0]:
store_df= (spark
            .range(0,99)
            .select(
                'id',
                round(rand()*100,0).alias('employees'),
                (col('id')%10).alias('country_id'),
                expr('uuid()').alias('name')
            ))
store_df = chp.add_standard_columns(store_df,'etl','mml2')

(store_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/ops/test1/test1_v/stores")
 )
df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/stores")




In [0]:
from pyspark.sql.functions import col, round, rand, expr, when

store_df = (
    spark
        .range(0, 100000000)
        .select(
            'id',
            round(col('id')%100000,0).alias('customer_master_id'),
            round(rand() * 100, 0).alias('customer_band'),
            (col('id') % 10).alias('country_id'),
            expr('uuid()').alias('name'),
            when(round(col('id')%100000,0) == col('id'), True)
                .otherwise(False)
                .alias('IsCurrent')
        )
)
store_df = chp.add_standard_columns(store_df,'etl','mml2')
(store_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/ops/test1/test1_v/customers")
 )
df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/customers")


In [0]:
countries = [
    (0,"Italy"),
    (1,"France"),
    (2,"Spain"),
    (3,"Germany"),
    (4,"UK"),
    (5,"USA"),
    (6,"Canada"),
    (7,"Mexico"),
    (8,"Brazil"),
    (9,"Argentina"),
    (10,"China"),
    (11,"Japan"),
    (12,"Korea"),
    (13,"India"),
    (14,"Australia"),
    (15,"New Zealand")
]

columns = ["id","name"]

countries_df = spark.createDataFrame(data=countries,schema=columns  )
countries_df = chp.add_standard_columns(countries_df,'etl','mml2')

(countries_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/ops/test1/test1_v/countries")
 )


In [0]:
#df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/transactions")

#df.createOrReplaceTempView("transactions")

# spark.conf.set(
#     "spark.databricks.delta.retentionDurationCheck.enabled",
#     "false"
# )

spark.sql("VACUUM delta.`/Volumes/ops/test1/test1_v/transactions` RETAIN 168 HOURS")
spark.sql("OPTIMIZE delta.`/Volumes/ops/test1/test1_v/transactions`")

#df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/countries")

#df.createOrReplaceTempView("countries")

# spark.conf.set(
#     "spark.databricks.delta.retentionDurationCheck.enabled",
#     "false"
# )

spark.sql("VACUUM delta.`/Volumes/ops/test1/test1_v/countries` RETAIN 168 HOURS")
spark.sql("OPTIMIZE delta.`/Volumes/ops/test1/test1_v/countries`")

#df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/customers")

#df.createOrReplaceTempView("customers")

# spark.conf.set(
#     "spark.databricks.delta.retentionDurationCheck.enabled",
#     "false"
# )

spark.sql("VACUUM delta.`/Volumes/ops/test1/test1_v/customers` RETAIN 168 HOURS")
spark.sql("OPTIMIZE delta.`/Volumes/ops/test1/test1_v/customers`")

#df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/stores")

#df.createOrReplaceTempView("stores")

# spark.conf.set(
#     "spark.databricks.delta.retentionDurationCheck.enabled",
#     "false"
# )

spark.sql("VACUUM delta.`/Volumes/ops/test1/test1_v/stores` RETAIN 168 HOURS")
spark.sql("OPTIMIZE delta.`/Volumes/ops/test1/test1_v/stores`")

In [0]:
tablea='transactions_1'

In [0]:
tablea = "transactions_2"
testname = "test1.1"
statement=f"""
    select /*+ BROADCAST(stores), BROADCAST(customers), BROADCAST(countries) */
count(*) from (
    select 
        transactions.id,
        amount,
        countries.name as country_name,
        employees,
        stores.name as store_name,
        max(cc.name) as max_customer_name,
        max(cc.customer_band) as max_customer_band
    from ops.test1.{tablea} as transactions
    inner join ops.test1.stores
        on transactions.store_id = stores.id
    inner join ops.test1.countries
        on transactions.country_id = countries.id
    inner join ops.test1.customers
        on transactions.customer_id = customers.id
    inner join ops.test1.customers cc
        on customers.customer_master_id = cc.id
    group by 
        transactions.id,
        amount,
        countries.name,
        employees,
        stores.name
        ) a
    """
chp.runtest(spark,testname,statement)
#------------------------------------------------------
testname = "test1.2"
statement=f"""
    select count(*) from (
SELECT /*+ BROADCAST(stores), BROADCAST(customers), BROADCAST(countries) */
       cc.id,
       sum(amount) as amount,
       countries.name AS country_name,
       employees,
       stores.name AS store_name,
       MAX(c1.name) AS max_customer_name,
       MAX(c1.customer_band) AS max_customer_band
FROM ops.test1.{tablea} AS transactions
inner JOIN ops.test1.stores
    ON transactions.store_id = stores.id
inner JOIN ops.test1.countries
    ON transactions.country_id = countries.id
inner JOIN ops.test1.customers c1
    ON transactions.customer_id = c1.id
inner JOIN ops.test1.customers cc
    ON c1.customer_master_id = cc.id
WHERE
      transactions.store_id between 10 and 20
  AND stores.employees > 10
  AND stores.employees <= 35
  AND c1.customer_band BETWEEN 15 AND 65
GROUP BY cc.id,
         countries.name,
         employees,
         stores.name
) a
    """
chp.runtest(spark,testname,statement)
#------------------------------------------------------
testname = "test1.3"
statement=f"""
select count(*) from (
SELECT /*+ BROADCAST(stores), BROADCAST(customers), BROADCAST(countries) */
       transactions.id,
       amount,
       countries.name AS country_name,
       employees,
       stores.name AS store_name,
       MAX(cc.name) AS max_customer_name,
       MAX(cc.customer_band) AS max_customer_band
FROM ops.test1.{tablea} AS transactions
inner JOIN ops.test1.stores
    ON transactions.store_id = stores.id
inner JOIN ops.test1.countries
    ON transactions.country_id = countries.id
inner JOIN ops.test1.customers c1
    ON transactions.customer_id = c1.id
inner JOIN ops.test1.customers cc
    ON c1.customer_master_id = cc.id
WHERE
      stores.id between 10 and 20
  AND stores.employees > 10
  AND stores.employees <= 35
  AND c1.customer_band BETWEEN 15 AND 65
GROUP BY transactions.id,
         amount,
         countries.name,
         employees,
         stores.name
) a
    """
chp.runtest(spark,testname,statement)
#------------------------------------------------------
testname = "test2.1"
statement=f"""
    select 
count(*) from (
    select 
        transactions.id,
        amount,
        countries.name as country_name,
        employees,
        stores.name as store_name,
        max(cc.name) as max_customer_name,
        max(cc.customer_band) as max_customer_band
    from ops.test1.{tablea} as transactions
    inner join ops.test1.stores
        on transactions.store_id = stores.id
    inner join ops.test1.countries
        on transactions.country_id = countries.id
    inner join ops.test1.customers
        on transactions.customer_id = customers.id
    inner join ops.test1.customers cc
        on customers.customer_master_id = cc.id
    group by 
        transactions.id,
        amount,
        countries.name,
        employees,
        stores.name
        ) a
    """
chp.runtest(spark,testname,statement)
#------------------------------------------------------
testname = "test2.2"
statement=f"""
    select count(*) from (
SELECT 
       cc.id,
       sum(amount) as amount,
       countries.name AS country_name,
       employees,
       stores.name AS store_name,
       MAX(c1.name) AS max_customer_name,
       MAX(c1.customer_band) AS max_customer_band
FROM ops.test1.{tablea} AS transactions
inner JOIN ops.test1.stores
    ON transactions.store_id = stores.id
inner JOIN ops.test1.countries
    ON transactions.country_id = countries.id
inner JOIN ops.test1.customers c1
    ON transactions.customer_id = c1.id
inner JOIN ops.test1.customers cc
    ON c1.customer_master_id = cc.id
WHERE
      transactions.store_id between 10 and 20
  AND stores.employees > 10
  AND stores.employees <= 35
  AND c1.customer_band BETWEEN 15 AND 65
GROUP BY cc.id,
         countries.name,
         employees,
         stores.name
) a
    """
chp.runtest(spark,testname,statement)
#------------------------------------------------------
testname = "test2.3"
statement=f"""
select count(*) from (
SELECT 
       transactions.id,
       amount,
       countries.name AS country_name,
       employees,
       stores.name AS store_name,
       MAX(cc.name) AS max_customer_name,
       MAX(cc.customer_band) AS max_customer_band
FROM ops.test1.{tablea} AS transactions
inner JOIN ops.test1.stores
    ON transactions.store_id = stores.id
inner JOIN ops.test1.countries
    ON transactions.country_id = countries.id
inner JOIN ops.test1.customers c1
    ON transactions.customer_id = c1.id
inner JOIN ops.test1.customers cc
    ON c1.customer_master_id = cc.id
WHERE
      stores.id between 10 and 20
  AND stores.employees > 10
  AND stores.employees <= 35
  AND c1.customer_band BETWEEN 15 AND 65
GROUP BY transactions.id,
         amount,
         countries.name,
         employees,
         stores.name
) a
    """
chp.runtest(spark,testname,statement)
#------------------------------------------------------


In [0]:
%sql
drop table ops.test1.countries;
drop table ops.test1.customers;
drop table ops.test1.stores;
drop table ops.test1.transactions_1;

In [0]:
%sql
create table if not exists ops.test1.countries(
  id int not null,
  name string not null,
  `timestamp` timestamp not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  primary key (id)
);

create table if not exists ops.test1.customers(
  id int not null,
  customer_master_id int not null,
  customer_band int not null,
  country_id int not null,
  name string not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  primary key (id)
);

create table if not exists ops.test1.stores(  
  id int not null,
  employees int not null,
  country_id int not null,
  name string not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  primary key (id)
);

create table if not exists ops.test1.transactions_1(
  id bigint not null,
  amount double not null,
  country_id int not null,
  store_id int not null,
  customer_id int not null,
  `time` timestamp not null,
  `timestamp` timestamp not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  `year` smallint not null,
  `month` tinyint not null,
  `customer_partition` smallint not null,
  primary key (id)
);


ALTER TABLE ops.test1.transactions_1
ADD CONSTRAINT fk_ops_test1_transactions_1_countries FOREIGN KEY (country_id) REFERENCES ops.test1.countries(id);
ALTER TABLE ops.test1.transactions_1
ADD CONSTRAINT fk_ops_test1_transactions_1_customers FOREIGN KEY (customer_id) REFERENCES ops.test1.customers(id);
ALTER TABLE ops.test1.transactions_1
ADD CONSTRAINT fk_ops_test1_transactions_1_stores FOREIGN KEY (store_id) REFERENCES ops.test1.stores(id);

/*
insert into ops.test1.stores(id, employees, country_id, name, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`)
select id, employees, country_id, name, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`
from stores;
insert into ops.test1.countries(id, name, `timestamp`, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`)
select id, name, `timestamp`, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`
from countries;
insert into ops.test1.customers(id, customer_master_id, customer_brand, country_id, name, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`)
select id, customer_master_id, customer_brand, country_id, name, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`
from customers;
insert into ops.test1.transactions_1(id, country_id, store_id, customer_id, `time`, `timestamp`, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`, `year`, `month`, `customer_partition`)
select id, country_id, store_id, customer_id, `time`, `timestamp`, `modifiedOn`, `modifiedBy`, `createdOn`, `createdBy`, `isCurrent`, `year`, `month`, `customer_partition`
from transactions;
*/




In [0]:
%sql
drop table if exists ops.test1.transactions_2;
drop table if exists ops.test1.transactions_3;
drop table if exists ops.test1.transactions_4;


In [0]:
%sql
create table if not exists ops.test1.transactions_2(
  id bigint not null,
  amount double not null,
  country_id int not null,
  store_id int not null,
  customer_id int not null,
  `time` timestamp not null,
  `timestamp` timestamp not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  `year` smallint not null,
  `month` tinyint not null,
  `customer_partition` smallint not null,
   constraint pk_ops_test1_transactions_2 primary key(id)
) using delta tblproperties(
  'delta.feature.allowColumnDefaults' = 'supported',
  'delta.columnMapping.mode' = 'name'
 ) partitioned by (year,month,customer_partition);


ALTER TABLE ops.test1.transactions_2
ADD CONSTRAINT fk_ops_test1_transactions_2_countries FOREIGN KEY (country_id) REFERENCES ops.test1.countries(id);
ALTER TABLE ops.test1.transactions_2
ADD CONSTRAINT fk_ops_test1_transactions_2_customers FOREIGN KEY (customer_id) REFERENCES ops.test1.customers(id);
ALTER TABLE ops.test1.transactions_2
ADD CONSTRAINT fk_ops_test1_transactions_2_stores FOREIGN KEY (store_id) REFERENCES ops.test1.stores(id);



create table if not exists ops.test1.transactions_3(
  id bigint not null,
  amount double not null,
  country_id int not null,
  store_id int not null,
  customer_id int not null,
  `time` timestamp not null,
  `timestamp` timestamp not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  `year` smallint not null,
  `month` tinyint not null,
  `customer_partition` smallint not null,
  constraint pk_ops_test1_transactions_3 primary key(id)
) using delta tblproperties(
  'delta.feature.allowColumnDefaults' = 'supported',
  'delta.columnMapping.mode' = 'name'
 ) cluster by (year,store_id,customer_partition);


ALTER TABLE ops.test1.transactions_3
ADD CONSTRAINT fk_ops_test1_transactions_3_countries FOREIGN KEY (country_id) REFERENCES ops.test1.countries(id);
ALTER TABLE ops.test1.transactions_3
ADD CONSTRAINT fk_ops_test1_transactions_3_customers FOREIGN KEY (customer_id) REFERENCES ops.test1.customers(id);
ALTER TABLE ops.test1.transactions_3
ADD CONSTRAINT fk_ops_test1_transactions_3_stores FOREIGN KEY (store_id) REFERENCES ops.test1.stores(id);

create table if not exists ops.test1.transactions_4(
  id bigint not null,
  amount double not null,
  country_id int not null,
  store_id int not null,
  customer_id int not null,
  `time` timestamp not null,
  `timestamp` timestamp not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  `year` smallint not null,
  `month` tinyint not null,
  `customer_partition` smallint not null,
  constraint pk_ops_test1_transactions_4 primary key(id)
) using delta tblproperties(
  'delta.feature.allowColumnDefaults' = 'supported',
  'delta.columnMapping.mode' = 'name'
 ) partitioned by (year)
 cluster by (store_id,customer_partition);



ALTER TABLE ops.test1.transactions_4
ADD CONSTRAINT fk_ops_test1_transactions_4_countries FOREIGN KEY (country_id) REFERENCES ops.test1.countries(id);
ALTER TABLE ops.test1.transactions_4
ADD CONSTRAINT fk_ops_test1_transactions_4_customers FOREIGN KEY (customer_id) REFERENCES ops.test1.customers(id);
ALTER TABLE ops.test1.transactions_4
ADD CONSTRAINT fk_ops_test1_transactions_4_stores FOREIGN KEY (store_id) REFERENCES ops.test1.stores(id);

In [0]:
from pyspark.sql.functions import *

df = (
    spark
        .range(0,10000000000,1)#,32)
        .select(
            'id',
            round(rand()*1000,2).alias('amount'),
            (col('id')%10).alias('country_id'),
            (col('id')%100).alias('store_id'),
            round(rand()*100000000,0).alias('customer_id'),
            from_unixtime(lit(1701692381+col('id'))).alias('time')
        )
)

df = chp.add_standard_columns(df,'etl','mml2')

df = chp.transactions_pt(df)

(df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy(['year','month','customer_partition'])
    .save("/Volumes/ops/test1/test1_v/transactions_2")
 )


In [0]:
%sql

create table if not exists ops.test1.transactions(
  id bigint not null,
  amount double not null,
  country_id int not null,
  store_id int not null,
  customer_id int not null,
  `time` timestamp not null,
  `timestamp` timestamp not null,
  `modifiedOn` timestamp,
  `modifiedBy` string,
  `createdOn` timestamp not null,
  `createdBy` string not null,
  `isCurrent` boolean not null,
  `year` smallint not null,
  `month` tinyint not null,
  `customer_partition` smallint not null,
  primary key (id)
)
LOCATION '/Volumes/ops/test1/test1_v/transactions';


ALTER TABLE ops.test1.transactions
ADD CONSTRAINT fk_ops_test1_transactions_1_countries FOREIGN KEY (country_id) REFERENCES ops.test1.countries(id);
ALTER TABLE ops.test1.transactions
ADD CONSTRAINT fk_ops_test1_transactions_1_customers FOREIGN KEY (customer_id) REFERENCES ops.test1.customers(id);
ALTER TABLE ops.test1.transactions
ADD CONSTRAINT fk_ops_test1_transactions_1_stores FOREIGN KEY (store_id) REFERENCES ops.test1.stores(id);

In [0]:
from delta.tables import DeltaTable

target_table = DeltaTable.forName(spark, "ops.test1.transactions_2")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/transactions_2")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


target_table = DeltaTable.forName(spark, "ops.test1.transactions_3")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/transactions_3")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


target_table = DeltaTable.forName(spark, "ops.test1.transactions_4")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/transactions__4")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)



In [0]:
from delta.tables import DeltaTable

target_table = DeltaTable.forName(spark, "ops.test1.stores")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/stores")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

target_table = DeltaTable.forName(spark, "ops.test1.customers")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/customers")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

target_table = DeltaTable.forName(spark, "ops.test1.countries")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/countries")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

target_table = DeltaTable.forName(spark, "ops.test1.transactions_1")
source_df = spark.read.format("delta").load("/Volumes/ops/test1/test1_v/transactions")

(
    target_table.alias("t")
    .merge(
        source=source_df.alias("s"),
        condition="t.id = s.id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)






In [0]:
%sql
select * from ops.test1.stores