In [0]:
%restart_python

In [0]:
%pip install pyalex sentence-transformers pandas tqdm


In [0]:

# COMMAND ----------
# 1. Imports and configuration

from pathlib import Path
from itertools import chain
import json

import pandas as pd
from pyalex import Topics, Works, config as pyalex_config
from sentence_transformers import SentenceTransformer, util
from tqdm.auto import tqdm

pyalex_config.email = "farzan.saif@gmail.com"  # TODO: change this

DATA_DIR = Path("/dbfs/FileStore/fincrime/data")
EXPORT_DIR = Path("/dbfs/FileStore/fincrime/export_for_powerbi")

DATA_DIR.mkdir(parents=True, exist_ok=True)
EXPORT_DIR.mkdir(parents=True, exist_ok=True)

DATA_DIR, EXPORT_DIR



In [0]:

# COMMAND ----------
# 2. Load OpenAlex TOPICS via PyAlex (once)
topics_records = list(chain.from_iterable(Topics().paginate(per_page=200)))

len(topics_records), topics_records[0]

In [0]:

# COMMAND ----------
# 3. Put topics into a pandas DataFrame and build text_for_embedding

topics_df = pd.DataFrame(topics_records)
topics_df.head()

topics_df["keywords_str"] = topics_df["keywords"].apply(
    lambda x: " ".join(x) if isinstance(x, list) else (x or "")
)

topics_df["text_for_embedding"] = (
    topics_df["display_name"] + " | " +
    topics_df["description"] + " | " +
    topics_df["keywords_str"]
)

topics_df[["id", "display_name", "text_for_embedding"]].head()

topics_path = DATA_DIR / "openalex_topics.csv"
topics_df.to_csv(topics_path, index=False)
topics_path


In [0]:
# COMMAND ----------
# 4. Compute topic embeddings with SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")

topic_texts = topics_df["text_for_embedding"].tolist()

topic_embeddings = model.encode(
    topic_texts,
    batch_size=128,
    convert_to_tensor=True
)

len(topic_embeddings)

In [0]:

# COMMAND ----------
# 5. Helper: select topics for a given free-text query

def select_topics_for_query(
    query: str,
    topics_df: pd.DataFrame,
    topic_embeddings,
    top_k: int = 5,
    min_similarity: float = 0.4,
    dropoff_ratio: float = 0.75,
):
    """
    Given a text query, return a small DataFrame of the most relevant OpenAlex topics.
    """
    query_emb = model.encode(query, convert_to_tensor=True)
    scores = util.cos_sim(query_emb, topic_embeddings)[0]

    top_scores, top_idx = scores.topk(k=top_k)
    top_scores = top_scores.cpu().tolist()
    top_idx = top_idx.cpu().tolist()

    selected = []
    best = top_scores[0]

    for score, idx in zip(top_scores, top_idx):
        if score < min_similarity:
            continue
        if score < best * dropoff_ratio:
            continue

        row = topics_df.iloc[idx].copy()
        row["similarity"] = float(score)
        selected.append(row)

    if not selected:
        row = topics_df.iloc[top_idx[0]].copy()
        row["similarity"] = float(top_scores[0])
        selected.append(row)

    return pd.DataFrame(selected)


In [0]:

# COMMAND ----------
# 6. Choose a user query and inspect selected topics

user_query = "crypto money laundering"  # TODO: adjust for different runs

selected_topics_df = select_topics_for_query(
    query=user_query,
    topics_df=topics_df,
    topic_embeddings=topic_embeddings,
    top_k=5,
    min_similarity=0.4,
    dropoff_ratio=0.75,
)

print("User query:", user_query)
selected_topics_df[["id", "display_name", "similarity"]]


selected_topics_df["topic_short_id"] = selected_topics_df["id"].str.replace(
    "https://openalex.org/", "",
    regex=False
)

selected_topics_df[["id", "topic_short_id", "display_name", "similarity"]]



In [0]:

# COMMAND ----------
# 8. Fetch works for selected topics via PyAlex and save to JSONL

raw_jsonl_path = DATA_DIR / "works_raw.jsonl"

