In [1]:
import os
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import yaml

import graphframes as gf
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column

while not Path("data") in Path(".").iterdir():
    os.chdir("..")

plt.style.use("seaborn-white")
conf_dict = yaml.safe_load(Path("config/conf.yaml").read_text())

checkpoint_dir = str(Path("spark-checkpoints").absolute())
graphframes_jar_path = str(
    Path(
        ".venv/lib/python3.9/site-packages/pyspark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar"
    ).absolute()
)

spark_conf = (
    SparkConf()
    .set("spark.jars", graphframes_jar_path)
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
)

sc = SparkContext(conf=spark_conf).getOrCreate()
sc.setCheckpointDir(checkpoint_dir)
sc.setLogLevel("ERROR")

spark = SparkSession.builder.config("spark.driver.memory", "8g").getOrCreate()

22/06/25 19:05:29 WARN Utils: Your hostname, domvwt-XPS-13-9305 resolves to a loopback address: 127.0.1.1; using 192.168.0.24 instead (on interface wlp164s0)
22/06/25 19:05:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/06/25 19:05:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/25 19:05:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/25 19:05:30 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
import src.dataprep as dp

In [4]:
conf_dict

{'ownership_data_raw': 'data/raw/open-ownership-data.jsonl',
 'companies_house_data_raw': 'data/raw/BasicCompanyDataAsOneFile-2022-05-01.csv',
 'companies_interim': 'data/interim/companies.parquet',
 'relationships_interim': 'data/interim/relationships.parquet',
 'persons_interim': 'data/interim/persons.parquet',
 'addresses_interim': 'data/interim/addresses.parquet',
 'companies_house_interim': 'data/interim/companies-info.parquet',
 'companies_processed': 'data/processed/companies.parquet',
 'relationships_processed': 'data/processed/relationships.parquet',
 'persons_processed': 'data/processed/persons.parquet',
 'addresses_processed': 'data/processed/addresses.parquet',
 'nodes': 'data/graph/nodes.parquet',
 'edges': 'data/graph/edges.parquet',
 'connected_components': 'data/graph/connected-components.parquet'}

In [5]:
persons_interim_df = spark.read.parquet(conf_dict["persons_interim"])
companies_processed_df = spark.read.parquet(conf_dict["companies_processed"])
relationships_processed_df = spark.read.parquet(conf_dict["relationships_processed"])
persons_processed_df = spark.read.parquet(conf_dict["persons_processed"])
nodes_filtered_df = spark.read.parquet("data/graph/component-nodes.parquet")
edges_filtered_df = spark.read.parquet("data/graph/component-edges.parquet")

In [12]:
dp.process_persons(persons_interim_df).show(5)

+--------------------+----------+---------------+----------+------------+--------------------+--------------------------+---------------+---------+-----------------+----+--------------------+--------------------+-----------+------+-------------+--------------------+---------------+-------+--------------------+---------------------+
|           addresses| birthDate|dissolutionDate|entityType|foundingDate|         identifiers|incorporatedInJurisdiction|interestedParty|interests|missingInfoReason|name|               names|       nationalities| personType|source|statementDate|         statementID|  statementType|subject|                name|nationalities[0].code|
+--------------------+----------+---------------+----------+------------+--------------------+--------------------------+---------------+---------+-----------------+----+--------------------+--------------------+-----------+------+-------------+--------------------+---------------+-------+--------------------+---------------------

