In [None]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from neo4j import Query, GraphDatabase, RoutingControl, Result
import time

In [None]:
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

## Setup Spark

In [None]:
env_file = '.env'

In [None]:
if os.path.exists(env_file):
    load_dotenv(env_file, override=True)

    # Neo4j
    NEO4J_URI = os.getenv('NEO4J_URI')
    NEO4J_USERNAME = os.getenv('NEO4J_USERNAME')
    NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')
    NEO4J_DATABASE = os.getenv('NEO4J_DATABASE')

    # Files 
    PSC_FILE = os.getenv('PSC_FILE')
else:
    print(f"File {env_file} not found.")

In [None]:
NEO4J_CONNECTOR = "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.10_for_spark_3"

spark = (
    SparkSession.builder
    .appName("CompanyHouse")
    .master("local[*]")
    .config("spark.jars.packages", NEO4J_CONNECTOR)
    .config("url", NEO4J_URI)
    .config("neo4j.url", NEO4J_URI)
    .config("neo4j.authentication.basic.username", NEO4J_USERNAME)
    .config("neo4j.authentication.basic.password", NEO4J_PASSWORD)
    .config("neo4j.database", NEO4J_DATABASE)
    .config("spark.driver.memory", "8g")
    .config("spark.driver.maxResultSize", "2g")
    .getOrCreate()
)

In [None]:
spark

In [None]:
jvm = spark.sparkContext._jvm
heap = (jvm.java.lang.management.ManagementFactory
        .getMemoryMXBean()
        .getHeapMemoryUsage())

print("Used (MB):", heap.getUsed() / 1024 / 1024)
print("Committed (MB):", heap.getCommitted() / 1024 / 1024)
print("Max (MB):", heap.getMax() / 1024 / 1024)

In [None]:
spark.sparkContext._jvm.scala.util.Properties.versionString()

In [None]:
spark.sparkContext.getConf().get("spark.jars.packages")

## Read Data

In [None]:
psc_raw = (
    spark.read
    .option("multiLine", "false")   # JSON Lines
    .option("mode", "PERMISSIVE")   # keep going if some lines are odd
    .json(PSC_FILE)
)

In [None]:
psc_raw.count()

In [None]:
psc_raw.printSchema()

In [None]:
psc_raw.groupBy(F.col('data.kind')).count().toPandas()

Format data and drop ceased relations

In [None]:
psc_df = (
    psc_raw
    .select(F.col("company_number"), F.col("data.*"))
    .withColumn("ceased", F.when(F.col("ceased_on").isNotNull(), F.lit(True)).otherwise(F.col("ceased")))
    .filter((F.col("ceased") == False) | F.col("ceased").isNull())
)

In [None]:
psc_df.groupBy(F.col('kind')).count().toPandas()

In [None]:
psc_df.withColumn('nature_of_control', F.explode(F.col('natures_of_control'))).groupBy(F.col('nature_of_control')).count().toPandas()

### Individuals

In [None]:
individuals_psc_df = (
    psc_df
    .filter(
        (F.col('data.kind') == 'individual-person-with-significant-control') | 
        (F.col('data.kind') == 'individual-beneficial-owner') 
    )
    .withColumn('forename', F.col('name_elements.forename'))
    .withColumn('middle_name', F.col('name_elements.middle_name'))
    .withColumn('surname', F.col('name_elements.surname'))
    .withColumn('title', F.col('name_elements.title'))
    .withColumn('year_of_birth', F.col('date_of_birth.year'))
    .withColumn('month_of_birth', F.col('date_of_birth.month'))
    
    .withColumn("person_id", F.sha2(F.concat_ws("||", F.col("forename"), F.col('middle_name'), F.col('surname'), F.col('title'), F.col('year_of_birth'), F.col('month_of_birth')), 256))
    .select('company_number', 'person_id', 'forename', 'middle_name', 'surname', 'title', 'year_of_birth', 'month_of_birth', 'nationality', 'is_sanctioned', 'natures_of_control','notified_on') 
)

Parse natures of control to only keep ownership relations extract percentages out.

In [None]:
s = F.col("ownership_control")

n1 = F.regexp_extract(s, r"(\d+)", 1)                    
n2 = F.regexp_extract(s, r"\d+.*?(\d+)", 1)    

individuals_ownership_df = (
    individuals_psc_df
    .withColumn(
        "ownership_controls",
        F.expr(f"filter(natures_of_control, x -> lower(x) like '%ownership%')")
    )
    .filter(F.size("ownership_controls") > 0)
    .withColumn("ownership_control", F.explode("ownership_controls"))
    .withColumn(
        "pct_min",
        F.when(s.rlike(r"\d+-to-\d+-percent"), n1.cast("int"))
         .when(s.rlike(r"more-than-\d+-percent"), (n1.cast("int")))
         .when(s.rlike(r"\d+-or-more-percent"), n1.cast("int"))
         .when(s.rlike(r"\d+-percent"), n1.cast("int"))
    )
    .withColumn(
        "pct_max",
        F.when(s.rlike(r"\d+-to-\d+-percent"), n2.cast("int"))
         .when(s.rlike(r"more-than-\d+-percent"), F.lit(100))
         .when(s.rlike(r"\d+-or-more-percent"), F.lit(100))
         .when(s.rlike(r"\d+-percent"), n1.cast("int"))
    )
    .groupBy('company_number', 'person_id', 'forename', 'middle_name', 'surname', 'title', 'year_of_birth', 'month_of_birth', 'nationality', 'is_sanctioned','notified_on')
    .agg(
        F.collect_set("ownership_control").alias("ownership_controls"),
        F.min("pct_min").alias("ownership_pct_min"),
        F.max("pct_max").alias("ownership_pct_max"),
    )
)

