In [None]:
from maplib import Mapping
import polars as pl
pl.Config.set_fmt_str_lengths(300)
from datetime import datetime

In [None]:
pan = "https://github.com/DataTreehouse/maplib_workshop/pan#"

### There are three date formats, so we have to do some work.. 

In [None]:
date_format_1 = "%d-%b-%Y"
date_format_2 = "%b %d, %Y" 
date_format_3 = "%Y-%m-%d"
date_formats = [date_format_1, date_format_2, date_format_3]

In [None]:
def parse_dates(lf, colname, check=False):
    new_cols = []
    for (i,date_format) in enumerate(date_formats):
        new_col = f"{colname}{i}"
        new_cols.append(new_col)
        lf = lf.with_columns(
            pl.col(colname).str.to_date(format=date_format, strict=False).alias(new_col)
        )
    lf = lf.with_columns(
        pl.coalesce(new_cols).alias(colname + "_new")
    ).drop(new_cols)
    if check:
        df = lf.collect()
        df = df.filter(pl.col(colname + "_new").is_null() & ~(pl.col(colname).is_null()))
        if df.height > 0:
            print("Unparsed dates:")
            print(df[colname])
            assert False
    lf = lf.drop(colname).with_columns(pl.col(colname + "_new").alias(colname)).drop(colname + "_new")
    return lf

### More utility functions for data preparation

In [None]:
def split_to_list_column(lf, colname, newname):
    lf = lf.with_columns(pl.col(colname).str.split(";").alias(newname)).drop(colname)
    return lf


In [None]:
def create_node_id_uri(lf, node_id_col):
    lf = lf.with_columns(
        ("https://github.com/DataTreehouse/maplib_workshop/node_ids#" + pl.col(node_id_col).cast(pl.Utf8)).alias(node_id_col))
    return lf

In [None]:
def clean_string(lf, col):
    lf = lf.with_columns(
        pl.col(col).
        str.replace_all("\"", "", literal=True).
        str.replace_all("\n", " ", literal=True).
        str.replace_all("\\", "", literal=True).
        str.replace_all("\uFFFD", "", literal=True).
        str.replace_all("%", "", literal=True)
    )
    return lf

### We can read and prepare the (legal) entities

In [None]:
entities_lf = pl.scan_parquet("offshoreleaks/nodes-entities.parquet")
for date_col in ["incorporation_date", "inactivation_date", "struck_off_date", "dorm_date"]:
    entities_lf = parse_dates(entities_lf, date_col, check=False)
entities_lf = create_node_id_uri(entities_lf, "node_id")
entities_lf = split_to_list_column(entities_lf, "countries", "country")
entities_lf = clean_string(entities_lf, "name")
entities_lf = entities_lf.select([
    "node_id", "name", "incorporation_date", "inactivation_date", "struck_off_date", "status", "country", "service_provider"])

In [None]:
entities_df = entities_lf.collect()
entities_df.head(5)

### We read and prepare the addresses of individuals and organizations

In [None]:
addresses_lf = pl.scan_parquet("offshoreleaks/nodes-addresses.parquet")
addresses_lf = create_node_id_uri(addresses_lf, "node_id")
addresses_lf = split_to_list_column(addresses_lf, "countries", "country")
addresses_lf = addresses_lf.drop_nulls("address")
addresses_lf = clean_string(addresses_lf, "address")
addresses_lf = addresses_lf.select(["node_id", "address", "country"])

In [None]:
addresses_df = addresses_lf.collect()
addresses_df.head(5)

### Read and prepare table of intermediaries

In [None]:
intermediaries_lf = pl.scan_parquet("offshoreleaks/nodes-intermediaries.parquet")
intermediaries_lf = create_node_id_uri(intermediaries_lf, "node_id")
intermediaries_lf = split_to_list_column(intermediaries_lf, "countries", "country")
intermediaries_lf = clean_string(intermediaries_lf, "name")
intermediaries_lf = intermediaries_lf.select(["node_id", "name", "status", "country"])

In [None]:
intermediaries_df = intermediaries_lf.collect()
intermediaries_df.head(5)

### Read and prepare the table of officers, e.g. CFOs, CEOs

