##Set up: Data Acqusition - Raw Transaction Layer



In [None]:
#importing relavent python libraries
import requests
import json
import polars as pl
from google.colab import userdata
import time
import os

In [None]:
#setting up folders
if os.path.exists("/content/sample_data/Output/"):
  pass
else:
  os.makedirs("/content/sample_data/Output/")


In [None]:
#list of wallet addressess associated with DPRK ITW
with open ("/content/sample_data/wallet_addr.json", "r") as f:
  wallet_dataset = json.load(f)
  print(wallet_dataset)

  api_key = userdata.get('etherscankey')


#For each wallet assessed to be associated with DPRK IT worker activity,
#the most recent transactions are retrieved via the Etherscan API.

for wallet in wallet_dataset:
  transactions_api = f"https://api.etherscan.io/v2/api"\
    f"?chainid=1"\
    f"&module=account"\
    f"&action=txlist"\
    f"&address={wallet}"\
    f"&startblock=0"\
    f"&endblock=99999999"\
    f"&page=1"\
    f"&offset=1000"\
    f"&sort=desc"\
    f"&apikey={api_key}"
  print(f"Getting transactions for {wallet}")

  response = requests.get(transactions_api).json()
  if response['message'] == 'No transactions found':
    print(f"No transactions found for {wallet}")
    continue


  with open(f"/content/sample_data/Output/{wallet}_transactions.json", "w") as f:
    f.write(json.dumps(response['result']))
  time.sleep(0.25)


## Data Storage & Query Layer (DuckDB as Analytical Engine)

In [None]:
import duckdb as db

#A raw, unmodified data layer is preserved to ensure reproducibility and separation
#from downstream analytical assumptions
raw_transactions_df = db.sql('''
              SELECT *
              FROM read_json('/content/sample_data/Output/*.json', union_by_name = true);''').df()
display(raw_transactions_df)

wallet_address_df = db.sql('''
SELECT lower(json) as wallet_address
FROM read_json('/content/sample_data/wallet_addr.json');''').df()



##Data Normalisation & Deduplication

In [None]:
#Raw transaction payloads include several fields (e.g., gas metrics, method identifiers)
#that are not analytically relevant to behavioural analysis. They are excluded to reduce noise

clean_dataset_df = db.sql('''
SELECT DISTINCT
hash AS tx_hash,
CAST(value AS DOUBLE)/1e18 AS value_eth,
to_timestamp (CAST(timeStamp AS BIGINT)) AS ts,
lower("from") AS from_address,
lower("to") AS to_address
FROM raw_transactions_df;''').df()


#Duplicate transaction hashes are removed to account for interactions between wallets
#under to preventing an inflation of transaction counts and value metrics

clean_dataset_df_1 = db.sql('''
WITH txs AS (
SELECT
COUNT(*) OVER (PARTITION BY hash) AS tx_count,
hash AS tx_hash,
CAST(value AS DOUBLE)/1e18 AS value_eth,
to_timestamp (CAST(timeStamp AS BIGINT)) AS ts,
lower("from") AS from_address,
lower("to") AS to_address
FROM raw_transactions_df
)
SELECT DISTINCT *
FROM txs
WHERE tx_count = 2;''').df()


#Timestamp normalisation to identify first and last transaction timestamp
max_timestamp = db.sql('''
SELECT MAX(ts) AS max_ts FROM clean_dataset_df;''').df()['max_ts'].iloc[0]
min_timestamp = db.sql('''
SELECT MIN(ts) AS min_ts FROM clean_dataset_df;''').df()['min_ts'].iloc[0]



##Transaction Flow Classification (Inbound vs Outbound)

In [None]:
#Transactions are classified as inbound or outbound relative to each wallet of interest

#Inbound flows are treated as on-chain revenue indicators
inbound_flows_df = db.sql('''
SELECT
t1.tx_hash,
t1.ts,
t1.to_address AS wallet_address, -- The wallet receiving the funds
t1.from_address AS sender_address, -- The sender of the funds
t1.value_eth
FROM clean_dataset_df as t1
INNER JOIN wallet_address_df as t2
ON t1.to_address = t2.wallet_address;''').df()

#Sanity check to ensure each transaction hash appears only once in the inbound flows
sanity_check_inbound_df = db.sql('''
SELECT DISTINCT
COUNT(*) OVER (PARTITION BY tx_hash) AS transaction_count, -- Count occurrences of each tx_hash
tx_hash
FROM inbound_flows_df
GROUP BY tx_hash;''').df()

