# Data ingestion to DB
## Step 2 
___The below code does the ingestion of data to a SQL DB. 
For every 2 minutes the data is ingested to DB. Total 4 batches of nearly 4K records are ingested to the DB
here is logging done which clearly calls out the ingestion runs___

In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
import logging
from datetime import datetime
import os
from tenacity import retry, stop_after_attempt, wait_exponential

# Create logs directory
os.makedirs('logs', exist_ok=True)

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'logs/bank_db_ingestion_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)

# Read and split data 
df = pd.read_csv('/Users/rohithkoripelli/M.Tech/SEM2/DMML/Assignment/Churn Prediction/BankChurnFinal.csv')
logging.info(f"Read {len(df)} records from source file")

splits = []
set1, set2, set3, set4 = np.array_split(df, 4)
for dataset in [set1, set2, set3, set4]:
    split1, split2 = np.array_split(dataset, 2)
    splits.extend([split1, split2])
logging.info(f"Created {len(splits)} splits")

# Database connection with retry parameters
conn_str = (
    'DRIVER={ODBC Driver 18 for SQL Server};'
    'SERVER=customerchurn.database.windows.net,1433;'
    'DATABASE=customerchurn;'
    'UID=sqladmin;'
    'PWD=churn@123;'
    'Encrypt=yes;'
    'TrustServerCertificate=no;'
    'Connection Timeout=600;'
    'Command Timeout=60;'
    'ConnectRetryCount=3;'
    'ConnectRetryInterval=10'
)

engine = create_engine(
    f'mssql+pyodbc:///?odbc_connect={quote_plus(conn_str)}',
    pool_pre_ping=True,
    pool_recycle=3600,
    pool_timeout=60,
    pool_size=5,
    max_overflow=10
)

# Update create table SQL to match the actual table structure
create_table_sql = """
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[BankChurnData]') AND type in (N'U'))
BEGIN
    CREATE TABLE BankChurnData (
        CustomerId BIGINT,              -- Primary key field
        Surname VARCHAR(100),
        CreditScore INT,
        Geography VARCHAR(50),
        Gender VARCHAR(10),
        Age INT,
        Tenure INT,
        Balance DECIMAL(12,2),
        NumOfProducts INT,
        HasCrCard BIT,
        IsActiveMember BIT,
        EstimatedSalary DECIMAL(12,2),
        Exited BIT,
        ingestion_timestamp DATETIME
    )
END
"""

# First drop and recreate table to ensure correct schema
try:
    with engine.connect() as conn:
        conn.execute(text("DROP TABLE IF EXISTS BankChurnData"))
        conn.execute(text(create_table_sql))
        conn.commit()
        logging.info("Table schema recreated without PRIMARY KEY constraint")
except Exception as e:
    logging.error(f"Table creation failed: {str(e)}")
    raise

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    reraise=True
)
def insert_batch(engine, batch, batch_num):
    with engine.connect() as conn:
        before_count = conn.execute(text("SELECT COUNT(*) FROM BankChurnData")).scalar()
        batch.to_sql('BankChurnData', engine, if_exists='append', index=False, chunksize=50)
        after_count = conn.execute(text("SELECT COUNT(*) FROM BankChurnData")).scalar()
        return after_count - before_count

# Insert first 4 batches
for i in range(4):
    try:
        batch = splits[i]
        batch['ingestion_timestamp'] = datetime.now()
        
        # Convert data types
        batch['CustomerId'] = batch['CustomerId'].astype('int64')
        batch['CreditScore'] = batch['CreditScore'].astype('int32')
        batch['Age'] = batch['Age'].astype('int32')
        batch['Tenure'] = batch['Tenure'].astype('int32')
        batch['Balance'] = pd.to_numeric(batch['Balance'], errors='coerce')
        batch['NumOfProducts'] = batch['NumOfProducts'].astype('int32')
        batch['HasCrCard'] = batch['HasCrCard'].astype('int32')
        batch['IsActiveMember'] = batch['IsActiveMember'].astype('int32')
        batch['EstimatedSalary'] = pd.to_numeric(batch['EstimatedSalary'], errors='coerce')
        batch['Exited'] = batch['Exited'].astype('int32')
        
        start_time = datetime.now()
        records_inserted = insert_batch(engine, batch, i+1)
        duration = (datetime.now() - start_time).total_seconds()
        
        logging.info(f"Batch {i+1}: Inserted {records_inserted} records in {duration:.2f} seconds")
            
    except Exception as e:
        logging.error(f"Batch {i+1} failed: {str(e)}")

logging.info("DB insertion completed")