if raw_jsonl_path.exists():
    raw_jsonl_path.unlink()

from_year = 2020
to_year = 2024
per_page = 200
max_pages_per_topic = 50  

total = 0
with raw_jsonl_path.open("w", encoding="utf-8") as f:
    for _, row in selected_topics_df.iterrows():
        topic_short_id = row["topic_short_id"]
        topic_name = row["display_name"]

        print(f"\n=== Fetching works for topic {topic_short_id} – {topic_name} ===")

        from_date = f"{from_year}-01-01"
        to_date = f"{to_year}-12-31"

        pager = (
            Works()
            .filter(
                topics={"id": [topic_short_id]},
                from_publication_date=from_date,
                to_publication_date=to_date,
            )
            .paginate(per_page=per_page)
        )

        page_count = 0
        topic_record_count = 0

        for page in pager:
            page_count += 1
            print(f"  → Page {page_count} ... ", end="")

            page_len = 0
            for rec in page:
                rec["source_topic_id"] = topic_short_id
                f.write(json.dumps(rec) + "\n")
                total += 1
                topic_record_count += 1
                page_len += 1

            print(f"{page_len} works")

            if page_count >= max_pages_per_topic:
                print(f"  (Reached max_pages_per_topic = {max_pages_per_topic}, stopping early)")
                break

        print(f"Completed topic {topic_short_id}: {topic_record_count} works written.")

print(f"\nTotal works written across all topics: {total}")
print(f"Saved to {raw_jsonl_path}")

In [0]:

# COMMAND ----------
# 9. Load raw works into pandas

raw_df = pd.read_json(raw_jsonl_path, lines=True)
raw_df.shape

raw_df[["id", "title", "publication_year", "cited_by_count", "source_topic_id"]].head()


In [0]:
def decode_abstract(abstract_index):
    if not isinstance(abstract_index, dict):
        return ""
    pos_word = [(pos, word) for word, positions in abstract_index.items() for pos in positions]
    return " ".join(word for _, word in sorted(pos_word))

# Decode abstract from inverted index
raw_df["abstract"] = raw_df["abstract_inverted_index"].apply(decode_abstract)

# Simple flat works table: core metadata
works_cols = [
    "id",
    "doi",
    "title",
    "abstract",  
    "publication_year",
    "publication_date",
    "cited_by_count",
    "type",
    "source_topic_id",
]

works_flat_df = raw_df[works_cols].drop_duplicates("id")
works_flat_df.head()


In [0]:

# COMMAND ----------
# 11. Explode authorships and institutions

# 11.1 Explode authorships
auth_df = raw_df[["id", "authorships"]].explode("authorships")
auth_df = auth_df.dropna(subset=["authorships"])

auth_df["author_id"] = auth_df["authorships"].apply(
    lambda a: a.get("author", {}).get("id") if isinstance(a, dict) else None
)
auth_df["author_name"] = auth_df["authorships"].apply(
    lambda a: a.get("author", {}).get("display_name") if isinstance(a, dict) else None
)
auth_df["author_position"] = auth_df["authorships"].apply(
    lambda a: a.get("author_position") if isinstance(a, dict) else None
)

# 11.2 Explode institutions within each authorship
auth_df["institutions"] = auth_df["authorships"].apply(
    lambda a: a.get("institutions", []) if isinstance(a, dict) else []
)
inst_df = auth_df.explode("institutions")
inst_df = inst_df.dropna(subset=["institutions"])

inst_df["institution_id"] = inst_df["institutions"].apply(
    lambda i: i.get("id") if isinstance(i, dict) else None
)
inst_df["institution_name"] = inst_df["institutions"].apply(
    lambda i: i.get("display_name") if isinstance(i, dict) else None
)
inst_df["institution_country_code"] = inst_df["institutions"].apply(
    lambda i: i.get("country_code") if isinstance(i, dict) else None
)

inst_df.head()


In [0]:

# COMMAND ----------
# 12. Aggregations for analysis / Power BI