Add whether Individual got sanctioned or not.

In [None]:
w = Window.partitionBy("person_id").orderBy(
    F.when(F.col("is_sanctioned") == True, 2)
     .when(F.col("is_sanctioned").isNull(), 1)
     .otherwise(0)
     .desc()
)

individuals_df = (
    individuals_ownership_df.withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

In [None]:
individuals_df = (
    individuals_df
    .select('person_id', 'forename', 'middle_name', 'surname', 'title', 'year_of_birth' ,'month_of_birth', 'nationality', 'is_sanctioned') 
    .distinct()
)

In [None]:
individuals_df.count()

In [None]:
individual_ownership_df = (
    individuals_ownership_df
    .select('company_number', 'person_id', 'ownership_controls', 'notified_on', 'ownership_pct_min', 'ownership_pct_max' )
    .distinct()
)

In [None]:
individual_ownership_df.count()

### Legal Entities

In [None]:
legal_entities_psc_df = (
    psc_df
    .filter(
        (F.col('data.kind') == 'legal-person-person-with-significant-control') | 
        (F.col('data.kind') == 'legal-person-beneficial-owner') 
    )
    .select('company_number', 'name', 'is_sanctioned', 'natures_of_control', 'notified_on')
)

Maybe do something with addresses! 

In [None]:
# legal_entities_addresses_df = (
#     psc_df
#     .filter(
#         (F.col('data.kind') == 'legal-person-person-with-significant-control') | 
#         (F.col('data.kind') == 'legal-person-beneficial-owner') 
#     )
#     .select('name', 'principal_office_address')
# )

Parse natures of control to only keep ownership relations extract percentages out.

In [None]:
s = F.col("ownership_control")

n1 = F.regexp_extract(s, r"(\d+)", 1)                    
n2 = F.regexp_extract(s, r"\d+.*?(\d+)", 1)    

legal_entities_ownership_df = (
    legal_entities_psc_df
    .withColumn(
        "ownership_controls",
        F.expr(f"filter(natures_of_control, x -> lower(x) like '%ownership%')")
    )
    .filter(F.size("ownership_controls") > 0)
    .withColumn("ownership_control", F.explode("ownership_controls"))
    .withColumn(
        "pct_min",
        F.when(s.rlike(r"\d+-to-\d+-percent"), n1.cast("int"))
         .when(s.rlike(r"more-than-\d+-percent"), (n1.cast("int")))
         .when(s.rlike(r"\d+-or-more-percent"), n1.cast("int"))
         .when(s.rlike(r"\d+-percent"), n1.cast("int"))
    )
    .withColumn(
        "pct_max",
        F.when(s.rlike(r"\d+-to-\d+-percent"), n2.cast("int"))
         .when(s.rlike(r"more-than-\d+-percent"), F.lit(100))
         .when(s.rlike(r"\d+-or-more-percent"), F.lit(100))
         .when(s.rlike(r"\d+-percent"), n1.cast("int"))
    )
    .groupBy("company_number", "name", "notified_on", "is_sanctioned")
    .agg(
        F.collect_set("ownership_control").alias("ownership_controls"),
        F.min("pct_min").alias("ownership_pct_min"),
        F.max("pct_max").alias("ownership_pct_max"),
    )
)

Add information if Legal Entity is sanctioned. 

In [None]:
w = Window.partitionBy("name").orderBy(
    F.when(F.col("is_sanctioned") == True, 2)
     .when(F.col("is_sanctioned").isNull(), 1)
     .otherwise(0)
     .desc()
)

legal_entities_df = (
    legal_entities_ownership_df
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

In [None]:
legal_entities_df = (
    legal_entities_df
    .select('name', 'is_sanctioned')
    .distinct()
)

In [None]:
legal_entities_ownership_df = (
    legal_entities_ownership_df
    .select('company_number', 'name', 'ownership_controls', 'notified_on', 'ownership_pct_min', 'ownership_pct_max')
    .distinct()
)

In [None]:
legal_entities_ownership_df.limit(5).toPandas()

### Corporate

In [None]:
corporate_psc_df = (
    psc_df
    .filter(
        (F.col('data.kind') == 'corporate-entity-person-with-significant-control') | 
        (F.col('data.kind') == 'corporate-entity-beneficial-owner') 
    )
    .withColumn('registration_number', F.col('identification.registration_number'))
    .select('company_number', 'name', 'natures_of_control', 'notified_on', 'registration_number')
    # for now only select known registration companies
    .filter(F.col('registration_number').isNotNull())
)

Parse natures of control to only keep ownership relations extract percentages out.

In [None]:
s = F.col("ownership_control")

n1 = F.regexp_extract(s, r"(\d+)", 1)                    
n2 = F.regexp_extract(s, r"\d+.*?(\d+)", 1)    

corporate_psc_ownership_df = (
    corporate_psc_df
    .withColumn(
        "ownership_controls",
        F.expr(f"filter(natures_of_control, x -> lower(x) like '%ownership%')")
    )
    .filter(F.size("ownership_controls") > 0)
    .withColumn("ownership_control", F.explode("ownership_controls"))
    .withColumn(
        "pct_min",
        F.when(s.rlike(r"\d+-to-\d+-percent"), n1.cast("int"))
         .when(s.rlike(r"more-than-\d+-percent"), (n1.cast("int")))
         .when(s.rlike(r"\d+-or-more-percent"), n1.cast("int"))
         .when(s.rlike(r"\d+-percent"), n1.cast("int"))
    )
    .withColumn(
        "pct_max",
        F.when(s.rlike(r"\d+-to-\d+-percent"), n2.cast("int"))
         .when(s.rlike(r"more-than-\d+-percent"), F.lit(100))
         .when(s.rlike(r"\d+-or-more-percent"), F.lit(100))
         .when(s.rlike(r"\d+-percent"), n1.cast("int"))
    )
    .groupBy("company_number", "registration_number", "notified_on")
    .agg(
        F.collect_set("ownership_control").alias("ownership_controls"),
        F.min("pct_min").alias("ownership_pct_min"),
        F.max("pct_max").alias("ownership_pct_max"),
    )
)

Maybe do something with non registered corporates

In [None]:
# non_existing_corporate = (
#     corporate_psc_df.filter(F.col('registration_number').isNull()
# )

In [None]:
corporate_psc_ownership_df = (
    corporate_psc_ownership_df
    .select('company_number', 'ownership_controls', 'notified_on', 'registration_number', 'ownership_pct_min', 'ownership_pct_max')
    .distinct()
)

## Load data

### Connection to Neo4j

In [None]:
t1 = time.time()
(
    individuals_df
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Individual")
    .option("node.keys", "person_id")
    .option("batch.size", "1000")
    .save()
)
t2 = time.time()
t2-t1

In [None]:
t1 = time.time()
(
    legal_entities_df
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":LegalEntity")
    .option("node.keys", "name")
    .option("batch.size", "1000")
    .save()
)
t2 = time.time()
t2-t1 

In [None]:
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))

