In [53]:
import morph_kgc

In [54]:
config = """
             [GTFS-Madrid-Bench]
             mappings: mapping.csv.ttl
         """

In [74]:
g = morph_kgc.materialize(config)

INFO | 2023-04-21 23:31:22,152 | Parallelization is not supported for win32 when running as a library. If you need to speed up your data integration pipeline, please run through the command line.
INFO | 2023-04-21 23:31:23,318 | 156 mapping rules retrieved.
INFO | 2023-04-21 23:31:23,381 | Mapping partition with 81 groups generated.
INFO | 2023-04-21 23:31:23,382 | Maximum number of rules within mapping group: 14.
INFO | 2023-04-21 23:31:23,383 | Mappings processed in 1.229 seconds.
INFO | 2023-04-21 23:31:24,507 | Number of triples generated in total: 2008.


In [75]:
g = rdflib.Graph().parse('mapping.csv.ttl')

In [77]:
q = """
    PREFIX rml: <http://semweb.mmlab.be/ns/rml#>
    
    DELETE {
        ?h rml:source ?source .
    }
    
    INSERT {
        ?h rml:source ?new_source .
    }
    
    WHERE {
        ?h rml:source ?source .
        BIND(CONCAT(".aux/", ?source) AS ?new_source) .
    }
"""
q_res = g.update(q)

In [78]:
q = """
         PREFIX rml: <http://semweb.mmlab.be/ns/rml#>

         SELECT ?source {
            ?h rml:source ?source
         }
      """

q_res = g.query(q)

for row in q_res:
    print(row['source'])

.aux/data/STOPS.csv
.aux/data/FREQUENCIES.csv
.aux/data/SHAPES.csv
.aux/data/SHAPES.csv
.aux/data/ROUTES.csv
.aux/data/CALENDAR_DATES.csv
.aux/data/CALENDAR_DATES.csv
.aux/data/CALENDAR.csv
.aux/data/CALENDAR.csv
.aux/data/TRIPS.csv
.aux/data/AGENCY.csv
.aux/data/STOP_TIMES.csv
.aux/data/FEED_INFO.csv


## Reading mapping as graph

In [30]:
import rdflib

In [10]:
mapping = rdflib.Graph()
mapping.parse('mapping.csv.ttl')

<Graph identifier=Nf7c36b50715e4c10a225f9252bc6771e (<class 'rdflib.graph.Graph'>)>

In [11]:
len(mapping)

942

In [70]:
q = """
         PREFIX rml: <http://semweb.mmlab.be/ns/rml#>

         SELECT ?source {
            ?h rml:source ?source
         }
      """

q_res = mapping.query(q)

for row in q_res:
    print(row['source'])

data/STOP_TIMES.csv
data/TRIPS.csv
data/ROUTES.csv
data/AGENCY.csv
data/STOPS.csv
data/CALENDAR.csv
data/CALENDAR.csv
data/CALENDAR_DATES.csv
data/CALENDAR_DATES.csv
data/FEED_INFO.csv
data/SHAPES.csv
data/SHAPES.csv
data/FREQUENCIES.csv


In [63]:
all_sources = set([str(row['source']) for row in q_res])

In [64]:
all_sources

{'data/AGENCY.csv',
 'data/CALENDAR.csv',
 'data/CALENDAR_DATES.csv',
 'data/FEED_INFO.csv',
 'data/FREQUENCIES.csv',
 'data/ROUTES.csv',
 'data/SHAPES.csv',
 'data/STOPS.csv',
 'data/STOP_TIMES.csv',
 'data/TRIPS.csv'}

## diff

In [None]:
import os
import pickle
import pandas as pd

In [8]:
def diff_dir(path: str, snapshot: str):
    """path: input data source path
    snapshot: path of snapshot
    """
    # load snapshot
    if os.path.exists(snapshot):
        new_version = False
        # snapshot exists
        with open(snapshot, 'rb') as f:
            sp = pickle.load(file=f)
    else:
        new_version = True
        # first version
        sp = dict()
    
    # data dir
    data_dir = os.fsencode(path) # TODO: quitar '/' si aparece al final
    # new data dir
    new_data_dir = os.fsencode(path + '_new')
    # create temp dir for new data
    if not os.path.exists(new_data_dir):
        os.makedirs(new_data_dir)

    for file in os.listdir(data_dir):
        filename = os.fsdecode(path + '/' + file.decode("ascii"))
        # read dataframes
        df_ds = pd.read_csv(filename, dtype=str) # source dataframe
        df_sp = pd.DataFrame() if new_version else sp[filename] # snapshot dataframe
        
        # find differences (assumes that new data is only in df_datasource)
        new_data = pd.concat([df_sp, df_ds]).drop_duplicates(keep=False)

        # save new data to new_data_dir
        new_file_path = os.fsdecode(path + '_new/' + file.decode("ascii"))
        new_data.to_csv(new_file_path, index=False)
        if len(new_data) == 0:
            print("No new data in %s, created file %s" % (file.decode('ascii'), new_file_path))
        else:
            print("Saved new data to %s" % (new_file_path))
        
        # save current snapshot = old + new
        sp[filename] = pd.concat([df_sp, new_data]) # should not have duplicates
    
    # save snaphsot
    with open(snapshot, 'wb') as f:
        pickle.dump(obj=sp, file=f)
        print("Saved snapshot to", snapshot)

