# AKIFA KHAN 29419 + ARMEEN GATTA 27260

## ETL PIPELINE FOR DATAWAREHOUSE

### IMPORTS

In [1]:
import pyodbc
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, ForeignKey
from sqlalchemy import create_engine, text
import requests
import pandas as pd
from powerbiclient.authentication import DeviceCodeLoginAuthentication
from powerbiclient import Report, models
from powerbiclient.authentication import DeviceCodeLoginAuthentication

### CONNECTION WITH DATABSE

In [2]:
connection_string = (
    "Driver={SQL Server};"
    "Server=DESKTOP-T8AJKL7\\SQLEXPRESS;"
    "Database=ATM_DB;"
    "Trusted_Connection=yes;"
)

def ingest_data(connection_string, query):
    try:
        connection = pyodbc.connect(connection_string)
        cursor = connection.cursor()
        cursor.execute(query)
        data = cursor.fetchall()
        columns = [column[0] for column in cursor.description]
        data_tuples = [tuple(row) for row in data]
        if len(data_tuples) > 0 and isinstance(data_tuples[0], tuple):
            df = pd.DataFrame(data_tuples, columns=columns)
            return df
        else:
            print("Data format is incorrect. Please check the query and data fetching process.")
            return None

    except pyodbc.Error as e:
        print("Error: ", e)
        return None
query = """
SELECT DISTINCT
    A.ATM_ID,
    A.ATM_NAME, A.LOCATION, A.CURRENT_MODE, A.Branch_Id, A.CITY, A.POSTAL_CODE, A.REGION, 
    AT.NAME AS ATM_TYPE_NAME, AT.DESCRIPTION AS ATM_TYPE_DESCRIPTION,
    D.Amount as DECLINED_AMOUNT, D.Transaction_Code, D.Transmission_Date_Time AS DECLINED_DATE, D.Declined_Tran_Log_Id,
    L.Notes_Dispensed_C1, L.Notes_Dispensed_C2, L.Notes_Dispensed_C3, L.Notes_Dispensed_C4,
    LT.Last_Tran_Status_Desc, 
    I.Amount AS TRANSACTION_AMOUNT, I.IS_TRAN_COMPLETED, I.Incoming_Tran_Id, I.Transmission_Date_Time, I.CurrencyCode

FROM 
    ATM A
JOIN 
    ATM_TYPE AT ON A.ATM_TYPE_ID = AT.ATM_TYPE_ID
JOIN 
    Declined_Tran_Log D ON A.ATM_ID = D.ATM_ID
JOIN 
    LAST_TRAN_STATUS L ON A.ATM_ID = L.ATM_ID
JOIN 
    LAST_TRAN_STATUS_TYPE LT ON L.Last_Tran_Status_Type_Id = LT.Last_Tran_Status_Type_Id
JOIN 
    INCOMING_TRANSACTION I ON A.ATM_ID = I.ATM_ID
    """ 

staging_area = ingest_data(connection_string, query)
if staging_area is not None:
    print("Columns for Query:" , staging_area.columns)
else:
    print("No data retrieved or data format issue for Query")


Columns for Query: Index(['ATM_ID', 'ATM_NAME', 'LOCATION', 'CURRENT_MODE', 'Branch_Id', 'CITY',
       'POSTAL_CODE', 'REGION', 'ATM_TYPE_NAME', 'ATM_TYPE_DESCRIPTION',
       'DECLINED_AMOUNT', 'Transaction_Code', 'DECLINED_DATE',
       'Declined_Tran_Log_Id', 'Notes_Dispensed_C1', 'Notes_Dispensed_C2',
       'Notes_Dispensed_C3', 'Notes_Dispensed_C4', 'Last_Tran_Status_Desc',
       'TRANSACTION_AMOUNT', 'IS_TRAN_COMPLETED', 'Incoming_Tran_Id',
       'Transmission_Date_Time', 'CurrencyCode'],
      dtype='object')


In [151]:
connection_string = (
    "Driver={SQL Server};"
    "Server=DESKTOP-T8AJKL7\\SQLEXPRESS;"
    "Database=ATM_DB;"
    "Trusted_Connection=yes;"
)
try:
    connection = pyodbc.connect(connection_string)
    print("Connected to the database")
    cursor = connection.cursor()
    cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE';")
    tables = cursor.fetchall()
    for table in tables:
        print(table.TABLE_NAME)

except pyodbc.Error as e:
    print(f"Error: {e}")

finally:
    if connection:
        connection.close()
        print("Connection closed")


