**Self-Intro**

In [None]:
Hello, I am Rakesh.

I have 7+ years of experience in IT, out of which 4+ years are focused specifically on Azure and 1 year on Microsoft Fabric, working across diverse domains like Banking, Retail, Finance, and Automotive.

In my most recent project with ALD Automotive, my responsibility is to design, develop, and optimize data pipelines using the Medallion Architecture on both Azure and Fabric platforms.
I work on ingesting data from various upstream source systems like SQL Server, SFTP, and ADLS Gen2 into the Bronze Layer.
In the Silver Layer, I handle data preprocessing, quality checks, standardization, and maintain historical data using SCD Type 2 patterns.

Once data is cleansed and validated, I move to the Gold Layer, where I aggregate and apply business logic — such as Cost Per Kilometer calculations, contract profitability scoring,
and excess mileage projections — storing curated data for reporting, analytics, and downstream consumption.
I build batch pipelines using Azure Data Factory, perform complex transformations using PySpark and Spark SQL, and have implemented a metadata-driven validation framework for data quality and governance.

One of my key achievements was being part of the migration of the TCO business line from Azure to Microsoft Fabric — — this involved re-architecting pipelines to leverage OneLake for unified storage,
Data Pipelines for orchestration, and Fabric Notebooks for PySpark transformations, resulting in improved performance and simplified governance.

My core tech stack includes: Azure Data Factory, Databricks, Synapse Analytics, ADLS Gen2, Delta Lake, PySpark, Spark SQL, and Microsoft Fabric (Lakehouse, Notebooks, Data Pipelines, and Power BI with Direct Lake)

I have strong experience in building scalable, production-grade data solutions in both Azure and Fabric.

**End To End Pipeline**

In [None]:
============================================================
ADF SQL Framework Flow (Full + Incremental + Backfill + History)
============================================================
""" Pre-requist -
01.ON-prim connection onprim-gate way/SHIR
02.Metadata tbl -> to define what to load and How to load
03.Control tbl -> for tracking when and status of ingestion
"""

1) Lookup Activity (Config Table)
   Use: Get list of tables + load type (FULL/INCR/BACKFILL)

2) ForEach Activity (Iterate Tables)
   Use: Loop through each table one by one

3) If Condition Activity (Full Load Check)
   Use: Decide whether table needs initial history load
   - If FULL → Run full extract
   - Else → Go incremental path

4) Lookup Activity (Watermark Table)
   Use: Fetch last processed value for current table

5) If Condition Activity (Backfill Check)
   Use: Decide whether to reprocess last N days or only new delta

6) Copy Activity (SQL → ADLS Bronze)
   Use: Load data from SQL source into raw zone
   - FULL: All rows
   - INCR: Only rows > watermark
   - BACKFILL: Rows from last N days

7) Notebook / Stored Procedure Activity (Merge History)
   - to update the water mark column and full the audit table

8) Web activity
-for send mail allert using logic app





**Schema Drift**

In [None]:
 Schema Drift:

"In our Azure Data Engineering platform for ALD Automotive TCO analytics,
schema drift was a common challenge because upstream systems like leasing,
maintenance, and fuel providers frequently introduced changes such as new
columns, missing fields, or data type modifications. These unexpected schema
changes often caused ingestion failures or inconsistencies in downstream
reporting.

To handle this, I implemented a controlled schema management approach.
In the Bronze layer, we allowed flexible raw ingestion so that files could
land successfully even if minor schema changes occurred. However, before
moving data into Silver and Gold layers, we enforced strict schema validation
using an approved schema registry and metadata checks.

If an unexpected schema change was detected, the pipeline flagged the issue,
logged it for review, and prevented corrupted data from impacting curated
tables.

This approach significantly reduced pipeline failures, improved data quality,
and ensured that TCO reporting remained consistent and reliable even as
source systems evolved."


**Late Arrival Files**

In [None]:
"In our Azure Data Engineering platform for ALD Automotive TCO analytics,
late arriving files were a frequent challenge because upstream systems like
leasing, maintenance, and fuel providers did not always deliver data on time.
For example, our ingestion pipelines were scheduled at 2 AM, but some daily
vehicle cost files would arrive several hours later or even the next day.

To handle this, I implemented a watermark-based incremental ingestion
framework in Azure Data Factory. We maintained a control table in Azure SQL
to store the last successfully processed business date. Each pipeline run
first performed a Lookup to fetch the watermark, then dynamically processed
any unprocessed dates instead of only relying on the current day’s schedule.
This ensured late files were automatically picked up in the next run without
manual reruns.

Additionally, I added file validation using Get Metadata and If Condition,
so missing files were logged and alerts were triggered instead of breaking
downstream jobs.

This approach improved data completeness, reduced operational overhead, and
kept TCO reporting accurate and reliable."

To solve this, I implemented a watermark-based incremental ingestion
framework in Azure Data Factory.

A (Action):
We maintained a control table in Azure SQL that stored the last successfully
processed business date for each pipeline. At the start of every run, the
pipeline performed a Lookup activity to fetch the watermark date, and then
dynamically identified which dates were still pending ingestion.

