In [2]:
import requests
import time
import urllib
from neo4j import GraphDatabase
import gzip
import pandas as pd
from datetime import date, datetime
from collections import defaultdict
from pathlib import Path
import csv
from neo4j import Driver
import datapane as dp
import folium


In [3]:
URI = "bolt://localhost:7687"
USER = ""
PASS = ""
DATABASE = "neo4j"
DATA_FOLDER = "data"

RINF_TOKEN_ENDPOINT = "https://rinf.era.europa.eu/api/token"
RINF_OP_ENDPOINT = "https://rinf.era.europa.eu/api/OperationalPoints"
DOMAIN_USER = "" # Needed for proxy :/ e.g. l039017. REMOVE; DON'T COMMIT
DOMAIN_PWD = "" # Needed for proxy :/ REMOVE; DON'T COMMIT
# HTTP_PROXY  = f"http://{DOMAIN_USER}:{DOMAIN_PWD}@proxy.oebb.at:8080" # Needed when executing in OEBB env.
PROXYDICT = { 
              # "http"  : HTTP_PROXY,
              # "https": HTTP_PROXY
            }

In [4]:
from neo4j import Transaction
from pandas import DataFrame


class OP_TNS:
    CreatedAt: datetime
    LatitudeEpsg4326: float
    LongitudeEpsg4326: float
    Name: str
    OpType: str
    UniqueOPID: str
    OpTafTapCode: str
    ValidToUtc: datetime
    ValidFromUtc: datetime

def create_rinf_http_session(RINF_TOKEN_ENDPOINT, PROXYDICT):
    session = requests.session()
    session.proxies = PROXYDICT
    response = session.post(RINF_TOKEN_ENDPOINT, data="grant_type=password&username=dev.tnöw@railcargo.com&password=Dev.tnw1492")
    response.raise_for_status()
    token = response.json().get('access_token')
    print('Auth Token received from RINF API')
    session.headers.update(Authorization = f"Bearer {token}")
    return session

def load_ops_from_rinf(session):
    opResponse = session.get('https://rinf.era.europa.eu/api/OperationalPoints?$expand=TafTAPCodes')
    opResponse.raise_for_status()
    opIdsInRinf = []
    for op in opResponse.json()["value"]:
        op['OPTafTapCode'] = op['TafTAPCodes'][0]["Value"] if len(op['TafTAPCodes']) > 0 else None
        opIdsInRinf.append(op)
    # opIdsInRinf = list(set(opIdsInRinf)) # Removed duplicates (RINF API contains multiple states per OPID)
    print(f"Found {len(opIdsInRinf)} OPs in RINF API")
    return opIdsInRinf

def df_to_csv(df: DataFrame, filename):
    path = DATA_FOLDER + "/" + filename

    csv = df.to_csv(encoding="utf-8-sig", index=False, sep=';')

    with open(path, "bw") as f:
        b = bytes(csv, encoding="utf-8-sig")
        f.write(b)

def df_to_csv_gz(df, filename):
    path = DATA_FOLDER + "/" + filename

    csv = df.to_csv(encoding="utf-8", index=False)

    with open(path + ".gz", "bw") as f:
        b = gzip.compress(bytes(csv, encoding="utf-8"), compresslevel=9)
        f.write(b)
        
def load_objects_from_gzip(path):
    df = pd.read_csv(path, compression='gzip')
    return df

def ops_from_neo4j_query(tx: Transaction) -> list:
    query = ''' MATCH (a:Op)-[hd:HAS_DETAILS]-(opd:OpDetails) 
                WHERE hd.VersionValidFrom <= datetime() 
                AND (hd.VersionValidTo >= datetime() OR hd.VersionValidTo IS NULL)
                RETURN properties(opd)'''
    result = tx.run(query)

    ops = []
    for record in result:
        dict = record.values()[0]
        ops.append(dict)

    return ops

def load_ops_from_neo4j(DATABASE, driver: Driver):
    with driver.session(database=DATABASE) as session:
        ops = session.write_transaction(ops_from_neo4j_query)
        print(f"Found {len(ops)} OPs in DB")
    return ops