# Data ingestion in to File share
## Step 2 
___The below code does the ingestion of data to a Azure file share. 
Total 4 batches of nearly 4K records are ingested to the Azure file share.
There is logging done which clearly calls out the ingestion runs___

In [None]:
import pandas as pd
import numpy as np
from azure.storage.fileshare import ShareServiceClient
import logging
from datetime import datetime
import os

# Create logs directory
os.makedirs('logs', exist_ok=True)

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'logs/fileshare_ingestion_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)

def cleanup_file_share(connection_string, share_name):
    """
    Remove all files from the specified file share before fresh ingestion.
    """
    service_client = ShareServiceClient.from_connection_string(conn_str=connection_string)
    share_client = service_client.get_share_client(share_name)
    for item in share_client.list_directories_and_files():
        file_client = share_client.get_file_client(item.name)
        file_client.delete_file()
    logging.info(f"Cleaned up old files in file share: {share_name}")

# Read and split data 
df = pd.read_csv('/Users/rohithkoripelli/M.Tech/SEM2/DMML/Assignment/Churn Prediction/BankChurnFinal.csv')
logging.info(f"Read {len(df)} records from source file")

splits = []
set1, set2, set3, set4 = np.array_split(df, 4)
for dataset in [set1, set2, set3, set4]:
    split1, split2 = np.array_split(dataset, 2)
    splits.extend([split1, split2])
logging.info(f"Created {len(splits)} splits")

connection_string = "DefaultEndpointsProtocol=https;AccountName=customerchurnsource;AccountKey=qxJurCvpimZ+qWV/AP3GNiMGrH7zFDYWF3B5SJ1/pxd5ppyXwiK2l4CeB7q3vKn5Wr5pZx4mJDE7+ASt0CDNcg==;EndpointSuffix=core.windows.net"
share_name = "customerchurnsource"

# Clean up old files first
cleanup_file_share(connection_string, share_name)
# Track successful uploads
successful_batches = set()

# Process last 4 batches (indices 4-7)
for i in range(4, 8):
    batch_num = i + 1
    
    # Skip if already successful
    if batch_num in successful_batches:
        logging.info(f"Batch {batch_num} already processed successfully, skipping")
        continue
        
    try:
        batch = splits[i]
        batch_size = len(batch)
        logging.info(f"Processing batch {batch_num} with {batch_size} records")
        
        # Add timestamp
        batch['ingestion_timestamp'] = datetime.now()
        
        # Create unique filename
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f'batch_{batch_num}_{timestamp}.csv'
        temp_path = f'temp_{filename}'
        
        # Save to temp file
        batch.to_csv(temp_path, index=False)
        
        # Upload to File Share
        share_service_client = ShareServiceClient.from_connection_string(connection_string)
        share_client = share_service_client.get_share_client(share_name)
        file_client = share_client.get_file_client(filename)
        
        start_time = datetime.now()
        with open(temp_path, 'rb') as file_data:
            file_client.upload_file(file_data)
        
        duration = (datetime.now() - start_time).total_seconds()
        
        # Cleanup temp file
        os.remove(temp_path)
        
        # Mark as successful
        successful_batches.add(batch_num)
        logging.info(f"Batch {batch_num}: Uploaded {batch_size} records in {duration:.2f} seconds")
        
    except Exception as e:
        logging.error(f"Batch {batch_num} failed: {str(e)}")
        if os.path.exists(temp_path):
            os.remove(temp_path)

logging.info("File Share ingestion completed")
logging.info(f"Successfully processed batches: {sorted(list(successful_batches))}")

# Store ingested data in to Azure data lake
## Step 3

__In below code, Raw data from DB and file share is transfered to a Azure data lake blob container. Maintaing the right folder structure, type & timestamp.___

In [None]:
from azure.storage.blob import BlobServiceClient
from azure.storage.fileshare import ShareServiceClient
from sqlalchemy import create_engine, text
import pandas as pd
import logging
from datetime import datetime
from urllib.parse import quote_plus
import io
import time
import os

# Create logs directory
os.makedirs('logs', exist_ok=True)

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'logs/blob_etl_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)

def fetch_from_db():
    conn_str = (
        'DRIVER={ODBC Driver 18 for SQL Server};'
        'SERVER=customerchurn.database.windows.net,1433;'
        'DATABASE=customerchurn;UID=sqladmin;PWD=churn@123;'
        'Encrypt=yes;TrustServerCertificate=no;'
    )
    try:
        engine = create_engine(f'mssql+pyodbc:///?odbc_connect={quote_plus(conn_str)}')
        df = pd.read_sql("SELECT * FROM BankChurnData", engine)
        logging.info(f"Fetched {len(df)} records from DB")
        return df
    except Exception as e:
        logging.error(f"DB fetch failed: {str(e)}")
        return None

