Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dagster Data pipeline #798

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@


import asyncio
from collections import namedtuple
from datetime import datetime

GPU_CONFIG = gpu.A10G()

MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
MODEL_ID = os.environ.get("MODEL_ID")
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]


def download_model():
import subprocess
import os
MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
MODEL_ID = os.environ.get("MODEL_ID")
print(f'MODEL_ID={MODEL_ID}')
subprocess.run(
[
Expand All @@ -33,12 +35,13 @@ def download_model():
"tabbyml/tabby:0.5.5",
add_python="3.11",
)
.env({"MODEL_ID": os.environ.get("MODEL_ID")})
.dockerfile_commands("ENTRYPOINT []")
.copy_local_dir(local_path='./modal/tabby_python_client/tabby_python_client', remote_path='/root/tabby_python_client')
.pip_install(
"git+https://github.com/TabbyML/tabby.git#egg=tabby-python-client&subdirectory=experimental/eval/tabby-python-client",
"httpx",
"pandas"
)
.copy_local_file(local_path="/tmp/tabby_model_id", remote_path="/tmp/tabby_model_id")
.run_function(download_model)
)

Expand All @@ -62,16 +65,17 @@ def __enter__(self):

my_env = os.environ.copy()
my_env["TABBY_DISABLE_USAGE_COLLECTION"] = "1"
MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
MODEL_ID = os.environ.get("MODEL_ID")
print(f'MODEL_ID={MODEL_ID}')

LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env)
self.client = Client("http://127.0.0.1:8000", timeout=240)

# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs.
def webserver_ready():
try:
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()

Line 91 already contains retry logic, no need to increase timeout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()

Line 91 already contains retry logic, no need to increase timeout

return True
except (socket.timeout, ConnectionRefusedError):
# Check if launcher webserving process has exited.
Expand Down Expand Up @@ -99,7 +103,7 @@ async def health(self):
return resp.to_dict()