In [5]:
# Load data from RINF

driver = GraphDatabase.driver(URI, auth=(USER, PASS))

session = create_rinf_http_session(RINF_TOKEN_ENDPOINT, PROXYDICT)

opIdsInRinf = load_ops_from_rinf(session)

df = pd.DataFrame(opIdsInRinf)
df.columns

print(df)

df_to_csv_gz(df, f'RINF_OPs.csv')



Auth Token received from RINF API
Found 60731 OPs in RINF API
            ID  VersionID                                Name  \
0            2        245                          Pampilhosa   
1            2        256                              PIREAS   
2            2        271      Rakenduspunkt TallinnVäikepiir   
3            2        274                     Valstybės siena   
4            2        379                             Banovci   
...        ...        ...                                 ...   
60726  5661845        616       Zone portuaire de Lauterbourg   
60727  5661917        616                    GPMB - Le Verdon   
60728  5662149        616                 GPMB - Bassens Aval   
60729  5662245        616                GPMB - Bassens Amont   
60730  5662318        616  Grand Port Maritime de La Rochelle   

                        Type    Country          ValidityDateStart  \
0                    station   Portugal  2018-01-01T00:00:00+01:00   
1         passeng

In [6]:
# Load data from neo4j

path = Path.cwd() / DATA_FOLDER
path.mkdir(parents=True, exist_ok=True)

with GraphDatabase.driver(URI, auth=(USER, PASS)) as driver:
    ops = load_ops_from_neo4j(DATABASE, driver)
    op_tns = pd.DataFrame(ops)

Found 51757 OPs in DB


In [7]:
# Load data from csv.gz

df = load_objects_from_gzip(f'{DATA_FOLDER}/2022-08-25_RINF_OPs.csv.gz')

In [8]:
import dtale

dtale.show(df, drop_index = True)







In [70]:
# Transform/Harmonize RINF dataframe

from math import isnan
from pandas import NaT
from pytz import utc


op_rinf_harmonized = pd.DataFrame(df, columns=['UOPID', 'Latitude', 'Longitude', 'Name', 'OPTafTapCode', 'Country', 'ValidityDateStart', 'ValidityDateEnd'])
op_rinf_harmonized = op_rinf_harmonized.rename(columns={"UOPID": "UniqueOPID", "Latitude": "LatitudeEpsg4326", "Longitude": "LongitudeEpsg4326"}).reset_index(drop=True).sort_values(['Country', 'UniqueOPID'])

op_tns_harmonized = pd.DataFrame(op_tns, columns=["UniqueOPID", "LatitudeEpsg4326", "LongitudeEpsg4326", "Name", "OpTafTapCode", "CountryCode", "ValidFromUtc", "ValidToUtc"])
op_tns_harmonized = op_tns_harmonized.rename(columns={"CountryCode": "Country", "OpTafTapCode": "OPTafTapCode", "ValidFromUtc": "ValidityDateStart", "ValidToUtc": "ValidityDateEnd"}).reset_index(drop=True).sort_values(['Country', 'UniqueOPID'])

op_rinf_harmonized.ValidityDateStart = pd.to_datetime(op_rinf_harmonized.ValidityDateStart, errors = 'coerce')
op_rinf_harmonized.ValidityDateEnd = pd.to_datetime(op_rinf_harmonized.ValidityDateEnd, errors = 'coerce')
op_tns_harmonized.ValidityDateStart = pd.to_datetime(op_tns_harmonized.ValidityDateStart)
op_tns_harmonized.ValidityDateEnd = pd.to_datetime(op_tns_harmonized.ValidityDateEnd, errors = 'coerce')