#Outbound flows are analysed as potential laundering, redistribution, or cash-out events
outbound_flows_df = db.sql('''
SELECT
t1.tx_hash,
t1.ts,
t1.from_address AS wallet_address, -- The wallet sending the funds
t1.to_address AS recipient_address, -- The recipient of the funds
t1.value_eth
FROM clean_dataset_df as t1
INNER JOIN wallet_address_df as t2
ON t1.from_address = t2.wallet_address;''').df()

#Sanity check to ensure each transaction hash appears only once in the outbound flows
sanity_check_outbound_df = db.sql('''
SELECT DISTINCT
COUNT(*) OVER (PARTITION BY tx_hash) AS transaction_count, -- Count occurrences of each tx_hash
tx_hash
FROM outbound_flows_df
GROUP BY tx_hash;''').df()

#NOTE:This separation forms the analytical basis for subsequent behavioural and temporal assessments

##Visualisation of Transactional Flows

In [None]:
#Aggregate inbound and outbound flows to create a stacked bar chart
import pandas as pd

#Aggregate inbound flows: calculate the total ETH value received by each wallet
agg_inbound_flows_df = db.sql('''
SELECT
wallet_address,
SUM(value_eth) AS value_eth,
'Inbound' AS flow_type
FROM inbound_flows_df
GROUP BY wallet_address;''').df()

#Aggregate outbound flows: calculate the total ETH value sent from each wallet
agg_outbound_flows_df = db.sql('''
SELECT
wallet_address,
SUM(value_eth) AS value_eth,
'Outbound' AS flow_type
FROM outbound_flows_df
GROUP BY wallet_address
ORDER BY value_eth DESC;''').df()

#Combine inbound and outbound aggregated data into a single DataFrame for plotting
stacked_bar_data = pd.concat([agg_inbound_flows_df, agg_outbound_flows_df])

#Filter for wallets with significant ETH value (> 1 ETH) for clearer visuals
stacked_bar_data_1 = db.sql('''
SELECT *,
FROM stacked_bar_data
WHERE value_eth > 1;''').df()


#Filter for wallets with very small ETH value (< 1 ETH)
stacked_bar_data_2 = db.sql('''
SELECT *,
FROM stacked_bar_data
WHERE value_eth < 1;''').df()


#Identify wallets that have both inbound and outbound flows (count_wallet_address > 1)
stacked_bar_data_3 = db.sql('''
WITH sbd AS (
SELECT *,
COUNT(wallet_address) OVER (PARTITION BY wallet_address) AS count_wallet_address,
FROM stacked_bar_data
)
SELECT *
FROM sbd
--WHERE count_wallet_address > 1
;''').df()


stacked_bar_data_4 = db.sql('''
WITH sbd4 AS (
SELECT  flow_type, SUM(value_eth) AS total_value_eth
FROM stacked_bar_data_3
GROUP BY flow_type
)
SELECT *, (total_value_eth/SUM(total_value_eth) OVER())*100 AS percentage_value
FROM sbd4
;''').df()


#visualisation
import plotly.graph_objects as go

labels = ['Outbound','Inbound']
values = [78.230472, 21.769528]

fig = go.Figure(data=[go.Pie(labels=labels, values=values, pull=[0, 0.2])])
fig.update_layout(title_text='Percentage of Inbound vs Outbound ETH Value per Wallet')


##Temporal Behaviour Analysis (Payment Intervals) & Visualisation

In [None]:
#Outbound transactions are ordered chronologically per wallet, and inter-payment
#time deltas are computed to quantify gaps between successive transfers

payment_intervals_df_1= db.sql('''
WITH t1 AS (
SELECT
wallet_address,
ts, LAG(ts) OVER (PARTITION BY wallet_address ORDER BY ts) AS previous_ts,
date_diff('minute', LAG(ts) OVER (PARTITION BY wallet_address ORDER BY ts), ts) AS time_since_last_transaction,
date_diff('day', LAG(ts) OVER (PARTITION BY wallet_address ORDER BY ts), ts) AS days_since_last,
value_eth,
FROM outbound_flows_df
)
SELECT *,
CASE
WHEN days_since_last >= 1 THEN '>=1 Days'
WHEN days_since_last <1 THEN '0 Days'
ELSE 'No Days'
END AS periodicity_group
FROM t1;''').df()


ratio_df = db.sql('''
WITH r1 AS (
SELECT COUNT(*) AS grouped_rows, periodicity_group
FROM payment_intervals_df_1
GROUP BY periodicity_group
)
SELECT *,
SUM (grouped_rows) OVER() AS total_rows,
ROUND(grouped_rows/SUM (grouped_rows) OVER(),4)*100 AS ratio
FROM r1;''').df()



