Skip to content

Commit

Permalink
Merge pull request #390 from Parsely/feature/pass_config_file_object
Browse files Browse the repository at this point in the history
Allow passing config_file objects instead of always assuming we have a config.json file
  • Loading branch information
dan-blanchard committed Aug 24, 2017
2 parents 2107460 + 5248db0 commit ee50169
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 48 deletions.
3 changes: 1 addition & 2 deletions streamparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import logging

from . import bolt, cli, component, decorators, dsl, spout, storm
from . import bolt, cli, component, dsl, spout, storm
from .dsl import Grouping, Stream, Topology
from .storm import (BatchingBolt, Bolt, JavaBolt, JavaSpout, ReliableSpout,
ShellBolt, ShellSpout, Spout, StormHandler,
Expand All @@ -25,7 +25,6 @@
'bolt',
'cli',
'component',
'decorators',
'dsl',
'Grouping',
'JavaBolt',
Expand Down
32 changes: 0 additions & 32 deletions streamparse/decorators.py

This file was deleted.

53 changes: 39 additions & 14 deletions streamparse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from thriftpy.rpc import make_client
from thriftpy.transport import TFramedTransportFactory

from .decorators import memoized
from .dsl.topology import Topology, TopologyType
from .thrift import Nimbus

Expand Down Expand Up @@ -143,27 +142,48 @@ def warn(msg, error_code=1):
print("{}: {}".format(yellow("warning"), msg))


@memoized
def get_config():
if not os.path.exists("config.json"):
die("No config.json found. You must run this command inside a "
"streamparse project directory.")
_config = None
def get_config(config_file=None):
"""
Parses the config file and returns it as a `dict`.
:param config_file: a `file`-like object that contains the config.json
contents. If `None`, we look for a file named
``config.json`` in the working directory.
:returns: a `dict` representing the parsed `config_file`.
"""
global _config
if _config is not None:
return _config

if config_file is None:
if not os.path.exists("config.json"):
die("No config.json found. You must run this command inside a "
"streamparse project directory.")

with open("config.json") as fp:
config = json.load(fp)
with open("config.json") as fp:
config = json.load(fp)
else:
config = json.load(config_file)
_config = config
return config


def get_topology_definition(topology_name=None):
def get_topology_definition(topology_name=None, config_file=None):
"""Fetch a topology name and definition file. If the topology_name is
None, and there's only one topology definiton listed, we'll select that
one, otherwise we'll die to avoid ambiguity.
:param topology_name: a `str`, the topology_name of the topology (without
.py extension).
:param config_file: a `file`-like object that contains the config.json
contents. If `None`, we look for a file named
``config.json`` in the working directory.
:returns: a `tuple` containing (topology_name, topology_file).
"""
config = get_config()
config = get_config(config_file=config_file)
topology_path = config["topology_specs"]
if topology_name is None:
topology_files = glob("{}/*.py".format(topology_path))
Expand All @@ -189,14 +209,18 @@ def get_topology_definition(topology_name=None):
return (topology_name, topology_file)


def get_env_config(env_name=None):
def get_env_config(env_name=None, config_file=None):
"""Fetch an environment name and config object from the config.json file.
If the name is None and there's only one environment, we'll select the
first, otherwise we'll die to avoid ambiguity.
:param config_file: a `file`-like object that contains the config.json
contents. If `None`, we look for a file named
``config.json`` in the working directory.
:returns: a `tuple` containing (env_name, env_config).
"""
config = get_config()
config = get_config(config_file=config_file)
if env_name is None and len(config["envs"]) == 1:
env_name = list(config["envs"].keys())[0]
elif env_name is None and len(config["envs"]) > 1:
Expand Down Expand Up @@ -261,10 +285,11 @@ def get_nimbus_client(env_config=None, host=None, port=None, timeout=7000):

_storm_workers = {}
def get_storm_workers(env_config):
"""Retrieves list of workers, optionally from nimbus
"""Retrieves list of workers, optionally from nimbus.
This function will look up the list of current workers from nimbus if
workers has not been defined in config.json
workers have not been defined in config.json.
:param env_config: The project's parsed config.
:type env_config: `dict`
Expand Down

0 comments on commit ee50169

Please sign in to comment.