In [25]:
import geopandas as gpd
import pandas as pd
from datetime import datetime, timedelta
import re
import dask.bag as db
import dask.dataframe as dd
from dask.delayed import delayed

# Importing pNeuma dataset

The pNeuma dataset files are fairly large and do not posses the same number of columns. Direct import into a dataframe is not possible. So, we process line by line, build a dict with the path object as a collection of points. Finally, create a geopandas dataframe from the list of dicts. 

A possible problem is that the whole dataset may not fit into ram, then we need to find a way to store information directñy in disk.

In [3]:
# Parse input file for time data
fname = '20181024_dX_0830_0900.csv'
def get_basedate(fname):
    pat = re.compile(r'^([0-9]{4})([0-9]{2})([0-9]{2})_dX_([0-9]{2})([0-9]{2})_([0-9]{2})([0-9]{2})\.csv')
    year, month, day, start_hour, start_min, end_hour, end_min = [int(x) for x in pat.findall(fname)[0]]
    basedate = datetime(year, month, day, start_hour, start_min)
    return basedate
basedate = get_basedate(fname)
# Now we can update the basedate with the time of each point in the dataset (given in seconds) 
basedate + timedelta(seconds=731.54)

datetime.datetime(2018, 10, 24, 8, 42, 11, 540000)

In [4]:
%%time
# Test with a single file
fname = '20181024_dX_0830_0900.csv'
all_points = 0
with open(f'pNeuma/{fname}') as f:
    basedate = get_basedate(fname)
    # Print column names
    print(f.readline())
    for line in f.readlines():
        line = line.strip().split(';')[:-1]
        track_id, type_, traveled_d, avg_speed = line[:4]
        lat = np.array(line[4::6], dtype=float)
        lon = np.array(line[5::6], dtype=float)
        speed = np.array(line[6::6], dtype=float)
        lon_acc = np.array(line[7::6], dtype=float)
        lat_acc = np.array(line[8::6], dtype=float)
        # It seems time in secs from the start of the experiment
        time = np.array(line[9::6], dtype=float)
        points = gpd.points_from_xy(lon, lat, crs='EPSG:4326')
        all_points += len(points)
        # Build time stamped points as a list of dicts
        # Convert seconds into hour, minute, second, microsecond
    
        #traj = [{'geometry':p, 't':basedate + timedelta(seconds=t)} 
        #        for p, t in zip(points,time)]
all_points

track_id; type; traveled_d; avg_speed; lat; lon; speed; lon_acc; lat_acc; time

CPU times: user 11min 47s, sys: 4.71 s, total: 11min 52s
Wall time: 11min 52s


32365163

It takes about 11 (my laptop) minutes to parse a single data file for all drones, without actually building any dataframe. We cannot hold complete data in RAM, so we need to design our analysis carefully to work online and save data to disk. This means building histograms, line plots, and estimating distributions the old school way, withput much interactivity. Other alternative is to use **Dask**.

## Using dask collections to read files

### Rearrange the data file so to have a point per row.
Sorted by ID, single huge file.

Several small files per road segment, to easy loading and analyzing data for road segment analysis.

Or propose something, this is a data engineering task, how can we store data efficiently for posterior analysis taking into account we cannot hold all data in ram? Does HDF5 or some database as sqlite be useful? Yo may ask your professors or some expert.

It seems the best bet is to save our data as parquet files, to speed up loading and indexing. So, a first task would be to build dask arrays of dataframes, and save them to parquet on disk. This should only be done once, so once its probably better to keep this process in a python script, not in the notebook, and abstract the process into a helper function, maybe csv_to_parquet?.

https://github.com/dask/dask-tutorial

We cannot share the datafiles in github, create a shared OneDrive folder?

In [34]:
# NOTE: it seems ids are not unique among files, we need to create our own unique ids.
# SOL: append id to ymd for unique ids TODO: need to verify per file IDs are indeed unique

