### Create spark session

In [0]:
# Import required libraries
from delta.tables import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Initialize Spark session with Delta Lake
spark = SparkSession.builder \
    .appName("MasterCardAcquisition") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

###Read data and create schema, write as Delta in Bronze

In [0]:
# Sample data for different acquired companies
companies_data = [
    (1, "PayTech Solutions", "US", "2023-01-15", 50000000, "payment_processing"),
    (2, "CryptoFlow", "UK", "2023-02-20", 75000000, "cryptocurrency"),
    (3, "LendFast", "Canada", "2023-03-10", 30000000, "lending"),
    (4, "WalletSecure", "Germany", "2023-04-05", 45000000, "digital_wallet"),
    (5, "InsurTech Pro", "France", "2023-05-12", 60000000, "insurance_tech")
]

companies_schema = StructType([
    StructField("company_id", IntegerType(), True),
    StructField("company_name", StringType(), True),
    StructField("country", StringType(), True),
    StructField("acquisition_date", StringType(), True),
    StructField("acquisition_value_usd", LongType(), True),
    StructField("business_type", StringType(), True)
])

companies_df = spark.createDataFrame(companies_data, companies_schema)

# Write to Delta table
companies_df.write.format("delta").mode("overwrite").save("/mnt/delta/bronze/acquired_companies")

print("✅ Delta table created successfully!")
companies_df.show()

###Load from Bronze and create a table in SQL

In [0]:
# Read Delta table
acquired_companies = spark.read.format("delta").load("/mnt/delta/bronze/acquired_companies")

# Alternative: Create table and query with SQL
acquired_companies.write.format("delta").saveAsTable("mastercard.acquired_companies")

# SQL query
spark.sql("""
SELECT company_name, country, acquisition_value_usd 
FROM mastercard.acquired_companies 
WHERE acquisition_value_usd > 50000000
ORDER BY acquisition_value_usd DESC
""").show()

###Add more data as part of Time travel

In [0]:
# Let's simulate data changes over time
print(" Initial version of data:")
spark.sql("SELECT * FROM mastercard.acquired_companies").show()

# Version 1: Add more companies
new_companies = [
    (6, "BlockChain Innovations", "Singapore", "2023-06-18", 80000000, "blockchain"),
    (7, "AI Financial", "Japan", "2023-07-22", 95000000, "ai_fintech"),
    (8, "RegTech Solutions", "Australia", "2023-08-15", 40000000, "regulatory_tech")
]

new_df = spark.createDataFrame(new_companies, companies_schema)
new_df.write.format("delta").mode("append").saveAsTable("mastercard.acquired_companies")

print("\n After adding new companies (Version 2):")
spark.sql("SELECT COUNT(*) as total_companies FROM mastercard.acquired_companies").show()

# Version 2: Update acquisition values (market corrections)
spark.sql("""
UPDATE mastercard.acquired_companies 
SET acquisition_value_usd = acquisition_value_usd * 1.1 
WHERE business_type = 'cryptocurrency'
""")

print("\n After updating crypto company values (Version 3):")
spark.sql("SELECT * FROM mastercard.acquired_companies WHERE business_type = 'cryptocurrency'").show()

###Schema Evolution

In [0]:
# Simulate new data with additional columns (schema evolution)
extended_companies_data = [
    (9, "NeoBank Global", "Brazil", "2023-09-10", 120000000, "digital_banking", "João Silva", 850, True),
    (10, "PaySecure", "India", "2023-10-05", 65000000, "payment_security", "Priya Sharma", 720, False),
    # Update existing company with new schema
    (2, "CryptoFlow", "UK", "2023-02-20", 82500000, "cryptocurrency", "James Wilson", 950, True)
]

extended_schema = StructType([
    StructField("company_id", IntegerType(), True),
    StructField("company_name", StringType(), True),
    StructField("country", StringType(), True),
    StructField("acquisition_date", StringType(), True),
    StructField("acquisition_value_usd", LongType(), True),
    StructField("business_type", StringType(), True),
    StructField("ceo_name", StringType(), True),  # New column
    StructField("credit_score", IntegerType(), True),  # New column
    StructField("public_company", BooleanType(), True)  # New column
])

extended_df = spark.createDataFrame(extended_companies_data, extended_schema)

print("🔄 New data with extended schema:")
extended_df.show()

###Merge (Upsert) Operation
####Comparing two data set if data available then set/update else just add 

In [0]:
# Create Delta table reference
delta_table = DeltaTable.forName(spark, "mastercard.acquired_companies")