Connected to the database
sysdiagrams
ATM
ATM_DEVICE
ATM_PROTOCOL
ATM_TYPE
BNA
CASH_HANDLER
CONT_RECEIPT
CURRENCY
Declined_Tran_Log
INCOMING_TRANSACTION
LAST_TRAN_STATUS
LAST_TRAN_STATUS_TYPE
PRE_PROCESSING
TRAN_SET
TRANSACTION_CODE
Connection closed


### STAGING AREA

In [3]:
def clean_data(staging_area):
    # Interpolate numerical columns
    cleaned_data = staging_area.interpolate(method='linear', axis=0)
    cleaned_data = staging_area.interpolate().ffill().bfill()
    return cleaned_data

staging_area = clean_data(staging_area)

  cleaned_data = staging_area.interpolate(method='linear', axis=0)
  cleaned_data = staging_area.interpolate().ffill().bfill()


In [4]:
import pandas as pd

def convert_column(df, column_name, to_type, date_format=None):
    if to_type == 'int64':
        df[column_name] = df[column_name].astype('int64')
    elif to_type == 'numeric':
        df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
    elif to_type == 'datetime':
        df[column_name] = pd.to_datetime(df[column_name], format=date_format, errors='coerce')
    else:
        raise ValueError("Unsupported type. Use 'int64', 'numeric', or 'datetime'.")
    return df

# Example usage:
staging_area = convert_column(staging_area, 'DECLINED_AMOUNT', 'int64')
staging_area = convert_column(staging_area, 'Transaction_Code', 'int64')                  
staging_area = convert_column(staging_area, 'TRANSACTION_AMOUNT', 'numeric')
staging_area = convert_column(staging_area, 'Notes_Dispensed_C1', 'numeric')
staging_area = convert_column(staging_area, 'Notes_Dispensed_C2', 'numeric')
staging_area = convert_column(staging_area, 'Notes_Dispensed_C3', 'numeric')
staging_area = convert_column(staging_area, 'Notes_Dispensed_C4', 'numeric')
staging_area = convert_column(staging_area, 'DECLINED_DATE', 'datetime')


In [5]:
def clean_data_null_drop(staging_area):
    cleaned_data = staging_area.dropna()
    return cleaned_data

staging_area = clean_data_null_drop(staging_area)

In [6]:
def add_new_column_based_on_condition(df, new_column_name, condition_column_name, condition_value):

    df[new_column_name] = (df[condition_column_name] == condition_value).astype(int)
    return df

staging_area = add_new_column_based_on_condition(staging_area, 'No_of_Completed_Transactions', 'IS_TRAN_COMPLETED', 'Y')


In [7]:
def mark_unique_transactions(df, column_name):
    # Count the occurrences of each value in the specified column
    value_counts = df[column_name].value_counts()
    
    # Create a mask for unique values
    unique_mask = df[column_name].map(value_counts) == 1
    
    # Update the column based on the mask
    df['No_of_Declined_Transactions'] = unique_mask.astype(int)
    
    return df

staging_area = mark_unique_transactions(staging_area, 'Declined_Tran_Log_Id')


In [8]:
def create_new_attributes(staging_area, new_att, operation, column1, column2, column3=None, column4=None):
    if operation == 'add':
        if column2 is not None and column3 is not None and column4 is not None:
            staging_area[new_att] = staging_area[column1] + staging_area[column2] + staging_area[column3] + staging_area[column4]
        else:
            raise ValueError("For addition, at least two columns are required.")
    elif operation == 'subtract':
        if column2 is not None:
            staging_area[new_att] = staging_area[column1] - staging_area[column2]
        else:
            raise ValueError("For subtraction, two columns are required.")
    elif operation == 'divide':
        if column2 is not None:
            staging_area[new_att] = staging_area[column1] / staging_area[column2]
        else:
            raise ValueError("For division, two columns are required.")
    else:
        raise ValueError("Unsupported operation. Use 'add', 'subtract', 'multiply', or 'divide'.")
    
    return staging_area

staging_area = create_new_attributes(staging_area, 'Total_Notes_Dispensed', 'add', 'Notes_Dispensed_C1', 'Notes_Dispensed_C2', 'Notes_Dispensed_C3', 'Notes_Dispensed_C4')
staging_area = create_new_attributes(staging_area, 'Approved_Transaction_Amount', 'subtract', 'DECLINED_AMOUNT', 'TRANSACTION_AMOUNT')
staging_area = create_new_attributes(staging_area, 'Ratio_of_Completed_Transaction', 'divide', 'No_of_Declined_Transactions', 'No_of_Completed_Transactions')

In [9]:
def generate_primary_keys(prefix, row_index):
    return prefix + str(row_index + 1)

