In [16]:
import numpy as np
import pandas as pd
import time
import random
import json
from pandas import json_normalize

In [17]:
df1 = pd.read_csv("account_status_changes_data.csv")
df2 = pd.read_csv("aggregated_time_series_data.csv")
df3 = pd.read_csv("alert_cases_data.csv", encoding='latin-1')
df4 = pd.read_csv("block_lists_data_UID_fixed.csv")
df5 = pd.read_csv("external_data_feeds_data.csv")
df6 = pd.read_csv("merchant_risk_data.csv")
df7 = pd.read_csv("ml_model_logs_data.csv")
df8 = pd.read_csv("realtime_features_data.csv")       
df9 = pd.read_csv("transactions_data.csv")
df10 = pd.read_csv("User_Profiles_100k.csv")

# 1 Top Fraud Types & Loss Reduction Target

In [3]:
df_merged = df9.merge(df3, on="TransactionID", how="left") \
               .merge(df7, on="TransactionID", how="left")


In [4]:
df_merged.isnull().sum()

TransactionID                 0
UserID                        0
Transaction_Amount            0
Transaction_Timestamp         0
CaseID                        0
AnalystID                     0
Case_Status                   0
Resolution_Code           29960
Open_Timestamp                0
AnalystNotes                  0
LogID                         0
Model_Version                 0
Prediction_Score              0
Decision_Engine_Result        0
Final_Outcome                 0
FeatureVector                 0
dtype: int64

In [5]:
df_merged['Resolution_Code'] = df_merged['Resolution_Code'].fillna('INSUFFICIENT_DATA')

In [6]:
df_merged.shape

(100000, 16)

In [7]:
business_goal_df = df_merged[[
    "TransactionID",
    "UserID",
    "Transaction_Amount",
    "Transaction_Timestamp",
    "CaseID",
    "AnalystID",
    "Case_Status",
    "Resolution_Code",
    "Final_Outcome",         
    "Prediction_Score"       
]]

In [8]:
business_goal_df['Final_Outcome'].unique()

array(['CLEARED', 'FRAUD'], dtype=object)

In [9]:
# Overall Fraud vs Cleared 
overall_summary = business_goal_df.groupby('Final_Outcome').agg(Transaction_Count=('TransactionID', 'count'),Total_Loss=('Transaction_Amount', 'sum')
).reset_index()
overall_summary

Unnamed: 0,Final_Outcome,Transaction_Count,Total_Loss
0,CLEARED,95557,38685586.21
1,FRAUD,4443,1814004.41


In [10]:
# FN_FRAUD ‚Äì Confirmed frauds (high priority)
# FN_NOT_FRAUD  (model ne galat flag kiya)
# INSUFFICIENT_DATA ‚Äì Jisme decision km the
# W_CLOSED ‚Äì Manually closed cases

# Fraud Resolution Breakdow 
fraud_breakdown = business_goal_df[business_goal_df['Final_Outcome'] == 'FRAUD'] \
    .groupby('Resolution_Code').agg(
        Transaction_Count=('TransactionID', 'count'),
        Total_Loss=('Transaction_Amount', 'sum')
    ).reset_index()
fraud_breakdown

Unnamed: 0,Resolution_Code,Transaction_Count,Total_Loss
0,FN_FRAUD,461,190093.93
1,FN_NOT_FRAUD,2336,918650.89
2,INSUFFICIENT_DATA,1481,636610.51
3,W_CLOSED,165,68649.08


# 2 Maximum Acceptable Latency for Fraud Decision

In [11]:
#  sirf first 5 transactions ka latency measure
for idx, row in df9.head(5).iterrows():
    txn_id = row["TransactionID"]
    
    start_time = time.time()  
    time.sleep(0.1)  # simulate 100 ms processing time
    
    end_time = time.time()  
    
    latency_ms = (end_time - start_time) * 1000
    print(f"TransactionID: {txn_id}, Latency: {latency_ms:.2f} ms")
    

TransactionID: TID_000001, Latency: 99.74 ms
TransactionID: TID_000002, Latency: 100.12 ms
TransactionID: TID_000003, Latency: 112.09 ms
TransactionID: TID_000004, Latency: 109.83 ms
TransactionID: TID_000005, Latency: 108.01 ms


# 3 Fraud detect hone ke baad system kya action lega, aur wo banking/payment system me kaise apply

