In [1]:
import sys
sys.path.append('../src/')

%load_ext autoreload
%autoreload 2

In [134]:
import pandas as pd
from pathlib import Path
from ast import literal_eval
import pdfplumber
import re
from zipfile import ZipFile
import geopandas as gpd
import plotly.express as px

PATH = Path.cwd().parent.joinpath('data')

### Goal

The goal of this notebook is to get the data on vessel ownership ready for analysis, for instance, with Neo4J.


### Data sources
1. List of vessels in the shadow fleet, compiled by the Kiev School of Economics Institute. 
2. Vessel information (including ownership) obtained from the GFW API, using IMO numbers.
3. Event data from GFW API (port visits, loitering, ais).
4. The same data, but manually downloaded from the GFW platform.
5. Vessel tracks of shadow fleet vessels, manually downloaded from GFW.
6. Vessel history information downloaded as pdfs from Equasis.
7. Company assets downloaded as pdfs from Equasis.

### Possible data issues

1. Completeness: we don't know if the data provided by KSE is complete. The definition used by KSE of the shadowfleet is: "In this paper we define the shadow fleet as consisting of non-G7/EU owned or managed vessels navigating without International Group (IG) protection and indemnity (P&I) insurance." This definition is debatable. Also the exact dates on which insurance was in place, is not known, only the months, which is rather imprecise. This should be double checked if the issue becomes important. We should also be careful with the AIS off switching data from GFW. It's an experimental feature and it only works in marine environments that are not to crowded, due to signal processing limitations of AIS receivers. 
2. Correct: Vessels change owners regularly and the KSE data doesn't always seem to reflect the most current ownership data. This ownership data can be ammended with data from Equasis and Global Fishing Watch, but these last sources also don't always agree or don't show the ultimate benicial owners. The sanction regime is also changing, so some vessels are sanctioned, while this is not reflected in the KSE data. This should be manually updated. Due to changes in ownership and the covert actions of some vessels, we have to assume that AIS data isn't always correct. Positions can be spoofed and AIS gaps can be attributed to changes in ownership (and often accompanying MMSI changes) or reception issues, especially in crowded marine environments like the North Sea and the Straight of Gibraltar. 

### Method

1. Create a dataframe with shadowfleet vessels and create a date range for each vessel when it's presumed to be uninsured or underinsured.
2. Query Global Fishing Watch to obtain information on identity (MMSIs for instance). 
3. Query Global Fishing Watch to obtain loitering, port visits and ais off switching events.
4. Parse Equasis data on vessels and companies and extract vessel and company specifics and history.
5. Perform some exploratory data analsysis to identify problems and possibilities

## Import data

### Import and clean shadowfleet

In [None]:
# Get list of relevant vessels

kse = pd.read_csv(PATH.joinpath('processed', 'shadowfleet_kse.csv'))
len(kse)

### Import vessel data from GFW

In [None]:
# Import from API downloads

vessels, owners = sf.get_vessels(query=kse.imo.unique(),
                                 filename=PATH.joinpath('raw_gfw', 'api', 'vessels.json'),
                                 field='imo',
                                 limit=10)

print(f'Imported {len(vessels)} vessels and {len(owners)} owners')

In [None]:
# Parse data

vessels = []
owners = []

with open(PATH.joinpath('raw_gfw', 'api', 'vessels.json'), 'r') as lines:
    for line in lines:
        for records in literal_eval(line).get('entries'):
            for vessel in records.get('selfReportedInfo'):
                vessel.update({'query': literal_eval(line).get('query')})
                vessels.append(vessel)
                ssvid = vessel.get('ssvid')

                coms = records.get('registryOwners')
                if len(coms) > 0:
                    for com in coms:
                        com.update({'query': literal_eval(line).get('query'), 'mmsi': ssvid})
                        owners.append(com)

df_vessels = pd.DataFrame(vessels)

# And clean
df_vessels = df_vessels[df_vessels.imo.notna()].copy()
df_vessels = df_vessels.astype({'imo': 'int', 'ssvid': 'int'})
df_vessels = df_vessels[df_vessels.imo.isin(kse.imo)].copy()
len(df_vessels)

### Extract vessel data equasis using PDF plumber

