Skip to content

Commit

Permalink
first version
Browse files Browse the repository at this point in the history
  • Loading branch information
anitagraser committed Dec 2, 2018
1 parent eb12c64 commit 59e11b5
Show file tree
Hide file tree
Showing 11 changed files with 900 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Expand Up @@ -102,3 +102,10 @@ venv.bak/

# mypy
.mypy_cache/
.project
.pydevproject
.idea/misc.xml
.idea/modules.xml
.idea/processing_edgebundling.iml
.idea/workspace.xml
.settings/org.eclipse.core.resources.prefs
32 changes: 32 additions & 0 deletions __init__.py
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-

"""
***************************************************************************
trajectoryProviderPlugin.py
---------------------
Date : December 2018
Copyright : (C) 2018 by Anita Graser
Email : anitagraser@gmx.at
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""

__author__ = 'Anita Graser'
__date__ = 'Dec 2018'
__copyright__ = '(C) 2018, Anita Graser'

# This will get replaced with a git SHA1 when you do a git archive

__revision__ = '$Format:%H$'

from .trajectoryProviderPlugin import TrajectoryProviderPlugin


def classFactory(iface):
return TrajectoryProviderPlugin()
180 changes: 180 additions & 0 deletions clipTrajectoriesByExtentAlgorithm.py
@@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-

"""
***************************************************************************
trajectoriesFromPointLayer.py
---------------------
Date : December 2018
Copyright : (C) 2018 by Anita Graser
Email : anitagraser@gmx.at
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""

__author__ = 'Anita Graser'
__date__ = 'Dec 2018'
__copyright__ = '(C) 2018, Anita Graser'

# This will get replaced with a git SHA1 when you do a git archive

__revision__ = '$Format:%H$'

import os

import pandas as pd
import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import Point, LineString, Polygon
from shapely.affinity import translate
from datetime import datetime, timedelta

from qgis.PyQt.QtCore import QCoreApplication, QVariant
from qgis.PyQt.QtGui import QIcon

from qgis.core import (QgsField,QgsFields,
QgsGeometry,
QgsFeature,
QgsFeatureSink,
QgsFeatureRequest,
QgsProcessing,
QgsProcessingAlgorithm,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterString,
QgsProcessingParameterExtent,
QgsProcessingParameterField,
QgsProcessingParameterNumber,
QgsProcessingParameterBoolean,
QgsProcessingParameterFeatureSink,
QgsProcessingParameterEnum,
QgsWkbTypes
)

from .trajectory import Trajectory
from .trajectoryUtils import trajectories_from_qgis_point_layer

pluginPath = os.path.dirname(__file__)


class ClipTrajectoriesByExtentAlgorithm(QgsProcessingAlgorithm):
# script parameters
INPUT = 'INPUT'
TRAJ_ID_FIELD = 'OBJECT_ID_FIELD'
TIMESTAMP_FIELD = 'TIMESTAMP_FIELD'
TIMESTAMP_FORMAT = 'TIMESTAMP_FORMAT'
EXTENT = 'EXTENT'
OUTPUT = 'OUTPUT'

def __init__(self):
super().__init__()

def name(self):
return "clip_traj_extent"

def icon(self):
return QIcon(os.path.join(pluginPath, "icons", "icon.png"))

def tr(self, text):
return QCoreApplication.translate("clip_traj_extent", text)

def displayName(self):
return self.tr("Clip trajectories by extent")

def group(self):
return self.tr("Basic")

def groupId(self):
return "TrajectoryBasic"

def shortHelpString(self):
return self.tr("""
<h3>Trajectories from point layer</h3>
<p>Todo</p>
""")

def helpUrl(self):
return "https://github.com/anitagraser/processing-trajectory"

def createInstance(self):
return type(self)()

def initAlgorithm(self, config=None):
# input layer
self.addParameter(QgsProcessingParameterFeatureSource(
name=self.INPUT,
description=self.tr("Input point layer"),
types=[QgsProcessing.TypeVectorPoint]))
# fields
self.addParameter(QgsProcessingParameterField(
name=self.TRAJ_ID_FIELD,
description=self.tr("Trajectory ID field"),
defaultValue="trajectory_id",
parentLayerParameterName=self.INPUT,
type=QgsProcessingParameterField.Any,
allowMultiple=False,
optional=False))
self.addParameter(QgsProcessingParameterField(
name=self.TIMESTAMP_FIELD,
description=self.tr("Timestamp field"),
defaultValue="t",
parentLayerParameterName=self.INPUT,
type=QgsProcessingParameterField.Any,
allowMultiple=False,
optional=False))
self.addParameter(QgsProcessingParameterString(
name=self.TIMESTAMP_FORMAT,
description=self.tr("Timestamp field"),
defaultValue="%Y-%m-%d %H:%M:%S+00",
optional=False))
self.addParameter(QgsProcessingParameterExtent(
name=self.EXTENT,
description=self.tr("Extent"),
optional=False))
# output layer
self.addParameter(QgsProcessingParameterFeatureSink(
name=self.OUTPUT,
description=self.tr("Clipped trajectories"),
type=QgsProcessing.TypeVectorLine))

