In [0]:

access_key = "AKIA4WQGEM7FTTTRW3QW"
secret_key = "cz2pKF+vulpQag3t6EcbJlXDQAwv3bj1BSjxQ9Zz"
 
# If your bucket is in a specific region (e.g., ap-south-1), set it here:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-north-1.amazonaws.com")

In [0]:
# COMMAND ----------
# Import necessary libraries
from pyspark.sql.functions import current_timestamp, lit, col, to_date, coalesce, when
from pyspark.sql.types import *
import re
import random
from datetime import datetime

def ingest_sequentially_s3(table_name, source_bucket, source_folder, archive_folder, file_format, access_key, secret_key):
    """
    Ingests files sequentially from S3.
    Features:
    1. STRICT SCHEMA ALIGNMENT: Prevents column swapping (Phone <-> Gender).
    2. SMART DATE PARSING: Prevents NULL dates by handling MM/dd/yyyy vs yyyy-MM-dd.
    """
    
    # 1. Configure S3 Access
    spark.conf.set("fs.s3a.access.key", access_key)
    spark.conf.set("fs.s3a.secret.key", secret_key)
    spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
    
    # Allow legacy parser for loose date formats
    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
    
    source_path = f"s3a://{source_bucket}/{source_folder}/"
    archive_path = f"s3a://{source_bucket}/{archive_folder}/"
    
    print(f"Checking for files in: {source_path}")

    # 2. Define Prefix Map
    table_prefix_map = {
        "Employee": "emp",
        "Department": "dept",
        "Project": "project",
        "Training": "training",
        "Payroll": "payroll"
    }
    file_prefix = table_prefix_map.get(table_name, table_name.lower()[:3])
    print(f"Looking for files starting with: '{file_prefix}'")

    # 3. List and Filter Files
    try:
        files = dbutils.fs.ls(source_path)
    except Exception as e:
        print(f"Error accessing S3 path. Check keys and bucket name.\nError: {e}")
        return

    data_files = []
    date_pattern = re.compile(r"(\d{8})") 

    for fi in files:
        # Filter by extension and prefix
        if fi.name.lower().endswith(f".{file_format}") and fi.name.lower().startswith(file_prefix):
            match = date_pattern.search(fi.name)
            if match:
                date_str = match.group(1) 
                try:
                    sort_key = datetime.strptime(date_str, "%m%d%Y").strftime("%Y%m%d")
                    data_files.append({"path": fi.path, "name": fi.name, "sort_key": sort_key})
                except ValueError:
                    print(f"Skipping file with invalid date in filename: {fi.name}")

    sorted_files = sorted(data_files, key=lambda x: x['sort_key'])

    if not sorted_files:
        print(f"No new '{file_prefix}' files found to ingest.")
        return

    # 4. Process Files
    for file_info in sorted_files:
        print(f"--------------------------------------------------")
        print(f"Processing file: {file_info['name']}")
        
        try:
            # A. Generate Metadata
            now = datetime.now()
            load_key = f"LCK{now.strftime('%Y%m%d')}{now.strftime('%H%M%S')}{random.randint(1000, 9999)}"

            # B. Read Data
            df = spark.read.format(file_format) \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .load(file_info['path'])
            
            # C. Add Metadata Columns
            df = df.withColumn("ingestTimestamp", current_timestamp()) \
                   .withColumn("loadKey", lit(load_key))

            # D. STRICT SCHEMA ALIGNMENT & SMART DATE CASTING
            target_table_name = f"rawz.{table_name}"
            target_schema = spark.table(target_table_name).schema
            
            # Map lowercase CSV columns to actual names
            df_cols_map = {c.lower(): c for c in df.columns}
            
            final_select_cols = []
            
            for field in target_schema:
                target_col_name = field.name
                target_col_lower = target_col_name.lower()
                target_dtype = field.dataType
                
                if target_col_lower in df_cols_map:
                    source_col_name = df_cols_map[target_col_lower]
                    source_col = col(source_col_name)
                    
                    # Logic: If Target is Date, try smart parsing. Else, standard cast.
                    if isinstance(target_dtype, DateType):
                        # Try MM/dd/yyyy first, then standard cast
                        col_expr = coalesce(
                            to_date(source_col, "MM/dd/yyyy"),
                            to_date(source_col, "M/d/yyyy"),
                            to_date(source_col, "yyyy-MM-dd"),
                            source_col.cast(DateType()) 
                        ).alias(target_col_name)
                    else:
                        col_expr = source_col.cast(target_dtype).alias(target_col_name)
                        
                    final_select_cols.append(col_expr)
                else:
                    # Column missing in CSV? Insert NULL.
                    final_select_cols.append(lit(None).cast(target_dtype).alias(target_col_name))
            
            # Reorder and Cast
            df_ordered = df.select(*final_select_cols)

            # E. Write to Table
            df_ordered.write.mode("append").insertInto(target_table_name)
            
            row_count = df_ordered.count()
            print(f"Successfully inserted {row_count} rows into {target_table_name}")

            # F. Audit Log
            audit_schema = StructType([
                StructField("file_name", StringType(), True),
                StructField("timestamp", TimestampType(), True),
                StructField("load_key", StringType(), True),
                StructField("completion_status", StringType(), True),
                StructField("file_path", StringType(), True),
                StructField("rows_processed", IntegerType(), True)
            ])
            
            audit_df = spark.createDataFrame([
                (file_info['name'], datetime.now(), load_key, "Success", file_info['path'], row_count)
            ], schema=audit_schema)
            
            audit_df.write.format("delta").mode("append").saveAsTable("rawz.audit_log")
            print("Audit log updated.")

            # G. Move to Archive
            destination_path = archive_path + file_info['name']
            dbutils.fs.mv(file_info['path'], destination_path)
            print(f"Moved file to Archive: {destination_path}")

        except Exception as e:
            print(f"FAILED to process file {file_info['name']}: {str(e)}")
            print("Stopping sequential ingestion due to error.")
            break