@method()
async def complete(self, language, crossfile_context, index, row):
async def complete(self, language: str, index: int, prompt: str, prediction: bool):
from tabby_python_client.api.v1 import completion
from tabby_python_client.models import (
CompletionRequest,
Expand All @@ -111,15 +115,10 @@ async def complete(self, language, crossfile_context, index, row):
from tabby_python_client import errors
import pandas as pd

if 'prediction' in row and not pd.isnull(row['prediction']):
# if prediction exists, just skip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if prediction exists, you can simply don't call complete function

if prediction:
return None, None, None

if crossfile_context:
prompt = row["crossfile_context"]["text"] + row["prompt"]
else:
prompt = row["prompt"]

groundtruth = row["groundtruth"]


request = CompletionRequest(
language=language, debug_options=DebugOptions(raw_prompt=prompt)
Expand All @@ -141,10 +140,14 @@ async def complete(self, language, crossfile_context, index, row):
except Exception as e:
return index, None, f"error type: {type(e)}"


def write_log(log: str):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open('./modal/log.txt', 'a') as f:
f.write(f"{now} : {log}")
f.write("\n")

@stub.local_entrypoint()
async def main(language, file):
async def main(language: str):
import json
import pandas as pd

Expand All @@ -157,45 +160,60 @@ async def main(language, file):
print(health_resp)
assert(health_resp['model'] == MODEL_ID)

whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file

if file == 'line_completion.jsonl':
crossfile_context = False
else:
crossfile_context = True

objs = []
with open(whole_path_file) as fin:
for line in fin:
obj = json.loads(line)
objs.append(obj)

df = pd.DataFrame(objs)

outputs = await asyncio.gather(*[model.complete.remote.aio(language, crossfile_context, index, row) for index, row in df.iterrows()])

skipped = 0
success = 0
error = 0

for index, prediction, error_msg in outputs:
if index is None:
skipped += 1
elif prediction is not None:
df.loc[index, 'prediction'] = prediction
success += 1
else:
df.loc[index, 'error'] = error_msg
error += 1
print(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")

with open(whole_path_file, 'w') as fout:
for index, row in df.iterrows():
json.dump(row.to_dict(), fout)
fout.write('\n')


with open("log.txt", "a") as flog:
flog.write(f"model: {MODEL_ID}; language: {language}; file: {file}")
flog.write(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")
flog.write("\n\n")

for file in ['line_completion.jsonl', 'line_completion_rg1_bm25.jsonl', 'line_completion_oracle_bm25.jsonl']:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please extract function for this, e.g read_pandas_frame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this meant? extract function for the "for" loop? or do you mean extract function for reading all the three files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either interface is fine, it's just good to split the main function here into smaller chunks for better readability


whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file
objs = []
with open(whole_path_file) as fin:
for line in fin:
obj = json.loads(line)
if file == 'line_completion.jsonl':
obj['raw_prompt'] = obj['prompt']
else:
obj['raw_prompt'] = obj['crossfile_context']['text']
objs.append(obj)

df = pd.DataFrame(objs)

write_log(f"model: {MODEL_ID}; language: {language}; file: {file}: length = {len(df)}")

def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))

def get_prediction(row):
if 'prediction' in row and not pd.isnull(row['prediction']):
return True
else:
return False

skipped = 0
success = 0
error = 0

for group in chunker(df, 30):
outputs = await asyncio.gather(*[model.complete.remote.aio(language, index, row['raw_prompt'], get_prediction(row)) for index, row in group.iterrows()])

for index, prediction, error_msg in outputs:
if index is None:
skipped += 1
elif prediction is not None:
df.loc[index, 'prediction'] = prediction
success += 1
else:
df.loc[index, 'error'] = error_msg
error += 1

write_log(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")

whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file

with open(whole_path_file, 'w') as fout:
for index, row in df.iterrows():
row_dict = row.to_dict()
json.dump(row_dict, fout)
fout.write('\n')



write_log(f"model: {MODEL_ID}; language: {language}; file: {file}: end!\n")
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
__pycache__/
build/
dist/
*.egg-info/
.pytest_cache/

# pyenv
.python-version

# Environments
.env
.venv

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# JetBrains
.idea/

/coverage.xml
/.coverage
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# tabby-python-client
A client library for accessing Tabby Server

## Usage
First, create a client:

```python
from tabby_python_client import Client

client = Client(base_url="https://api.example.com")
```

If the endpoints you're going to hit require authentication, use `AuthenticatedClient` instead:

```python
from tabby_python_client import AuthenticatedClient

client = AuthenticatedClient(base_url="https://api.example.com", token="SuperSecretToken")
```

Now call your endpoint and use your models:

```python
from tabby_python_client.models import MyDataModel
from tabby_python_client.api.my_tag import get_my_data_model
from tabby_python_client.types import Response

my_data: MyDataModel = get_my_data_model.sync(client=client)
# or if you need more info (e.g. status_code)
response: Response[MyDataModel] = get_my_data_model.sync_detailed(client=client)
```

Or do the same thing with an async version:

```python
from tabby_python_client.models import MyDataModel
from tabby_python_client.api.my_tag import get_my_data_model
from tabby_python_client.types import Response

my_data: MyDataModel = await get_my_data_model.asyncio(client=client)
response: Response[MyDataModel] = await get_my_data_model.asyncio_detailed(client=client)
```

By default, when you're calling an HTTPS API it will attempt to verify that SSL is working correctly. Using certificate verification is highly recommended most of the time, but sometimes you may need to authenticate to a server (especially an internal server) using a custom certificate bundle.

```python
client = AuthenticatedClient(
base_url="https://internal_api.example.com",
token="SuperSecretToken",
verify_ssl="/path/to/certificate_bundle.pem",
)
```

You can also disable certificate validation altogether, but beware that **this is a security risk**.

```python
client = AuthenticatedClient(
base_url="https://internal_api.example.com",
token="SuperSecretToken",
verify_ssl=False
)
```

There are more settings on the generated `Client` class which let you control more runtime behavior, check out the docstring on that class for more info.

Things to know:
1. Every path/method combo becomes a Python module with four functions:
1. `sync`: Blocking request that returns parsed data (if successful) or `None`
1. `sync_detailed`: Blocking request that always returns a `Request`, optionally with `parsed` set if the request was successful.
1. `asyncio`: Like `sync` but async instead of blocking
1. `asyncio_detailed`: Like `sync_detailed` but async instead of blocking

1. All path/query params, and bodies become method arguments.
1. If your endpoint had any tags on it, the first tag will be used as a module name for the function (my_tag above)
1. Any endpoint which did not have a tag will be in `tabby_python_client.api.default`

## Building / publishing this Client
This project uses [Poetry](https://python-poetry.org/) to manage dependencies and packaging. Here are the basics:
1. Update the metadata in pyproject.toml (e.g. authors, version)
1. If you're using a private repository, configure it with Poetry
1. `poetry config repositories.<your-repository-name> <url-to-your-repository>`
1. `poetry config http-basic.<your-repository-name> <username> <password>`
1. Publish the client with `poetry publish --build -r <your-repository-name>` or, if for public PyPI, just `poetry publish --build`

If you want to install this client into another project without publishing it (e.g. for development) then:
1. If that project **is using Poetry**, you can simply do `poetry add <path-to-this-client>` from that project
1. If that project is not using Poetry:
1. Build a wheel with `poetry build -f wheel`
1. Install that wheel from the other project `pip install <path-to-wheel>`
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[tool.black]
line-length = 120
target_version = ['py38', 'py39', 'py310', 'py311']
exclude = '''
(
/(
| \.git
| \.venv
| \.mypy_cache
)/
)
'''

[tool.isort]
line_length = 120
profile = "black"
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pathlib

from setuptools import find_packages, setup

here = pathlib.Path(__file__).parent.resolve()
long_description = (here / "README.md").read_text(encoding="utf-8")

setup(
name="tabby-python-client",
version="0.4.0-dev",
description="A client library for accessing Tabby Server",
long_description=long_description,
long_description_content_type="text/markdown",
packages=find_packages(),
python_requires=">=3.8, <4",
install_requires=["httpx >= 0.15.0, < 0.25.0", "attrs >= 21.3.0", "python-dateutil >= 2.8.0, < 3"],
package_data={"tabby_python_client": ["py.typed"]},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
""" A client library for accessing Tabby Server """
from .client import AuthenticatedClient, Client

__all__ = (
"AuthenticatedClient",
"Client",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" Contains methods for accessing the API """
Loading