In [12]:
# Randme generate krega df 9 se df4 me
df4["UserID"] = df9["UserID"].sample(len(df4), replace=True).values

In [13]:
df_merged3 = (
    df9.merge(df3, on="TransactionID", how="left")
       .merge(df7, on="TransactionID", how="left")
       .merge(df4, on="UserID", how="left")
)


In [14]:
df_merged3.isnull().sum()

TransactionID                  0
UserID                         0
Transaction_Amount             0
Transaction_Timestamp          0
CaseID                         0
AnalystID                      0
Case_Status                    0
Resolution_Code           228502
Open_Timestamp                 0
AnalystNotes                   0
LogID                          0
Model_Version                  0
Prediction_Score               0
Decision_Engine_Result         0
Final_Outcome                  0
FeatureVector                  0
Blocked_ID                   505
Block_Type                   505
Reason_Code                  505
Expiration_Date              505
Source_System                505
dtype: int64

In [15]:
df_merged3.fillna({
    "Resolution_Code": "Unknown",
    "Blocked_ID": "NA",
    "Block_Type": "NA",
    "Reason_Code": "NA",
    "Expiration_Date": "9999-12-31",
    "Source_System": "NA"
}, inplace=True)


In [16]:
df_merged3.shape

(766848, 21)

In [17]:
# Select only required columns
df_action = df_merged3[[
    'TransactionID', 'UserID', 'Transaction_Amount',
    'Prediction_Score', 'Decision_Engine_Result',
    'Final_Outcome', 'Block_Type', 'Reason_Code',
    'Case_Status', 'Resolution_Code'
]]

# Define system actions based on model output
df_action["Action_Flag"] = df_action["Prediction_Score"].apply(
    lambda x: "Hard Block" if x > 0.9 else
              "Soft Challenge" if x > 0.7 else
              "Refer to Analyst" if x > 0.5 else
              "Allow"
)