# 12.1 Publications per year
pubs_per_year_df = (
    works_flat_df
    .dropna(subset=["publication_year"])
    .groupby("publication_year")
    .agg(publication_count=("id", "nunique"))
    .reset_index()
    .sort_values("publication_year")
)

pubs_per_year_df.head()

# 12.2 Publications by country
inst_with_year_df = inst_df.merge(
    works_flat_df[["id", "publication_year"]],
    on="id",
    how="left",
)

pubs_by_country_df = (
    inst_with_year_df
    .dropna(subset=["institution_country_code"])
    .groupby("institution_country_code")
    .agg(publication_count=("id", "nunique"))
    .reset_index()
    .sort_values("publication_count", ascending=False)
)

pubs_by_country_df.head()

# 12.3 Publications by institution
pubs_by_institution_df = (
    inst_df
    .dropna(subset=["institution_id"])
    .groupby(["institution_id", "institution_name"])
    .agg(publication_count=("id", "nunique"))
    .reset_index()
    .sort_values("publication_count", ascending=False)
)

pubs_by_institution_df.head()

# 12.4 Top cited works
top_cited_df = (
    works_flat_df
    .dropna(subset=["cited_by_count"])
    .sort_values("cited_by_count", ascending=False)
    .head(100)
)

top_cited_df[["title", "publication_year", "cited_by_count"]].head()

# COMMAND ----------
# 13. Export CSVs for Power BI (stored in DBFS)

pubs_per_year_df.to_csv(EXPORT_DIR / "publications_per_year.csv", index=False)
pubs_by_country_df.to_csv(EXPORT_DIR / "top_countries.csv", index=False)
pubs_by_institution_df.to_csv(EXPORT_DIR / "top_institutions.csv", index=False)
top_cited_df.to_csv(EXPORT_DIR / "top_cited.csv", index=False)
works_flat_df.to_csv(EXPORT_DIR / "works_tidy.csv", index=False)

In [0]:
# === SECTOR MAPPING ===
#sourced from pg 7 https://www.austrac.gov.au/sites/default/files/2024-07/2024%20AUSTRAC%20Money%20Laundering%20NRA.pdf
## condensed and individual keywords formed using an LLM

SECTOR_KEYWORDS = {
    "banking": [
        "major banks", "domestic banks", "foreign bank branches", "mutual banks",
        "non-bank lenders", "financial institutions", "stockbroker", "wealth management",
        "securities dealer", "custodian", "superannuation", "managed investment scheme"
    ],
    "remittance": [
        "remittance", "money transfer", "registered remittance", "unregistered remittance"
    ],
    "digital currency": [
        "cryptocurrency", "bitcoin", "ethereum", "digital asset",
        "digital currency exchange", "DCE", "digital currency", "crypto"
    ],
    "gambling": [
        "gambling", "casino", "betting agency", "corporate bookmaker",
        "pubs and clubs", "on-course bookmaker", "offshore gambling", "junket tour"
    ],
    "real estate": [
        "real estate", "property", "real estate agent"
    ],
    "bullion": [
        "bullion", "precious metals", "gold", "bullion dealer"
    ],
    "professional services": [
        "accountant", "lawyer", "legal services", "legal structure",
        "trust", "company service provider", "offshore service provider"
    ],
    "cash-intensive business": [
        "cash", "cash smuggling", "declared cash", "undeclared cash",
        "cash-intensive", "transfer of value", "store of value"
    ],
    "luxury goods": [
        "luxury goods", "luxury vehicle", "luxury watercraft"
    ],
    "customs and logistics": [
        "customs broker", "freight", "shipping"
    ]
}

def classify_sectors(df: pd.DataFrame, text_cols=["title", "abstract"]):
    def extract_sectors(text):
        matches = set()
        for sector, keywords in SECTOR_KEYWORDS.items():
            for kw in keywords:
                if kw.lower() in text.lower():
                    matches.add(sector)
        return list(matches)

    df["combined_text"] = df[text_cols].fillna("").agg(" ".join, axis=1)
    df["sectors"] = df["combined_text"].apply(extract_sectors)
    return df.drop(columns=["combined_text"])