# Perform merge (upsert) operation
delta_table.alias("target") \
    .merge(
        extended_df.alias("source"),
        "target.company_id = source.company_id"
    ) \
    .whenMatchedUpdate(set = {
        "acquisition_value_usd": "source.acquisition_value_usd",
        "ceo_name": "source.ceo_name",
        "credit_score": "source.credit_score",
        "public_company": "source.public_company"
    }) \
    .whenNotMatchedInsert(values = {
        "company_id": "source.company_id",
        "company_name": "source.company_name",
        "country": "source.country",
        "acquisition_date": "source.acquisition_date",
        "acquisition_value_usd": "source.acquisition_value_usd",
        "business_type": "source.business_type",
        "ceo_name": "source.ceo_name",
        "credit_score": "source.credit_score",
        "public_company": "source.public_company"
    }) \
    .execute()

print("✅ Merge operation completed!")
spark.sql("SELECT * FROM mastercard.acquired_companies ORDER BY company_id").show()

# Check schema evolution
print("\n📋 Current table schema:")
spark.sql("DESCRIBE mastercard.acquired_companies").show()

###Medallion Architecture (Bronze/Silver/Gold)
####Bronze Layer (Raw Data)

In [0]:
# Bronze: Raw transaction data from acquired companies
raw_transactions = [
    (1, 1, "2023-12-01", 150.75, "USD", "purchase", "retail"),
    (2, 1, "2023-12-01", 2500.00, "USD", "transfer", "p2p"),
    (3, 2, "2023-12-01", 0.05, "BTC", "crypto_buy", "exchange"),
    (4, 3, "2023-12-01", 5000.00, "CAD", "loan_disbursement", "lending"),
    (5, 4, "2023-12-01", 25.30, "EUR", "wallet_load", "digital_wallet")
]