def line_to_df(line, date_cols):
    """ Line is a string delimited by ; with trailing newline.
    Create a small df with each line and return it. """
    
    line = line.strip().split(';')[:-1]
    track_id, type_, traveled_d, avg_speed = line[:4]
    lon = np.array(line[5::6], dtype=float)
    lat = np.array(line[4::6], dtype=float)
    #speed = np.array(line[6::6], dtype=float)
    #lon_acc = np.array(line[7::6], dtype=float)
    #lat_acc = np.array(line[8::6], dtype=float)
    
    # Date information
    n = len(lon)
    ymd, hour, mins = date_cols
    ID = np.full(n, int(str(ymd) + track_id), dtype=int)
    
    # It seems time in secs from the start of the experiment chunk
    # Since precision given in sec with two decimals,
    # Rewrite in miliseconds since 8 (integer, avoid precision issues)
    secs = np.array((np.array(line[9::6], dtype=float) + (hour - 8)*3600)*1000, dtype=int)
    
    df = pd.DataFrame({'ID': ID, 'lon': lon, 'lat': lat, 'secs': secs})
    
    return df    

In [38]:
pat = re.compile(r'^([0-9]{8})_dX_([0-9]{2})([0-9]{2})_([0-9]{2})([0-9]{2})\.csv')
def get_date_cols(fname, pat):
    ymd, start_hour, start_min, end_hour, end_min = [int(x) for x in pat.findall(fname)[0]]
    return ymd, start_hour, start_min


fname = '20181024_dX_0830_0900.csv'
date_cols = get_date_cols(fname, pat)

f = open(f'pNeuma/{fname}')
f.readline()
dfs = [delayed(line_to_df)(l, date_cols) for l in f]
df = dd.from_delayed(dfs)

# Write dask df to parquet
df.to_parquet(fname.split('.')[0] + '-points.parquet')
f.close()

In [41]:
df = dd.read_parquet(fname.split('.')[0] + '-points.parquet')
df.head()

Unnamed: 0,ID,lon,lat,secs
0,201810241,23.724906,37.984642,0
1,201810241,23.724901,37.984643,40
2,201810241,23.724896,37.984643,80
3,201810241,23.724892,37.984644,120
4,201810241,23.724887,37.984645,160


In [42]:
len(df)

32365163

In [11]:
# Save dataframe into parquet format, indexed by ID, categorize type, think how to store date, 
# perhaps split into several columns, otherwise will be stored as a byte arrays with generic object type.
# Storing them at differetn columns may be more useful to filter by day, hour, etc.

### Now, repeat, but with a vehicle per row, and arrays in columns, so building paths is easy.

This repeats our data, which wastes disk, but if we win cpu time, then I believe is worth it.

# Dowloading the road network of the area of interest

Use osmnx, once you have the polygon this is easy, then create a geopandas dataframe with edges to find the nearest edge for each point in the pneuma dataset in order to build per road data views.

You'll need to get the polygon by hand (draw over google earth for example) or find the convex hull of all points (harder but more elegant, may be tricky to find an efficient way to calculate this, since we have hundreds of millions of points).

To darw the polygon you may use: http://apps.headwallphotonics.com/

In [1]:
import osmnx



https://towardsdatascience.com/geospatial-operations-at-scale-with-dask-and-geopandas-4d92d00eb7e8

https://www.quansight.com/post/spatial-filtering-at-scale-with-dask-and-spatialpandas

# Use datashader to visualize point density and/or trajectory density ovelayed on the map.

I think datashader lets us add points in chunks with seems convinient. This is likely an easy visualization, and shoulb be done first.

https://examples.pyviz.org/census/census.html

# Visualizing distance and velocity along road segments.

For each road egment of the network, normalize distance and velocities using the total road length and create a line plot and line density plot for each road (maybe a density plot for all roads) of vel vs distance, vel vs time, and dist vs time.

# Histograms for each road segment

Create velocity histograms for each road segmente, full, split by vehicle type, and by day time and by road type (this info may be available from OSM).

# Fit several target distributions for each road segmente using maximum likelihood.

This is the main objective, mut we may not have enough time.