def cleanup_blob_container(conn_str, container_name):
    """
    Delete and recreate the container with increased retry delay
    """
    try:
        blob_service_client = BlobServiceClient.from_connection_string(conn_str)
        max_retries = 5  # Increased from 3
        retry_delay = 15  # Increased from 5 seconds
        
        try:
            container_client = blob_service_client.get_container_client(container_name)
            if container_client.exists():
                container_client.delete_container()
                logging.info(f"Deleted existing container: {container_name}")
                
                # Longer initial wait after deletion
                time.sleep(30)  # Added 30 second initial wait
                
                for attempt in range(max_retries):
                    try:
                        container_client = blob_service_client.create_container(container_name)
                        logging.info(f"Created new container: {container_name} on attempt {attempt + 1}")
                        return container_client
                    except Exception as e:
                        if attempt == max_retries - 1:
                            raise
                        logging.info(f"Retrying container creation... Attempt {attempt + 1}")
                        time.sleep(retry_delay)
        except:
            container_client = blob_service_client.create_container(container_name)
            logging.info(f"Created new container: {container_name}")
            return container_client
            
    except Exception as e:
        logging.error(f"Error cleaning up blob container: {str(e)}", exc_info=True)
        raise

def fetch_from_fileshare():
    conn_str = "DefaultEndpointsProtocol=https;AccountName=customerchurnsource;AccountKey=qxJurCvpimZ+qWV/AP3GNiMGrH7zFDYWF3B5SJ1/pxd5ppyXwiK2l4CeB7q3vKn5Wr5pZx4mJDE7+ASt0CDNcg==;EndpointSuffix=core.windows.net"
    try:
        share_client = ShareServiceClient.from_connection_string(conn_str)
        share = share_client.get_share_client("customerchurnsource")
        
        dfs = []
        for item in share.list_directories_and_files():
            if item.name.endswith('.csv'):
                file_client = share.get_file_client(item.name)
                download_stream = file_client.download_file()
                content = download_stream.readall()
                df = pd.read_csv(pd.io.common.BytesIO(content))
                dfs.append(df)
                logging.info(f"Read file: {item.name}")
        
        if dfs:
            final_df = pd.concat(dfs)
            logging.info(f"Fetched {len(final_df)} total records from File Share")
            return final_df
        return None
    except Exception as e:
        logging.error(f"File Share fetch failed: {str(e)}")
        return None

def write_to_blob(df, source):
    try:
        conn_str = "DefaultEndpointsProtocol=https;AccountName=churndest;AccountKey=y8ZXlGbsdIX6Z7sP0/2VMxL9+7NKTLv4M6fvbhczK8T+fP/LJXfuvturAeghXm+89QAmosxQS8+/+AStywM6UQ==;EndpointSuffix=core.windows.net"
        blob_service_client = BlobServiceClient.from_connection_string(conn_str)
        
        container_name = "customerchurn"
        try:
            container_client = blob_service_client.create_container(container_name)
            logging.info(f"Created new container: {container_name}")
        except:
            container_client = blob_service_client.get_container_client(container_name)
            logging.info(f"Using existing container: {container_name}")

        now = datetime.now()
        path = f"source={source}/year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}"
        filename = f"data_{now.strftime('%Y%m%d_%H%M%S')}.csv"
        blob_path = f"{path}/{filename}"

        # Convert to CSV instead of parquet
        csv_buffer = io.StringIO()
        df.to_csv(csv_buffer, index=False)
        csv_buffer.seek(0)

        blob_client = container_client.get_blob_client(blob_path)
        blob_client.upload_blob(csv_buffer.getvalue(), overwrite=True)
        
        logging.info(f"Successfully written {len(df)} records to {blob_path}")
        return True

    except Exception as e:
        logging.error(f"Blob storage write failed for {source}: {str(e)}")
        return False

if __name__ == "__main__":
    try:
        # Use the actual connection string
        conn_str = "DefaultEndpointsProtocol=https;AccountName=churndest;AccountKey=y8ZXlGbsdIX6Z7sP0/2VMxL9+7NKTLv4M6fvbhczK8T+fP/LJXfuvturAeghXm+89QAmosxQS8+/+AStywM6UQ==;EndpointSuffix=core.windows.net"
        container_name = "customerchurn"
        
        # Clean up and get container client
        container_client = cleanup_blob_container(conn_str, container_name)
        
        # Process DB data
        db_data = fetch_from_db()
        if db_data is not None:
            write_to_blob(db_data, "db")
        
        # Process File Share data
        fs_data = fetch_from_fileshare()
        if fs_data is not None:
            write_to_blob(fs_data, "fileshare")
            
    except Exception as e:
        logging.error(f"Main process failed: {str(e)}", exc_info=True)