staging_area['DateID']=staging_area.index.map(lambda x: generate_primary_keys('1',x))
staging_area['TransactionID']=staging_area.index.map(lambda x: generate_primary_keys('2',x))
staging_area['LocationID']=staging_area.index.map(lambda x: generate_primary_keys('3',x))
staging_area['AtmID']=staging_area.index.map(lambda x: generate_primary_keys('4',x))
staging_area['FactID']=staging_area.index.map(lambda x: generate_primary_keys('5',x))


In [10]:
def extract_date_components(df, prefix, date_column):
    df[date_column] = pd.to_datetime(df[date_column])
    df[prefix + 'Day'] = df[date_column].dt.day
    df[prefix + 'Quarter'] = df[date_column].dt.quarter
    df[prefix + 'Month'] = df[date_column].dt.month
    df[prefix + 'Week'] = df[date_column].dt.isocalendar().week
    df[prefix + 'Year'] = df[date_column].dt.year
    df[prefix + 'Hour'] = df[date_column].dt.hour
    df[prefix + 'Minute'] = df[date_column].dt.minute
    return df

staging_area = extract_date_components(staging_area, 'DD_', 'DECLINED_DATE')

In [11]:
staging_area = convert_column(staging_area, 'Approved_Transaction_Amount', 'numeric')
staging_area = convert_column(staging_area, 'Ratio_of_Completed_Transaction', 'numeric')
staging_area['Ratio_of_Completed_Transaction'] = staging_area['Ratio_of_Completed_Transaction'].astype(float)
inf_count = staging_area['Ratio_of_Completed_Transaction'].value_counts().get(float('inf'), 0)
staging_area['Ratio_of_Completed_Transaction'] = staging_area['Ratio_of_Completed_Transaction'].replace([np.inf, -np.inf], np.nan)
staging_area = staging_area.dropna(subset=['Ratio_of_Completed_Transaction'])

In [12]:
staging_area

Unnamed: 0,ATM_ID,ATM_NAME,LOCATION,CURRENT_MODE,Branch_Id,CITY,POSTAL_CODE,REGION,ATM_TYPE_NAME,ATM_TYPE_DESCRIPTION,...,LocationID,AtmID,FactID,DD_Day,DD_Quarter,DD_Month,DD_Week,DD_Year,DD_Hour,DD_Minute
9882,19,Branch CCDM 2005,HBL Al Ain Branch,8,2005,Al Ain,123,UAE,NCR SelfServ 22,NCR AVANZA,...,39883,49883,59883,1,3,7,27,2009,21,13
9883,19,Branch CCDM 2005,HBL Al Ain Branch,8,2005,Al Ain,123,UAE,NCR SelfServ 22,NCR AVANZA,...,39884,49884,59884,1,3,7,27,2009,21,13
9884,19,Branch CCDM 2005,HBL Al Ain Branch,8,2005,Al Ain,123,UAE,NCR SelfServ 22,NCR AVANZA,...,39885,49885,59885,1,3,7,27,2009,21,13
9885,19,Branch CCDM 2005,HBL Al Ain Branch,8,2005,Al Ain,123,UAE,NCR SelfServ 22,NCR AVANZA,...,39886,49886,59886,4,3,7,27,2009,11,25
9886,19,Branch CCDM 2005,HBL Al Ain Branch,8,2005,Al Ain,123,UAE,NCR SelfServ 22,NCR AVANZA,...,39887,49887,59887,4,3,7,27,2009,11,25
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
219169,87,Branch CCDM 9511,HBL Deira Branch 2nd CCDM,8,2011,Dubai,7400,021,NCR Avanza,NCR Avanza,...,3219170,4219170,5219170,15,4,12,51,2020,18,56
219176,153,INIA MALE,"AIRPORT, HULHULE ISLAND, MALDIVES",8,2050,MALE MALDIVES,7400,021,NCR SelfServ 22,NCR AVANZA,...,3219177,4219177,5219177,15,1,1,3,2020,15,16
219177,156,Kalmunai Branch,"Main Street, Kalmunai",8,2098,Colombo,7400,740,NCR Avanza,NCR Avanza,...,3219178,4219178,5219178,14,1,1,3,2020,11,11
219178,156,Kalmunai Branch,"Main Street, Kalmunai",8,2098,Colombo,7400,740,NCR Avanza,NCR Avanza,...,3219179,4219179,5219179,14,1,1,3,2020,11,12


### CREATION OF STAR SCHEMA TABLES

In [174]:

connection_string = "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};SERVER=DESKTOP-T8AJKL7\\SQLEXPRESS;DATABASE=ATM_SS;Trusted_Connection=yes;"

engine = create_engine(connection_string)