Instead of processing only the current day’s file, the pipeline was designed
to automatically pick up any unprocessed files whenever they arrived.

Additionally, I added file existence validation using Get Metadata and
If Condition activities. If the expected file was missing, the pipeline
did not fail abruptly. Instead, it logged the missing file event and
triggered alerts, preventing downstream jobs from running on incomplete data."

R (Result):
"As a result, late arriving files were handled automatically without manual
reruns, data completeness improved significantly, operational effort reduced,
and the business received accurate and consistent TCO reporting even when
source deliveries were delayed.

**SDC Type-2 Implemnetataion using Delta merge**

In [None]:
MERGE INTO dim_customer t
USING source_customer s
ON t.customer_id = s.customer_id   #business keys
AND t.is_current = true

WHEN MATCHED AND t.city <> s.city #attributes not matched means chnages
THEN UPDATE SET
  t.end_date = current_date(),
  t.is_current = false

WHEN MATCHED AND t.city <> s.city
THEN INSERT (
  customer_id, city, start_date, end_date, is_current
)
VALUES (
  s.customer_id, s.city, current_date(), NULL, true
)

WHEN NOT MATCHED
THEN INSERT (
  customer_id, city, start_date, end_date, is_current
)
VALUES (
  s.customer_id, s.city, current_date(), NULL, true
);



**Row Level Security (RLS)**

In [None]:
#Row Filter Function Using Group Membership

#step :1
CREATE FUNCTION main_catalog.company.group_rls(dept STRING)
RETURN
  CASE
    WHEN is_member('hr_team') AND dept = 'HR' THEN TRUE
    WHEN is_member('finance_team') AND dept = 'Finance' THEN TRUE
    ELSE FALSE
  END;
#step :2
ALTER TABLE main_catalog.company.employee
SET ROW FILTER main_catalog.company.group_rls ON (dept);


#Row Filter Function Using Current User
#step :1
CREATE FUNCTION main_catalog.company.user_rls(dept STRING)
RETURN
  CASE
    WHEN current_user() = 'ravi@datacorp.com' AND dept = 'HR'
    THEN TRUE
    ELSE FALSE
  END;

#Step :2
ALTER TABLE main_catalog.company.employee
SET ROW FILTER main_catalog.company.user_rls ON (dept);



**Column Level Security (CLS)**

In [None]:
#GROUP LEVEL
#Step :1
CREATE FUNCTION main_catalog.company.group_mask_salary(salary INT)
RETURN
  CASE
    WHEN is_member('finance_team')
    THEN salary
    ELSE NULL
  END;

#Step :2
ALTER TABLE employee
ALTER COLUMN salary
SET MASK main_catalog.company.group_mask_salary;

#User Level
#Step :1
CREATE FUNCTION main_catalog.company.user_mask_salary(salary INT)
RETURN
  CASE
    WHEN current_user() = 'john@datacorp.com'
    THEN salary
    ELSE NULL
  END;

#Step :2
ALTER TABLE employee
ALTER COLUMN salary
SET MASK main_catalog.company.user_mask_salary;


In [None]:
-- SIMPLE AND CORRECT SCD TYPE-2 IN DELTA (2 STEPS)

-- ===============================
-- STEP 1: Expire old record
-- ===============================
MERGE INTO dim_customer t
USING source_customer s
ON t.customer_id = s.customer_id
AND t.is_current = true

WHEN MATCHED AND t.city <> s.city
THEN UPDATE SET
  t.end_date = current_date(),
  t.is_current = false;


-- ===============================
-- STEP 2: Insert new record
-- (new customer OR changed customer)
-- ===============================
INSERT INTO dim_customer
SELECT
  s.customer_id,
  s.city,
  current_date() AS start_date,
  NULL           AS end_date,
  true           AS is_current
FROM source_customer s
LEFT JOIN dim_customer t
  ON s.customer_id = t.customer_id
 AND t.is_current = true
WHERE t.customer_id IS NULL
   OR t.city <> s.city;

-- Done: History stored correctly


# ADF- Azure datafactory

**Tracking Failures and re-try**

In [None]:
"In our Azure Data Engineering platform, I tracked ingestion failures using
a centralized audit logging framework. Every ADF pipeline run wrote details
into an audit table, including the source name, file or table processed,
run status, rows loaded, and error message. This gave complete visibility
into which ingestion failed and why.

At the activity level, I used ADF’s On Failure paths to capture error
information and log failures immediately. For transient issues like network
timeouts or temporary source unavailability, I configured retry policies in
Copy activities, typically with 2–3 retries and defined retry intervals.

If failures persisted beyond the retry limit, the pipeline triggered alerts
through email or Teams notifications with run details, so support teams could
act quickly.

This approach ensured reliable ingestion, faster issue resolution, and
minimal data pipeline downtime.

**how you will pass parameter in runtime**

