Skip to content

Commit

Permalink
add env as a passing parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
yuli_han authored and yuli_han committed Feb 27, 2024
1 parent d40f160 commit 41f5f25
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def download_only_once(url, path):
@retry_decorator(retries=3, backoff=50)
def download_jar(version, jar_type="uber", release_tag=None, spark_version="2.4.0"):
assert (
spark_version in SUPPORTED_SPARK
spark_version in SUPPORTED_SPARK
), f"Received unsupported spark version {spark_version}. Supported spark versions are {SUPPORTED_SPARK}"
scala_version = SCALA_VERSION_FOR_SPARK[spark_version]
maven_url_prefix = os.environ.get("CHRONON_MAVEN_MIRROR_PREFIX", None)
Expand Down Expand Up @@ -237,14 +237,16 @@ def set_runtime_env(args):
- Environment variables derived from args (like app_name)
- conf.metaData.modeToEnvMap for the mode (set on config)
- team environment per context and mode set on teams.json
- default team environment per context and mode set on teams.json
- production team environment per mode set on teams.json
- default production team environment per context and mode set on teams.json
- Common Environment set in teams.json
"""
environment = {
"common_env": {},
"conf_env": {},
"default_env": {},
"team_env": {},
"production_team_env ": {},
"cli_args": {},
}
conf_type = None
Expand All @@ -262,7 +264,7 @@ def set_runtime_env(args):
)
if args.conf and effective_mode:
try:
context, conf_type, team, _ = args.conf.split("/")[-4:]
_, conf_type, team, _ = args.conf.split("/")[-4:]
except Exception as e:
logging.error(
"Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format(
Expand All @@ -272,6 +274,7 @@ def set_runtime_env(args):
raise e
if not team:
team = "default"
context = args.env
logging.info(
f"Context: {context} -- conf_type: {conf_type} -- team: {team}"
)
Expand All @@ -298,8 +301,8 @@ def set_runtime_env(args):
environment["team_env"] = (
teams_json[team].get(context, {}).get(effective_mode, {})
)
environment["dev_team_env"] = (
teams_json[team].get("dev", {}).get(effective_mode, {})
environment["production_team_env"] = (
teams_json[team].get("production", {}).get(effective_mode, {})
)
environment["default_env"] = (
teams_json.get("default", {})
Expand All @@ -317,10 +320,10 @@ def set_runtime_env(args):
[
k
for k in [
"chronon",
conf_type,
args.mode.replace("-", "_") if args.mode else None,
]
"chronon",
conf_type,
args.mode.replace("-", "_") if args.mode else None,
]
if k is not None
]
)
Expand All @@ -329,12 +332,7 @@ def set_runtime_env(args):
environment["cli_args"]["CHRONON_DRIVER_JAR"] = args.chronon_jar
environment["cli_args"]["CHRONON_ONLINE_JAR"] = args.online_jar
environment["cli_args"]["CHRONON_ONLINE_CLASS"] = args.online_class
# If the job is running on airflow, ignore the dev team environment.
if 'AIRFLOW_CTX_EXECUTION_DATE' in os.environ:
order = ["conf_env", "team_env", "default_env", "common_env", "cli_args"]
else:
# If the job is running locally for testing, dev team environment should be prioritized.
order = ["conf_env", "dev_team_env", "team_env", "default_env", "common_env", "cli_args"]
order = ["conf_env", "team_env", "production_team_env", "default_env", "common_env", "cli_args"]
print("Setting env variables:")
for key in os.environ:
if any([key in environment[set_key] for set_key in order]):
Expand Down Expand Up @@ -375,7 +373,7 @@ def __init__(self, args, jar_path):
raise e
possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES
assert (
args.mode in possible_modes
args.mode in possible_modes
), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format(
args.mode, self.conf, self.conf_type, possible_modes
)
Expand Down Expand Up @@ -451,7 +449,7 @@ def run(self):
)
if self.mode == "streaming":
assert (
len(filtered_apps) == 1
len(filtered_apps) == 1
), "More than one found, please kill them all"
print("All good. No need to start a new app.")
return
Expand Down Expand Up @@ -576,6 +574,12 @@ def set_defaults(parser):
required=False,
help="Conf param - required for every mode except fetch",
)
parser.add_argument(
"--env",
required=False,
default='dev',
help="Running environment - default to be dev"
)
parser.add_argument("--mode", choices=MODE_ARGS.keys())
parser.add_argument("--ds", help="the end partition to backfill the data")
parser.add_argument(
Expand All @@ -597,7 +601,7 @@ def set_defaults(parser):
parser.add_argument(
"--online-jar",
help="Jar containing Online KvStore & Deserializer Impl. "
+ "Used for streaming and metadata-upload mode.",
+ "Used for streaming and metadata-upload mode.",
)
parser.add_argument(
"--online-class",
Expand All @@ -614,7 +618,7 @@ def set_defaults(parser):
parser.add_argument(
"--online-jar-fetch",
help="Path to script that can pull online jar. "
+ "This will run only when a file doesn't exist at location specified by online_jar",
+ "This will run only when a file doesn't exist at location specified by online_jar",
)
parser.add_argument(
"--sub-help",
Expand All @@ -639,7 +643,7 @@ def set_defaults(parser):
parser.add_argument(
"--render-info",
help="Path to script rendering additional information of the given config. "
+ "Only applicable when mode is set to info",
+ "Only applicable when mode is set to info",
)
set_defaults(parser)
pre_parse_args, _ = parser.parse_known_args()
Expand Down

0 comments on commit 41f5f25

Please sign in to comment.