In [0]:
from datetime import datetime
import pyodbc
from collections import defaultdict
from pyspark.sql.types import *
from pyspark.sql.functions import col
from loguru import logger

In [0]:
dbutils.widgets.text("pipeline_name", "")
dbutils.widgets.text("pipeline_run_id", "")
dbutils.widgets.text("Mode", "")
dbutils.widgets.text("process_name", "")
dbutils.widgets.text("Table_Names", "")
dbutils.widgets.text("landing_path", "")
dbutils.widgets.text("curated_path", "")

In [0]:
pipeline_name = dbutils.widgets.get("pipeline_name").strip()
pipeline_run_id = dbutils.widgets.get("pipeline_run_id").strip()
mode=dbutils.widgets.get("Mode")
process_name = dbutils.widgets.get("process_name").strip()
Table_Names = dbutils.widgets.get("Table_Names").strip()
landing_path = dbutils.widgets.get("landing_path").strip("/")
curated_path = dbutils.widgets.get("curated_path").strip()

In [0]:
class PipelineLogger:
    def __init__(self, pipeline_name, pipeline_run_id):
        self.pipeline_name = pipeline_name
        self.pipeline_run_id = pipeline_run_id
 
    def _connect(self):
        server = 'etltask.database.windows.net'
        database = 'tasketldb'
        username = dbutils.secrets.get(scope='Azr-adf-scope1', key='USERNAME')
        password = dbutils.secrets.get(scope='Azr-adf-scope1', key='PASSWORD')
        return pyodbc.connect(
            f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'
        )
 
    def log_start_time(self):
        try:
            conn = self._connect()
            cursor = conn.cursor()
            start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            cursor.execute("""
                INSERT INTO meta_data.pipeline_logs (pipeline_run_id, pipeline_name, start_time)
                VALUES (?, ?, ?)
            """, (self.pipeline_run_id, self.pipeline_name, start_time))
            conn.commit()
            logger.info("successs")
        except Exception as e:
            logger.error("error")
        finally:
            cursor.close()
            conn.close()
 
    def log_end_time(self):
        try:
            conn = self._connect()
            cursor = conn.cursor()
            end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            cursor.execute("""
                UPDATE meta_data.pipeline_logs
                SET end_time = ?
                WHERE pipeline_run_id = ? AND pipeline_name = ?
            """, (end_time, self.pipeline_run_id, self.pipeline_name))
            conn.commit()
            logger.info("success")
        except Exception as e:
            logger.error("error")
        finally:
            cursor.close()
            conn.close()
    def load_and_transform_table(self,Table_Names, landing_path, column_meta_by_table):
        logger.info(f"Processing table: {Table_Names}")
        df = spark.read.parquet(landing_path)
        mappings = column_meta_by_table.get(Table_Names, [])
 
        sql_to_spark_type = {
            "int": IntegerType(),
            "string": StringType(),
            "float": FloatType(),
            "double": DoubleType(),
            "date": DateType(),
            "timestamp": TimestampType(),
            "varchar(500)": StringType()
        }
 
        for col_map in mappings:
            src = col_map["source_column_name"]
            dst = col_map["destination_column_name"]
            dtype = sql_to_spark_type.get(col_map["destination_column_data_type"], StringType())
 
            if src in df.columns:
                df = df.withColumn(src, col(src).cast(dtype))
                if src != dst:
                    df = df.withColumnRenamed(src, dst)
            else:
                logger.warning(f"Column not found in the DataFrame: {src}")
       
        df = df.dropDuplicates()
 
        return df
 
# Main method
    def run_dqm_validation(self, Table_Names, landing_path, curated_path):
        try:
            df = spark.read.format("parquet").load(landing_path)
 
            conn = self._connect()
            cursor = conn.cursor()
            cursor.execute("""
                SELECT source_table_name, source_column_name, destination_column_name, destination_column_data_type
                FROM meta_data.column_meta
            """)
            rows = cursor.fetchall()
 
            column_meta_by_table = defaultdict(list)
            for row in rows:
                column_meta_by_table[row.source_table_name].append({
                    "source_column_name": row.source_column_name,
                    "destination_column_name": row.destination_column_name,
                    "destination_column_data_type": row.destination_column_data_type.lower()
                })
 
            transformed_df = self.load_and_transform_table(Table_Names, landing_path, column_meta_by_table)
 
            output_path = f"{curated_path}/{Table_Names}"
            transformed_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(output_path)
 
            logger.info("Success")
        except Exception as e:
            logger.error(f"DQM Failed: {str(e)}")
   
    def archive_path(self, landing_path, Table_Names):
        try: 
            dst_dir = f"dbfs:/mnt/Etltask/archive/{Table_Names}"  
            logger.info(f"Moving directory {landing_path} to {dst_dir}")
 
        # Move the whole directory recursively
            dbutils.fs.mv(landing_path, dst_dir, recurse=True)
 
            logger.info(f"Successfully moved {landing_path} to {dst_dir}")
 
        except Exception as e:
            logger.error(f"Failed to move directory {Table_Names}: {str(e)}")

In [0]:
if __name__ == "__main__":
    function= PipelineLogger(pipeline_name, pipeline_run_id)
 
    try:
        if process_name == "start_time":
            function.log_start_time()
        elif process_name == "l2c":
            function.run_dqm_validation(Table_Names, landing_path, curated_path)
        elif process_name == "end_time":
            function.log_end_time()
        elif process_name == "archive":
            function.archive_path(landing_path, Table_Names)
        else:
            logger.info("Invalid mode. Use: start, dqm, end.")
    except Exception as ex:
        logger.error(f"Pipeline failed with error: {ex}")