schema_definition = {
    'DimAtm': {
        'AtmID': 'Integer, primary_key=True, autoincrement=True, unique=True',
        'ATM_NAME': 'String(255)',
        'CURRENT_MODE': 'String(255)',
        'Branch_Id': 'Integer',
        'ATM_TYPE_NAME': 'String(255)',
        'ATM_TYPE_DESCRIPTION': 'String(255)'
    },
    'DimLocation': {
        'LocationID': 'Integer, primary_key=True, autoincrement=True, unique=True',
        'LOCATION': 'String(255)',
        'CITY': 'String(255)',
        'REGION': 'String(255)',
        'POSTAL_CODE': 'Integer'
    },
    'DimTransaction': {
        'TransactionID': 'Integer, primary_key=True, autoincrement=True, unique=True',
        'Transaction_Code': 'String(255)',
        'Last_Tran_Status_Desc': 'String(255)'
    },
    'DimDate': {
        'DateID': 'Integer, primary_key=True, autoincrement=True, unique=True',
        'DD_Day': 'Integer',
        'DD_Quarter': 'Integer',
        'DD_Month': 'String(255)',
        'DD_Week': 'Integer',
        'DD_Year': 'Integer',
        'DD_Hour': 'Integer',
        'DD_Minute': 'Integer'
    },
    'Fact_ATM': {
        'FactID': 'Integer, primary_key=True, autoincrement=True, unique=True',
        'ATM_ID': 'Integer, ForeignKey(DimAtm.AtmID)',
        'Date_ID': 'Integer, ForeignKey(DimDate.DateID)',
        'Location_ID': 'Integer, ForeignKey(DimLocation.LocationID)',
        'Transaction_ID': 'Integer, ForeignKey(DimTransaction.TransactionID)',
        'DECLINED_AMOUNT': 'Integer',
        'No_of_Completed_Transactions': 'Integer',
        'No_of_Declined_Transactions': 'Integer',
        'Total_Notes_Dispensed': 'Integer',
        'Ratio_of_Completed_Transaction': 'Float',
        'Approved_Transaction_Amount': 'Integer'
    }
}

def create_star_schema_tables(engine, schema_definition):
    metadata = MetaData()
    for table_name, columns in schema_definition.items():
        columns_list = []
        for col_name, col_type in columns.items():
            if col_type.startswith('String'):
                max_length = int(col_type.split('(')[1].split(')')[0])
                columns_list.append(Column(col_name, String(max_length)))
            elif col_type.startswith('Integer'):
                if 'primary_key=True' in col_type:
                    columns_list.append(Column(col_name, Integer, primary_key=True, autoincrement=True, unique=True))
                else:
                    columns_list.append(Column(col_name, Integer))
            elif 'ForeignKey' in col_type:
                referenced_table, referenced_column = col_type.split('(')[1].split(')')[0].split('.')
                columns_list.append(Column(col_name, Integer, ForeignKey(f'{referenced_table}.{referenced_column}')))
            elif col_type.startswith('Float'):
                columns_list.append(Column(col_name, Float))
        Table(table_name, metadata, *columns_list)
    metadata.create_all(engine)

create_star_schema_tables(engine, schema_definition)

### MAPPING TO DIMENSION TABLES

In [13]:
from sqlalchemy import create_engine

def map_to_dimtable(staging_area, engine, dimension_mappings):
    with engine.connect() as connection:
        for table_name, columns in dimension_mappings.items():
            dimension_data = staging_area[columns].copy()
            dimension_data.to_sql(table_name, connection, if_exists='replace', index=False)

dimension_mappings = {
    'Fact_ATM': ['FactID', 'AtmID', 'DateID', 'LocationID', 'TransactionID', 'DECLINED_AMOUNT', 'No_of_Completed_Transactions', 'No_of_Declined_Transactions', 'Total_Notes_Dispensed', 'Ratio_of_Completed_Transaction', 'Approved_Transaction_Amount'],    
    'DimAtm': ['AtmID', 'ATM_NAME', 'CURRENT_MODE', 'Branch_Id', 'ATM_TYPE_NAME', 'ATM_TYPE_DESCRIPTION'],
    'DimLocation': ['LocationID', 'LOCATION', 'CITY', 'REGION', 'POSTAL_CODE'],
    'DimTransaction': ['TransactionID', 'Transaction_Code', 'Last_Tran_Status_Desc'],
    'DimDate': ['DateID', 'DD_Day', 'DD_Quarter', 'DD_Month', 'DD_Week', 'DD_Year', 'DD_Hour', 'DD_Minute'],
    
}

connection_string = "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};SERVER=DESKTOP-T8AJKL7\\SQLEXPRESS;DATABASE=ATM_SS;Trusted_Connection=yes;"
engine = create_engine(connection_string)

map_to_dimtable(staging_area, engine, dimension_mappings)


### MAPPING TO FACT TABLE