In [0]:
# # COMMAND ----------
# # Import necessary libraries
# from pyspark.sql.functions import current_timestamp, lit, col
# from pyspark.sql.types import *
# import re
# import random
# from datetime import datetime

# def ingest_sequentially_s3(table_name, source_bucket, source_folder, archive_folder, file_format, access_key, secret_key):
#     """
#     Ingests files sequentially from S3. 
#     CRITICAL FIX: Rearranges CSV columns to match Target Table schema exactly to prevent column swapping.
#     """
    
#     # 1. Configure S3 Access
#     spark.conf.set("fs.s3a.access.key", access_key)
#     spark.conf.set("fs.s3a.secret.key", secret_key)
#     spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
    
#     source_path = f"s3a://{source_bucket}/{source_folder}/"
#     archive_path = f"s3a://{source_bucket}/{archive_folder}/"
    
#     print(f"Checking for files in: {source_path}")

#     # 2. Define Prefix Map (Ensures we only pick the right files)
#     table_prefix_map = {
#         "Employee": "emp",
#         "Department": "dept",
#         "Project": "project",
#         "Training": "training",
#         "Payroll": "payroll"
#     }
#     file_prefix = table_prefix_map.get(table_name, table_name.lower()[:3])
#     print(f"Looking for files starting with: '{file_prefix}'")

#     # 3. List and Filter Files
#     try:
#         files = dbutils.fs.ls(source_path)
#     except Exception as e:
#         print(f"Error accessing S3 path. Check keys and bucket name.\nError: {e}")
#         return

#     data_files = []
#     date_pattern = re.compile(r"(\d{8})") 

#     for fi in files:
#         if fi.name.lower().endswith(f".{file_format}") and fi.name.lower().startswith(file_prefix):
#             match = date_pattern.search(fi.name)
#             if match:
#                 date_str = match.group(1) 
#                 try:
#                     sort_key = datetime.strptime(date_str, "%m%d%Y").strftime("%Y%m%d")
#                     data_files.append({"path": fi.path, "name": fi.name, "sort_key": sort_key})
#                 except ValueError:
#                     print(f"Skipping file with invalid date: {fi.name}")

#     sorted_files = sorted(data_files, key=lambda x: x['sort_key'])

#     if not sorted_files:
#         print(f"No new '{file_prefix}' files found to ingest.")
#         return

#     # 4. Process Files
#     for file_info in sorted_files:
#         print(f"--------------------------------------------------")
#         print(f"Processing file: {file_info['name']}")
        
#         try:
#             # A. Generate Metadata
#             now = datetime.now()
#             load_key = f"LCK{now.strftime('%Y%m%d')}{now.strftime('%H%M%S')}{random.randint(1000, 9999)}"

#             # B. Read Data
#             df = spark.read.format(file_format) \
#                 .option("header", "true") \
#                 .option("inferSchema", "true") \
#                 .load(file_info['path'])
            
#             # C. Add Metadata Columns
#             df = df.withColumn("ingestTimestamp", current_timestamp()) \
#                    .withColumn("loadKey", lit(load_key))

#             # D. CRITICAL FIX: STRICT SCHEMA ALIGNMENT
#             # Get the Target Table Schema (Correct Order)
#             target_table_name = f"rawz.{table_name}"
#             target_schema = spark.table(target_table_name).schema
            
