Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add supports for stream mode #79

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,35 @@ To list all the conversation Id's you had with Claude , you can use the list_all
conversation_id = conversation['uuid']
print(conversation_id)

## Send Message
## Query

To send a message to Claude, you can use the send_message method. You need to provide the prompt and the conversation ID:
To query Claude, you can use the query method. You need to provide the prompt and the conversation ID:

prompt = "Hello, Claude!"
conversation_id = "<conversation_id>" or claude_api.create_new_chat()['uuid']
response = claude_api.query(prompt, conversation_id)
print(response)

Also supported stream mode:

prompt = "Hello, Claude!"
conversation_id = "<conversation_id>" or claude_api.create_new_chat()['uuid']
response = claude_api.send_message(prompt, conversation_id)
print(response)
response = claude_api.query(prompt, conversation_id, stream=True)
for r in response:
print(r, end="")

## Send Message with attachment
## Query with attachment

You can send any type of attachment to claude to get responses using attachment argument in send_message().
You can send any type of attachment to claude to get responses using attachment argument in query().
Note: Claude currently supports only some file types.

{ You can also add timeout if you need ,using timeout parameter[default set to 500] }

prompt = "Hey,Summarize me this document.!"
conversation_id = "<conversation_id>" or claude_api.create_new_chat()['uuid']
response = claude_api.send_message(prompt, conversation_id,attachment="path/to/file.pdf",timeout=600)
response = claude_api.query(prompt, conversation_id,attachment="path/to/file.pdf",timeout=600)
print(response)


## Delete Conversation

To delete a conversation, you can use the delete_conversation method:
Expand Down
86 changes: 70 additions & 16 deletions claude-api/claude_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from curl_cffi import requests
import requests as req
import re
import typing


class Client:
Expand Down Expand Up @@ -75,6 +76,21 @@ def list_all_conversations(self):

# Send Message to Claude
def send_message(self, prompt, conversation_id, attachment=None,timeout=500):
return self._non_stream_query(
prompt=prompt,
conversation_id=conversation_id,
attachment=attachment,
timeout=timeout,
)

def _query_stream(
self,
prompt,
conversation_id,
attachment=None,
timeout=500,
) -> typing.Generator[dict, None, None]:

url = "https://claude.ai/api/append_message"

# Upload attachment if provided
Expand Down Expand Up @@ -118,22 +134,60 @@ def send_message(self, prompt, conversation_id, attachment=None,timeout=500):
'Sec-Fetch-Site': 'same-origin',
'TE': 'trailers'
}

response = requests.post( url, headers=headers, data=payload,impersonate="chrome110",timeout=500)
decoded_data = response.content.decode("utf-8")
decoded_data = re.sub('\n+', '\n', decoded_data).strip()
data_strings = decoded_data.split('\n')
completions = []
for data_string in data_strings:
json_str = data_string[6:].strip()
data = json.loads(json_str)
if 'completion' in data:
completions.append(data['completion'])

answer = ''.join(completions)

# Returns answer
return answer

with requests.Session() as s:
response = s.post( url, headers=headers, data=payload,impersonate="chrome110",timeout=timeout, stream=True)
for line in response.iter_lines():
data_string = line.decode("utf-8")
if not data_string.startswith('data:'):
continue
json_str = data_string[6:].strip()
data = json.loads(json_str)
if 'completion' in data:
yield data['completion']

def _non_stream_query(
self,
prompt,
conversation_id,
attachment=None,
timeout=500,
) -> str:
completion = ""

for data in self._query_stream(
prompt=prompt,
conversation_id=conversation_id,
attachment=attachment,
timeout=timeout,
):
completion += data

return completion

def query(
self,
prompt,
conversation_id,
attachment=None,
timeout=500,
stream=False,
) -> typing.Union[str, typing.Generator[str, None, None]]:
if not stream:
return self.send_message(
prompt=prompt,
conversation_id=conversation_id,
attachment=attachment,
timeout=timeout,
)
else:
return self._query_stream(
prompt=prompt,
conversation_id=conversation_id,
attachment=attachment,
timeout=timeout,
)


# Deletes the conversation
def delete_conversation(self, conversation_id):
Expand Down