[autoreload of src.dataprep failed: Traceback (most recent call last):
  File "/home/domvwt/projects/msc-thesis/.venv/lib/python3.9/site-packages/IPython/extensions/autoreload.py", line 257, in check
    superreload(m, reload, self.old_objects)
  File "/home/domvwt/projects/msc-thesis/.venv/lib/python3.9/site-packages/IPython/extensions/autoreload.py", line 455, in superreload
    module = reload(module)
  File "/home/domvwt/.pyenv/versions/3.9.12/lib/python3.9/importlib/__init__.py", line 169, in reload
    _bootstrap._exec(spec, module)
  File "<frozen importlib._bootstrap>", line 613, in _exec
  File "<frozen importlib._bootstrap_external>", line 846, in exec_module
  File "<frozen importlib._bootstrap_external>", line 983, in get_code
  File "<frozen importlib._bootstrap_external>", line 913, in source_to_code
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/home/domvwt/projects/msc-thesis/src/dataprep.py", line 192
    F.col("names").getItem(

In [None]:
print(f"Node count: {nodes_filtered_df.count():,}")
print(f"Edge count: {edges_filtered_df.count():,}")

Node count: 205,984
Edge count: 204,644


In [None]:
nodes_filtered_df.groupBy("isCompany").count().show()

+---------+------+
|isCompany| count|
+---------+------+
|     true|171748|
|    false| 34236|
+---------+------+



### Neo4j

In [None]:
import textwrap as tw
from pyspark.sql import DataFrame


def sanitise_string(column: str) -> Column:
    """Replace single quotes with double for neo4j import compatibility."""
    return F.regexp_replace(
        F.regexp_replace(F.col(column), pattern='"', replacement="'"),
        pattern="'",
        replacement="''",
    )


def convert_to_csv(df: DataFrame, output_name: str) -> None:
    csv_path = f"data/neo4j/{output_name}.csv"
    parquet_csv_path = f"{csv_path}.parquet"
    string_cols_sanitised = [
        sanitise_string(col[0]).alias(col[0]) if col[1] == "string" else col[0]
        for col in df.dtypes
    ]
    sanitised_df = df.select(*string_cols_sanitised)
    sanitised_df.coalesce(1).write.csv(
        parquet_csv_path, header=False, escape="", mode="overwrite"
    )
    csv_txt_path = next(Path.cwd().glob(f"{parquet_csv_path}/part-00000*.csv"))
    Path(csv_txt_path).rename(csv_path)


def make_neo4j_statement(df: DataFrame, name: str, entity_type: str) -> None:
    acc = []

    for col in df.columns:
        acc.append(f"{col}: row.{col}")

    statement = f"""\
        :auto USING PERIODIC COMMIT LOAD CSV WITH HEADERS FROM "file:///{name}.csv" AS row
        CREATE (:{entity_type.capitalize()} """
    statement += "{" + ", ".join(acc) + "});"
    statement = tw.dedent(statement)
    print(statement)
    print()

In [None]:
def join_graph_features_to_entities(
    entity_df: DataFrame, nodes_df: DataFrame
) -> DataFrame:
    nodes_df = nodes_df.drop("isCompany").withColumnRenamed("id", "statementID")
    return entity_df.join(nodes_df, on=["statementID"], how="inner")

In [None]:
nodes_filtered_df.filter(~F.col("isCompany")).show()

+--------------------+---------+------------+--------+---------+-------------+--------+
|                  id|isCompany|   component|inDegree|outDegree|triangleCount|pagerank|
+--------------------+---------+------------+--------+---------+-------------+--------+
| 1034841021205811543|    false| 34359784699|       0|        1|            0|       0|
|10441516196839861294|    false| 68719539121|       0|       13|            0|       0|
|10479431246556618495|    false|429496771468|       0|       11|            0|       0|
|10519184071906386747|    false| 34359785157|       0|        2|            0|       0|
|10599056856969872098|    false|566935730925|       0|        3|            0|       0|
|10611275659210887798|    false| 51539614510|       0|        1|            0|       0|
|10638019556212718020|    false|214748408114|       0|        3|            0|       0|
|10946430016329891743|    false|  8589971433|       0|        1|            0|       0|
|11019066695878997252|    false|

In [None]:
companies_filtered_df = join_graph_features_to_entities(
    companies_processed_df, nodes_filtered_df
)
persons_filtered_df = join_graph_features_to_entities(
    persons_processed_df, nodes_filtered_df
)

# TODO

- Connect company information back to nodes df to make companies nodes df
- Connect person information back to nodes df to make persons nodes df


In [None]:
convert_to_csv(companies_filtered_df, "companies")
convert_to_csv(persons_filtered_df, "persons")
convert_to_csv(edges_filtered_df, "relationships")

                                                                                

In [None]:
make_neo4j_statement(companies_filtered_df, "companies", "company")
make_neo4j_statement(persons_filtered_df, "persons", "person")

:auto USING PERIODIC COMMIT LOAD CSV WITH HEADERS FROM "file:///companies.csv" AS row
CREATE (:Company {statementID: row.statementID, name: row.name, foundingDate: row.foundingDate, dissolutionDate: row.dissolutionDate, countryCode: row.countryCode, companiesHouseID: row.companiesHouseID, openCorporatesID: row.openCorporatesID, openOwnershipRegisterID: row.openOwnershipRegisterID, CompanyCategory: row.CompanyCategory, CompanyStatus: row.CompanyStatus, Accounts_AccountCategory: row.Accounts_AccountCategory, SICCode_SicText_1: row.SICCode_SicText_1, component: row.component, inDegree: row.inDegree, outDegree: row.outDegree, triangleCount: row.triangleCount, pagerank: row.pagerank});

:auto USING PERIODIC COMMIT LOAD CSV WITH HEADERS FROM "file:///persons.csv" AS row
CREATE (:Person {statementID: row.statementID, birthDate: row.birthDate, nationality: row.nationality, component: row.component, inDegree: row.inDegree, outDegree: row.outDegree, triangleCount: row.triangleCount, pagerank: ro

In [None]:
persons_filtered_df.printSchema()

root
 |-- statementID: string (nullable = true)
 |-- birthDate: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- component: long (nullable = true)
 |-- inDegree: integer (nullable = true)
 |-- outDegree: integer (nullable = true)
 |-- triangleCount: long (nullable = true)
 |-- pagerank: long (nullable = true)



In [None]:
edges_filtered_df.printSchema()

root
 |-- src: string (nullable = true)
 |-- interestedPartyIsPerson: boolean (nullable = true)
 |-- dst: string (nullable = true)
 |-- minimumShare: double (nullable = true)



```cypher
:auto USING PERIODIC COMMIT LOAD CSV WITH HEADERS FROM "file:///relationships.csv" AS row
MATCH
  (a:Person),
  (b:Company)
WHERE a.statementID = row.interestedPartyStatementID AND b.statementID = row.subjectStatementID
CREATE (a)-[r:Ownership {minimumShare: row.minimumShare}]->(b);

:auto USING PERIODIC COMMIT LOAD CSV WITH HEADERS FROM "file:///relationships.csv" AS row
MATCH
  (a:Company),
  (b:Company)
WHERE a.statementID = row.interestedPartyStatementID AND b.statementID = row.subjectStatementID
CREATE (a)-[r:Ownership {minimumShare: row.minimumShare}]->(b)
```