op_rinf_harmonized['IsCurrentlyValid'] = op_rinf_harmonized.apply(lambda x: 'No validity' if type(x.ValidityDateStart) is float else 'Valid' if (x.ValidityDateStart.replace(tzinfo=None) <= datetime.now() and (type(x.ValidityDateEnd) is float or (x.ValidityDateEnd is NaT or x.ValidityDateEnd.replace(tzinfo=None) >= datetime.now()))) else 'Invalid', axis=1)
op_tns_harmonized['IsCurrentlyValid'] = op_tns_harmonized.apply(lambda x: 'No validity' if type(x.ValidityDateStart) is float else 'Valid' if (x.ValidityDateStart.replace(tzinfo=None) <= datetime.now() and (type(x.ValidityDateEnd) is float or (x.ValidityDateEnd is NaT or x.ValidityDateEnd.replace(tzinfo=None) >= datetime.now()))) else 'Invalid', axis=1)


In [None]:
# Filter data for current valid OPs only (maybe invalidated ones still are interesting?)

# op_rinf_harmonized = op_rinf_harmonized[op_rinf_harmonized["IsCurrentlyValid"].isin(['Valid', 'No validity'])]


In [11]:
# Create HTML report

from codecs import unicode_escape_encode
from encodings.utf_8_sig import encode


ops_in_tns_missing_in_rinf = op_tns_harmonized[~op_tns_harmonized["UniqueOPID"].isin(op_rinf_harmonized["UniqueOPID"])].sort_values(['Country', 'UniqueOPID'])
ops_in_tns_missing_in_rinf.to_csv(DATA_FOLDER + '/ops_in_tns_missing_in_rinf.csv', encoding='utf-8-sig', sep=';')

ops_in_rinf_missing_in_tns = op_rinf_harmonized[~op_rinf_harmonized["UniqueOPID"].isin(op_tns_harmonized["UniqueOPID"])].sort_values(['Country', 'UniqueOPID'])
ops_in_rinf_missing_in_tns.to_csv(DATA_FOLDER + '/ops_in_rinf_missing_in_tns.csv', encoding='utf-8-sig', sep=';')


map_tns_missing = folium.Map(location=[48.210033, 16.363449],zoom_start=5)
ops_in_tns_missing_in_rinf.apply(lambda row:folium.CircleMarker(location=[row["LatitudeEpsg4326"], row["LongitudeEpsg4326"]], radius=2, tooltip=(str(("Unique OP ID: " + row["UniqueOPID"] + "</br>" + row["Name"]).encode('raw_unicode_escape')))[2:-1])
                                             .add_to(map_tns_missing), axis=1)


map_rinf_missing = folium.Map(location=[48.210033, 16.363449],zoom_start=5)
ops_in_rinf_missing_in_tns.apply(lambda row:folium.CircleMarker(location=[row["LatitudeEpsg4326"], row["LongitudeEpsg4326"]], radius=2, tooltip=(str(("Unique OP ID: " + row["UniqueOPID"] + "</br>" + row["Name"]).encode('raw_unicode_escape')))[2:-1])
                                             .add_to(map_rinf_missing), axis=1)

table = dp.DataTable(ops_in_tns_missing_in_rinf, 'OPs in TNS that are not in RINF')
table2 = dp.DataTable(ops_in_rinf_missing_in_tns, 'OPs in RINF that are not in TNS')
report = dp.Report(blocks=[dp.HTML("<h1>OPs in TNS that are not in RINF</h1>"), dp.Group(blocks=[table, dp.Plot(map_tns_missing)]), dp.Divider(),dp.HTML("<h1>OPs in RINF that are not in TNS</h1>"), table2, dp.Plot(map_rinf_missing)])
report.save(path=DATA_FOLDER + "/RINF_Report.html", formatting=dp.ReportFormatting(width=dp.ReportWidth.FULL))

# ops_found_in_rinf = op_rinf_harmonized[op_rinf_harmonized["UniqueOPID"].isin(op_tns_harmonized["UniqueOPID"])]
# ops_found_in_rinf.compare(op_tns_harmonized, align_axis='UniqueOPID')


# print(html)
# ProfileReport(
#     op_tns[~op_tns["UniqueOPID"].isin(op_rinf["UniqueOPID"])]
#     )
# conf = {'title': 'Test', 'notebook',}
# ProfileReport(ops_in_tns_missing_in_rinf, config={'title': 'Test'})

NameError: name 'op_tns_harmonized' is not defined

In [None]:
# Push new RINF report to git