## Commenting and documentation of all 4 successful dags
This notebook will introduce the four successful dags that are dumped already and will cover the explanation for the code of these 4 successful dags. There are two dags for dumping tables of POS database, and other two dags for merging iceberg tables.

The two dags for POS database are:
1. pos_multiple_dump 
2. pos_large_INSTALLMENT_SCHE_dump

The two dags for merging iceberg tables are:

1. ALL_POS_S3_to_Iceberg_dag
2. pos_large_installment_sche_ICEBERG


## Brief Introduction of two dags for POS database

pos_multiple_dump: This dag was used for dumping multiple small tables in a parallel manner in S3 storage. 

pos_large_INSTALLMENT_SCHE_dump: This dag was used for dumping one large table in S3 storage.

## Brief Introduction of two dags for iceberge merge

ALL_POS_S3_to_Iceberg_dag: This dag was used for merging multiple pos small tables into iceberg format in S3 storage.

pos_large_installment_sche_ICEBERG: This dag was used for merging one large table into iceberg format in S3 storage.

## Necessary packages for the code of four successful dags

We used datetime, json, os, sys, re, DAG,Variable,S3Hook, Bashoperator,Pythonoperator,Oracleoperator,OracleHook,k8s,Trigger_Rule,pandas,days_ago and open packages for dumping large table and multiple small tables and merging multiple small tables and a large table into icerberg format in S3 storage.

## Explanation for the code of pos_multiple_dump

In [None]:
Code:


default_args = {
    'owner': 'pos'
}

Since the database is pos, that's why, the owner is pos.

In [None]:
#Code:

dag = DAG(
        dag_id='pos_multiple_dump',
        start_date=days_ago(0),
        schedule_interval=None,
        catchup=False,
      )


This code was used for declaring the dag id in order to dump multiple small tables of pos database into s3 storage. The dag_id aka dag name is pos_multiple_dump. The start_date marks the start of the DAG's first data interval, not when tasks in the dag will start running. The time is set with respect to the default timezone.

In [None]:
#Code:

def get_column_data_type(original_schema_dict, column_name):
    return original_schema_dict[column_name]["DATA_TYPE"]
    
    
This function is used to get the data types for all the columns in original schema dictionary.

In [None]:
#Code:

def get_column_data_scale(original_schema_dict, column_name):
    return original_schema_dict[column_name]["DATA_SCALE"]
    
After getting the data type, we defined this function to scale the data column in original schema dictionary.

In [None]:
def remove_special_characters_from_all_column_names(df):
    print("")// This prints nothing.
    print("Removing Special Characters From All Column Names ....")// This prints the title removing special characters from all column names
    for col in df.columns:
        print(f"{col} --> {df[col].dtype}")// This is used to print the data type of all the columns in the dataframe
        df = df.rename(columns={col: re.sub("[!@#$&*^%~<>?+=]", "", col)})
    
    return df



Overall, this function was used to remove special characters from all column names in the dataframe.

In [None]:
def calculate_original_schema_dict(conn, schema_name, table_name):
    # First dictionary named original_schema_dict is defined.
    original_schema_dict = {}
    try:
        # We ran the following sql query to see the schema for the table where we can see the data type, data_scale,data_precision and columnname for         the column names in table.
        schema_sql = f"SELECT COLUMN_NAME, DATA_TYPE, DATA_SCALE, DATA_PRECISION, COLUMN_ID FROM sys.all_tab_columns where OWNER = '{schema_name.upper()}' AND TABLE_NAME = '{table_name.upper()}' ORDER BY COLUMN_ID"
        print(schema_sql)
        # Then we connect with oracle and then read a dataframe.
        ndf = pd.read_sql(schema_sql, conn)
        # Then we ran for loop 
        for idx, row in ndf.iterrows():
            print(row['COLUMN_NAME'], row['DATA_TYPE'], "DL:", row['DATA_SCALE'], " , DP:", row['DATA_PRECISION'])
            val = {
                "DATA_TYPE": row["DATA_TYPE"],
                "DATA_PRECISION": row["DATA_PRECISION"],
                "DATA_SCALE": row["DATA_SCALE"]
            }
            original_schema_dict[row["COLUMN_NAME"]] = val
    except Exception as e:
        print("[ERROR] while fetching table meta data....")
        print(str(e))
    
    return original_schema_dict
    
   
 