In [None]:
dfs = []
for file in PATH.joinpath('equasis', 'vessels').glob('*.pdf'):
    pdf = pdfplumber.open(file)    
    
    # Get imo
    for lines in pdf.pages[0].extract_text_lines():
        if isinstance(lines, dict):
            if 'imo: ' in lines.get('text'):
                imo = lines.get('text').replace('imo: ', '')

    # Get tables
    for page in pdf.pages:
        tables = page.extract_tables()

        for table in tables:
            tabs = []
            keys = table[0]
            keys = [x.replace('\n', ' ') for x in keys]
            for tab in table[1:]:
                values = [str(x).replace('\n', ' ') for x in tab]
                tab = dict(zip(keys, values))
                tabs.append(tab)
            df = pd.DataFrame(tabs)
            df['imo'] = imo
            dfs.append(df)

In [None]:
# Create relevant dataframes

companies = []
inspections = []
human = []
names = []
flags = []
classifications = []
current_companies = []
current_info = []


for df in dfs:
    if 'Company' in df.columns:
        companies.append(df)
    elif 'Detention' in df.columns:
        inspections.append(df)
    elif 'Human element deficiencies' in df.columns:
        human.append(df)
    elif 'Name of ship' in df.columns:
        names.append(df)
    elif 'Flag' in df.columns and 'Date of effect' in df.columns:
        flags.append(df)
    elif 'Classification society' and 'Date of survey' in df.columns:
        classifications.append(df)
    elif 'Name of company' and 'Address' in df.columns:
        current_companies.append(df)



companies = pd.concat(companies)
inspections = pd.concat(inspections)
human = pd.concat(human)
names = pd.concat(names)
flags = pd.concat(flags)
classifications = pd.concat(classifications)
current_companies = pd.concat(current_companies)


In [None]:
# Clean dataframes

def clean_dates(df):
    cols = [x for x in df.columns if 'date' in x.lower()]
    to_replace = {'during ': '', 'since ': '', 'before ': ''}
    for col in cols: 
        df[col] = df[col].replace(to_replace, regex=True)
        df[col] = pd.to_datetime(df[col], format='mixed', dayfirst=True, errors='coerce')
    return df


# First clean up the companies dataframe
companies.drop('Sources', axis=1, inplace=True)
companies.columns = ['company', 'role', 'start_date', 'imo']
companies.role = companies.role.str.replace(' T', '').str.replace('manager/', 'manager /').str.replace('/Com', '/ Com')
companies = clean_dates(companies)
companies.imo = companies.imo.astype(int)

# Create end date column
companies = companies.sort_values(by=['imo', 'role', 'start_date'])
companies['end_date'] = companies.groupby(['imo', 'role'])['start_date'].shift(-1)


# Clean up names dataframe
names = clean_dates(names)
names.columns = ['vessel_name', 'start_date', 'source', 'imo']
names = names.sort_values(by=['imo', 'start_date'])
names['end_date'] = names.groupby('imo')['start_date'].shift(-1)
names.imo = names.imo.astype(int)

flags = clean_dates(flags)
flags.columns = ['flag', 'start_date', 'source', 'imo']
flags = flags.sort_values(by=['imo', 'start_date'])
flags['end_date'] = flags.groupby('imo')['start_date'].shift(-1)
flags.imo = flags.imo.astype(int)

classifications = clean_dates(classifications)
classifications.columns = ['classification_society', 'date_of_survey', 'source', 'imo']
classifications.imo = classifications.imo.astype(int)

inspections = clean_dates(inspections)
inspections.columns = ['authority', 'port', 'date', 'detention', 'PSC_organisation', 
                       'inspection_type', 'duration', 'number_of_deficiencies', 'imo']
inspections = inspections.reset_index().sort_values(['imo', 'index'])
inspections.authority = inspections.authority.ffill()
inspections['port'] = inspections['port'].ffill()
inspections['date'] = inspections['date'].ffill()
inspections['detention'] = inspections['detention'].ffill()
inspections.drop('index', axis=1, inplace=True)
inspections.imo = inspections.imo.astype(int)

current_companies = clean_dates(current_companies)
current_companies.columns = ['company_imo', 'role', 'company', 'address', 'start_date', 'imo']
current_companies.role = current_companies.role.str.replace(' T', '')\
        .str.replace('manager/', 'manager /')\
        .str.replace('/Com', '/ Com')
current_companies.imo = current_companies.imo.astype(int)

In [None]:
# Write to file