In [None]:
"In ADF, I pass runtime parameters by defining pipeline parameters like
table name or file name. The master pipeline reads these values from a
config table using a Lookup activity and loops through them with ForEach.
Then it passes the values to Copy or child pipelines, so the same pipeline
can load different tables or files without creating separate pipelines."

**End to end pipeline with files**

In [None]:
Sure. Below is a real-world enterprise design for SFTP/ADLS file ingestion
with multiple file formats, date-wise folders, and a config-driven framework
(ALD Automotive TCO style).

============================================================
1. Real-World SFTP Folder Structure (Date-wise Delivery)
============================================================

Upstream systems drop daily files like this:

SFTP Root: /ald_tco/

------------------------------------------------------------
/ald_tco/vehicle_cost/2026/01/29/
    vehicle_cost_20260129.csv

/ald_tco/fuel_transactions/2026/01/29/
    fuel_txn_20260129.json

/ald_tco/maintenance_invoice/2026/01/29/
    maint_inv_20260129.parquet
------------------------------------------------------------

3 Different File Formats:
- CSV  (vehicle cost)
- JSON (fuel transactions)
- Parquet (maintenance invoices)

============================================================
2. Config Table (Drives All File Pipelines)
============================================================

Table Name: file_ingestion_config

---------------------------------------------------------------------------------------------------
source_name     source_folder              file_pattern              file_type   target_table
---------------------------------------------------------------------------------------------------
vehicle_cost    /ald_tco/vehicle_cost/     vehicle_cost_*.csv        CSV         silver_vehicle_cost
fuel_txn        /ald_tco/fuel_transactions/ fuel_txn_*.json          JSON        silver_fuel_txn
maint_invoice   /ald_tco/maintenance_invoice/ maint_inv_*.parquet   PARQUET     silver_maintenance
---------------------------------------------------------------------------------------------------

Additional Columns (Enterprise):

---------------------------------------------------------------------------------------------------
date_partitioned   expected_daily   validation_required
---------------------------------------------------------------------------------------------------
Y                  Y                Y
Y                  Y                Y
Y                  Y                N
---------------------------------------------------------------------------------------------------

============================================================
3. Processed File Log Table (Avoid Duplicates + Late Files)
============================================================

Table: processed_file_log

---------------------------------------------------------------------------------------------------
source_name      file_name                    processed_date     status
---------------------------------------------------------------------------------------------------
vehicle_cost     vehicle_cost_20260128.csv    2026-01-28         SUCCESS
fuel_txn         fuel_txn_20260128.json       2026-01-28         SUCCESS
---------------------------------------------------------------------------------------------------

============================================================
4. Parameterized ADF Pipeline Flow (Real World)
============================================================

Pipeline Name: PL_File_Master_Ingestion

Pipeline Parameter:
- p_source_name (optional, else process all)

------------------------------------------------------------
Activity Flow
------------------------------------------------------------

1) Lookup Activity (Read Config)
   Use: Get all file sources to ingest today

   Query:
   SELECT * FROM file_ingestion_config

        |
        v

2) ForEach Activity (Loop Each Source)
   Use: Iterate vehicle_cost, fuel_txn, maint_invoice

        |
        v

3) Set Variable (Build Today Folder Path)
   Use: Construct dynamic date-wise path

   Example Expression:
   @concat(
     item().source_folder,
     formatDateTime(utcNow(),'yyyy/MM/dd'),
     '/'
   )

   Output:
   /ald_tco/vehicle_cost/2026/01/29/

        |
        v

4) Get Metadata Activity (List Files)
   Use: List files in today’s folder

        |
        v

5) Lookup Activity (Processed File Log)
   Use: Fetch already processed files for this source

        |
        v

6) Filter Activity (New/Late Files Only)
   Use: Remove files already processed
        Keep only unprocessed arrivals

        |
        v

7) ForEach Activity (Loop Each File)

   Inside:

   7a) Copy Activity (SFTP → Bronze ADLS)
       Use: Land raw file into Bronze zone

       Target Path:
       /bronze/@{item().source_name}/

   7b) Notebook Activity (Format-Specific Load)
       Use: Read based on file_type

       If CSV → spark.read.csv()
       If JSON → spark.read.json()
       If PARQUET → spark.read.parquet()

       Write into Silver table:
       silver_vehicle_cost, silver_fuel_txn, etc.

   7c) Log File Processed
       Use: Insert into processed_file_log

        |
        v

8) If Condition (Missing File Alert)
   Use: If expected_daily = Y and no file found → send alert

        |
        v

[End]

============================================================
5. How Late Arriving Files Are Handled
============================================================

Example:
vehicle_cost_20260129.csv arrives late on 30th Jan.

Next pipeline run:
- Get Metadata lists file
- Filter checks processed_file_log
- File not processed → picked automatically

No manual rerun needed.

============================================================
One Line Summary
============================================================

Config-driven source list → Build date folder → List files →
Filter unprocessed → Copy to Bronze → Load to Silver → Log processed →
Alert if missing

============================================================

If you want, I can give the exact Filter expression logic used in ADF
to compare Get Metadata output with processed_file_log.