In this function, we defined dictionary named original_schema_dict. Also, we ran sql query to see the schema for the table where we saw the data type, data_scale,data_precision and columnname for the column in table. Then we connected with oracle and then read a dataframe. After that, we ran for loop to see the rows of the data, and then saved data type, data precision and data_scale in a seperate dictionary named val. Later, we declared val equal to column name in original schema dictionary to view the calculated original schema for the tables.

In [None]:
def convert_data_types_to_original(original_schema_dict, df):
    print("")
    print("Converting Data Types ....")
    try:
        for col in df.columns:
            data_type = get_column_data_type(original_schema_dict, col)
            if data_type == "CHAR":
                data_type = "str"
            elif data_type == "ROWID":
                data_type = "str"
            elif data_type == "VARCHAR":
                data_type = "str"
            elif data_type == "VARCHAR2":
                data_type = "str"
            elif data_type == "DATE":
                data_type = "datetime64[ns]"
            elif data_type == "LONG":
                df[col] = df[col].fillna(0)
                data_type = "int64"
            elif data_type == "FLOAT":
                df[col] = df[col].fillna(0)
                data_type = "float64"
            elif data_type == "NUMBER":
                df[col] = df[col].fillna(0)
                df[col] = pd.to_numeric(df[col])
                data_type = "float64"
            else:
                data_type = df[col].dtype
            
            if data_type == "object":
                data_type = "str"

            if data_type != df[col].dtype:
                print(f"{col} ({df[col].dtype}) --> {data_type}")
                if data_type == "datetime64[ns]":
                    df[col] = pd.to_datetime(df[col])
                else:
                    df = df.astype({col: data_type})
        
        print("")
        print("Conversion Completed")
        return df

    except Exception as e:
        print(str(e))
        print("[ERROR] Error occurred while schema converstion.")
        print(f"[ERROR] {col} ({df[col].dtype}) --> {data_type}")
        raise Exception(str(e))


This function was used to convert data types into original schema. First, we called a function named get_column_data_type to fetch the data type of the columns in dataframe which I discussed earlier. Subsequently, we declared different conditions to change the data type to original type after retrieving the data type from the function named get_column_data_type i.e whenever we got character type data,rowid, varchar and varchar2, we modified the data type into string. Moreover, we converted date type into date time format and filled with null values whenever we got long type, float type and number type values. We kept some of the data type as same as before which did not match any of our given conditions.

In [None]:
def OracleToParquetToS3(schema_name, table_name, target_bucket, file_key):


        SQL= f"SELECT * FROM {schema_name}.{table_name}"
        oracle_conn = OracleHook(oracle_conn_id='con-ora-pos').get_conn()

        print("")
        print("Executing SELECT Query...")
        df = pd.read_sql(SQL, oracle_conn)

        # Calculate Original Schema Dictionary
        print("")
        print("Fetching Table Meta Data...")
        original_schema_dict = calculate_original_schema_dict(oracle_conn, schema_name, table_name)
        if len(original_schema_dict.keys()) == 0:
            raise Exception("Could not calculate original schema")
        
        oracle_conn.close()  

        # Renaming columns without special characters
        df = remove_special_characters_from_all_column_names(df)

        # Convert Column Data Types To Original
        df = convert_data_types_to_original(original_schema_dict, df)

        s3 = S3Hook(aws_conn_id='con-s3')
        # Dump as parquet directly to S3:
        with open(f"s3://{target_bucket}/{file_key}", 'wb', transport_params={'client': s3.get_conn()}) as out_file:
            df.to_parquet(out_file, engine='pyarrow', index=False)
            

In this function, we converted oracle database to paraquet format and then transferred data to S3 storage. In order to do so, we first read the sql query and then used oracle hook to connect with oracle, and then fetched table meta data by calculating original schema which I already explained before. If the length of the original schema was zero, we raised an exception stating that we could not calculate original schema. Otherwise, we will rename columns without special characters, and then converted data types to original followed by dumping data as parquet directly to S3.