#             # Create a Case-Insensitive Map of current DF columns
#             # e.g. {'emp_id': 'Emp_ID', 'ph_num': 'Ph_Num'}
#             df_cols_map = {c.lower(): c for c in df.columns}
            
#             final_select_cols = []
            
#             # Iterate through Target Table columns in ORDER
#             for field in target_schema:
#                 target_col_name = field.name
#                 target_col_lower = target_col_name.lower()
                
#                 # Check if this target column exists in our Source DF
#                 if target_col_lower in df_cols_map:
#                     # Found it! Select the source column, cast it to target type, and rename it correctly
#                     source_col_name = df_cols_map[target_col_lower]
#                     final_select_cols.append(col(source_col_name).cast(field.dataType).alias(target_col_name))
#                 else:
#                     # Column missing in Source (e.g. data missing in CSV). Insert NULL safely.
#                     # This prevents crashes if a column is missing, but keeps order intact.
#                     print(f"Warning: Column '{target_col_name}' missing in CSV. Inserting NULL.")
#                     final_select_cols.append(lit(None).cast(field.dataType).alias(target_col_name))
            
#             # Reorder the DataFrame to match Target Table exactly
#             df_ordered = df.select(*final_select_cols)

#             # E. Write to Table (Now safe to use insertInto)
#             df_ordered.write.mode("append").insertInto(target_table_name)
            
#             row_count = df_ordered.count()
#             print(f"Successfully inserted {row_count} rows into {target_table_name}")

#             # F. Update Audit Log
#             audit_schema = StructType([
#                 StructField("file_name", StringType(), True),
#                 StructField("timestamp", TimestampType(), True),
#                 StructField("load_key", StringType(), True),
#                 StructField("completion_status", StringType(), True),
#                 StructField("file_path", StringType(), True),
#                 StructField("rows_processed", IntegerType(), True)
#             ])
            
#             audit_df = spark.createDataFrame([
#                 (file_info['name'], datetime.now(), load_key, "Success", file_info['path'], row_count)
#             ], schema=audit_schema)
            
#             audit_df.write.format("delta").mode("append").saveAsTable("rawz.audit_log")
#             print("Audit log updated.")

#             # G. Move to Archive
#             destination_path = archive_path + file_info['name']
#             dbutils.fs.mv(file_info['path'], destination_path)
#             print(f"Moved file to Archive: {destination_path}")

#         except Exception as e:
#             print(f"FAILED to process file {file_info['name']}: {str(e)}")
#             print("Stopping sequential ingestion due to error.")
#             break

In [0]:
# import re
# import random
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import current_timestamp, lit, col
# from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
# from datetime import datetime

# import boto3
# from botocore.exceptions import ClientError
# import re
# from typing import List, Dict

# # --------------------------
# # S3 client factory
# # --------------------------
# def get_s3_client(access_key: str, secret_key: str, region: str = None):
#     """Return a boto3 S3 client using provided credentials."""
#     params = {"aws_access_key_id": access_key, "aws_secret_access_key": secret_key}
#     if region:
#         params["region_name"] = region
#     return boto3.client("s3", **params)

# # --------------------------
# # list objects under prefix
# # --------------------------
# def list_s3_objects(client, bucket: str, prefix: str) -> List[Dict]:
#     """Return list of S3 object dicts under the prefix. Returns [] if no objects."""
#     objs = []
#     paginator = client.get_paginator("list_objects_v2")
#     for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
#         for o in page.get("Contents", []):
#             objs.append(o)
#     return objs

# # --------------------------
# # filter by table prefix (emp*.csv)
# # --------------------------
# def filter_table_objects(objects: List[Dict], table_prefix: str) -> List[Dict]:
#     out = []
#     for o in objects:
#         key = o["Key"]
#         name = key.rstrip("/").split("/")[-1]
#         if name.lower().startswith(table_prefix.lower()) and name.lower().endswith(".csv"):
#             out.append(o)
#     return out

# # --------------------------
# # extract digits after prefix and sort (earliest first)
# # --------------------------
# def _extract_digits_from_name(name: str, table_prefix: str) -> int:
#     m = re.search(rf"{re.escape(table_prefix)}(\d+)", name, flags=re.IGNORECASE)
#     return int(m.group(1)) if m else 10**18

# def sort_objects_by_filename_date(objs: List[Dict], table_prefix: str) -> List[Dict]:
#     return sorted(objs, key=lambda o: _extract_digits_from_name(o["Key"].split("/")[-1], table_prefix))

