Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dataflow/utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def __init__(self,
self.logger = get_logger()

def _get_cache_file_path(self, step) -> str:
if step == -1:
self.logger.error("You must call storage.step() before reading or writing data. Please call storage.step() first for each operator step.")
raise ValueError("You must call storage.step() before reading or writing data. Please call storage.step() first for each operator step.")
if step == 0:
# If it's the first step, use the first entry file name
return os.path.join(self.first_entry_file_name)
Expand All @@ -56,6 +59,10 @@ def reset(self):

def _load_local_file(self, file_path: str, file_type: str) -> pd.DataFrame:
"""Load data from local file based on file type."""
# check if file exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"File {file_path} does not exist. Please check the path.")
# Load file based on type
try:
if file_type == "json":
return pd.read_json(file_path)
Expand Down
8 changes: 5 additions & 3 deletions test/small_functions/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class GPT_generator():
def __init__(self):
self.storage = FileStorage(
first_entry_file_name="../../dataflow/example/GeneralTextPipeline/translation.jsonl",
first_entry_file_name="../../dataflow/example/GeneralTextPipeline/translation.jsonl1",
cache_path="./cache",
file_name_prefix="translation",
cache_type="jsonl",
Expand All @@ -16,13 +16,15 @@ def __init__(self):
model_name="gpt-4o",
max_workers=2
)
self.prompt_generator = PromptedGenerator(llm_serving = self.llm_serving)
self.prompt_generator = PromptedGenerator(
llm_serving = self.llm_serving,
system_prompt = "Please translate to Chinese.",
)

def forward(self):
# Initial filters
self.prompt_generator.run(
storage = self.storage.step(),
system_prompt = "Please translate to Chinese.",
input_key = "raw_content",
)

Expand Down
Loading