In [None]:
#diff_dir('./data', 'snapshot.pkl')

In [86]:
def load_kg(aux_data_path: str, mapping_file: str, snapshot_file: str, old_graph: rdflib.Graph):
    """mapping: path to mapping file
    snapshot: path to snapshot file
    old_graph: None or old version
    """
    # load snapshot
    if os.path.exists(snapshot_file):
        new_version = False
        # snapshot exists
        with open(snapshot_file, 'rb') as f:
            sp = pickle.load(file=f)
    else:
        new_version = True
        # first version
        sp = dict()
    
    # Read mapping
    mapping_graph = rdflib.Graph().parse(mapping_file)
    
    # Extract sources from mapping
    mapping_query = """
            PREFIX rml: <http://semweb.mmlab.be/ns/rml#>

            SELECT ?source {
                ?h rml:source ?source
            }
        """
    query_res = mapping_graph.query(mapping_query)
    all_sources = set([str(row['source']) for row in query_res]) # Ignore duplicates

    # Create auxiliary data directory
    aux_data_path = os.fsencode(aux_data_path) # TODO: quitar '/' si aparece al final
    # create temp dir for new data
    if not os.path.exists(aux_data_path):
        os.makedirs(aux_data_path)
    
    # Calculate diff between every new and old file
    for source_file in all_sources:
        # read dataframes
        df_ds = pd.read_csv(source_file, dtype=str) # source dataframe
        df_sp = pd.DataFrame() if new_version else sp[source_file] # snapshot dataframe
        
        # find differences (assumes that new data is only in df_datasource)
        new_data = pd.concat([df_sp, df_ds]).drop_duplicates(keep=False)

        # save new data to new_data_dir
        new_file_path = aux_data_path.decode('utf-8') + '/' + source_file
        
        # Create directories for aux file
        os.makedirs(os.path.dirname(new_file_path), exist_ok=True)

        # Save aux file
        new_data.to_csv(new_file_path, index=False)
        if len(new_data) == 0:
            print("No new data in %s, created empty file %s." % (source_file, new_file_path))
        else:
            print("Found new data in %s, saved to file %s." % (source_file, new_file_path))
        
        # save current snapshot = old + new
        sp[source_file] = pd.concat([df_sp, new_data]) # should not have duplicates
    
    # Save snaphsot
    with open(snapshot_file, 'wb') as f:
        pickle.dump(obj=sp, file=f)
        print("Saved snapshot to", snapshot_file)
    
    # Change source paths from mapping
    query_update = """
            PREFIX rml: <http://semweb.mmlab.be/ns/rml#>
            
            DELETE { ?h rml:source ?source }
            INSERT { ?h rml:source ?new_source }
            WHERE {
                ?h rml:source ?source .
                BIND(CONCAT(".aux/", ?source) AS ?new_source) .
            }
        """
    mapping_graph.update(query_update)
    
    # Save new mapping
    new_mapping_file = aux_data_path.decode('utf-8') + '/.aux_' + mapping_file
    mapping_graph.serialize(new_mapping_file)

    config = "[GTFS-Madrid-Bench]\nmappings: %s" % new_mapping_file
    new_graph = morph_kgc.materialize(config)

    # TODO: delete temp data dir?

    # return old_graph + new_graph
    return new_graph if old_graph is None else old_graph + new_graph

#### Create new graph

In [90]:
g = load_kg(aux_data_path= '.aux', mapping_file='mapping.csv.ttl', snapshot_file='snapshot.pkl', old_graph=None)

INFO | 2023-04-21 23:36:52,727 | Parallelization is not supported for win32 when running as a library. If you need to speed up your data integration pipeline, please run through the command line.


No new data in data/FEED_INFO.csv, created empty file .aux/data/FEED_INFO.csv.
No new data in data/SHAPES.csv, created empty file .aux/data/SHAPES.csv.
No new data in data/STOP_TIMES.csv, created empty file .aux/data/STOP_TIMES.csv.
No new data in data/CALENDAR_DATES.csv, created empty file .aux/data/CALENDAR_DATES.csv.
Found new data in data/AGENCY.csv, saved to file .aux/data/AGENCY.csv.
No new data in data/ROUTES.csv, created empty file .aux/data/ROUTES.csv.
No new data in data/STOPS.csv, created empty file .aux/data/STOPS.csv.
No new data in data/FREQUENCIES.csv, created empty file .aux/data/FREQUENCIES.csv.
No new data in data/TRIPS.csv, created empty file .aux/data/TRIPS.csv.
No new data in data/CALENDAR.csv, created empty file .aux/data/CALENDAR.csv.
Saved snapshot to snapshot.pkl