dataframes = [companies, names, flags, classifications, inspections, current_companies]
df_names = ['companies', 'names', 'flags', 'classifications', 'inspections', 'current_companies']

for df_, name in zip(dataframes, df_names):
    df_.to_csv(PATH.joinpath('processed', f'owners_{name}.csv'), index=False)

### Extract company info using pdf plumber

In [725]:
dfs = []

for file in PATH.joinpath('equasis', 'companies').glob('*.pdf'):
    pdf = pdfplumber.open(file)    

    info = pdf.pages[0].extract_tables()[0][1][0].split(' : ')
    imo = info[1].replace('\nName of company', '')
    name = info[2].replace('\nAddress', '')
    address = info[3].replace('\nLast update', '').replace('C/O: ', '')
    last_update = info[4]


    # Get tables
    for page in pdf.pages:
        tables = page.extract_tables()

        for table in tables:
            if 'Gross\ntonnage' in table[0]:
            
                tabs = []
                keys = table[0]
                keys = [x.replace('\n', ' ') for x in keys]
                for tab in table[1:]:
                    values = [str(x).replace('\n', ' ') for x in tab]
                    tab = dict(zip(keys, values))
                    tabs.append(tab)
                df = pd.DataFrame(tabs)
                df['company_imo'] = imo
                df['name'] = name
                df['address'] = address
                df['last_update'] = last_update
                dfs.append(df)

df = pd.concat(dfs).reset_index(drop=True)
len(df)

15292

In [726]:
# Clean

df.columns = df.columns.str.lower().str.replace(' ', '_')
df.last_update = pd.to_datetime(df.last_update, format='%d/%m/%Y', errors='coerce')

def extract_date(text):
    match = re.search(r'\d{2}/\d{2}/\d{4}|\d{4}', text)
    return match.group(0) if match else None
def extract_text(text):
    match = re.split(r'\(', text)[0].strip()
    return match if match else None


df['since'] = df['acting_as_(since)'].apply(extract_date)
df['role'] = df['acting_as_(since)'].apply(extract_text)
df = df[(df.imo.notna()) & (df.since.notna())].copy()
df.since = df.since.apply(lambda x: '01/01/' + x if len(x) < 5 else x)
df.since = pd.to_datetime(df.since, format='%d/%m/%Y', errors='coerce')
df.drop('acting_as_(since)', axis=1, inplace=True)

#df.year_of_build = df.year_of_build.str.replace('', 0)
#df[['imo', 'company_imo']] = df[['imo', 'company_imo']].astype(int)

df.year_of_build = df.year_of_build.apply(lambda x: int(x) if x else 0)
df.imo = df.imo.apply(lambda x: int(x) if x else 0)
df[['imo', 'company_imo', 'year_of_build']] = df[['imo', 'company_imo', 'year_of_build']].astype(int)

df.to_csv(PATH.joinpath('processed', 'owners_companies_details_vessels.csv'), index=False)

In [727]:
# Extract just company information, not vessels

companies = []
for file in PATH.joinpath('equasis', 'companies').glob('*.pdf'):
    pdf = pdfplumber.open(file)    

    info = pdf.pages[0].extract_tables()[0][1][0].split(' : ')
    
    companies.append({'imo': info[1].replace('\nName of company', ''),
                        'name': info[2].replace('\nAddress', ''),
                        'address': info[3].replace('\nLast update', '').replace('C/O: ', ''),
                        'last_update': info[4]})
    
equasis_records = pd.DataFrame(companies)
len(equasis_records)

1703

In [728]:
# Clean