In [None]:
import pandas as pd
from sqlalchemy import create_engine

def map_to_facttable(staging_area, engine, fact_mappings, dimension_mappings):
    with engine.connect() as connection:
        for table_name, columns in fact_mappings.items():
            fact_data = staging_area[columns].copy()
            
            # Convert key columns to the same type
            for col in ['AtmID', 'LocationID', 'TransactionID', 'DateID']:
                if col in fact_data.columns:
                    fact_data[col] = fact_data[col].astype(str)
            
            # Join with dimension tables to get foreign keys
            for dim_table, dim_columns in dimension_mappings.items():
                for dim_col in dim_columns:
                    if dim_col in fact_data.columns:
                        dim_data = pd.read_sql(f"SELECT {dim_col} FROM {dim_table}", connection)
                        dim_data[dim_col] = dim_data[dim_col].astype(str)  # Ensure type consistency
                        fact_data = fact_data.merge(dim_data, on=dim_col, how='left')
            
            # Writing data to SQL, including primary key column
            fact_data.to_sql(table_name, connection, if_exists='replace', index=False)

# Example data structure for dimension mappings and fact mappings
dimension_mappings = {
    'DimAtm': ['AtmID'],
    'DimLocation': ['LocationID'],
    'DimTransaction': ['TransactionID'],
    'DimDate': ['DateID'],
}

fact_mappings = {
    'Fact_ATM': ['FactID', 'AtmID', 'DateID', 'LocationID', 'TransactionID', 'DECLINED_AMOUNT', 'No_of_Completed_Transactions', 'No_of_Declined_Transactions', 'Total_Notes_Dispensed', 'Ratio_of_Completed_Transaction', 'Approved_Transaction_Amount']
}

# Connection string for SQL Server
connection_string = "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};SERVER=DESKTOP-T8AJKL7\\SQLEXPRESS;DATABASE=ATM_SS;Trusted_Connection=yes;"
engine = create_engine(connection_string)

# Assuming staging_area is a pandas DataFrame containing your data
# staging_area = pd.read_csv('path_to_your_staging_data.csv')  # Example of loading staging data

# Run the function to map data to the fact table
map_to_facttable(staging_area, engine, fact_mappings, dimension_mappings)


### SQL QUERIES

In [261]:
# Define connection string and query
connection_string = "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};SERVER=DESKTOP-T8AJKL7\\SQLEXPRESS;DATABASE=ATM_SS;Trusted_Connection=yes;"
query1 = """
SELECT TOP 2 Last_Tran_Status_Desc, COUNT(*) AS Decline_Count
FROM Fact_ATM AS fa
JOIN DimTransaction AS dt ON fa.TransactionID = dt.TransactionID
JOIN DimDate AS dd ON fa.DateID = dd.DateID
JOIN DimLocation AS dl ON fa.LocationID = dl.LocationID
WHERE dd.DD_Year = 2022 AND dl.LOCATION = 'Sohar Branch'
GROUP BY Last_Tran_Status_Desc
ORDER BY Decline_Count DESC;"""

query2 = """
SELECT SUM(Total_Notes_Dispensed) AS Sum_of_Notes_Dispensed
FROM Fact_ATM AS fa
JOIN DimDate AS dd ON fa.DateID = dd.DateID
JOIN DimLocation AS dl ON fa.LocationID = dl.LocationID
WHERE dd.DD_Year = 2021 AND dd.DD_Month = '7' AND dl.CITY = 'Colombo';"""

query3 = """
SELECT TOP 5 dl.LOCATION, COUNT(*) AS Transaction_Count_wrt_Location
FROM Fact_ATM AS fa
JOIN DimLocation AS dl ON fa.LocationID = dl.LocationID
GROUP BY dl.LOCATION
ORDER BY Transaction_Count_wrt_Location DESC"""

query4 = """
SELECT TOP 10 da.ATM_NAME, (CAST(No_of_Declined_Transactions AS FLOAT) / (No_of_Completed_Transactions + No_of_Declined_Transactions)) AS Decline_Rate
FROM Fact_ATM AS fa
JOIN DimAtm AS da ON fa.AtmID = da.AtmID
ORDER BY Decline_Rate DESC"""

query5 = """
SELECT da.ATM_NAME, SUM(fa.Approved_Transaction_Amount) AS Total_Amount_Dispensed
FROM Fact_ATM AS fa
JOIN DimAtm AS da ON fa.ATMID = da.AtmID
JOIN DimDate AS dd ON fa.DateID = dd.DateID
WHERE dd.DD_Year = 2019 AND dd.DD_Month = '3'
GROUP BY da.ATM_NAME;"""

