Skip to content

Commit

Permalink
Merge pull request #3874 from LBNL-UCB-STI/do/3867-analytic-python-sc…
Browse files Browse the repository at this point in the history
…ripts

Ability to run python scripts
  • Loading branch information
nikolayilyin committed May 29, 2024
2 parents 98cc9fa + 0368274 commit 7fc2c7f
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 51 deletions.
12 changes: 9 additions & 3 deletions docs/inputs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,6 @@ There's the list of parameters responsible for writing out data produced by BEAM
beam.outputs.writeGraphs = true
beam.outputs.writePlansInterval = 0
beam.outputs.writeEventsInterval = 1
beam.outputs.writeAnalysis = true
beam.outputs.writeR5RoutesInterval = 0
beam.physsim.writeEventsInterval = 0
beam.physsim.events.fileOutputFormats = "csv" # valid options: xml(.gz) , csv(.gz), none - DEFAULT: csv.gz
Expand All @@ -1198,6 +1197,11 @@ There's the list of parameters responsible for writing out data produced by BEAM
beam.outputs.matsim.deleteITERSFolderFiles = ""
beam.outputs.matsim.deleteRootFolderFiles = ""
beam.outputs.stats.binSize = 3600
beam.outputs.analysis = {
iterationScripts = []
simulationScripts = []
processWaitTimeInMinutes = 5
}
# Skims configuration
beam.router.skim = {
keepKLatestSkims = 1
Expand Down Expand Up @@ -1262,7 +1266,6 @@ writing is disabled.
* outputs.writeGraphs: enable writing activity locations to #.activityLocations.png
* outputs.writePlansInterval: enable writing plans of persons at the iteration to #.plans.csv.gz
* outputs.writeEventsInterval: enable writing AgentSim events to #.events.csv.gz
* outputs.writeAnalysis: enable analysis with python script analyze_events.py and writing different data files
* outputs.writeR5RoutesInterval: enable writing routing requests/responses to files #.routingRequest.parquet, #.routingResponse.parquet, #.embodyWithCurrentTravelTime.parquet
* physsim.writeEventsInterval: enable writing physsim events to #.physSimEvents.csv.gz
* physsim.events.fileOutputFormats: file format for physsim event file; valid options: xml(.gz) , csv(.gz), none - DEFAULT: csv.gz
Expand All @@ -1280,6 +1283,9 @@ writing is disabled.
* outputs.matsim.deleteITERSFolderFiles: comma separated list of matsim iteration output files to be deleted before beam shutdown.
* outputs.matsim.deleteRootFolderFiles: comma separated list of matsim root output files to be deleted before beam shutdown.
* outputs.stats.binSize: bin size for various histograms.
* outputs.analysis.iterationScripts: array of python scripts that are run at the end of each iteration.
* outputs.analysis.simulationScripts: array of python scripts that are run at the end of the simulation.
* outputs.processWaitTimeInMinutes: How much time in minutes Beam waits for the script to end after the simulation finishes
* router.skim.keepKLatestSkims: How many skim data iterations to keep
* router.skim.writeSkimsInterval: enable writing all skim data for a particular iteration to corresponding files
* router.skim.writeAggregatedSkimsInterval: enable writing all aggregated skim data (for all iterations) to corresponding files
Expand Down Expand Up @@ -1470,4 +1476,4 @@ Parameters that are not supported anymore
beam.physsim.relaxation.experiment5_1.percentToSimulate
beam.physsim.relaxation.experiment5_2.percentToSimulate
beam.urbansim.backgroundODSkimsCreator.*

beam.outputs.writeAnalysis
45 changes: 45 additions & 0 deletions docs/users.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ Steps to add a new configuration :

.. image:: _static/figs/scala_test_configuration.png

.. _building-beam-docker-image:

BEAM in Docker image
^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -300,6 +301,50 @@ Each iteration of the run produces a sub-folder under the `ITERS` directory. Wit

In addition, raw outputs are available in the two events file (one from the AgentSim and one from the PhysSim, see :ref:`matsim-events` for more details), titled `%ITER%.events.csv` and `%ITER%.physSimEvents.xml.gz` respectively.

Data Analysis
^^^^^^^^^^^^^

One may execute python scripts after each iteration or after the whole simulation. To do so add the following parameters
into beam config file: ::

beam.outputs.analysis.iterationScripts = ["path/to/iteration/script1.py", "path/to/iteration/script2.py"]
beam.outputs.analysis.simulationScripts = ["path/to/simulation/script1.py", "path/to/simulation/script2.py"]

Beam provides the following command line arguments to the iteration scripts:
#. Beam config path
#. Iteration output directory path
#. Iteration number
And to the simulation scripts:
#. Beam config path
#. Output directory path

`An example of an iteration script <https://github.com/LBNL-UCB-STI/beam/blob/develop/src/main/python/events_analysis/events_within_beam.py>`_.
If you need to use a specific version of python (i.e. from a conda environment) then you can provide a path to the
python using `--python-executable` command line argument when you start Beam. If you want to use a non-standard python
library then install it to your environment.

Beam docker image contains some of python libraries preinstalled. That's `the command <https://github.com/LBNL-UCB-STI/beam/blob/develop/docker/beam-environment/Dockerfile-java-11#L16-L15>`_ in the build docker file that
does it. If you want to use some other python libraries within beam docker image you can add a similar instruction to
`createDockerfile` task in `build.gradle` file.

.. code-block:: groovy
...
from 'beammodel/beam-environment:jdk-11-4.01'
runCommand('pip install emoji')
...
Go to :ref:`building-beam-docker-image` to see how one can build a beam docker image.
You need to mount your python code directory to the beam container so that it has access to your python code. You can do
it with the following docker argument:

.. code-block:: bash
--mount source="/absolute/path/to/code",destination=/app/analysis,type=bind
Then you want to use the path like `analysis/relative/path/to/script` in beam config `beam.outputs.analysis.iterationScripts` param.


Model Config
^^^^^^^^^^^^

Expand Down
32 changes: 32 additions & 0 deletions src/main/python/events_analysis/events_within_beam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import sys
import os
import json
from analyze_events import get_pooling_metrics
from analyze_events import get_pooling_sankey_diagram

# When beam executes an iteration python script it provides the following command line arguments
# 1. path to the beam config
# 2. path to the iteration output dir
# 3. the iteration number
if len(sys.argv) <= 1:
print("This script is supposed to be run by Beam")
exit(1)
iteration_path = sys.argv[2]
iteration_number = sys.argv[3]
possibleEventFiles = [f"{iteration_path}/{iteration_number}.events.csv",
f"{iteration_path}/{iteration_number}.events.csv.gz"]

event_file = next((x for x in possibleEventFiles if os.path.exists(x)), None)

if not event_file:
print(f"Cannot find event files within " + str(possibleEventFiles))
else:
print("getting pooling metrics from events file: " + event_file)
pooling_metrics = get_pooling_metrics(event_file)
name = event_file.rsplit('/', 1)[0]
with open('{}/pooling-metrics.json'.format(name), 'w') as f:
json.dump(pooling_metrics, f)
unit = 1000.0
get_pooling_sankey_diagram(pooling_metrics, name, unit)
print(json.dumps(pooling_metrics, indent=4))
print("done")
11 changes: 10 additions & 1 deletion src/main/resources/beam-template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,6 @@ beam.outputs.generalizedLinkStats.endTime = 108000
beam.outputs.defaultWriteInterval = 1
beam.outputs.writePlansInterval = "int | 0"
beam.outputs.writeEventsInterval = "int | 1"
beam.outputs.writeAnalysis = "boolean | true"
beam.outputs.writeR5RoutesInterval = "int | 0"
beam.physsim.writeEventsInterval = "int | 0"
beam.physsim.writePlansInterval = "int | 0"
Expand All @@ -944,6 +943,16 @@ beam.outputs.matsim.deleteRootFolderFiles = ""
#Delete MATSim files from ITERS folder:
#beam.outputs.matsim.deleteITERSFolderFiles="experienced_plans.xml.gz,legHistogram.txt,legHistogram_all.png,legHistogram_bike.png,legHistogram_cav.png,legHistogram_ride_hail.png,legHistogram_walk.png,legHistogram_car.png,legHistogram_walk_transit.png,tripdurations.txt"
beam.outputs.matsim.deleteITERSFolderFiles = ""
# post processing with python scripts
beam.outputs.analysis = {
#@optional
iterationScripts = [string] | []
#@optional
simulationScripts = [string] | []
# How much time in minutes beam waits for the process to end after the simulation finishes
processWaitTimeInMinutes = 60
}

##################################################################
# SPATIAL
##################################################################
Expand Down
10 changes: 4 additions & 6 deletions src/main/scala/beam/analysis/AnalysisProcessor.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package beam.analysis

import scala.concurrent._
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration.TimeUnit
import scala.sys.process._

import beam.utils.logging.ExponentialLazyLogging

case class PythonProcess(processOption: Option[Process]) {

def isRunning: Boolean = processOption match {
Expand All @@ -30,7 +30,7 @@ case class PythonProcess(processOption: Option[Process]) {
}
}

object AnalysisProcessor extends ExponentialLazyLogging {
object AnalysisProcessor extends LazyLogging {

def firePythonScriptAsync(scriptPath: String, args: String*): PythonProcess = {
firePythonAsync("python3", scriptPath, args: _*)
Expand All @@ -42,11 +42,9 @@ object AnalysisProcessor extends ExponentialLazyLogging {
val processLogger = ProcessLogger(
output => {
logger.info(s"Process Handler Stdout for $source: $output")
println(s"Process Handler Stdout for $source: $output")
},
output => {
logger.error(s"Process Handler Stderr for $source: $output")
println(s"Process Handler Stdout for $source: $output")
}
)
logger.info(s"Running python script: $scriptPath with args $args")
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/beam/sim/ArgumentsParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object ArgumentsParser {

private def buildParser: OptionParser[Arguments] = {
new scopt.OptionParser[Arguments]("beam") {
opt[String]("config")
opt[String]('c', "config")
.action((value, args) => {
args.copy(
config = Some(BeamConfigUtils.parseFileSubstitutingInputDirectory(value)),
Expand All @@ -20,6 +20,9 @@ object ArgumentsParser {
else success
)
.text("Location of the beam config file")
opt[String]('p', "python-executable")
.action((value, args) => args.copy(pythonExecutable = Option(value).map(_.trim)))
.text("A python executable to be used with analytic scripts")
opt[String]("cluster-type")
.action((value, args) =>
args.copy(
Expand Down Expand Up @@ -76,6 +79,7 @@ object ArgumentsParser {
case class Arguments(
configLocation: Option[String] = None,
config: Option[TypesafeConfig] = None,
pythonExecutable: Option[String] = None,
clusterType: Option[ClusterType] = None,
nodeHost: Option[String] = None,
nodePort: Option[String] = None,
Expand Down
46 changes: 25 additions & 21 deletions src/main/scala/beam/sim/BeamHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,29 +115,33 @@ trait BeamHelper extends LazyLogging with BeamValidationHelper {
parsedArgs: Arguments,
config: TypesafeConfig
): TypesafeConfig = {
config.withFallback(
ConfigFactory.parseMap(
(
Map(
"beam.cluster.enabled" -> parsedArgs.useCluster,
"beam.useLocalWorker" -> parsedArgs.useLocalWorker.getOrElse(
if (parsedArgs.useCluster) false else true
val primaryConfigEntries = ConfigFactory.parseMap(
parsedArgs.pythonExecutable.map("beam.outputs.analysis.pythonExecutable" -> _).toMap.asJava
)
val secondaryConfigEntries = ConfigFactory.parseMap(
(
Map(
"beam.cluster.enabled" -> parsedArgs.useCluster,
"beam.useLocalWorker" -> parsedArgs.useLocalWorker.getOrElse(
if (parsedArgs.useCluster) false else true
)
) ++ {
if (parsedArgs.useCluster)
Map(
"beam.cluster.clusterType" -> parsedArgs.clusterType.get.toString,
"akka.actor.provider" -> "akka.cluster.ClusterActorRefProvider",
"akka.remote.artery.canonical.hostname" -> parsedArgs.nodeHost.get,
"akka.remote.artery.canonical.port" -> parsedArgs.nodePort.get,
"akka.cluster.seed-nodes" -> java.util.Arrays
.asList(s"akka://ClusterSystem@${parsedArgs.seedAddress.get}")
)
) ++ {
if (parsedArgs.useCluster)
Map(
"beam.cluster.clusterType" -> parsedArgs.clusterType.get.toString,
"akka.actor.provider" -> "akka.cluster.ClusterActorRefProvider",
"akka.remote.artery.canonical.hostname" -> parsedArgs.nodeHost.get,
"akka.remote.artery.canonical.port" -> parsedArgs.nodePort.get,
"akka.cluster.seed-nodes" -> java.util.Arrays
.asList(s"akka://ClusterSystem@${parsedArgs.seedAddress.get}")
)
else Map.empty[String, Any]
}
).asJava
)
else Map.empty[String, Any]
}
).asJava
)
primaryConfigEntries
.withFallback(config)
.withFallback(secondaryConfigEntries)
}

def module(
Expand Down
51 changes: 34 additions & 17 deletions src/main/scala/beam/sim/BeamSim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import beam.utils.watcher.MethodWatcher
import beam.utils._
import com.conveyal.r5.transit.TransportNetwork
import com.google.inject.Inject
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.lang3.StringUtils
import org.jfree.data.category.DefaultCategoryDataset
Expand Down Expand Up @@ -77,6 +78,7 @@ class BeamSim @Inject() (
private val beamConfigChangesObservable: BeamConfigChangesObservable,
private val routeHistory: RouteHistory,
private val rideHailIterationHistory: RideHailIterationHistory,
private val config: Config,
private val configHolder: BeamConfigHolder
) extends StartupListener
with IterationStartsListener
Expand Down Expand Up @@ -542,8 +544,6 @@ class BeamSim @Inject() (
logger.info("Ending Iteration")
if (COLLECT_AND_CREATE_BEAM_ANALYSIS_AND_GRAPHS) {
delayMetricAnalysis.generateDelayAnalysis(event)

writeEventsAnalysisUsing(event)
}
vmInformationWriter.notifyIterationEnds(event)

Expand All @@ -555,27 +555,33 @@ class BeamSim @Inject() (
beamServices.skims.dt_skimmer.displaySkimStats()
beamServices.skims.tc_skimmer.displaySkimStats()

// iteration python scripts
for {
scripts <- beamServices.beamConfig.beam.outputs.analysis.iterationScripts
script <- scripts
} {
val iterationPath = event.getServices.getControlerIO.getIterationPath(event.getIteration)
val configPath = config.getString("config")
val process = AnalysisProcessor.firePythonAsync(
getPythonExecutable,
script,
configPath,
iterationPath,
event.getIteration.toString
)
runningPythonScripts += process
}

if (beamConfig.beam.agentsim.agents.vehicles.linkSocAcrossIterations)
beamServices.beamScenario.setInitialSocOfPrivateVehiclesFromCurrentStateOfVehicles()

// Clear the state of private vehicles because they are shared across iterations
beamServices.beamScenario.privateVehicles.values.foreach(_.resetState())
}

private def writeEventsAnalysisUsing(event: IterationEndsEvent) = {
if (beamServices.beamConfig.beam.outputs.writeAnalysis) {
val writeEventsInterval = beamServices.beamConfig.beam.outputs.writeEventsInterval
val writeEventAnalysisInThisIteration = writeEventsInterval > 0 && event.getIteration % writeEventsInterval == 0
if (writeEventAnalysisInThisIteration) {
val currentEventsFilePath =
event.getServices.getControlerIO.getIterationFilename(event.getServices.getIterationNumber, "events.csv")
val pythonProcess = beam.analysis.AnalysisProcessor.firePythonScriptAsync(
"src/main/python/events_analysis/analyze_events.py",
if (new File(currentEventsFilePath).exists) currentEventsFilePath else currentEventsFilePath + ".gz"
)
runningPythonScripts += pythonProcess
}
}
private def getPythonExecutable = {
val pythonExecProp = "beam.outputs.analysis.pythonExecutable"
if (config.hasPath(pythonExecProp)) config.getString(pythonExecProp) else "python3"
}

private def dumpMatsimStuffAtTheBeginningOfSimulation(): Unit = {
Expand Down Expand Up @@ -651,11 +657,22 @@ class BeamSim @Inject() (

deleteMATSimOutputFiles(event.getServices.getIterationNumber)

// simulation python scripts
for {
scripts <- beamServices.beamConfig.beam.outputs.analysis.simulationScripts
script <- scripts
} {
val outputPath = event.getServices.getControlerIO.getOutputPath
val configPath = config.getString("config")
val process = AnalysisProcessor.firePythonAsync(getPythonExecutable, script, configPath, outputPath)
runningPythonScripts += process
}

runningPythonScripts
.filter(process => process.isRunning)
.foreach(process => {
logger.info("Waiting for python process to complete running.")
process.waitFor(5, TimeUnit.MINUTES)
process.waitFor(beamServices.beamConfig.beam.outputs.analysis.processWaitTimeInMinutes, TimeUnit.MINUTES)
logger.info("Python process completed.")
})

Expand Down
Loading

0 comments on commit 7fc2c7f

Please sign in to comment.