INFO | 2023-04-21 23:36:53,914 | 156 mapping rules retrieved.
INFO | 2023-04-21 23:36:53,970 | Mapping partition with 81 groups generated.
INFO | 2023-04-21 23:36:53,971 | Maximum number of rules within mapping group: 14.
INFO | 2023-04-21 23:36:53,971 | Mappings processed in 1.243 seconds.
INFO | 2023-04-21 23:36:54,872 | Number of triples generated in total: 7.


In [91]:
len(g)

2008

#### Add new triples

New total should be >= previous

In [12]:
g = load_kg(path='./data', mapping='mapping.csv.ttl', snapshot='snapshot.pkl', old_graph=g)

INFO | 2023-04-21 16:05:36,173 | Parallelization is not supported for win32 when running as a library. If you need to speed up your data integration pipeline, please run through the command line.


Saved new data to ./data_new/AGENCY.csv
No new data in CALENDAR.csv, created file ./data_new/CALENDAR.csv
No new data in CALENDAR_DATES.csv, created file ./data_new/CALENDAR_DATES.csv
No new data in FEED_INFO.csv, created file ./data_new/FEED_INFO.csv
No new data in FREQUENCIES.csv, created file ./data_new/FREQUENCIES.csv
No new data in ROUTES.csv, created file ./data_new/ROUTES.csv
No new data in SHAPES.csv, created file ./data_new/SHAPES.csv
No new data in STOPS.csv, created file ./data_new/STOPS.csv
No new data in STOP_TIMES.csv, created file ./data_new/STOP_TIMES.csv
No new data in TRIPS.csv, created file ./data_new/TRIPS.csv
Saved snapshot to snapshot.pkl


INFO | 2023-04-21 16:05:37,418 | 156 mapping rules retrieved.
INFO | 2023-04-21 16:05:37,480 | Mapping partition with 81 groups generated.
INFO | 2023-04-21 16:05:37,481 | Maximum number of rules within mapping group: 14.
INFO | 2023-04-21 16:05:37,482 | Mappings processed in 1.307 seconds.
INFO | 2023-04-21 16:05:38,492 | Number of triples generated in total: 7.


In [None]:
len(g)

#### Test query

In [None]:
q3 = """
         PREFIX gtfs: <http://vocab.gtfs.org/terms#>
         PREFIX geo: <http://www.w3.org/2003/01/geo/wgs84_pos#>
         PREFIX dct: <http://purl.org/dc/terms/>

         SELECT * WHERE {
             ?stop a gtfs:Stop . 
             ?stop gtfs:locationType ?location .
             OPTIONAL { ?stop dct:description ?stopDescription . }
             OPTIONAL { 
                 ?stop geo:lat ?stopLat . 
                 ?stop geo:long ?stopLong .
             }
             OPTIONAL {?stop gtfs:wheelchairAccessible ?wheelchairAccessible . }
             FILTER (?location=<http://transport.linkeddata.es/resource/LocationType/2>)
         }
      """

q3_res = g.query(q3)

for row in q3_res:
    print(row['stop'], row['stopLat'], row['stopLong'])

In [15]:
q3 = """
         PREFIX gtfs: <http://vocab.gtfs.org/terms#>

         SELECT ?agency ?url WHERE {
            ?agency a gtfs:Agency.
            ?agency gtfs:fareUrl ?url
         }
      """

q3_res = g.query(q3)

for row in q3_res:
    print(row['agency'], row['url'])

http://transport.linkeddata.es/madrid/agency/00000000000000000001 https://www.crtm.es/billetes-y-tarifas
http://transport.linkeddata.es/madrid/agency/00000000000000000002 https://www.crtm.es/billeaaaaaaaaaaaaaaa


## Snapshot inspection

In [13]:
with open('snapshot.pkl', 'rb') as f:
    snapshot = pickle.load(file=f)

In [14]:
snapshot['./data/AGENCY.csv']

Unnamed: 0,agency_id,agency_name,agency_url,agency_timezone,agency_lang,agency_phone,agency_fare_url
0,1,1,http://www.crtm.es,1,1,1,https://www.crtm.es/billetes-y-tarifas
1,2,3,http://www.crtm.es,1,1,1,https://www.crtm.es/billeaaaaaaaaaaaaaaa


TODO:

- leer mapping

In [None]:
df1 = pd.DataFrame({"a": [1, 2, 3, 9], "b": [4, 5, 6, 10], "c": [6, 7, 8, 11]})
df2 = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [6, 7, 8]})
df3 = pd.DataFrame()

In [None]:
display(df1)
display(df2)
display(df3)

In [None]:
pd.concat([df1, df3])

In [None]:
# find differences
len(pd.concat([df1, df2]).drop_duplicates(keep=False))