In [None]:
officers_lf = pl.scan_parquet("offshoreleaks/nodes-officers.parquet")
officers_lf = split_to_list_column(officers_lf, "countries", "country")
officers_lf = create_node_id_uri(officers_lf, "node_id")
officers_lf = clean_string(officers_lf, "name")
officers_lf = officers_lf.select(["node_id", "name", "country"])

In [None]:
officers_df = officers_lf.collect()
officers_df.head(5)

### There are a few other nodes

In [None]:
others_lf = pl.scan_csv("offshoreleaks/nodes-others.csv", dtypes={"internal_id":pl.Utf8})
others_lf = split_to_list_column(others_lf, "countries", "country")
others_lf = create_node_id_uri(others_lf, "node_id")
others_lf = clean_string(others_lf,"name")
others_lf = others_lf.select(["node_id", "name", "country"])

In [None]:
others_df = others_lf.collect()
others_df.head(5)

### Now we prepare the relationships between nodes, these require quite a bit of cleaning up. 

In [None]:
relationships_lf = pl.scan_parquet("offshoreleaks/relationships.parquet")
relationships_lf = create_node_id_uri(relationships_lf, "node_id_start")
relationships_lf = create_node_id_uri(relationships_lf, "node_id_end")
relationships_lf = relationships_lf.with_columns(
    pl.col("link").
    str.replace_all(" / ", ",", literal=True).
    str.replace_all("/", ",", literal=True).
    str.replace_all(" - ", ",", literal=True).
    str.replace_all(" & ", ",", literal=True).
    str.replace_all(";", ",", literal=True)
)
relationships_lf = clean_string(relationships_lf, "link").with_columns(
    pl.col("link").str.split(",")).explode("link")
relationships_lf = relationships_lf.with_columns(
    pl.col("link").
    str.strip().
    str.to_lowercase().
    str.replace_all(" ", "_", literal=True).
    str.replace_all("[\W\.\n0-9]", "", literal=False)
)
relationships_lf = relationships_lf.with_columns(
    pl.when(pl.col("link").str.n_chars() < 2).then("unknown").otherwise(pl.col("link"))
)
relationships_lf = relationships_lf.with_columns(
    (pan + pl.col("rel_type")).alias("rel_type"),
    (pan + pl.col("link")).alias("link")
)
relationships_lf = relationships_lf.select(["node_id_start", "node_id_end", "link", "rel_type"])

In [None]:
relationships_df = relationships_lf.collect()
relationships_df.head(5)

### We define templates to instantiate triples using stOTTR, which is terse syntax for OTTR

In [None]:
doc = """
@prefix pan:<https://github.com/DataTreehouse/maplib_workshop/pan#>.
@prefix tpl:<https://github.com/DataTreehouse/maplib_workshop/templates#>.
@prefix xsd:<http://www.w3.org/2001/XMLSchema#>.

tpl:type_labels [ ] :: {
  ottr:Triple(pan:Entity,rdfs:label,"Entity"),
  ottr:Triple(pan:Address,rdfs:label,"Address"),
  ottr:Triple(pan:Intermediary,rdfs:label,"Intermediary"),
  ottr:Triple(pan:Officer,rdfs:label,"Officer"),
  ottr:Triple(pan:Other,rdfs:label,"Other"),
} .

tpl:entities [ xsd:anyURI ?node_id, ??name, ??incorporation_date, 
               ??inactivation_date, ??struck_off_date, ??status, 
               ??country, ??service_provider ] :: {
  tpl:named_node(?node_id, ?name, pan:Entity),
  ottr:Triple(?node_id,pan:incorporation_date,?incorporation_date) ,
  ottr:Triple(?node_id,pan:inactivation_date,?inactivation_date) ,
  ottr:Triple(?node_id,pan:struck_off_date,?struck_off_date) ,
  ottr:Triple(?node_id,pan:status,?status) ,
  tpl:country(?node_id, ?country) ,
  ottr:Triple(?node_id,pan:service_provider,?service_provider)
} . 

tpl:addresses [ xsd:anyURI ?node_id, ?address, ??country ] :: {
  tpl:named_node(?node_id, ?address, pan:Address),
  ottr:Triple(?node_id, rdfs:label, ?address),
  ottr:Triple(?node_id, pan:address, ?address),
  tpl:country(?node_id, ?country)
} . 

tpl:intermediaries [ xsd:anyURI ?node_id, ??name, ??status, ??country ] :: {
  tpl:named_node(?node_id, ?name, pan:Intermediary),
  ottr:Triple(?node_id,pan:status,?status) ,
  tpl:country(?node_id, ?country)
} . 

tpl:officers [ xsd:anyURI ?node_id, ??name, ??country ] :: {
  tpl:named_node(?node_id, ?name, pan:Officer),
  tpl:country(?node_id, ?country)
} . 

tpl:others [ xsd:anyURI ?node_id, ??name, ??country ] :: {
  tpl:named_node(?node_id, ?name, pan:Other),
  tpl:country(?node_id, ?country),
} . 

tpl:relationships [xsd:anyURI ?node_id_start, xsd:anyURI ?node_id_end, ?rel_type] :: {
    ottr:Triple(?node_id_start, ?rel_type, ?node_id_end),
} .

tpl:specific_relationships [xsd:anyURI ?node_id_start, xsd:anyURI ?node_id_end, ?link] :: {
    ottr:Triple(?node_id_start, ?link, ?node_id_end),
} .

tpl:named_node [ ?node_id, ?name, ?type ] :: {
  tpl:node(?node_id, ?type),
  ottr:Triple(?node_id, rdfs:label, ?name),
} .

tpl:node [?node_id, ?type] :: {
    ottr:Triple(?node_id, a, pan:Node),
    ottr:Triple(?node_id, a, ?type),
} . 

tpl:country [?node_id, ?country] :: {
  cross | ottr:Triple(?node_id, pan:country, ++?country)
} .
"""