def processAlgorithm(self, parameters, context, feedback):
input_layer = self.parameterAsSource(parameters, self.INPUT, context)
traj_id_field = self.parameterAsFields(parameters, self.TRAJ_ID_FIELD, context)[0]
timestamp_field = self.parameterAsFields(parameters, self.TIMESTAMP_FIELD, context)[0]
timestamp_format = self.parameterAsString(parameters, self.TIMESTAMP_FORMAT, context)
extent = self.parameterAsExtent(parameters, self.EXTENT, context)

output_fields = QgsFields()
output_fields.append(QgsField(traj_id_field, QVariant.String))

(sink, dest_id) = self.parameterAsSink(parameters, self.OUTPUT, context,
output_fields,
QgsWkbTypes.LineString,
input_layer.sourceCrs())

trajectories = trajectories_from_qgis_point_layer(input_layer, timestamp_field, traj_id_field, timestamp_format)

xmin = extent.xMinimum()
xmax = extent.xMaximum()
ymin = extent.yMinimum()
ymax = extent.yMaximum()
polygon = Polygon([(xmin,ymin), (xmin,ymax), (xmax,ymax), (xmax,ymin), (xmin,ymin)])

intersections = []
for traj in trajectories:
for intersection in traj.intersection(polygon):
intersections.append(intersection)

for traj in intersections:
line = QgsGeometry.fromWkt(traj.to_linestring().wkt)
f = QgsFeature()
f.setGeometry(line)
f.setAttributes([traj.id])
sink.addFeature(f, QgsFeatureSink.FastInsert)

# default return type for function
return {self.OUTPUT: dest_id}
Binary file added icons/icon.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions metadata.txt
@@ -0,0 +1,21 @@
[general]
name=Trajectory tools for Processing
description=This is a collection of tools for handling trajectory data
about=TODO
category=Plugins
version=1.0
qgisMinimumVersion=3.0
qgisMaximumVersion=4.0

icon=icons/icon.png
tags=trajectories, movement

author=Anita Graser
email=anitagraser@gmx.at

homepage=https://github.com/anitagraser/processing-trajectory
tracker=https://github.com/anitagraser/processing-trajectory/issues
repository=https://github.com/anitagraser/processing-trajectory

experimental=True
deprecated=False
141 changes: 141 additions & 0 deletions test/testTrajectory.py
@@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-

"""
***************************************************************************
testTrajectory.py
---------------------
Date : December 2018
Copyright : (C) 2018 by Anita Graser
Email : anitagraser@gmx.at
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
ATTENTION!
If you use OSGeo4W, you need to run the following command first:
call C:\OSGeo4W64\bin\py3_env.bat
python3 testTrajectory.py -v
"""

import sys
import unittest
import pandas as pd
import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import Point, LineString, Polygon
from shapely.affinity import translate
from datetime import datetime, timedelta

sys.path.append("..")

from trajectory import Trajectory

class TestTrajectory(unittest.TestCase):

def test_two_intersections_with_same_polygon(self):
polygon = Polygon([(5,-5), (7,-5), (7,12), (5,12), (5,-5)])
data = [{'id':1, 'geometry':Point(0,0), 't':datetime(2018,1,1,12,0,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,10,0)},
{'id':1, 'geometry':Point(10,0), 't':datetime(2018,1,1,12,15,0)},
{'id':1, 'geometry':Point(10,10), 't':datetime(2018,1,1,12,30,0)},
{'id':1, 'geometry':Point(0,10), 't':datetime(2018,1,1,13,0,0)}]
df = pd.DataFrame(data).set_index('t')
geo_df = GeoDataFrame(df, crs={'init': '31256'})
traj = Trajectory(1,geo_df)
intersections = traj.intersection(polygon)
result = []
for x in intersections:
result.append(x.to_linestring())
expected_result = [LineString([(5,0),(6,0),(7,0)]),LineString([(7,10),(5,10)])]
self.assertEqual( result , expected_result)