bronze_schema = StructType([
    StructField("transaction_id", LongType(), True),
    StructField("company_id", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("category", StringType(), True)
])

bronze_df = spark.createDataFrame(raw_transactions, bronze_schema)
bronze_df.write.format("delta").mode("overwrite").save("/mnt/delta/bronze/transactions")
bronze_df.write.format("delta").mode("overwrite").saveAsTable("mastercard.bronze_transactions")

print("🥉 Bronze layer created - Raw transaction data")
bronze_df.show()

####Silver Layer - Cleaning data

In [0]:
# Silver: Cleaned and validated data
silver_df = spark.sql("""
SELECT 
    transaction_id,
    company_id,
    to_date(transaction_date, 'yyyy-MM-dd') as transaction_date,
    amount,
    upper(currency) as currency,
    lower(transaction_type) as transaction_type,
    lower(category) as category,
    CASE 
        WHEN currency = 'USD' THEN amount
        WHEN currency = 'EUR' THEN amount * 1.10  -- Simplified conversion
        WHEN currency = 'CAD' THEN amount * 0.75
        WHEN currency = 'BTC' THEN amount * 42000  -- Simplified BTC rate
        ELSE amount
    END as amount_usd,
    current_timestamp() as processed_at
FROM mastercard.bronze_transactions
WHERE amount > 0 AND transaction_date IS NOT NULL
""")

silver_df.write.format("delta").mode("overwrite").save("/mnt/delta/silver/transactions")
silver_df.write.format("delta").mode("overwrite").saveAsTable("mastercard.silver_transactions")

print("🥈 Silver layer created - Cleaned and standardized data")
silver_df.show()

####Gold Layer

In [0]:
# Gold: Aggregated business metrics
gold_df = spark.sql("""
SELECT 
    c.company_name,
    c.business_type,
    c.country,
    COUNT(t.transaction_id) as total_transactions,
    SUM(t.amount_usd) as total_volume_usd,
    AVG(t.amount_usd) as avg_transaction_usd,
    MAX(t.transaction_date) as last_transaction_date,
    current_timestamp() as report_generated_at
FROM mastercard.silver_transactions t
JOIN mastercard.acquired_companies c ON t.company_id = c.company_id
GROUP BY c.company_name, c.business_type, c.country
""")

gold_df.write.format("delta").mode("overwrite").save("/mnt/delta/gold/company_metrics")
gold_df.write.format("delta").mode("overwrite").saveAsTable("mastercard.gold_company_metrics")

print("🥇 Gold layer created - Business intelligence metrics")
gold_df.show()

####Partitioning and Z-Order

In [0]:
# Create large dataset for partitioning demonstration
import random
from datetime import datetime, timedelta

# Generate sample transaction data across multiple months
def generate_transactions(num_records=10000):
    transactions = []
    start_date = datetime(2023, 1, 1)
    
    for i in range(num_records):
        transaction_date = start_date + timedelta(days=random.randint(0, 365))
        company_id = random.randint(1, 10)
        amount = round(random.uniform(1, 10000), 2)
        
        transactions.append((
            i + 1,
            company_id,
            transaction_date.strftime('%Y-%m-%d'),
            amount,
            random.choice(['USD', 'EUR', 'GBP', 'CAD']),
            random.choice(['purchase', 'transfer', 'withdrawal', 'deposit']),
            random.choice(['retail', 'online', 'atm', 'p2p'])
        ))
    
    return transactions

large_transactions = generate_transactions(10000)
large_df = spark.createDataFrame(large_transactions, bronze_schema)

# Add year and month columns for partitioning
partitioned_df = large_df.withColumn("year", year(col("transaction_date"))) \
                         .withColumn("month", month(col("transaction_date")))

# Write partitioned table
partitioned_df.write \
    .format("delta") \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .save("/mnt/delta/silver/transactions_partitioned")

partitioned_df.write \
    .format("delta") \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .saveAsTable("mastercard.transactions_partitioned")

print("📂 Partitioned table created by year and month")
print(f"Total records: {partitioned_df.count()}")

# Query performance comparison
print("\n⚡ Querying partitioned data (should be faster):")
spark.sql("""
SELECT company_id, COUNT(*) as transaction_count, SUM(amount) as total_amount
FROM mastercard.transactions_partitioned 
WHERE year = 2023 AND month = 12
GROUP BY company_id
ORDER BY total_amount DESC
""").show()

In [0]:
# Apply Z-ordering for better query performance
spark.sql("""
OPTIMIZE mastercard.transactions_partitioned
ZORDER BY company_id, amount
""")

print("🔄 Z-ordering applied on company_id and amount columns")

# Check table details
print("\n📊 Table details after optimization:")
spark.sql("DESCRIBE DETAIL mastercard.transactions_partitioned").show()

In [0]:
# Show table history before vacuum
print("🗂️ Table history before vacuum:")
spark.sql("DESCRIBE HISTORY mastercard.acquired_companies").select("version", "timestamp", "operation").show()

# Set retention period (for demo purposes - normally 7 days minimum)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

# Vacuum old versions (removes old data files)
spark.sql("VACUUM mastercard.acquired_companies RETAIN 0 HOURS")
print("\n🧹 Vacuum completed - old data files removed")

# Note: In production, use appropriate retention period
# spark.sql("VACUUM mastercard.acquired_companies RETAIN 168 HOURS")  # 7 days

# Try time travel after vacuum (should fail for older versions)
print("\n⚠️ Testing time travel after vacuum:")
try:
    spark.sql("SELECT * FROM mastercard.acquired_companies VERSION AS OF 0").show()
except Exception as e:
    print(f"Expected error: {str(e)[:100]}...")

In [0]:
def mastercard_acquisition_pipeline():
    """
    Complete data pipeline for MasterCard acquisition scenario
    """
    print("🚀 Starting MasterCard Acquisition Data Pipeline")
    
    # 1. Bronze: Ingest raw data
    print("\n🥉 Bronze Layer: Ingesting raw data...")
    # (Code from bronze layer above)
    
    # 2. Silver: Clean and validate
    print("\n🥈 Silver Layer: Cleaning and validating...")
    # (Code from silver layer above)
    
    # 3. Gold: Create business metrics
    print("\n🥇 Gold Layer: Generating business insights...")
    # (Code from gold layer above)
    
    # 4. Optimize for performance
    print("\n⚡ Optimizing tables for performance...")
    spark.sql("OPTIMIZE mastercard.silver_transactions")
    spark.sql("OPTIMIZE mastercard.gold_company_metrics ZORDER BY company_name")
    
    # 5. Generate final report
    print("\n📊 Final Acquisition Summary Report:")
    spark.sql("""
    SELECT 
        'Total Companies Acquired' as metric,
        CAST(COUNT(*) AS STRING) as value
    FROM mastercard.acquired_companies
    
    UNION ALL
    
    SELECT 
        'Total Acquisition Value (USD)' as metric,
        CONCAT('$', FORMAT_NUMBER(SUM(acquisition_value_usd), 0)) as value
    FROM mastercard.acquired_companies
    
    UNION ALL
    
    SELECT 
        'Average Company Value (USD)' as metric,
        CONCAT('$', FORMAT_NUMBER(AVG(acquisition_value_usd), 0)) as value
    FROM mastercard.acquired_companies
    
    UNION ALL
    
    SELECT 
        'Top Business Type' as metric,
        business_type as value
    FROM (
        SELECT business_type, COUNT(*) as count
        FROM mastercard.acquired_companies
        GROUP BY business_type
        ORDER BY count DESC
        LIMIT 1
    )
    """).show(truncate=False)
    
    print("\n✅ Pipeline completed successfully!")

# Run the complete pipeline
mastercard_acquisition_pipeline()