#Visualisaion - A donut chart of days_since_last_past

import plotly.graph_objects as go

labels = ['No Days','>=1 Days','0 Days']
values = [15.47, 29.52, 55.01]

fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=.3)])
fig.update_layout(title_text='Payment Intervals - time between sequential outbound transfers')


In [None]:
#Observed temporal patterns are assessed. Here regular intervals and
#consistent transaction sizing may indicate payroll-like disbursement,
#while highly variable intervals suggest ad-hoc liquidation or laundering
outbound_count_per_wallet_df = db.sql('''
SELECT wallet_address, COUNT(*) AS tx_count, SUM(value_eth) AS value_eth
FROM outbound_flows_df
GROUP BY wallet_address
ORDER BY tx_count DESC, value_eth DESC;''').df()

#Additional diagnostics include per-wallet transaction counts and
#day-of-month frequency analysis to identify preferential transaction
#windows indicative of structured operational cycles

outbound_count_per_wallet_df_1 = db.sql('''
SELECT wallet_address, tx_count, value_eth
FROM outbound_count_per_wallet_df
WHERE tx_count > 1
ORDER BY tx_count DESC, value_eth DESC;''').df()


##Enrichment of Data: Value-Based Behaviour Analysis

In [None]:
#To enable cross-period comparison and mitigate the effects of ETH price volatility,
#transaction values are converted from ETH to USD using daily ETH-USD pricing data from yfinance library
import yfinance as yf
import pandas as pd

#Fetching historical ETH-USD pricing data using yfinance
#The date range is chosen to cover potential transaction dates
raw_eth_pricing_df = yf.Ticker("ETH-USD")
raw_eth_pricing_df = raw_eth_pricing_df.history(start="2020-08-17", end="2026-01-07", interval="1d")
raw_eth_pricing_df.reset_index(inplace=True)
raw_eth_pricing_df = raw_eth_pricing_df.rename(columns={'Date': 'eth_date'})

#The daily 'high' price estimate is considered
eth_daily_prices_df = db.sql('''
SELECT eth_date, High AS eth_price
FROM raw_eth_pricing_df;''').df()

#Pricing data is joined to transaction records based on transaction date
transactions_with_price = db.sql('''
SELECT
t1.ts,
t1.tx_hash,
t1.from_address,
t1.to_address,
t1.value_eth,
t2.eth_date,
t2.eth_price,
ROUND((value_eth * eth_price), 2) AS ETH_USD -- Calculate transaction value in USD
FROM clean_dataset_df as t1
LEFT JOIN eth_daily_prices_df as t2
ON (date(t1.ts) = date(t2.eth_date)); -- Join on date part of timestamp
''').df()

#Pricing data with outbound flows is specifically enriched
outbound_flows_with_price_df = db.sql('''
SELECT
t1.ts,
t1.tx_hash,
t1.wallet_address,
t1.recipient_address,
t1.value_eth,
t2.eth_price,
t2.ETH_USD
FROM outbound_flows_df as t1
LEFT JOIN transactions_with_price as t2
ON t1.tx_hash = t2.tx_hash;''').df()

##Monthly Clustering Analysis & visualisation

In [None]:
#This analysis assesses whether outbound transaction activity exhibits
#recurring concentration within specific windows of the calendar month,
#which may be indicative of salary disbursement cycles.

#Transaction timestamps are decomposed to extract the day-of-month and
#grouped into early, mid and late-month buckets to capture realistic
#payroll behaviour rather than exact calendar dates.

monthly_clustering_df = db.sql('''
SELECT
wallet_address,
YEAR(ts) AS year,
DAYOFMONTH(ts) AS day_of_month,
CASE
WHEN DAYOFMONTH(ts) BETWEEN 1 AND 5 THEN 'Early'
WHEN DAYOFMONTH(ts) BETWEEN 25 AND 31 THEN 'Late'
ELSE 'Mid-Month'
END AS clustering_period,
CASE
WHEN ETH_USD < 2000 THEN '<$2K'
WHEN ETH_USD BETWEEN 2000 AND 5000 THEN '$2K-$5K'
ELSE '>$5K'
END AS value_band,
COUNT(*) AS tx_count,
SUM(value_eth) AS total_value_eth_USD,
AVG(value_eth) AS avg_value_eth_USD,
FROM outbound_flows_with_price_df
GROUP BY wallet_address, DAYOFMONTH(ts), Year(ts), clustering_period, value_band
ORDER BY year, tx_count, clustering_period, value_band DESC;''').df()


