# I-Extraction

In [3]:
import requests
import pandas as pd
from datetime import datetime
import json


# Define URLs for your API endpoints
transactions_url = 'http://127.0.0.1:5000/api/transactions'
customers_url = 'http://127.0.0.1:5000/api/customers'
external_data_url = 'http://127.0.0.1:5000/api/externalData'

# Function to retrieve data from URL and convert it to Python objects
def get_data_as_object(url):
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print(f"Failed to retrieve data from {url}")
        return None
    
# Retrieve data from URLs
transactions_data = get_data_as_object(transactions_url)
customers_data = get_data_as_object(customers_url)
extern_data = get_data_as_object(external_data_url)

# II-Transformations

### 1-transaction_data

In [4]:
print(json.dumps(transactions_data, indent=2))

[
  {
    "amount": 5447.145768221903,
    "currency": "GBP",
    "customer_id": "C087",
    "date_time": "2020-11-22T16:17:22",
    "location": "City5",
    "merchant_details": "Merchant11",
    "transaction_id": "T00000",
    "transaction_type": "withdrawal"
  },
  {
    "amount": 5913.606798438381,
    "currency": "USD",
    "customer_id": "C083",
    "date_time": "2020-12-26T06:35:48",
    "location": "City9",
    "merchant_details": "Merchant26",
    "transaction_id": "T00001",
    "transaction_type": "withdrawal"
  },
  {
    "amount": 167.31724536057274,
    "currency": "EUR",
    "customer_id": "C070",
    "date_time": "2020-05-15T18:11:56",
    "location": "City8",
    "merchant_details": "Merchant7",
    "transaction_id": "T00002",
    "transaction_type": "purchase"
  },
  {
    "amount": 7465.906993335358,
    "currency": "USD",
    "customer_id": "C068",
    "date_time": "2022-09-20T01:47:58",
    "location": "City1",
    "merchant_details": "Merchant7",
    "transaction_id

#### currency

In [5]:
# Exchange rates for currencies to USD
exchange_rates = {
    "USD": 1,
    "EUR": 1.09,
    "GBP": 1.27
}

# Function to convert amount to USD based on currency
def convert_to_usd(amount, currency):
    if currency in exchange_rates:
        return amount * exchange_rates[currency]
    else:
        return None  # Return None for unknown currencies

# Update transactions_data to include the 'amountUSD' column
for transaction in transactions_data:
    amount_usd = convert_to_usd(transaction['amount'], transaction['currency'])
    transaction['amountUSD'] = amount_usd

#### date_time

In [6]:
from datetime import datetime

# Assuming transactions_data is your list of transactions

for transaction in transactions_data:
    # Parse the date_time string into a datetime object
    date_time_obj = datetime.strptime(transaction["date_time"], "%Y-%m-%dT%H:%M:%S")
    
    # Assign date and time
    transaction["date"] = date_time_obj.date().strftime("%Y-%m-%d")
    transaction["time"] = date_time_obj.time().strftime("%H:%M:%S")
    
    # Extract year, month, and day
    transaction["year"] = date_time_obj.year
    transaction["month"] = date_time_obj.month
    transaction["day"] = date_time_obj.day

In [7]:
# Loop through the keys in the first dictionary to determine their types
for key, value in transactions_data[0].items():
    print(f"Column '{key}' is of type: {type(value).__name__}")

Column 'amount' is of type: float
Column 'currency' is of type: str
Column 'customer_id' is of type: str
Column 'date_time' is of type: str
Column 'location' is of type: str
Column 'merchant_details' is of type: str
Column 'transaction_id' is of type: str
Column 'transaction_type' is of type: str
Column 'amountUSD' is of type: float
Column 'date' is of type: str
Column 'time' is of type: str
Column 'year' is of type: int
Column 'month' is of type: int
Column 'day' is of type: int


In [8]:
print(json.dumps(transactions_data, indent=2))

[
  {
    "amount": 5447.145768221903,
    "currency": "GBP",
    "customer_id": "C087",
    "date_time": "2020-11-22T16:17:22",
    "location": "City5",
    "merchant_details": "Merchant11",
    "transaction_id": "T00000",
    "transaction_type": "withdrawal",
    "amountUSD": 6917.875125641817,
    "date": "2020-11-22",
    "time": "16:17:22",
    "year": 2020,
    "month": 11,
    "day": 22
  },
  {
    "amount": 5913.606798438381,
    "currency": "USD",
    "customer_id": "C083",
    "date_time": "2020-12-26T06:35:48",
    "location": "City9",
    "merchant_details": "Merchant26",
    "transaction_id": "T00001",
    "transaction_type": "withdrawal",
    "amountUSD": 5913.606798438381,
    "date": "2020-12-26",
    "time": "06:35:48",
    "year": 2020,
    "month": 12,
    "day": 26
  },
  {
    "amount": 167.31724536057274,
    "currency": "EUR",
    "customer_id": "C070",
    "date_time": "2020-05-15T18:11:56",
    "location": "City8",
    "merchant_details": "Merchant7",
    "tra

### 2-customers_data

In [9]:
print(json.dumps(customers_data, indent=2))

[
  {
    "account_history": [
      "T00021",
      "T00069",
      "T00170",
      "T00364",
      "T00455",
      "T00517",
      "T00523",
      "T00654",
      "T00773",
      "T00836",
      "T00864",
      "T00917",
      "T00932"
    ],
    "behavioral_patterns": {
      "avg_transaction_value": 164.99035827502018
    },
    "customer_id": "C000",
    "demographics": {
      "age": 59,
      "location": "City2"
    }
  },
  {
    "account_history": [
      "T00028",
      "T00176",
      "T00599",
      "T00769",
      "T00949"
    ],
    "behavioral_patterns": {
      "avg_transaction_value": 184.2330088484842
    },
    "customer_id": "C001",
    "demographics": {
      "age": 23,
      "location": "City10"
    }
  },
  {
    "account_history": [
      "T00324",
      "T00365",
      "T00380",
      "T00399",
      "T00439",
      "T00469",
      "T00536",
      "T00632",
      "T00667",
      "T00675",
      "T00806",
      "T00946"
    ],
    "behavioral_patterns": {
      

In [10]:
for customer in customers_data:
    customer["account_history"] = ", ".join(customer["account_history"])
    customer["avg_transaction_value"] = customer["behavioral_patterns"]["avg_transaction_value"]
    del customer["behavioral_patterns"]
    customer.update(customer["demographics"])
    del customer["demographics"]

print(json.dumps(customers_data, indent=2))

[
  {
    "account_history": "T00021, T00069, T00170, T00364, T00455, T00517, T00523, T00654, T00773, T00836, T00864, T00917, T00932",
    "customer_id": "C000",
    "avg_transaction_value": 164.99035827502018,
    "age": 59,
    "location": "City2"
  },
  {
    "account_history": "T00028, T00176, T00599, T00769, T00949",
    "customer_id": "C001",
    "avg_transaction_value": 184.2330088484842,
    "age": 23,
    "location": "City10"
  },
  {
    "account_history": "T00324, T00365, T00380, T00399, T00439, T00469, T00536, T00632, T00667, T00675, T00806, T00946",
    "customer_id": "C002",
    "avg_transaction_value": 136.41147567562575,
    "age": 26,
    "location": "City2"
  },
  {
    "account_history": "T00027, T00104, T00121, T00413, T00520, T00590, T00602, T00621, T00640, T00681, T00692, T00704, T00739, T00919",
    "customer_id": "C003",
    "avg_transaction_value": 150.51679621467935,
    "age": 51,
    "location": "City4"
  },
  {
    "account_history": "T00094, T00141, T00163

In [11]:
# Loop through the keys in the first dictionary to determine their types
for key, value in customers_data[0].items():
    print(f"Column '{key}' is of type: {type(value).__name__}")

Column 'account_history' is of type: str
Column 'customer_id' is of type: str
Column 'avg_transaction_value' is of type: float
Column 'age' is of type: int
Column 'location' is of type: str


### 3-external_data

In [12]:
# Accessing the 'blacklist_info' key and joining the list elements into a string
extern_data['blacklist_info'] = ', '.join(extern_data['blacklist_info'])

# Printing the updated JSON
print(json.dumps(extern_data, indent=2))

{
  "blacklist_info": "Merchant28, Merchant22, Merchant25, Merchant26, Merchant24, Merchant23, Merchant29, Merchant27, Merchant30, Merchant21",
  "credit_scores": {
    "C000": 796,
    "C001": 462,
    "C002": 398,
    "C003": 447,
    "C004": 789,
    "C005": 421,
    "C006": 824,
    "C007": 450,
    "C008": 712,
    "C009": 821,
    "C010": 537,
    "C011": 336,
    "C012": 595,
    "C013": 598,
    "C014": 790,
    "C015": 433,
    "C016": 394,
    "C017": 393,
    "C018": 366,
    "C019": 701,
    "C020": 738,
    "C021": 584,
    "C022": 626,
    "C023": 815,
    "C024": 599,
    "C025": 672,
    "C026": 790,
    "C027": 724,
    "C028": 649,
    "C029": 569,
    "C030": 300,
    "C031": 696,
    "C032": 566,
    "C033": 745,
    "C034": 653,
    "C035": 367,
    "C036": 426,
    "C037": 505,
    "C038": 495,
    "C039": 322,
    "C040": 348,
    "C041": 843,
    "C042": 467,
    "C043": 793,
    "C044": 576,
    "C045": 581,
    "C046": 304,
    "C047": 498,
    "C048": 845,
  

In [13]:
# Example: Convert dictionaries to a list of dictionaries
external_data = []
for customer_id, credit_score in extern_data['credit_scores'].items():
    customer_info = {
        'customer_id': customer_id,
        'credit_score': credit_score,
        'fraud_report_count': extern_data['fraud_reports'].get(customer_id, 0)
        # Add more columns as needed
    }
    external_data.append(customer_info)

# Display the prepared data
print(json.dumps(external_data, indent=2))

[
  {
    "customer_id": "C000",
    "credit_score": 796,
    "fraud_report_count": 5
  },
  {
    "customer_id": "C001",
    "credit_score": 462,
    "fraud_report_count": 1
  },
  {
    "customer_id": "C002",
    "credit_score": 398,
    "fraud_report_count": 4
  },
  {
    "customer_id": "C003",
    "credit_score": 447,
    "fraud_report_count": 1
  },
  {
    "customer_id": "C004",
    "credit_score": 789,
    "fraud_report_count": 2
  },
  {
    "customer_id": "C005",
    "credit_score": 421,
    "fraud_report_count": 0
  },
  {
    "customer_id": "C006",
    "credit_score": 824,
    "fraud_report_count": 1
  },
  {
    "customer_id": "C007",
    "credit_score": 450,
    "fraud_report_count": 4
  },
  {
    "customer_id": "C008",
    "credit_score": 712,
    "fraud_report_count": 1
  },
  {
    "customer_id": "C009",
    "credit_score": 821,
    "fraud_report_count": 4
  },
  {
    "customer_id": "C010",
    "credit_score": 537,
    "fraud_report_count": 5
  },
  {
    "customer_i

## II-Chargement

### 1-Creation of Database and Tables

In [14]:
from pyhive import hive

# Establish a connection to Hive
conn = hive.Connection(host='localhost', port=10000)

# Create a database
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    cursor.execute(f"USE {database_name}")

# Define the table schemas
transaction_table_schema = """
CREATE TABLE IF NOT EXISTS transactions (
    amount FLOAT,
    currency STRING,
    amountUSD FLOAT,
    customer_id STRING,
    transaction_date STRING,
    time STRING,
    location STRING,
    merchant_details STRING,
    transaction_id STRING,
    transaction_type STRING
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS ORC
"""

customer_table_schema = """
CREATE TABLE IF NOT EXISTS customers (
    account_history STRING,
    customer_id STRING,
    avg_transaction_value FLOAT,
    age INT,
    location STRING
)
STORED AS ORC
"""

external_table_schema = """
CREATE TABLE IF NOT EXISTS external_data (
    customer_id STRING,
    credit_score INT,
    fraud_report_count INT
)
STORED AS ORC
"""

blacklist_info_schema = """
CREATE TABLE IF NOT EXISTS blacklist_info (
    blacklist STRING
)
STORED AS ORC
"""

# Execute the table creation queries
with conn.cursor() as cursor:
    cursor.execute(transaction_table_schema)
    cursor.execute(customer_table_schema)
    cursor.execute(external_table_schema)
    cursor.execute(blacklist_info_schema)

TTransportException: TSocket read 0 bytes

### 2-Insertion data into Tables 

#### blacklist_info Table

In [38]:
# Split the string into individual values
blacklist_info = extern_data['blacklist_info'].split(', ')
database_name = 'financial_data'
# Insert each value as a separate row into the table, avoiding duplicates
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

    for merchant in blacklist_info:
        # Check if the merchant already exists in the table
        check_query = f"SELECT * FROM blacklist_info WHERE blacklist = '{merchant}'"
        cursor.execute(check_query)
        existing_entry = cursor.fetchone()

        if not existing_entry:
            # Insert the merchant if it doesn't exist in the table
            insert_query = f"INSERT INTO blacklist_info VALUES ('{merchant}')"
            cursor.execute(insert_query)

#### external_data Table

In [144]:
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

    # Insert the external_data into the table
    for data in external_data:
        insert_query = f"INSERT INTO external_data VALUES ('{data['customer_id']}', {data['credit_score']}, {data['fraud_report_count']})"
        cursor.execute(insert_query)

#### customers Table

In [145]:
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

    # Insert the customers_data into the table
    for data in customers_data:
        insert_query = f"INSERT INTO customers VALUES ('{data['account_history']}', '{data['customer_id']}', {data['avg_transaction_value']}, {data['age']}, '{data['location']}')"
        cursor.execute(insert_query)

#### transactions Table

In [16]:
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

    # Insert each transaction from transactions_data into the transactions table
    for data in transactions_data:
        insert_query = f"INSERT INTO transactions PARTITION (year={data['year']}, month={data['month']}, day={data['day']}) VALUES ({data['amount']}, '{data['currency']}', '{data['amountUSD']}', '{data['customer_id']}', '{data['date']}', '{data['time']}', '{data['location']}', '{data['merchant_details']}', '{data['transaction_id']}', '{data['transaction_type']}')"
        cursor.execute(insert_query)

## III-Fraud Detection System Based on Rules

#### Unusually_High_Transaction_Amounts

In [45]:
from pyhive import hive
import csv
import os

# Establish a connection to Hive
conn = hive.Connection(host='localhost', port=10000)

# Create a database
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

# Query to identify unusually high transaction amounts
Unusually_High_Transaction_Amounts = """
SELECT t.*
FROM transactions t
JOIN (
    SELECT percentile_approx(amountUSD, 0.95) AS percentile_95
    FROM transactions
) p ON t.amountUSD > p.percentile_95
"""
# Set properties to allow Cartesian products
with conn.cursor() as cursor:
    cursor.execute("set hive.strict.checks.cartesian.product=false")
    cursor.execute("set hive.mapred.mode=nonstrict")
    
# Execute the query and fetch the results
with conn.cursor() as cursor:
    cursor.execute(Unusually_High_Transaction_Amounts)
    results = cursor.fetchall()

# Specify the file path for the CSV
file_path = 'C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/Unusually_High_Transaction_Amounts.csv'

# Check if the file exists
file_exists = os.path.isfile(file_path)

# Open the CSV file in append mode if it exists, otherwise create a new file
with open(file_path, 'a', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    
    # Write header only if the file doesn't exist
    if not file_exists:
        csvwriter.writerow(['amount', 'currency', 'amountUSD', 'customer_id', 'transaction_date', 'time', 'location', 'merchant_details', 'transaction_id', 'transaction_type', 'year', 'month', 'day'])
    
    # Write rows
    csvwriter.writerows(results)

print(f"Data has been appended to {file_path}")

Data has been saved to C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/Unusually_High_Transaction_Amounts.csv


#### High Frequency of Transactions Within a Short Time Frame

In [46]:
from pyhive import hive

# Establish a connection to Hive
conn = hive.Connection(host='localhost', port=10000)

# Create a database
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

# Query to identify High Frequency of Transactions Within a Short Time Frame
High_Frequency_of_Transactions_within_a_Short_Time_Frame = """
SELECT customer_id, COUNT(*) AS transaction_count
FROM transactions
WHERE transaction_date BETWEEN '2022-12-02' AND '2022-12-29'
GROUP BY customer_id
HAVING COUNT(*) > 1
"""

# Execute the query and fetch the results
with conn.cursor() as cursor:
    cursor.execute(High_Frequency_of_Transactions_within_a_Short_Time_Frame)
    results = cursor.fetchall()

# Specify the file path for the CSV
file_path = 'C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/High_Frequency_of_Transactions_within_a_Short_Time_Frame.csv'

# Check if the file exists
file_exists = os.path.isfile(file_path)

# Write the data to the CSV file
with open(file_path, 'a', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    
    # Write header only if the file doesn't exist
    if not file_exists:
        csvwriter.writerow(['customer_id', 'transaction_count'])
    
    # Write rows
    csvwriter.writerows(results)

print(f"Data has been appended to {file_path}")

Data has been saved to C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/High_Frequency_of_Transactions_within_a_Short_Time_Frame.csv


#### Transactions from Unusual Locations

In [47]:
from pyhive import hive
import csv

# Establish a connection to Hive
conn = hive.Connection(host='localhost', port=10000)

# Create a database
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

# Query to identify Transactions from Unusual Locations
Transactions_from_Unusual_Locations = """
SELECT t.*
FROM transactions t
LEFT JOIN customers c ON t.location = c.location
WHERE c.customer_id IS NULL
"""

# Execute the query and fetch the results
with conn.cursor() as cursor:
    cursor.execute(Transactions_from_Unusual_Locations)
    results = cursor.fetchall()

# Specify the file path for the CSV
file_path = 'C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/Transactions_from_Unusual_Locations.csv'

# Check if the file exists
file_exists = os.path.isfile(file_path)

# Write the data to the CSV file
with open(file_path, 'a', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    
    # Write header only if the file doesn't exist
    if not file_exists:
        csvwriter.writerow(['amount', 'currency', 'amountUSD', 'customer_id', 'transaction_date', 'time', 'location', 'merchant_details', 'transaction_id', 'transaction_type', 'year', 'month', 'day'])
    
    # Write rows
    csvwriter.writerows(results)

print(f"Data has been appended to {file_path}")

Data has been saved to C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/Transactions_from_Unusual_Locations.csv


#### Transactions Involving Blacklisted Customers

In [2]:
from pyhive import hive

# Establish a connection to Hive
conn = hive.Connection(host='localhost', port=10000)

# Create a database
database_name = 'financial_data'
with conn.cursor() as cursor:
    cursor.execute(f"USE {database_name}")

# Query to identify Transactions Involving Blacklisted Customers
Transactions_Involving_Blacklisted_Customers = """
SELECT t.*
FROM transactions t
JOIN blacklist_info b ON t.merchant_details = b.blacklist
"""

# Execute the query and fetch the results
with conn.cursor() as cursor:
    cursor.execute(Transactions_Involving_Blacklisted_Customers)
    results = cursor.fetchall()

# Specify the file path for the CSV
file_path = 'C:/Users/Youcode/Desktop/8 Months/sprint 4/fifth week_Détection de Fraude dans les Transactions Financières/Transactions_Detection_archive/Transactions_Involving_Blacklisted_Customers.csv'

# Check if the file exists
file_exists = os.path.isfile(file_path)

# Write the data to the CSV file
with open(file_path, 'a', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    
    # Write rows
    if not file_exists:  # Write header if the file is newly created
        csvwriter.writerow(['amount', 'currency', 'amountUSD', 'customer_id', 'transaction_date', 'time', 'location', 'merchant_details', 'transaction_id', 'transaction_type', 'year', 'month', 'day'])
    
    csvwriter.writerows(results)

print(f"Data has been appended to {file_path}")

TTransportException: TSocket read 0 bytes