In [None]:
# Step 3 - Declare dummy start and stop tasks

start_task = DummyOperator(task_id='start', dag=dag)

start_dumping = DummyOperator(task_id='start_dumping', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

checkpoint_count = 1


def GetCheckPointTask(name):
    return DummyOperator(task_id=name, dag=dag)


# Step 4 - Read the list of elements from the airflow variable
global_var = Variable.get("pos_multiple_dump_var", deserialize_json=True)
table_names = global_var['table_names']
s3_dump_base_path = global_var['s3_dump_base_path']
parallel_task_count = global_var['parallel_task_count']

dump_tasks = []
i = 0

start_task >> start_dumping

parent_task = start_dumping

for val in table_names:
    
    val = val.lower()
    res = val.split(".")
    schema_name = res[0]
    table = res[1]

    task = PythonOperator(
        task_id=f"dump__{schema_name}.{table}",
        #trigger_rule=TriggerRule.ALL_DONE,
        python_callable=OracleToParquetToS3, 
        op_kwargs={
              "schema_name": schema_name,
              "table_name": table,
              "target_bucket": "bigdata-dev-cmfcknil", #Had to make a change here
              "file_key":f"{s3_dump_base_path}/{schema_name}/{table}.parquet"
        },
        executor_config={
            "pod_template_file": os.path.join(AIRFLOW_HOME, "kubernetes/pod_templates/default_template_2.yaml"),
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    #node_selector={
                     #   "node-group": "master"
                    #},
                    containers=[
                        k8s.V1Container(
                            name="base",
                        ),
                    ],
                )
            ),
        },
    )

    i = i + 1
    dump_tasks.append(task)



# SEQUENTIAL TASKS
if len(dump_tasks) > 0:
    j = 0
    for task in dump_tasks:
        if j < parallel_task_count:
            parent_task.set_downstream(dump_tasks[j])
        else:
            dump_tasks[j-parallel_task_count].set_downstream(dump_tasks[j]) 

        j = j + 1

    x = parallel_task_count
    while x > 0:
        dump_tasks[j-parallel_task_count].set_downstream(end_task)
        x = x - 1
        j = j + 1
    
    #merge_tasks[j-1].set_downstream(end_task)
else:
    parent_task >> end_task

This code clearly explains the mechanism of our work. First, we used dummy operators for initializing tasks and dumping databases. We also used dummy operators for ending the task. Then we defined check point task, and after that, we read the list of elements from variables where we set dependencies between starting_task and start_dumping and mentioned the dump path. Later, for every values in tables, we splitted full stop and then converted the database from oracle to parquet and then stored into S3 where I appended the task into list named dump_tasks. If the length of dump tasks was greater than zero, then we would iterate a dump task list and then check whether parallel task count would be greater than zero or not. If it were greater, then we would set parent-task into dump_tasks, otherwise we would set dump_tasks j - parallel task count to dump task. If the length of dump tasks was less than zero, then we would set dependencies between parent task and end task.  

## Explanation for the code of pos_multiple_dump

In [None]:
#This part is from new code (begin)


dag = DAG(
        dag_id='pos_large_INSTALLMENT_SCHE_dump',
        start_date=datetime.datetime(2021, 12, 1),
        default_args=default_args,
        schedule_interval="@once",
        catchup=False,
      )


This code was used for declaring the dag id in order to dump multiple small tables of pos database into s3 storage. The dag_id aka dag name is pos_large_INSTALLMENT_SCHE_dump. The start_date marks the start of the DAG's first data interval, not when tasks in the dag will start running. The time is set with respect to the default timezone.

The rest of the code was similar to the code written for pos multiple dump till step 3.

In [None]:


# Step 3 - Declare dummy start and stop tasks

start_task = DummyOperator(task_id='start', dag=dag)

start_dumping = DummyOperator(task_id='start_dumping', dag=dag)

end_task = DummyOperator(task_id='end', dag=dag)