# Final output summary
df_action_summary = df_action[[
    'TransactionID', 'UserID', 'Transaction_Amount',
    'Prediction_Score', 'Action_Flag',
    'Block_Type', 'Reason_Code', 'Case_Status'
]]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_action["Action_Flag"] = df_action["Prediction_Score"].apply(


In [18]:
df_action_summary.head(2)

Unnamed: 0,TransactionID,UserID,Transaction_Amount,Prediction_Score,Action_Flag,Block_Type,Reason_Code,Case_Status
0,TID_000001,UID_01855,262.53,0.1601,Allow,USER_ID,SPOOFING,CLOSED
1,TID_000001,UID_01855,262.53,0.1601,Allow,USER_ID,SPOOFING,CLOSED


# 4 Tumhara fraud detection system kin important rules aur laws (jaise AML, KYC, GDPR, PCI-DSS) ka palan karega?‚Äù

In [19]:
# Randme generate krega df 9 se df4 me
df4["UserID"] = df1["UserID"].sample(len(df4), replace=True).values

In [20]:
df_merged4 = (
    df9.merge(df1, on="UserID", how="left")
       .merge(df5, on="TransactionID", how="left")
       .merge(df4, on="UserID", how="left")
)

In [21]:
df_merged4.head(1)

Unnamed: 0,TransactionID,UserID,Transaction_Amount,Transaction_Timestamp,ChangeID,Change_Timestamp,Old_Status,New_Status,Change_Reason,Audit_User,FeedID,IP_Risk_Score,Email_Domain_Age,VPN_Flag,Blocked_ID,Block_Type,Reason_Code,Expiration_Date,Source_System
0,TID_000001,UID_01855,262.53,2025-10-15 08:22:47,CHID_004171,2024-12-07 16:21:55,PWD_RESET,CLOSED,FRAUD_BLOCK,Fraud_Analyst,FID_000001,3.5,505,True,CARD_003836,CARD_HASH,HARD_FRAUD,2/6/2028,External Feed


In [22]:
# df_merged4.isnull().sum()

In [23]:
df_merged4.fillna({
    "ChangeID": "NO_CHANGE",
    "Change_Timestamp": "N/A",
    "Old_Status": "Unknown",
    "New_Status": "No Update",
    "Change_Reason": "Not Provided",
    "Audit_User": "System",
    "FeedID": "NA",
    "IP_Risk_Score": 0,
    "Email_Domain_Age": 999,
    "VPN_Flag": False,
    "Blocked_ID": "None",
    "Block_Type": "Not Blocked",
    "Reason_Code": "None",
    "Expiration_Date": "N/A",
    "Source_System": "Internal"
}, inplace=True)


In [None]:
df_merged4["AML_Flag"] = df_merged4.apply(
    lambda x: "Potential AML" 
    if (x["Transaction_Amount"] > 100000 or "SANCTION" in str(x["Reason_Code"]).upper()) 
    else "Clear", 
    axis=1
)

df_merged4["KYC_Flag"] = df_merged4.apply(
    lambda x: "KYC Review Needed" 
    if str(x["New_Status"]).upper() == "UNVERIFIED" 
    else "KYC OK", 
    axis=1
)

df_merged4["PCI_Flag"] = df_merged4.apply(
    lambda x: "High Risk Device" 
    if (x.get("VPN_Flag") == True or x.get("IP_Risk_Score", 0) > 80) 
    else "Secure", 
    axis=1
)

#  Step 3: GDPR Audit Log Check
df_merged4["GDPR_Flag"] = df_merged4.apply(
    lambda x: "Audit Required" 
    if str(x.get("Audit_User")).lower() in ["system", "unknown", "na"] 
    else "Compliant", 
    axis=1
)

# Step 4: Final summarized output
df_regulatory_summary = df_merged4[[
    "TransactionID", "UserID", "Transaction_Amount", 
    "Reason_Code", "Block_Type", "Change_Reason", "Audit_User",
    "IP_Risk_Score", "VPN_Flag",
    "AML_Flag", "KYC_Flag", "PCI_Flag", "GDPR_Flag"
]]

In [None]:
df_regulatory_summary.head(2)

# 5. 10 tables me data kha se aa rha he or kitna value  txn pr second he ?   source system or txn pr second fill krna he 

In [None]:
tables = [
    'account_status_changes_data',
    'alert_cases_data',
    'block_lists_data',
    'external_data_feeds_data',
    'merchant_risk_data',
    'ml_model_logs_data',
    'realtime_features_data',
    'transactions_data',
    'User_Profiles_Sample_Data'
]

source_systems = [
    'Core Banking / Account Management',   # df1
    'CRM / Case Management',               # df3
    'Compliance / Sanction System',        # df4
    'External Feeds / Risk Feeds',         # df5
    'Merchant Database / Analytics',       # df6
    'ML Platform / Model Logs',            # df7
    'Streaming Logs / Real-time Platform', # df8
    'Core Banking / Payment System',       # df9
    'User Profile / KYC Database'          # df10
]

ingestion_volume = [
    100,  # df1
    10,   # df3
    5,    # df4
    20,   # df5
    30,   # df6
    15,   # df7
    200,  # df8
    500,  # df9
    10    # df10
]

metadata = pd.DataFrame({
    'Table_Name': tables,
    'Source_System': source_systems,
    'Ingestion_Volume_txn_per_sec': ingestion_volume
})

metadata

# 6. real-time features kaise nikloge aur kis tool (Kafka, Spark, Flink) se fast calculate karoge taaki delay na ho.

In [None]:
from kafka import KafkaProducer, KafkaConsumer
from datetime import datetime, timedelta
from collections import defaultdict


#  Simple in-memory feature store (Redis ke jagah temporary memory)
user_txn_history = defaultdict(list)

#  Kafka Producer ‚Äî df9 ke records bhejega
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

#  Tumhare df9 ke first 5 records bhejenge (testing ke liye)
for _, row in df9.head(5).iterrows():
    txn = {
        "TransactionID": str(row["TransactionID"]),
        "UserID": str(row["UserID"]),   # String me rakha, int() nahi lagaya
        "Transaction_Amount": float(row["Transaction_Amount"]),
        "Transaction_Timestamp": str(row["Transaction_Timestamp"]),
    }

    producer.send('transactions_stream', txn)
    print("Produced (real data):", txn)
    time.sleep(1)

# Kafka Consumer ‚Äî messages receive karke live features compute karega
consumer = KafkaConsumer(
    'transactions_stream',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

for i, message in enumerate(consumer):
    txn = message.value
    user_id = txn["UserID"]
    amount = txn["Transaction_Amount"]

    # Timestamp ko safe tarike se parse karo
    try:
        timestamp = datetime.fromisoformat(str(txn["Transaction_Timestamp"]))
    except ValueError:
        timestamp = datetime.utcnow()  # agar format issue ho to abhi ka time le lo

    # üß© Har user ke recent transactions store karo (last 1 ghante ke)
    user_txn_history[user_id].append((timestamp, amount))
    cutoff = timestamp - timedelta(hours=1)
    user_txn_history[user_id] = [(t, a) for (t, a) in user_txn_history[user_id] if t > cutoff]

    #  Real-time feature calculation
    last_5m = [a for (t, a) in user_txn_history[user_id] if t > timestamp - timedelta(minutes=5)]
    Txn_Count_5m = len(last_5m)
    Avg_Amt_1h = (
        sum(a for (_, a) in user_txn_history[user_id]) / len(user_txn_history[user_id])
        if user_txn_history[user_id] else 0
    )
    Velocity_Score = round(Txn_Count_5m * Avg_Amt_1h, 2)

    print(f"User {user_id} ‚Üí Txn_Count_5m: {Txn_Count_5m}, Avg_Amt_1h: {Avg_Amt_1h:.2f}, Velocity_Score: {Velocity_Score}")

    if i >= 4:  # Sirf 5 records process karne ke baad stop
        break


# 7. JSON format me jo data hai uska structure kya hai aur uske andar ke values ko kaise extract karke numeric ML features banaoge

In [18]:
records = []

for x in df3["AnalystNotes"]:
    try:
        d = json.loads(x)
        records.append(d if isinstance(d, dict) else {})
    except:
        records.append({}) 

df3 = pd.concat([df3, pd.DataFrame(records)], axis=1)

In [19]:
df3 = df3.drop(columns=['AnalystNotes'])

In [20]:
df3.tail(1)

Unnamed: 0,CaseID,TransactionID,AnalystID,Case_Status,Resolution_Code,Open_Timestamp,Summary,Initial_Assessment
99999,CS_100000,TID_100000,ANL_010,PENDING_REVIEW,,2025-10-22 09:33:02,,Automatic trigger on velocity breach.


In [22]:
df3['Initial_Assessment'].unique()

array([nan, 'Automatic trigger on velocity breach.'], dtype=object)

In [21]:
df3['Summary'].unique()

array(['User verified transaction as legitimate.',
       'Case closed due to inactivity.', nan,
       'Confirmed fraud. Account compromised.'], dtype=object)

In [23]:
records = []

for x in df7["FeatureVector"]:
    try:
        d = json.loads(x)
        records.append(d if isinstance(d, dict) else {})
    except:
        records.append({}) 

df7 = pd.concat([df7, pd.DataFrame(records)], axis=1)

In [24]:
df7 = df7.drop(columns=['FeatureVector'])

In [25]:
df7.head(1)

Unnamed: 0,LogID,TransactionID,Model_Version,Prediction_Score,Decision_Engine_Result,Final_Outcome,Txn_Amt,Txn_Count_5m,Geo_Match,Risk_Score
0,LID_000001,TID_000001,v1.3,0.1601,REVIEW,CLEARED,308.48,1,True,37.7


In [26]:

records = []

for _, row in df10[["Risk_Score_History", "AddressHistory"]].iterrows():
    combined = {}

    for col in ["Risk_Score_History", "AddressHistory"]:
        val = row[col]
        try:
            data = json.loads(val.replace("'", '"')) if isinstance(val, str) else val
            if isinstance(data, list):
                for item in data:
                    if isinstance(item, dict):
                        combined.update(item)  # merge all dicts from list
        except:
            pass

    records.append(combined)

df10 = pd.concat([df10, pd.DataFrame(records)], axis=1)


In [27]:
df10 = df10.drop(columns=['Risk_Score_History', 'AddressHistory'])

In [28]:
df10.head(1)

Unnamed: 0,UserID,KYC_Level,Account_Open_Date,Current_Risk_Score,Total_Accounts,Is_VIP,Date,Score,AddressID,Street,City,State,ZipCode,MoveInDate
0,USR000001,Level 2 (Verified),26-12-22,768,3,False,2024-06-17,471,ADDR-USR000001-1,Street 552,London,NY,49219,2015-11-15


# 8. real-time features ko fast access karne ke liye Redis jaisa  tez database (Feature Store) use hoga, aur woh data kitni der mein update hoga aur kitni der tak valid rahega

In [None]:
import redis
print(redis.__version__)


In [None]:
# Connect to Redis (low-latency database)
r = redis.Redis(host='localhost', port=6379, db=0)

# TTL for features: 5 minutes
TTL_seconds = 5 * 60

# Define critical behavioral features
critical_features = ['Txn_Count_5m', 'Avg_Amt_1h', 'Foreign_Country_Count_1d', 'Velocity_Score']

# Push to Redis
for idx, row in df8.iterrows():
    user_id = row['UserID']
    feature_dict = row[critical_features].to_dict()
    r.set(user_id, str(feature_dict), ex=TTL_seconds)

print("Critical features stored in Redis with TTL.")


In [None]:
# print(r.keys())

In [None]:
import ast

user_id = 'UID_10743'   # ya koi bhi ek key jo r.keys() me dikhi ho
data = r.get(user_id)

if data:
    features = ast.literal_eval(data.decode('utf-8'))
    print(f"Features for {user_id}:")
    print(features)
else:
    print("User not found in Redis.")


# 9.

In [48]:
New_df5 = df5[['TransactionID', 'IP_Risk_Score', 'VPN_Flag']]
New_df7 = df7[['TransactionID', 'Final_Outcome', 'Geo_Match']]
New_df8 = df8[['UserID', 'Txn_Count_5m', 'Avg_Amt_1h', 'Foreign_Country_Count_1d', 'Velocity_Score']]
New_df9 = df9[['TransactionID', 'UserID', 'Transaction_Amount']]
New_df1 = df1[['UserID', 'Old_Status', 'New_Status', 'Change_Reason']]


In [52]:
All_DF = (
    New_df9
    .merge(New_df5, on="TransactionID", how="left")
    .merge(New_df7, on="TransactionID", how="left")
    .merge(New_df8, on="UserID", how="left")
    .merge(New_df1, on="UserID", how="left")
)


In [53]:
All_DF.head(2)

Unnamed: 0,TransactionID,UserID,Transaction_Amount,IP_Risk_Score,VPN_Flag,Final_Outcome,Geo_Match,Txn_Count_5m,Avg_Amt_1h,Foreign_Country_Count_1d,Velocity_Score,Old_Status,New_Status,Change_Reason
0,TID_000001,UID_01855,262.53,3.5,True,CLEARED,True,1.0,14.49,1.0,18.2,PWD_RESET,CLOSED,FRAUD_BLOCK
1,TID_000001,UID_01855,262.53,3.5,True,CLEARED,True,1.0,14.49,1.0,18.2,PWD_RESET,CLOSED,FRAUD_BLOCK


In [59]:
All_DF.isnull().sum()

TransactionID               0
UserID                      0
Transaction_Amount          0
IP_Risk_Score               0
VPN_Flag                    0
Final_Outcome               0
Geo_Match                   0
Txn_Count_5m                0
Avg_Amt_1h                  0
Foreign_Country_Count_1d    0
Velocity_Score              0
Old_Status                  0
New_Status                  0
Change_Reason               0
dtype: int64

In [58]:
# Numerical columns
All_DF['Txn_Count_5m'].fillna(All_DF['Txn_Count_5m'].median(), inplace=True)
All_DF['Avg_Amt_1h'].fillna(All_DF['Avg_Amt_1h'].median(), inplace=True)
All_DF['Foreign_Country_Count_1d'].fillna(All_DF['Foreign_Country_Count_1d'].median(), inplace=True)
All_DF['Velocity_Score'].fillna(All_DF['Velocity_Score'].median(), inplace=True)

# Categorical columns
All_DF['Old_Status'].fillna(All_DF['Old_Status'].mode()[0], inplace=True)
All_DF['New_Status'].fillna(All_DF['New_Status'].mode()[0], inplace=True)
All_DF['Change_Reason'].fillna(All_DF['Change_Reason'].mode()[0], inplace=True)


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  All_DF['Txn_Count_5m'].fillna(All_DF['Txn_Count_5m'].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  All_DF['Avg_Amt_1h'].fillna(All_DF['Avg_Amt_1h'].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work becaus

In [60]:
All_DF.to_csv("All_DF.csv", index=False)