countries = ['Netherlands', 'Belgium', 'Germany', 'United Kingdom', 'France', 'Spain', 'Portugal', 'Italy', 
'Greece', 'Turkey', 'Marshall Islands', 'United Arab Emirates', 'Saudi Arabia', 'Russia', 'China', 'Japan',
'South Korea', 'Taiwan', 'Philippines', 'Indonesia', 'Australia', 'New Zealand', 'Spain', 'Norway', 'Denmark',
'Sweden', 'Finland', 'Belarus', 'Turkey', 'Egypt', 'South Africa', 'Brazil', 'Argentina', 'Chile', 'Peru', 
'Panama', 'Cook Islands', 'Bermuda', 'Cayman Islands', 'Bahamas', 'Honduras', 'Costa Rica', 'Nicaragua', 'British Virgin Islands',
'Seychelles', 'Mauritius', 'Madagascar', 'Mozambique', 'Tanzania', 'Kenya', 'Somalia', 'Djibouti', 'Yemen', 'Oman', 'India',
'Liberia', 'Switzerland', 'Singapore', 'Hong Kong', 'Vietnam', 'Thailand', 'Malaysia', 'Bangladesh', 'Sri Lanka', 'Pakistan',
'Antigua & Barbuda', 'St Kitts & Nevis', 'St Vincent & Grenadines', 'Grenada', 'St Lucia', 'Barbados', 'Trinidad & Tobago',
'Greece', 'Cyprus', 'Malta', 'Lebanon', 'United Ara Emirates', 'Georgia', 'Isle of Man', 'Jersey', 'Guernsey', 'Gibraltar',
'Latvia', 'Lithuania', 'Estonia', 'Poland', 'Ukraine', 'Romania', 'Bulgaria', 'Croatia', 'Slovenia', 'Bosnia & Herzegovina',
'Kazakhstan', 'Uzbekistan', 'Kyrgyzstan', 'Tajikistan', 'Turkmenistan', 'Afghanistan', 'Iran', 'Iraq',
'Nigeria', 'Gabon', 'Equatorial Guinea', 'Cameroon', 'Congo', 'Angola', 'Namibia', 'Botswana', 'Zimbabwe', 'Zambia', 'Malawi',
'Azerbaijan', 'Armenia', 'Moldova', 'Montenegro', 'Serbia', 'Kosovo', 'Albania', 'North Macedonia', 'Hungary', 'Slovakia',
'Ireland', 'Canada', 'Canary Islands', 'Greenland', 'Iceland', 'Faroe Islands', 'Puerto Rico', 'Dominican Republic',
'Montenegro', 'USA', 'Monaco', 'Kuwait', 'Libya' ]

def extract_country(address):
    for country in countries:
        if country in address:
            return country
    return None

equasis_records.address = equasis_records.address.str.replace('\n', ' ')
equasis_records['country'] = equasis_records.address.apply(extract_country)

equasis_records.head()
equasis_records.rename(columns={'imo': 'company_imo'}, inplace=True)
equasis_records['key'] = equasis_records.name.str.replace(' ', '').str.upper().str.strip()

In [729]:
df_company_list = pd.read_csv(PATH.joinpath('processed', 'equasis_company_details.csv'))
df_company_list['key'] = df_company_list.company.str.replace(' ', '').str.upper().str.strip()
df_company_list.company_imo.nunique()

1678

In [730]:
company_vessels = pd.read_csv(PATH.joinpath('processed', 'owners_companies.csv'))
company_vessels = company_vessels[(company_vessels.end_date >= '2022-01-01') | (company_vessels.end_date.isna())].copy().reset_index(drop=True)
company_vessels.replace({'NNK- KAMCHATNEFTEPRODUK JSC': 'NNK-KAMCHATNEFTEPRODUKT JSC'}, inplace=True)
company_vessels['key'] = company_vessels.company.str.replace(' ', '').str.upper().str.strip()
len(company_vessels)

5083

In [731]:
merged = pd.merge(company_vessels, equasis_records, on='key', how='left')

In [732]:
equasis_records[equasis_records.name.str.contains('KAMCHATNEFTE')]

Unnamed: 0,company_imo,name,address,last_update,country,key
1654,5138401,NNK-KAMCHATNEFTEPRODUKT JSC,"ul Kosmonavtov 1, Petropavlovsk-Kamchatskiy, K...",01/10/2024,Russia,NNK-KAMCHATNEFTEPRODUKTJSC


In [733]:
merged[merged.company_imo.isna()].company.value_counts()

company
UNKNOWN                       273
K&O SHIPMANAGEMENT FZE         17
DENIZCILIK VE GEMI             13
MEDITERRANEANMASTE FREIGHT      2
Name: count, dtype: int64

In [734]:
merged.to_csv(PATH.joinpath('processed', 'company_vessels_final.csv'), index=False)

### Create dataframe with uninsured/underinsured ranges

In [None]:
# Create uninsured dataframe with start and end dates of uninsured periods and sanctions

rows = []
cols = [col for col in kse.columns if '202' in col]
for i, row in kse.iterrows():
    for col in cols[3:]:
        if pd.isnull(row[col]):
            continue
        else:
            record = {'imo': row.imo,
                      'start_date': f'01-{col[:-5]}-{col[-4:]}'}
            rows.append(record)