wait_to_dag_gets_updated = BashOperator(
    task_id="wait_to_dag_gets_updated",
    dag=dag,
    bash_command="sleep 3m",
    executor_config={
        "pod_template_file": os.path.join(AIRFLOW_HOME, "kubernetes/pod_templates/default_template_2.yaml"),
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                #node_selector={
                 #       "node-group": "master"
                  #  },
                containers=[
                    k8s.V1Container(
                        name="base",
                        image_pull_policy="Always",
                    ),
                ],
            )
        ),
    },
)

First, we used dummyoperator to initialize the starting task and dump the tables. We also used dummyoperator to initialize the end task. Later on, we used bash operator in order to wait for the dag to get updated as we aimed to dump large tables in chunks.


In [None]:
# Step 4 - Read the list of elements from the airflow variable
variable_name="pos_large_INSTALLMENT_SCHE_dump_var"
global_var = Variable.get(variable_name, deserialize_json=True)
row_count = global_var['ROW_COUNT']['row_count'] #Made chanfe
target_bucket_name = global_var['target_s3_bucket_name']
s3_dump_base_path = global_var['s3_dump_base_path']
dump_tasks = []
total_fetched = 0
chunk_size = 1000000    # 1 million

val = global_var['table_name']
#val = val.lower()
res = val.split(".")
schema_name = res[0]
table = res[1]
#schema_name="POS" #According to this schema_name, a folder will get created
#table="ACC_LEDGER"

def OracleToParquetToS3(schema_name, table_name, target_bucket, file_key, index, offset):
    SQL= 'SELECT * FROM ' + schema_name+"."+table_name + f" OFFSET {offset} ROWS FETCH NEXT {chunk_size} ROWS ONLY"
    oracle_conn = OracleHook(oracle_conn_id='con-ora-pos').get_conn()

    print("")
    print("Executing SELECT Query...")
    df = pd.read_sql(SQL, oracle_conn)

    ## Calculate Original Schema Dictionary
    print("")
    print("Fetching Table Meta Data...")
    original_schema_dict = calculate_original_schema_dict(oracle_conn, schema_name, table_name)
    if len(original_schema_dict.keys()) == 0:
        raise Exception("Could not calculate original schema")

    oracle_conn.close()  

    ## Renaming columns without special characters
    df = remove_special_characters_from_all_column_names(df)

    ## Convert Column Data Types To Original
    df = convert_data_types_to_original(original_schema_dict, df)
    
    s3 = S3Hook(aws_conn_id='con-s3')
    ## Dump as parquet directly to S3:
    with open(f"s3://{target_bucket}/{file_key}__{index}.parquet", 'wb', transport_params={'client': s3.get_conn()}) as out_file:
        df.to_parquet(out_file, engine='pyarrow', index=False)

i = 1
while total_fetched < row_count:
    task = PythonOperator(
        task_id=f"dump__chunk_{i}",
        python_callable=OracleToParquetToS3, 
        op_kwargs={
              "schema_name": schema_name,
              "table_name": table,
              "target_bucket": target_bucket_name,
              "file_key":f"{s3_dump_base_path}/{schema_name}/{table}",
              "index": i,
              "offset": total_fetched
        },
        executor_config={
            "pod_template_file": os.path.join(AIRFLOW_HOME, "kubernetes/pod_templates/default_template_2.yaml"),
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                   # node_selector={
                    #    "node-group": "master"
                   # },
                    containers=[
                        k8s.V1Container(
                            name="base",
                        ),
                    ],
                )
            ),
        },
    )

    dump_tasks.append(task)
    i = i + 1
    total_fetched = total_fetched + chunk_size


# SEQUENTIAL TASKS
if len(dump_tasks) > 0:
    j = 0
    for task in dump_tasks:
        if j == 0:
            start_dumping.set_downstream(dump_tasks[j])
        else:
            dump_tasks[j-1].set_downstream(dump_tasks[j]) 

        j = j + 1

    dump_tasks[j-1].set_downstream(end_task)
else:
    start_dumping >> end_task



def find_row_count_function():
    count = 0
    try:
        count_query= 'SELECT COUNT(*) FROM ' + val #Made Change
        oracle_hook = OracleHook(oracle_conn_id='con-ora-pos')
        value = oracle_hook.get_first(sql = count_query)

        count = value[0]
    except Exception as e:
        print(str(e))
        count = 0
    
    print(f"Total Row Count: {count}")
    config = Variable.get(variable_name, deserialize_json=True) # Needto change variable name
    config['ROW_COUNT']['row_count'] = count  #Made CHange
    Variable.set(variable_name, json.dumps(config)) # Needto change variable name