# # --------------------------
# # download S3 object to DBFS via /dbfs/ mapping
# # --------------------------
# def download_s3_object_to_dbfs(client, bucket: str, key: str, dbfs_path: str) -> str:
#     """
#     Download S3 object to DBFS path.
#     - dbfs_path: example "/mini_project/archive/emp05132025.csv"
#     Returns: "dbfs:/mini_project/archive/emp05132025.csv"
#     """
#     # Local path on driver mapped to DBFS
#     if dbfs_path.startswith("/"):
#         local_path = "/dbfs" + dbfs_path
#     else:
#         local_path = "/dbfs/" + dbfs_path

#     # Ensure parent dir exists in DBFS
#     parent = "/".join(local_path.split("/")[:-1])
#     try:
#         dbutils.fs.mkdirs(parent.replace("/dbfs",""))
#     except Exception:
#         pass

#     # If file already exists, skip download
#     try:
#         with open(local_path, "rb"):
#             # file exists on driver -> return dbfs path
#             return "dbfs:" + dbfs_path
#     except FileNotFoundError:
#         pass

#     # Stream download from S3 to local path
#     try:
#         with open(local_path, "wb") as f:
#             client.download_fileobj(Bucket=bucket, Key=key, Fileobj=f)
#     except ClientError as e:
#         raise RuntimeError(f"Failed to download s3://{bucket}/{key}: {e}")

#     return "dbfs:" + dbfs_path

# # --------------------------
# # helper: safe display list of archive
# # --------------------------
# def list_dbfs_path(path: str):
#     try:
#         return dbutils.fs.ls(path)
#     except Exception:
#         return []


In [0]:
# # Ingestion_Util: S3 helpers (minimal, only for source discovery + archive move)
# import re
# from pyspark.sql import DataFrame

# def configure_s3_keys(access_key: str, secret_key: str, region: str = "s3.amazonaws.com"):
#     """
#     Configure Spark/Hadoop to use AWS keys on a standard (non-serverless) cluster.
#     DO NOT commit keys to repo. Supply keys via notebook variables or Databricks secrets.
#     """
#     # standard cluster: direct JVM config is allowed
#     spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
#     spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
#     spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", region)
#     # recommended defaults
#     spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
#     spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")


# def list_s3_files(bucket: str, prefix: str = "") -> list:
#     """
#     Return dbutils-style FileInfo list for objects in s3a://<bucket>/<prefix>
#     Example: list_s3_files("acen-employee-csv", "landing/emp/")
#     """
#     path = f"s3a://{bucket}/{prefix}".rstrip("/") + "/"
#     return dbutils.fs.ls(path)


# def filter_table_files(fileinfo_list: list, table_prefix: str) -> list:
#     """
#     Keep only files that belong to the given table. 
#     Example: table_prefix='emp' keeps emp05132025.csv
#     """
#     filtered = [f for f in fileinfo_list if f.name.lower().startswith(table_prefix.lower()) and f.name.lower().endswith(".csv")]
#     return filtered


# def sort_by_filename_date(fileinfo_list: list, table_prefix: str):
#     """
#     Sort by the date part after the table_prefix: empMMDDYYYY or empYYYYMMDD depending on pattern.
#     This implementation looks for any contiguous digits after prefix and sorts by int(digits).
#     """
#     def extract_digits(name):
#         m = re.search(rf"{re.escape(table_prefix)}(\d+)", name, flags=re.IGNORECASE)
#         return int(m.group(1)) if m else 0
#     return sorted(fileinfo_list, key=lambda f: extract_digits(f.name))


# def move_s3_to_dbfs_archive(s3_path: str, archive_dbfs_path: str):
#     """
#     Copy the file from S3 to DBFS archive and then delete the S3 source.
#     Using dbutils.fs.cp then dbutils.fs.rm for cross-filesystem move reliability.
#     Example: move_s3_to_dbfs_archive("s3a://bucket/landing/emp/emp05132025.csv", "/mini_project/archive/emp05132025.csv")
#     """
#     # copy
#     dbutils.fs.cp(s3_path, archive_dbfs_path)
#     # verify copy exists then delete the source
#     if len(dbutils.fs.ls(archive_dbfs_path)) >= 0:
#         dbutils.fs.rm(s3_path)


In [0]:
# def insert_audit_log(
#     spark: SparkSession,
#     db_name: str,
#     file_name: str,
#     load_key: str,
#     completion_status: str,
#     file_path: str,
#     rows_processed: int
# ):
#     audit_table = f"{db_name}.audit_log"