# Data validation
## Step 4
___Raw data from Azure data lake is fetched for completing data validation___

In [28]:
from azure.storage.blob import BlobServiceClient
import pandas as pd
import logging
from datetime import datetime
import io

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def fetch_and_analyze():
    # Connect to blob
    conn_str = "DefaultEndpointsProtocol=https;AccountName=churndest;AccountKey=y8ZXlGbsdIX6Z7sP0/2VMxL9+7NKTLv4M6fvbhczK8T+fP/LJXfuvturAeghXm+89QAmosxQS8+/+AStywM6UQ==;EndpointSuffix=core.windows.net"
    blob_service_client = BlobServiceClient.from_connection_string(conn_str)
    container_client = blob_service_client.get_container_client("customerchurn")

    # Read all CSVs
    dfs = []
    for blob in container_client.list_blobs():
        if blob.name.endswith('.csv'):
            blob_client = container_client.get_blob_client(blob.name)
            content = blob_client.download_blob().readall()
            df = pd.read_csv(io.BytesIO(content))
            dfs.append(df)
            logging.info(f"Read: {blob.name}")

    # Combine and analyze
    if dfs:
        combined_df = pd.concat(dfs, ignore_index=True)
        
        print("\nData Shape:", combined_df.shape)
        print("\nMissing Values:")
        print(combined_df.isnull().sum())
        print("\nData Description:")
        print(combined_df.describe())
        # Print duplicate records
        print("\nDuplicate Records:", combined_df.duplicated().sum())
        # Validate range of columns
        print("\nCreditScore Range:", combined_df['CreditScore'].min(), combined_df['CreditScore'].max())
        print("Age Range:", combined_df['Age'].min(), combined_df['Age'].max())
        print("Tenure Range:", combined_df['Tenure'].min(), combined_df['Tenure'].max())
        print("Balance Range:", combined_df['Balance'].min(), combined_df['Balance'].max())
        print("NumOfProducts Range:", combined_df['NumOfProducts'].min(), combined_df['NumOfProducts'].max())
        
    else:
        print("No CSV files found")

if __name__ == "__main__":
    fetch_and_analyze()

2025-02-22 13:53:34,290 - INFO - Request URL: 'https://churndest.blob.core.windows.net/customerchurn?restype=REDACTED&comp=REDACTED'
Request method: 'GET'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'User-Agent': 'azsdk-python-storage-blob/12.24.1 Python/3.9.6 (macOS-15.1-arm64-arm-64bit)'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': '4e96e372-f0f6-11ef-aae8-6241a593cdd7'
    'Authorization': 'REDACTED'
No body was attached to the request
2025-02-22 13:53:35,646 - INFO - Response status: 200
Response headers:
    'Transfer-Encoding': 'chunked'
    'Content-Type': 'application/xml'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'c92100e9-301e-00a2-5d03-855a8d000000'
    'x-ms-client-request-id': '4e96e372-f0f6-11ef-aae8-6241a593cdd7'
    'x-ms-version': 'REDACTED'
    'Date': 'Sat, 22 Feb 2025 08:23:34 GMT'
2025-02-22 13:53:35,649 - INFO - Request URL: 'https://churndest.blob.core.windows.net/custo


Data Shape: (32842, 14)

Missing Values:
CustomerId               0
Surname                  0
CreditScore              0
Geography                0
Gender                   0
Age                      0
Tenure                   0
Balance                357
NumOfProducts            0
HasCrCard                0
IsActiveMember           0
EstimatedSalary        346
Exited                   0
ingestion_timestamp      0
dtype: int64

Data Description:
         CustomerId   CreditScore           Age        Tenure        Balance  \
count  3.284200e+04  32842.000000  32842.000000  32842.000000   32485.000000   
mean   1.551574e+07    645.094879     38.493271      4.957798   75984.918590   
std    2.255006e+06    134.471892     11.726820      2.933764   63022.562214   
min    0.000000e+00      0.000000      0.000000      0.000000       0.000000   
25%    1.457840e+07    566.000000     31.000000      2.000000       0.000000   
50%    1.566222e+07    647.000000     37.000000      5.000000   9396