uninsured = pd.DataFrame(rows)
uninsured.start_date = pd.to_datetime(uninsured.start_date, 
                                      format='%d-%m-%Y', 
                                      dayfirst=True)
uninsured['end_date'] = pd.to_datetime(uninsured['start_date'], 
                                       format="%Y%m") + pd.tseries.offsets.MonthEnd(0)

# Create date ranges
uninsured = uninsured.groupby('imo').agg({'start_date': 'min', 
                                          'end_date': 'max'}).reset_index()

# And add sanction date to the uninsured dataframe

uninsured = pd.merge(uninsured, 
                     kse[['imo', 'earliest_sanction_date']], 
                     on='imo', 
                     how='left')

uninsured.to_csv(PATH.joinpath('processed', 'uninsured.csv'), index=False)

### Find activity in ship to ship transfer areas

Import csv files, filter on time period and area and write to parquet file.

In [78]:
sts_locations = gpd.read_file(PATH.joinpath('geo', 'sts_locations.geojson'))
sts_locations = sts_locations.to_crs('EPSG:4326')
sts_locations.head()

Unnamed: 0,Name,geometry
0,Malta,"POLYGON ((14.11697 36.12737, 14.11697 35.69324..."
1,Laconia,"POLYGON ((22.57003 36.84774, 22.37132 36.48697..."
2,Augusta,"POLYGON ((15.00922 37.52169, 15.00922 37.12402..."
3,Dakar,"POLYGON ((-17.68226 14.78504, -17.68226 13.659..."
4,Lome,"POLYGON ((1.80680 6.13829, 0.19884 6.13829, 0...."


In [89]:
from tqdm import tqdm

file_path = PATH.joinpath('processed', 'sts_tracks.parquet')

for vessel in tqdm(PATH.joinpath('raw_gfw', 'tracks').rglob('*.zip')):
    name = vessel.stem.split(' - ')[0]
    zf = ZipFile(vessel)
    for file in zf.namelist():
        if 'csv' in file:
            df = pd.read_csv(zf.open(file))
            df.timestamp = pd.to_datetime(df.timestamp).dt.tz_localize(None)
            df['name'] = name
            gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat), crs=4326)
            gdf = gdf.clip(sts_locations)
            df = gdf.drop(['geometry', 'seg_id'], axis=1)

            
            if file_path.exists():
                df.to_parquet(file_path, engine='fastparquet', append=True)
            else:
                df.to_parquet(file_path, engine='fastparquet')


0it [00:00, ?it/s]

1056it [02:04,  8.48it/s]


#### Create file of all tracks

In [None]:
dfs = []

for file in PATH.joinpath('raw_gfw', 'identity').glob('*.csv*'):
    df = pd.read_csv(file)
    
    dfs.append(df)

df = pd.concat(dfs).reset_index(drop=True)
df = df[['id', 'flag', 'mmsi', 'imo', 'shipname', 'callsign']].copy()
df = df[df.imo.notna()].copy()
df.imo = df.imo.astype(int)
df.mmsi = df.mmsi.astype(int)
df.shipname = df.shipname.str.upper().str.strip().str.replace(' ', '')
len(df)

In [None]:
file_path = PATH.joinpath('processed', 'tracks.parquet')

for file in PATH.joinpath('raw_gfw', 'tracks').glob('*.zip'):
    name = file.stem.split(' - ')[0].strip().upper().replace(' ', '')
    zf = ZipFile(file)
    for f in zf.namelist():
        if f.endswith('.csv'):
            vessel = pd.read_csv(zf.open(f))
            vessel.drop('seg_id', axis=1, inplace=True)
            vessel['name'] = name
            vessel = pd.merge(vessel,
                            df[['imo', 'shipname']],
                            left_on='name',
                            right_on='shipname',
                            how='left')
            vessel.drop('name', axis=1, inplace=True)
            vessel.round({'lon': 3, 'lat': 3, 'course': 0, 'speed': 1})
            vessel['timestamp'] = pd.to_datetime(vessel.timestamp)
            vessel = vessel[vessel.timestamp >= '2022-01-01'].copy()
            if file_path.exists():
                vessel.to_parquet(file_path, engine='fastparquet', append=True)
            else:
                vessel.to_parquet(file_path, engine='fastparquet')