In [None]:
driver.execute_query(
    """
    MATCH (n) RETURN COUNT(n) as Count
    """,
    database_=NEO4J_DATABASE,
    routing_=RoutingControl.READ,
    result_transformer_= lambda r: r.to_df()
)

In [None]:
driver.execute_query(
    """
        CREATE CONSTRAINT individual_id IF NOT EXISTS FOR (i:Individual) REQUIRE i.person_id IS UNIQUE;
    """,
    database_=NEO4J_DATABASE,
    routing_=RoutingControl.WRITE,
    result_transformer_= lambda r: r.to_df()
)

In [None]:
driver.execute_query(
    """
        CREATE CONSTRAINT legalentity_name IF NOT EXISTS FOR (l:LegalEntity) REQUIRE l.name IS UNIQUE;
    """,
    database_=NEO4J_DATABASE,
    routing_=RoutingControl.WRITE,
    result_transformer_= lambda r: r.to_df()
)

In [None]:
driver.execute_query(
    """
    SHOW CONSTRAINTS
    """,
    database_=NEO4J_DATABASE,
    routing_=RoutingControl.READ,
    result_transformer_= lambda r: r.to_df()
)

In [None]:
t1 = time.time()
(
  individual_ownership_df
    .repartition(1)
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "OWNS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Individual")
    .option("relationship.source.node.keys", "person_id:person_id")
    .option("relationship.target.labels", ":Company")
    .option("relationship.target.node.keys", "company_number:company_number")
    .option("batch.size", "10000")
    .save()
)
t2 = time.time()
t2-t1

In [None]:
t1 = time.time()
(
  legal_entities_ownership_df
    .repartition(1)
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "OWNS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":LegalEntity")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Company")
    .option("relationship.target.node.keys", "company_number:company_number")
    .option("batch.size", "1000")
    .save()
)
t2 = time.time()
t2-t1

In [None]:
t1 = time.time()
(
  corporate_psc_ownership_df
    .repartition(1)
    .write
    .format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "OWNS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Company")
    .option("relationship.source.node.keys", "registration_number:company_number")
    .option("relationship.target.labels", ":Company")
    .option("relationship.target.node.keys", "company_number:company_number")
    .option("batch.size", "1000")
    .save()
)
t2 = time.time()
t2-t1