query6 = """
SELECT da.ATM_TYPE_NAME, AVG(CAST(fa.No_of_Declined_Transactions AS FLOAT) / (fa.No_of_Completed_Transactions + fa.No_of_Declined_Transactions)) AS Avg_Decline_Rate
FROM Fact_ATM AS fa
JOIN DimAtm AS da ON fa.ATMID = da.AtmID
GROUP BY da.ATM_TYPE_NAME;"""

query7 = """
SELECT TOP 5 dd.DD_Hour, COUNT(*) AS Transaction_Count
FROM Fact_ATM AS fa
JOIN DimDate AS dd ON fa.DateID = dd.DateID
JOIN DimLocation AS dl ON fa.LocationID = dl.LocationID
WHERE dd.DD_Year = 2023 AND dl.CITY = 'Al Ain'
GROUP BY dd.DD_Hour
ORDER BY Transaction_Count DESC"""


# Create SQLAlchemy engine
engine = create_engine(connection_string)



In [249]:
# Execute query and retrieve data
with engine.connect() as connection:
    result = connection.execute(text(query1))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print(" QUERY1: Top Decline Status for ATM transaction decline in 2022 in Sohar Branch")
df

 QUERY1: Top Decline Status for ATM transaction decline in 2022 in Sohar Branch


Unnamed: 0,Last_Tran_Status_Desc,Decline_Count
0,Good termination sent ...,1237


In [251]:
with engine.connect() as connection:
    result = connection.execute(text(query2))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print("QUERY2: Total number of notes dispensed in July 2021 in Colombo")
df

QUERY2: Total number of notes dispensed in July 2021 in Colombo


Unnamed: 0,Sum_of_Notes_Dispensed
0,34


In [260]:
with engine.connect() as connection:
    result = connection.execute(text(query3))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print("QUERY3: Location with the maximum number of withdrawals")
df

QUERY3: Location with the maximum number of withdrawals


Unnamed: 0,LOCATION,Transaction_Count_wrt_Location
0,"New West Zone SuperMarket , Satwa - Dubai",25026
1,Sohar Branch,20044
2,HBL Riffa Manama,17584
3,HBL Al Ain Branch,10230
4,Salalah Branch,6394


In [262]:
with engine.connect() as connection:
    result = connection.execute(text(query4))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print("QUERY4: ATM with the maximum decline rate for transactions")
df

QUERY4: ATM with the maximum decline rate for transactions


Unnamed: 0,ATM_NAME,Decline_Rate
0,Branch CCDM 2101,0.5
1,Branch CCDM 9502,0.5
2,MALE BRANCH MALDIVES,0.5
3,CCDM Muttrah Branch,0.5
4,Chittagong Branch,0.5
5,Sohar Branch,0.5
6,2009,0.5
7,2009,0.5
8,CCDM Walja Branch,0.5
9,CCDM Muttrah Branch,0.5


In [256]:
with engine.connect() as connection:
    result = connection.execute(text(query5))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print("QUERY5:Total amount of money dispensed by each ATM in March 2024")
df

QUERY5:Total amount of money dispensed by each ATM in March 2024


Unnamed: 0,ATM_NAME,Total_Amount_Dispensed
0,0002-Central Branch CCDM,820
1,2006,53800
2,2009,368070
3,2104,1151440
4,Al Khuwair Branch,-40
5,Branch CCDM 2005,868000
6,Branch CCDM 2101,847900
7,Branch CCDM 2107,12600
8,Branch CCDM 9502,785600
9,Branch CCDM 9512,-499837800


In [257]:
with engine.connect() as connection:
    result = connection.execute(text(query6))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print("QUERY6: Retrieve the average transaction decline rate for each type of ATM")
df

QUERY6: Retrieve the average transaction decline rate for each type of ATM


Unnamed: 0,ATM_TYPE_NAME,Avg_Decline_Rate
0,NCR SelfServ 6626,0.424442
1,NCR Avanza,0.169003
2,Personas 86,0.2235
3,NCR SelfServ 22,0.109761
4,Personas 70,0.0


In [258]:
with engine.connect() as connection:
    result = connection.execute(text(query7))  # Use text() to create a SQLAlchemy text object
    data = result.fetchall()
    
# Convert data to DataFrame
df = pd.DataFrame(data, columns=result.keys())
print("QUERY7: Identify the peak hours of ATM transactions in Karachi in 2023")
df

QUERY7: Identify the peak hours of ATM transactions in Karachi in 2023


Unnamed: 0,DD_Hour,Transaction_Count
0,13,50
1,11,42
2,21,42
3,22,42
4,10,38


### FACT TABLE SNAPSHOT