#     audit_df = spark.createDataFrame(
#         [(file_name, load_key, completion_status, file_path, rows_processed)],
#         ["file_name", "load_key", "completion_status", "file_path", "rows_processed"]
#     ).select(
#         lit(file_name).alias("file_name"),
#         current_timestamp().alias("timestamp"),
#         lit(load_key).alias("load_key"),
#         lit(completion_status).alias("completion_status"),
#         lit(file_path).alias("file_path"),
#         lit(rows_processed).cast("int").alias("rows_processed")
#     )

#     audit_df.write.format("delta").mode("append").saveAsTable(audit_table)
#     print(f"Audit log inserted into {audit_table}")

In [0]:
# def ingest_sequentially(
#     table_name: str,
#     source_folder: str,
#     pk_col: str = None,
#     db_name: str = "rawz",
#     file_format: str = "csv",
#     read_opts: dict = None
# ):
#     spark = SparkSession.builder.getOrCreate()
#     read_opts = read_opts or {"header": True, "inferSchema": True}

#     spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
#     target_table = f"{db_name}.{table_name}"
#     archive_folder = f"{source_folder}/Archive"

#     # Step 1: List and Sort Files
#     try:
#         files = dbutils.fs.ls(source_folder)
#     except Exception as e:
#         print(f"Error accessing source folder: {e}")
#         return

#     table_prefix_map = {
#         "Employee": "emp",
#         "Department": "dept",
#         "Project": "project",
#         "Training": "training",
#         "Payroll;": "payroll"
#     }

#     prefix = table_prefix_map.get(table_name, table_name.lower()[:3])
#     csv_files = [f.path for f in files if f.name.lower().startswith(prefix) and f.name.lower().endswith(f".{file_format}")]

#     if not csv_files:
#         print(f"No {file_format} files found in {source_folder} matching prefix '{prefix}'")
#         return

#     def extract_date(path):
#         filename = path.split("/")[-1]
#         match = re.search(rf'{prefix}(\d{{8}})\.{file_format}', filename)
#         if not match:
#             raise ValueError(f"Filename {filename} does not match expected pattern {prefix}MMDDYYYY.csv")
#         return datetime.strptime(match.group(1), "%m%d%Y")

#     try:
#         csv_files.sort(key=extract_date)
#     except ValueError as e:
#         print(e)
#         return

#     file_to_process = csv_files[0]
#     print(f"Processing file: {file_to_process}")

#     # Step 2: Read File
#     try:
#         df_source = spark.read.format(file_format).options(**read_opts).load(file_to_process)
#     except Exception as e:
#         print(f"Error reading file: {e}")
#         return

#     # Step 3: Check if Table Exists
#     if not spark._jsparkSession.catalog().tableExists(db_name, table_name):
#         print(f"Target table {target_table} does not exist. Please create it using DDL before ingestion.")
#         return

#     # Step 4: Generate Load Key
#     now = datetime.now()
#     date_part = now.strftime("%Y%m%d")
#     time_part = now.strftime("%H%M%S")
#     random_part = f"{random.randint(1000, 9999)}"
#     load_key = f"LCK{date_part}{time_part}{random_part}"

#     # Step 5: Align Schema and Insert
#     try:
#         df_source = df_source.withColumn("ingestTimestamp", current_timestamp()) \
#                              .withColumn("loadKey", lit(load_key))

#         # Align with target table schema
#         target_schema = spark.table(target_table).schema
#         target_col_names = [f.name for f in target_schema]

#         for field in target_schema:
#             col_name = field.name
#             if col_name in df_source.columns:
#                 df_source = df_source.withColumn(col_name, df_source[col_name].cast(field.dataType))

#         df_source = df_source.select(*target_col_names)

#         # Write to delta
#         df_source.write.format("delta") \
#                        .mode("append") \
#                        .option("mergeSchema", "false") \
#                        .saveAsTable(target_table)

#         completion_status = "Success"
#         rows_processed = df_source.count()
#         print(f"Inserted {rows_processed} rows into {target_table}")
#     except Exception as e:
#         print(f"Error writing to Delta table: {e}")
#         completion_status = "Failed"
#         rows_processed = 0

#     # Step 6: Insert Audit Log
#     insert_audit_log(
#         spark=spark,
#         db_name=db_name,
#         file_name=file_to_process.split("/")[-1],
#         load_key=load_key,
#         completion_status=completion_status,
#         file_path=file_to_process,
#         rows_processed=rows_processed
#     )

#     # Step 7: Archive File
#     try:
#         dbutils.fs.mv(file_to_process, f"{archive_folder}/{file_to_process.split('/')[-1]}")
#         print(f"Moved {file_to_process} to {archive_folder}")
#     except Exception as e:
#         print(f"Error moving file to archive: {e}")