def test_duplicate_traj_points(self):
polygon = Polygon([(5,-5), (7,-5), (7,5), (5,5), (5,-5)])
data = [{'id':1, 'geometry':Point(0,0), 't':datetime(2018,1,1,12,0,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,10,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,12,0)},
{'id':1, 'geometry':Point(10,0), 't':datetime(2018,1,1,12,15,0)},
{'id':1, 'geometry':Point(10,10), 't':datetime(2018,1,1,12,30,0)},
{'id':1, 'geometry':Point(0,10), 't':datetime(2018,1,1,13,0,0)}]
df = pd.DataFrame(data).set_index('t')
geo_df = GeoDataFrame(df, crs={'init': '31256'} )
traj = Trajectory(1,geo_df)
intersections = traj.intersection(polygon)
result = []
for x in intersections:
result.append(x.to_linestring())
expected_result = [LineString([(5,0),(6,0),(6,0),(7,0)])]
self.assertEqual( result , expected_result)

def test_one_intersection(self):
polygon = Polygon([(5,-5), (7,-5), (7,5), (5,5), (5,-5)])
data = [{'id':1, 'geometry':Point(0,0), 't':datetime(2018,1,1,12,0,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,10,0)},
{'id':1, 'geometry':Point(10,0), 't':datetime(2018,1,1,12,15,0)},
{'id':1, 'geometry':Point(10,10), 't':datetime(2018,1,1,12,30,0)},
{'id':1, 'geometry':Point(0,10), 't':datetime(2018,1,1,13,0,0)}]
df = pd.DataFrame(data).set_index('t')
geo_df = GeoDataFrame(df, crs={'init': '31256'} )
traj = Trajectory(1,geo_df)
intersections = traj.intersection(polygon)
result = []
for x in intersections:
result.append(x.to_linestring())
expected_result = [LineString([(5,0),(6,0),(7,0)])]
self.assertEqual( result , expected_result)

def test_one_intersection_reversed(self):
polygon = Polygon([(5,-5), (7,-5), (7,5), (5,5), (5,-5)])
data = [{'id':1, 'geometry':Point(0,10), 't':datetime(2018,1,1,12,0,0)},
{'id':1, 'geometry':Point(10,10), 't':datetime(2018,1,1,12,10,0)},
{'id':1, 'geometry':Point(10,0), 't':datetime(2018,1,1,12,15,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,30,0)},
{'id':1, 'geometry':Point(0,0), 't':datetime(2018,1,1,13,0,0)}]
df = pd.DataFrame(data).set_index('t')
geo_df = GeoDataFrame(df, crs={'init': '31256'} )
traj = Trajectory(1,geo_df)
intersections = traj.intersection(polygon)
result = []
for x in intersections:
result.append(x.to_linestring())
expected_result = [LineString([(7,0),(6,0),(5,0)])]
self.assertEqual( result , expected_result)

def test_milliseconds(self):
polygon = Polygon([(5,-5), (7,-5), (8,5), (5,5), (5,-5)])
data = [{'id':1, 'geometry':Point(0,10), 't':datetime(2018,1,1,12,0,0)},
{'id':1, 'geometry':Point(10,10), 't':datetime(2018,1,1,12,10,0)},
{'id':1, 'geometry':Point(10,0), 't':datetime(2018,1,1,12,15,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,30,0)},
{'id':1, 'geometry':Point(0,0), 't':datetime(2018,1,1,13,0,0)}]
df = pd.DataFrame(data).set_index('t')
geo_df = GeoDataFrame(df, crs={'init': '31256'} )
traj = Trajectory(1,geo_df)
result = traj.intersection(polygon)[0].to_linestring().wkt
expected_result = "LINESTRING (7.5 0, 6 0, 5 0)"
self.assertEqual( result , expected_result)

def test_no_intersection(self):
polygon = Polygon([(105,-5), (107,-5), (107,12), (105,12), (105,-5)])
data = [{'id':1, 'geometry':Point(0,0), 't':datetime(2018,1,1,12,0,0)},
{'id':1, 'geometry':Point(6,0), 't':datetime(2018,1,1,12,10,0)},
{'id':1, 'geometry':Point(10,0), 't':datetime(2018,1,1,12,15,0)},
{'id':1, 'geometry':Point(10,10), 't':datetime(2018,1,1,12,30,0)},
{'id':1, 'geometry':Point(0,10), 't':datetime(2018,1,1,13,0,0)}]
df = pd.DataFrame(data).set_index('t')
geo_df = GeoDataFrame(df, crs={'init': '31256'})
traj = Trajectory(1,geo_df)
result = traj.intersection(polygon)
expected_result = []
self.assertEqual( result , expected_result)


if __name__ == '__main__':
unittest.main()

0 comments on commit 59e11b5

Please sign in to comment.