find_row_count = PythonOperator(
    task_id="find_table_row_count",
    python_callable=find_row_count_function,
    dag=dag,
    executor_config={
        "pod_template_file": os.path.join(AIRFLOW_HOME, "kubernetes/pod_templates/default_template_2.yaml"),
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                   #node_selector={
                    #    "node-group": "master"
                    #},
                containers=[
                    k8s.V1Container(
                        name="base",
                        image_pull_policy="Always",
                    ),
                ],
            )
        ),
    },
)


start_task >> find_row_count >> wait_to_dag_gets_updated >> start_dumping



We read the list of elements from variables. Later, for every value in tables, we splitted full stop and then converted the database from oracle to parquet and then stored into S3 where I appended the task into list named dump_tasks and then measured the chunk size of thee tables which were dumped. If the length of dump tasks were greater than zero, then we would iterate a dump task list and then check whether it would be equal to zero or not. If it were zero, then we would set start_dumping into dump task, otherwise we would set downward dependencies between dump_tasks j - parallel task count to dump_task. After iterating a dump task list, we set downward dependencies between dump_task(j-1) to end_task, and then set dependencies between start_dumping and end_task. Then, we used row count function to count the number of rows for each table stored in S3 and then waited for dag to get updated. Finally, we set dependencies between start-task, find-row_count, wait_to_dag_gets_updated and start_dumping.


## Explanation for the code of All_pos_s3_to_iceberg

In [None]:
dag = DAG(
        dag_id='ALL_POS_S3_to_Iceberg_dag',
        start_date=datetime.datetime(2021, 12, 1),
        schedule_interval="@once",
        catchup=False,
      )


This code was used for declaring the dag id in order to merge multiple small tables in S3 storage to iceberg format. The dag_id aka dag name is ALL_POS_INSTALLMENT_SCHE_dump. The start_date marks the start of the DAG's first data interval, not when tasks in the dag will start running. The time is set with respect to the default timezone.

In [None]:
# Step 3 - Declare dummy start and stop tasks
start_task = DummyOperator(task_id='start', dag=dag)
start_merge_job = DummyOperator(task_id='start_merge_job', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

We used dummy operator for initializing a task. We also used dummy operator in order to commence merging a task and ending a task

In [None]:
checkpoint_count = 1

We have initialized check point equal to 1.

In [None]:

def GenerateRandomString():
    return ''.join(random.choices(string.ascii_lowercase, k=16))

This function randomly generates a string and converts the string into lowercase.

In [None]:
def GetCheckPointTask(name):
    return DummyOperator(task_id=name, dag=dag)


This function is used to get check points

In [None]:
# Step 4 - Read the list of elements from the airflow variable
global_var = Variable.get("all_pos_s3_to_iceberg", deserialize_json=True)
inputs = global_var['inputs']
parallel_task_count = global_var['parallel_task_count']
min_executors = global_var['min_executors']
max_executors = global_var['max_executors']
executor_min_cpu = global_var['executor_min_cpu']
executor_max_cpu = global_var['executor_max_cpu']
executor_memory = global_var['executor_memory']
s3_datasource_base_path = global_var['s3_datasource_base_path']

merge_tasks = []
i = 0

start_task >> start_merge_job

parent_task = start_merge_job

for input in inputs:
    #if i == 0:
        #merge_tasks_group[dump_task_group_index] = []
    
    #val = val.lower()
    #res = val.split(".")
    #schema_name = res[0]
    #table = res[1]

    conf = {}
    conf["min_executors"] = min_executors
    conf["max_executors"] = max_executors
    conf["executor_min_cpu"] = executor_min_cpu
    conf["executor_max_cpu"] = executor_max_cpu
    conf["executor_memory"] = executor_memory
    conf["s3_datasource_path"] = f"{s3_datasource_base_path}/{input['src']['db'].lower()}/{input['src']['schema'].lower()}/{input['src']['table'].lower()}.parquet"
    conf["target_database"] = input['dst']['db'].lower()
    conf["target_schema"] = input['dst']['schema'].lower()
    conf["target_table"] = input['dst']['table'].lower()
    conf["table_keys"] = input['table_keys'].lower()
    conf["update_date_column"] = input['update_date_column'].lower() if 'update_date_column' in input else ''
    conf["partition_column"] = input['partition_column'].lower() if 'partition_column' in input else ''
    conf["partition_column_transformation"] = input['partition_column_transformation'].lower() if 'partition_column_transformation' in input else ''


    task = TriggerDagRunOperator(
        task_id=f"merge__{input['src']['db'].lower()}.{input['src']['schema'].lower()}.{input['src']['table'].lower()}",
        trigger_dag_id="spark_submit_merge_parquet_to_iceberg",
        conf=conf,
        dag=dag,
        wait_for_completion=True,
        executor_config={
            "pod_template_file": os.path.join(AIRFLOW_HOME, "kubernetes/pod_templates/default_template_2.yaml"),
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                        ),
                    ],
                )
            ),
        },
    )

    merge_tasks.append(task)
 