# === HIGH-RISK AND MONITORED COUNTRIES EXTRACTION ===



from typing import List
import re

#https://www.fatf-gafi.org/content/fatf-gafi/en/publications/High-risk-and-other-monitored-jurisdictions/Call-for-action-june-2025.html
# Source: FATF "Call for Action" (June 2025)
# https://www.fatf-gafi.org/content/fatf-gafi/en/publications/High-risk-and-other-monitored-jurisdictions/Call-for-action-june-2025.html
HIGH_RISK_COUNTRIES = [
    "North Korea", "Iran", "Myanmar"
]

# Source: FATF "Jurisdictions under Increased Monitoring" (June 2025)
# https://www.fatf-gafi.org/en/publications/High-risk-and-other-monitored-jurisdictions/increased-monitoring-june-2025.html
FATF_MONITORED_COUNTRIES = [
    "Algeria", "Angola", "Bolivia", "Bulgaria", "Burkina Faso", "Cameroon",
    "Côte d'Ivoire", "Democratic Republic of the Congo", "Haiti", "Kenya",
    "Lao PDR", "Lebanon", "Monaco", "Mozambique", "Namibia", "Nepal", "Nigeria",
    "South Africa", "South Sudan", "Syria", "Venezuela", "Vietnam",
    "Virgin Islands (UK)", "Yemen"
]

# === REGIONAL & STRATEGIC PARTNERS ===
# Source: AUSTRAC strategic focus regions and collaboration

REGIONAL_GROUPS = {
    "Pacific": ["Fiji", "Samoa", "Vanuatu", "Tonga", "Papua New Guinea"],
    "Southeast Asia": ["Thailand", "Malaysia", "Vietnam", "Indonesia", "Cambodia", "Philippines"],
    "AUSTRAC Partners (Five Eyes)": ["Australia", "New Zealand", "United Kingdom", "United States", "Canada"]
}

def extract_jurisdictions(text):
    labels = set()

    for country in HIGH_RISK_COUNTRIES:
        if re.search(rf"\b{re.escape(country)}\b", text, re.IGNORECASE):
            labels.add("High-Risk: " + country)

    for country in FATF_MONITORED_COUNTRIES:
        if re.search(rf"\b{re.escape(country)}\b", text, re.IGNORECASE):
            labels.add("FATF Monitored: " + country)

    for group, members in REGIONAL_GROUPS.items():
        if any(re.search(rf"\b{re.escape(c)}\b", text, re.IGNORECASE) for c in members):
            labels.add(f"Region: {group}")

    return list(labels)

def tag_jurisdictions(df: pd.DataFrame, text_cols=["title", "abstract"]):
    df["combined_text"] = df[text_cols].fillna("").agg(" ".join, axis=1)
    df["jurisdictions"] = df["combined_text"].apply(extract_jurisdictions)
    return df.drop(columns=["combined_text"])


# === PREDICATE CRIME CLASSIFICATION ===
#sourced from pg 6 https://www.austrac.gov.au/sites/default/files/2024-07/2024%20AUSTRAC%20Money%20Laundering%20NRA.pdf


PREDICATE_CRIME_KEYWORDS = {
    "illicit drugs": ["drug trafficking", "methamphetamine", "heroin", "cocaine", "narcotics"],
    "tax and revenue crime": ["tax evasion", "tax fraud", "revenue fraud"],
    "government-funded program fraud": ["welfare fraud", "program fraud", "medicare fraud"],
    "scams": ["scam", "phishing", "online fraud", "investment scam"],
    "illicit tobacco": ["illicit tobacco", "illegal cigarettes", "tobacco smuggling"],
    "pure cybercrime": ["cybercrime", "hacking", "ransomware", "malware"],
    "identity crime": ["identity theft", "fake identity", "synthetic identity"],
    "corruption and bribery": ["corruption", "bribery", "kickback", "embezzlement"],
    "superannuation fraud": ["superannuation fraud", "pension fraud", "retirement fraud"],
    "child sexual exploitation": ["child exploitation", "child abuse material", "CEM"],
    "environmental crime": ["illegal logging", "wildlife trafficking", "environmental crime"],
    "payment fraud": ["credit card fraud", "payment fraud", "unauthorised transaction"],
    "firearms trafficking": ["arms trafficking", "illegal weapons", "gun smuggling"],
    "human trafficking": ["human trafficking", "forced labour", "modern slavery"],
    "intellectual property crime": ["counterfeit goods", "piracy", "intellectual property theft"],
}