### We create a mapping object (a graph) based on our templates.

In [None]:
m = Mapping([doc])

### We can now expand the templates with the tables / DataFrames

In [None]:
m.expand("tpl:type_labels")
m.expand("tpl:entities", entities_df, ["node_id"])
m.expand("tpl:addresses", addresses_df, ["node_id"])
m.expand("tpl:intermediaries", intermediaries_df, ["node_id"])
m.expand("tpl:officers", officers_df, ["node_id"])
m.expand("tpl:others", others_df, ["node_id"])

In [None]:
relcols=["node_id_start", "node_id_end", "rel_type"]
m.expand("tpl:relationships", relationships_df[relcols], relcols)

In [None]:
relcols=["node_id_start", "node_id_end", "link"]
m.expand("tpl:specific_relationships", relationships_df[relcols].drop_nulls("link"), relcols)

### We have a look at the different relationship types

In [None]:
relationships_df["rel_type"].unique()

### We are immediately able to query the graph

In [None]:
df = m.query("""
PREFIX pan:<https://github.com/DataTreehouse/maplib_workshop/pan#>
PREFIX rdfs:<http://www.w3.org/2000/01/rdf-schema#>
SELECT ?oname ?cname WHERE {
    ?officer a pan:Officer .
    ?officer rdfs:label ?oname .
    ?officer pan:officer_of ?c .
    ?c rdfs:label ?cname .
    ?officer pan:country "Norway" .
}
""")
df

### Query results are again DataFrames, which we can manipulate further. 

In [None]:
df.filter(pl.col("cname").str.to_lowercase().str.contains("wilh"))

### Let's find all Norwegian officers, ordered by the number of companies they are officers of (descending)

In [None]:
m.query("""
PREFIX pan:<https://github.com/DataTreehouse/maplib_workshop/pan#>
PREFIX rdfs:<http://www.w3.org/2000/01/rdf-schema#>
SELECT ?officer ?oname (COUNT(?cname) as ?ccount) WHERE {
    ?officer a pan:Officer .
    ?officer rdfs:label ?oname .
    ?officer pan:officer_of ?c .
    ?c rdfs:label ?cname .
    ?officer pan:country "Norway" .
}
GROUP BY ?oname ?officer
ORDER BY DESC(?ccount)
LIMIT 20
""")

In [None]:
# Just a backup.. "https://github.com/DataTreehouse/maplib_workshop/node_ids#80063253"

### Finally, we can write 2,7 GB of triples.. 

In [None]:
m.write_ntriples("leaks.nt")