This code covers the full mechanism of merging small tables into iceberg format in S3 storage. In this step, we read the list of elements from airflow variable. First, we retrieved the variables from iceberg table where we declared minimum executors, maximum_executors, minimum cpu of executors, maximum cpu of executors, executor memory and s3 datasource base path. Subsequently, we declared merge task dictionary, and created dependencies between start-task and start_merge job. We then made start_merge job equal to parent task for performing sequntial task. Then for each table, we passed minimum executors, maximum executors, minimum cpu of executors, maximum cpu of executors, executor mememory and s3 datasource base path in configuration dictionary to perform configuration for the table. We also converted the destination path, table name and table schema into lowercase letters, and then we converted table keys and updated date column into lower case letter. Later, we partitioned the column and then converted the column name into lowercase letters. In order to merge the tables, we then used trigger dag run operator to convert the tables from parquet format to iceberg. After the conversion, we then appended the task with list. 

In [None]:
# SEQUENTIAL TASKS
if len(merge_tasks) > 0:
    j = 0
    for task in merge_tasks:
        if j < parallel_task_count:
            parent_task.set_downstream(merge_tasks[j])
        else:
            merge_tasks[j-parallel_task_count].set_downstream(merge_tasks[j]) 

        j = j + 1

    x = parallel_task_count
    while x > 0:
        merge_tasks[j-parallel_task_count].set_downstream(end_task)
        x = x - 1
        j = j + 1
    
    #merge_tasks[j-1].set_downstream(end_task)
else:
    parent_task >> end_task

Then we checked whether the length of merge task would be greater than zero. If it were greater than zero, we would iterate a merge task list and then check whether parallel task count would be greater than j or not. j is like a count function. If parallel task count were greater than j, then we would set downnward dependencies between parent_task and merge_tasks, otherwise,  we would set downward dependencies between merge_tasks[j-parallel_task_count] with merge tasks. After completing iterating merge tasks list, we initialized parallel_task_count with x and then decremented x till 1. Inside the loop, we set downward dependencies between merge_tasks[j-parallel_task_count] with end task. If the length of merge tasks were less than zero, we would set dependencies between parent_task and end_task. 

## Explanation for the code of pos_large_pos_installment_sche_iceberg

In [None]:
dag = DAG(
        dag_id='pos_large_installment_sche_ICEBERG', #NEED TO CHANGE HERE
        start_date=datetime.datetime(2021, 12, 1),
        schedule_interval="@once",
        default_args = default_args,
        catchup=False,
      )

This code was used for declaring the dag id in order to merge a large table in S3 storage to iceberg format. The dag_id aka dag name is pos_large_pos_installment_sche_iceberg. The start_date marks the start of the DAG's first data interval, not when tasks in the dag will start running. The time is set with respect to the default timezone.