def classify_predicate_crimes(df: pd.DataFrame, text_cols=["title", "abstract"]):
    def extract_predicates(text):
        matches = set()
        for label, keywords in PREDICATE_CRIME_KEYWORDS.items():
            for kw in keywords:
                if kw.lower() in text.lower():
                    matches.add(label)
        return list(matches)

    df["combined_text"] = df[text_cols].fillna("").agg(" ".join, axis=1)
    df["predicate_crimes"] = df["combined_text"].apply(extract_predicates)
    return df.drop(columns=["combined_text"])

raw_df = classify_sectors(raw_df)
raw_df = tag_jurisdictions(raw_df)
raw_df = classify_predicate_crimes(raw_df)


In [0]:
predicate_exploded = raw_df[["id", "predicate_crimes"]].explode("predicate_crimes").dropna()
predicate_df = (
    predicate_exploded
    .groupby("predicate_crimes")
    .agg(publication_count=("id", "nunique"))
    .reset_index()
    .sort_values("publication_count", ascending=False)
)
predicate_df.to_csv(EXPORT_DIR / "predicate_crimes.csv", index=False)

sector_exploded = raw_df[["id", "sectors"]].explode("sectors").dropna()
sector_df = (
    sector_exploded
    .groupby("sectors")
    .agg(publication_count=("id", "nunique"))
    .reset_index()
    .sort_values("publication_count", ascending=False)
)
sector_df.to_csv(EXPORT_DIR / "sectors.csv", index=False)

jurisdiction_exploded = raw_df[["id", "jurisdictions"]].explode("jurisdictions").dropna()
jurisdiction_df = (
    jurisdiction_exploded
    .groupby("jurisdictions")
    .agg(publication_count=("id", "nunique"))
    .reset_index()
    .sort_values("publication_count", ascending=False)
)
jurisdiction_df.to_csv(EXPORT_DIR / "jurisdictions.csv", index=False)

print("Extra export complete.")


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS fincrime;


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# helper to write a pandas df as a Delta table
def write_delta_from_pandas(pdf: pd.DataFrame, table_name: str, schema: str = "fincrime"):
    sdf = spark.createDataFrame(pdf)
    full_name = f"{schema}.{table_name}"
    (
        sdf.write
        .mode("overwrite")
        .format("delta")
        .saveAsTable(full_name)
    )
    print(f"Written Delta table: {full_name}")

# core bibliometrics
write_delta_from_pandas(pubs_per_year_df,       "publications_per_year")
write_delta_from_pandas(pubs_by_country_df,     "top_countries")
write_delta_from_pandas(pubs_by_institution_df, "top_institutions")
write_delta_from_pandas(top_cited_df,           "top_cited")
write_delta_from_pandas(works_flat_df,          "works_tidy")

# AUSTRAC-style tags
write_delta_from_pandas(predicate_df,    "predicate_crimes")
write_delta_from_pandas(sector_df,       "sectors")
write_delta_from_pandas(jurisdiction_df, "jurisdictions")


In [0]:
# Bridge tables (relational)
predicate_exploded.rename(columns={"predicate_crimes": "predicate_crime"}, inplace=True)
sector_exploded.rename(columns={"sectors": "sector"}, inplace=True)
jurisdiction_exploded.rename(columns={"jurisdictions": "jurisdiction"}, inplace=True)

write_delta_from_pandas(works_flat_df,      "fact_works")                
write_delta_from_pandas(predicate_exploded, "bridge_work_predicate")
write_delta_from_pandas(sector_exploded,    "bridge_work_sector")
write_delta_from_pandas(jurisdiction_exploded, "bridge_work_jurisdiction")
write_delta_from_pandas(inst_df,            "bridge_work_institution")