In [271]:
connection_string = (
    "Driver={SQL Server};"
    "Server=DESKTOP-T8AJKL7\\SQLEXPRESS;"
    "Database=ATM_SS;"
    "Trusted_Connection=yes;"
)
query_Fact = """
SELECT * from Data_Snapshot
"""
Fact_Snapshot = ingest_data(connection_string, query_Fact)


Columns for Query: Index(['FactID', 'ATM_NAME', 'CURRENT_MODE', 'Branch_Id', 'ATM_TYPE_NAME',
       'ATM_TYPE_DESCRIPTION', 'DD_Day', 'DD_Quarter', 'DD_Month', 'DD_Week',
       'DD_Year', 'DD_Hour', 'DD_Minute', 'LOCATION', 'CITY', 'REGION',
       'POSTAL_CODE', 'Transaction_Code', 'Last_Tran_Status_Desc',
       'DECLINED_AMOUNT', 'No_of_Completed_Transactions',
       'No_of_Declined_Transactions', 'Total_Notes_Dispensed',
       'Ratio_of_Completed_Transaction', 'Approved_Transaction_Amount'],
      dtype='object')


In [272]:
Fact_Snapshot

Unnamed: 0,FactID,ATM_NAME,CURRENT_MODE,Branch_Id,ATM_TYPE_NAME,ATM_TYPE_DESCRIPTION,DD_Day,DD_Quarter,DD_Month,DD_Week,...,REGION,POSTAL_CODE,Transaction_Code,Last_Tran_Status_Desc,DECLINED_AMOUNT,No_of_Completed_Transactions,No_of_Declined_Transactions,Total_Notes_Dispensed,Ratio_of_Completed_Transaction,Approved_Transaction_Amount
0,5100001,2009,8,2006,Personas 86,,28,3,9,39,...,021,7400,1,Good termination sent ...,500,1,1,0,1.0,90
1,5100002,CCDM Muttrah Branch,8,2020,NCR SelfServ 6626,TTW Cash Dispensing ATM,3,3,8,31,...,001,114 Mutrah,1,Good termination sent ...,130,1,1,0,1.0,-70
2,5100003,Sohar Branch,8,2037,NCR Avanza,NCR Avanza,23,1,3,12,...,MN,311 Sohar,1,Good termination sent ...,50,1,1,0,1.0,45
3,5100004,2104,8,2104,NCR Avanza,NCR Avanza,28,4,11,48,...,02,7400,1,Suspected ...,30,1,0,0,0.0,20
4,5100006,CCDM Al Khuwair_2,8,2037,NCR SelfServ 6626,TTW Cash Dispensing ATM,12,3,7,28,...,001,112 Ruwi,1,Suspected ...,152,1,0,0,0.0,137
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
134543,599995,Offsite ATM 9004,8,2026,NCR SelfServ 22,NCR AVANZA,31,3,7,31,...,N A,N A,1,Suspected ...,500,1,0,0,0.0,-2400
134544,599996,0002-Central Branch CCDM,8,2083,NCR SelfServ 6626,TTW Cash Dispensing ATM,26,4,10,44,...,001,112 Ruwi,1,Good termination sent ...,0,1,0,0,0.0,-10
134545,599997,2104,8,2104,NCR Avanza,NCR Avanza,27,3,7,30,...,02,7400,1,Suspected ...,420,1,0,0,0.0,410
134546,599998,Sohar Branch,8,2037,NCR Avanza,NCR Avanza,30,4,10,44,...,MN,311 Sohar,1,Good termination sent ...,10,1,1,0,1.0,5


### MASTER/CONTROLLER FUNCTION

In [None]:
import pyodbc
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, ForeignKey
import numpy as np