In [None]:
# Step 3 - Declare dummy start and stop tasks
start_task = DummyOperator(task_id='start', dag=dag)
start_insert_job = DummyOperator(task_id='start_insert_job', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

We used dummy operator for initializing a task. We also used dummy operator in order to commence insertion and ending a task.

In [None]:
# Step 4 - Read the list of elements from the airflow variable

parallel_task_count = 1
min_executors = "2"
max_executors = "6"
executor_min_cpu = "1"
executor_max_cpu = "2"
executor_memory = "12G"
s3_datasource_base_path = "bigdata-dev-cmfcknil/raw/idl/posdb/POS_INSTALLMENT_SCHE/POS"# AND HERE (Object storage location of the file)
part_start_index = 1
part_end_index = 37 # NEED TO CHANGE THIS ACCORDING TO NUMBER OF CHUNKS

tasks = []

i = 0
j = part_start_index

start_task >> start_insert_job

parent_task = start_insert_job

while j <= part_end_index:
    
    #val = val.lower()
    #res = val.split(".")
    #schema_name = res[0]
    #table = res[1]

    conf = {}
    conf["min_executors"] = min_executors
    conf["max_executors"] = max_executors
    conf["executor_min_cpu"] = executor_min_cpu
    conf["executor_max_cpu"] = executor_max_cpu
    conf["executor_memory"] = executor_memory
    conf["s3_datasource_path"] = f"{s3_datasource_base_path}/POS_INSTALLMENT_SCHE__{j}.parquet" #ALSO HERE, Take the table name from S3 object storage
    conf["target_database"] = "pos" # Creates this folder under warehouse if not exists
    conf["target_schema"] = "pos" # Appends this schema name with the tablename
    conf["target_table"] = "pos_installment_sche" #AND HERE, DESTINATION (keep the name in small letter)
    conf["table_keys"] = "ID"
    conf["update_date_column"] = "MODIFIED" #Taken from control tableb
    conf["partition_column"] = "INSTALLMENT_DUE_DATE" #Taken from control table
    conf["partition_column_transformation"] = ""


    task = TriggerDagRunOperator(
        task_id=f"merge__part_{j}",
        trigger_dag_id="spark_submit_merge_parquet_to_iceberg",
        conf=conf,
        dag=dag,
        wait_for_completion=True,
        executor_config={
            "pod_template_file": os.path.join(AIRFLOW_HOME, "kubernetes/pod_templates/default_template_2.yaml"),
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources = k8s.V1ResourceRequirements(
                                requests= {
                                    "cpu": "200m",
                                    "memory": "1G"
                                },
                                limits = {
                                    "cpu": "1",
                                    "memory": "1G"
                                }
                            ),
                        ),
                    ],
                )
            ),
        },
    )

    i = i + 1
    j = j + 1
    tasks.append(task)



First, we declared default parallel task count. Then, we declared minimum_executors,maximum_executors,executor_minimum_cpu,executor_maximum_cpu,executor_memory,S3_database_cource_path,part_start_index,part_end_index for merging parquet file into iceberg format. We then iterated each table from starting index to end index to set the declared values in order to configure our tables, and then, we used trigger dag operaator to convert parquet tables into iceberg format. We then appended the task with list array named tasks.

In [None]:
# SEQUENTIAL TASKS
if len(tasks) > 0:
    j = 0
    for task in tasks:
        if j < parallel_task_count:
            parent_task.set_downstream(tasks[j])
        else:
            tasks[j-parallel_task_count].set_downstream(tasks[j]) 

        j = j + 1

    x = parallel_task_count
    while x > 0:
        tasks[j-parallel_task_count].set_downstream(end_task)
        x = x - 1
        j = j + 1
    
    #convert_tasks[j-1].set_downstream(end_task)
else:
    parent_task >> end_task

Then we would check whether the length of the tasks would be greater than zero or not. If it were greater than zero, then we would iterate the list and check whether parallel task count would be greater than zero. If it were greater than zero, then we would set downward dependencies between parent task and tasks, otherwise, we would set downward dependencies between tasks j- parallel_task_count and tasks. Then, we initialized x to parallel_task_count. If x were greater than 0, then we would set downward dependencies between tasks[j-parallel_task_count] with end_task. If the length of the tasks were less than zero, then we would set dependencies between parent_task and end_task.