#This query aggregates all outbound transactions by day of the month, calculating total counts and average/summed USD values.
#This provides an overall view of transaction frequency and value distribution across the days of a month.
aggregated_monthly_clustering_df = db.sql('''
SELECT DAYOFMONTH(ts) AS Day_of_month, COUNT(*) AS tx_count, AVG(value_eth) AS avg_value_eth, AVG(ETH_USD) AS value_eth_usd, SUM(ETH_USD) AS total_value_eth_usd
FROM outbound_flows_with_price_df
GROUP BY DAYOFMONTH(ts)
ORDER BY Day_of_month;''').df()



#agg yearly count of transactions over total transactions of all years * 100
aggregated_monthly_clustering_df_1 = db.sql('''
WITH yearly_counts AS (
SELECT COUNT(*) AS tx_count, YEAR(ts) AS year, SUM(value_eth) AS total_value_eth, SUM(ETH_USD) AS total_ETH_USD
FROM outbound_flows_with_price_df
GROUP BY YEAR(ts)
)
SELECT year, tx_count, SUM(tx_count) OVER() AS total_tx_count, ROUND((tx_count/total_tx_count)*100, 2) AS percentage_tx_count,
SUM(total_value_eth) OVER () AS tvs, SUM(total_ETH_USD) OVER () AS teu, (total_value_eth/tvs)* 100 AS percentrage_tvs,
(total_ETH_USD/teu)* 100 AS percentrage_teu
FROM yearly_counts
ORDER BY year
;''').df()


##visualisation1
import plotly.express as px

#Creating a stacked bar chart for transaction counts by clustering period and value band.
#This chart shows how many transactions fall into each monthly period (early, mid, late) and value range (<$2K, $2K-$5K, >$5K).
fig_tx_count = px.bar(monthly_clustering_df,
                      x="clustering_period",
                      y="tx_count",
                      color="value_band",
                      barmode="stack",
                      title="Monthly Outbound Transaction Count by Clustering Period and Value Band",
                      labels={"clustering_period": "Clustering Period",
                              "tx_count": "transaction Count",
                              "value_band": "Value Band"},
                      height=500)



#Creating a stacked bar chart for total ETH_USD value by clustering period and value band.
#This chart visualises the total monetary value transferred within each monthly period and value range.
fig_total_value = px.bar(monthly_clustering_df,
                      x="clustering_period",
                      y="total_value_eth_USD",
                      color="value_band",
                      barmode="stack",
                      title="Monthly Outbound Total Value in USD by Clustering Period and Value Band",
                      labels={"clustering_period": "Clustering Period",
                              "total_value_eth_USD": "Total ETH_USD Value",
                              "value_band": "Value Band"},
                      height=500)



##visualisation2
import plotly.express as px

df = px.data.tips()

#Creating a stacked bar chart for transaction counts faceted by year.
#This allows for comparison of transaction frequency patterns across different years.
fig_tx_count = px.bar(monthly_clustering_df,
                      x="clustering_period",
                      y="tx_count",
                      color="value_band",
                      barmode="stack",
                      facet_col="year",
                      title="Yearly-Monthly Outbound Transaction Count by Clustering Period and Value Band",
                      labels={"clustering_period": "Clustering Period",
                              "tx_count": "transaction Count",
                              "value_band": "Value Band"},
                      height=500)



#Creating a stacked bar chart for total ETH_USD value faceted by year.
#This allows for comparison of total value transferred patterns across different years.
fig_total_value = px.bar(monthly_clustering_df,
                      x="clustering_period",
                      y="total_value_eth_USD",
                      color="value_band",
                      barmode="stack",
                      facet_col="year",
                      title="Yearly-Monthly Outbound Total Value in USD by Clustering Period and Value Band",
                      labels={"clustering_period": "Clustering Period",
                              "total_value_eth_USD": "Total ETH_USD Value",
                              "value_band": "Value Band"},
                      height=500)


##visualisation3
#Outbound frequency distribution by day of month
fig = px.bar(aggregated_monthly_clustering_df, y='tx_count', x='Day_of_month', text_auto='.2s',
            title="Outbound Transaction Frequency by Day of Month")
fig.update_traces(textfont_size=12, textangle=0, textposition="inside", cliponaxis=False)
fig.update_layout(xaxis_title="Day of Month", yaxis_title="Number of Transactions", xaxis=dict(tickmode='linear', tick0=1, dtick=1))