def update_data_mart(connection_string_oltp, query, engine_data_mart, dimension_mappings, fact_mappings):
    # Step 1: Extract data from OLTP system
    staging_area = ingest_data(connection_string_oltp, query)
    
    if staging_area is not None:
        staging_area = clean_data(staging_area)
        staging_area = convert_column(staging_area, 'DECLINED_AMOUNT', 'int64')
        staging_area = convert_column(staging_area, 'TRANSACTION_AMOUNT', 'numeric')
        staging_area = convert_column(staging_area, 'Notes_Dispensed_C1', 'numeric')
        staging_area = convert_column(staging_area, 'Notes_Dispensed_C2', 'numeric')
        staging_area = convert_column(staging_area, 'Notes_Dispensed_C3', 'numeric')
        staging_area = convert_column(staging_area, 'Notes_Dispensed_C4', 'numeric')
        staging_area = convert_column(staging_area, 'DECLINED_DATE', 'datetime')
        staging_area = clean_data_null_drop(staging_area)
        staging_area = add_new_column_based_on_condition(staging_area, 'No_of_Completed_Transactions', 'IS_TRAN_COMPLETED', 'Y')
        staging_area = mark_unique_transactions(staging_area, 'Declined_Tran_Log_Id')
        staging_area = create_new_attributes(staging_area, 'Total_Notes_Dispensed', 'add', 'Notes_Dispensed_C1', 'Notes_Dispensed_C2', 'Notes_Dispensed_C3', 'Notes_Dispensed_C4')
        staging_area = create_new_attributes(staging_area, 'Approved_Transaction_Amount', 'subtract', 'DECLINED_AMOUNT', 'TRANSACTION_AMOUNT')
        staging_area = create_new_attributes(staging_area, 'Ratio_of_Completed_Transaction', 'divide', 'No_of_Declined_Transactions', 'No_of_Completed_Transactions')
        staging_area['DateID'] = staging_area.index.map(lambda x: generate_primary_keys('1',x))
        staging_area['TransactionID'] = staging_area.index.map(lambda x: generate_primary_keys('2',x))
        staging_area['LocationID'] = staging_area.index.map(lambda x: generate_primary_keys('3',x))
        staging_area['AtmID'] = staging_area.index.map(lambda x: generate_primary_keys('4',x))
        staging_area['FactID'] = staging_area.index.map(lambda x: generate_primary_keys('5',x))
        staging_area = extract_date_components(staging_area, 'DD_', 'DECLINED_DATE')
        staging_area = convert_column(staging_area, 'Approved_Transaction_Amount', 'numeric')
        staging_area = convert_column(staging_area, 'Ratio_of_Completed_Transaction', 'numeric')
        staging_area['Ratio_of_Completed_Transaction'] = staging_area['Ratio_of_Completed_Transaction'].astype(float)
        inf_count = staging_area['Ratio_of_Completed_Transaction'].value_counts().get(float('inf'), 0)
        staging_area['Ratio_of_Completed_Transaction'] = staging_area['Ratio_of_Completed_Transaction'].replace([np.inf, -np.inf], np.nan)
        staging_area = staging_area.dropna(subset=['Ratio_of_Completed_Transaction'])

        # Step 4: Load data into the data mart
        map_to_dimtable(staging_area, engine_data_mart, dimension_mappings)
        map_to_facttable(staging_area, engine_data_mart, fact_mappings, dimension_mappings)

# Define the OLTP connection string and query
connection_string_oltp = (
    "Driver={SQL Server};"
    "Server=DESKTOP-T8AJKL7\\SQLEXPRESS;"
    "Database=ATM_DB;"
    "Trusted_Connection=yes;"
)


# Define the data mart engine and schema mappings
data_mart_engine = create_engine(
    "mssql+pyodbc:///?odbc_connect=DRIVER={ODBC Driver 17 for SQL Server};"
    "SERVER=DESKTOP-T8AJKL7\\SQLEXPRESS;"
    "DATABASE=ATM_SS;"
    "Trusted_Connection=yes;"
)


# Define the schema mappings for dimensions and facts
dimension_mappings = {
    'DimAtm': ['AtmID', 'ATM_NAME', 'CURRENT_MODE', 'Branch_Id', 'ATM_TYPE_NAME', 'ATM_TYPE_DESCRIPTION'],
    'DimLocation': ['LocationID', 'LOCATION', 'CITY', 'REGION', 'POSTAL_CODE'],
    'DimTransaction': ['TransactionID', 'Transaction_Code', 'Last_Tran_Status_Desc'],
    'DimDate': ['DateID', 'DD_Day', 'DD_Quarter', 'DD_Month', 'DD_Week', 'DD_Year', 'DD_Hour', 'DD_Minute']
}
fact_mappings = {
    'Fact_ATM': ['FactID', 'ATM_ID', 'Date_ID', 'Location_ID', 'Transaction_ID', 'DECLINED_AMOUNT', 'No_of_Completed_Transactions', 'No_of_Declined_Transactions', 'Total_Notes_Dispensed', 'Ratio_of_Completed_Transaction', 'Approved_Transaction_Amount']
}
# Update the data mart
update_data_mart(connection_string_oltp, query, data_mart_engine, dimension_mappings, fact_mappings)


                            

### POWERBI DASHBOARD

In [16]:
from IPython.display import IFrame

# Power BI report URL
url = "https://app.powerbi.com/view?r=eyJrIjoiMTE2YWQ4ZDItOTg2MS00ODM2LTg5NTItODdlNjU5ZjEwZTg4IiwidCI6ImZlZTNiOTE2LTAxYzEtNDk4Ny1hNjQ2LWUxOTM0MzJiOWVhYSIsImMiOjl9"

# Embed the report in an iframe
IFrame(url, width=1000, height=600)
