In [28]:
import json

from zep_cloud import ZepDataType
import typing
from dotenv import load_dotenv
from pydantic import BaseModel, Field, WithJsonSchema
from typing_extensions import Annotated
from typing import Optional
from zep_cloud.client import AsyncZep
import asyncio
import os

load_dotenv()

class ZepBaseText(BaseModel):
    description: typing.Optional[str] = None
    type: ZepDataType = "ZepText"

ZepText = Annotated[
    str | None,
    Field(default=None, zeptype="ZepText"),
    WithJsonSchema(ZepBaseText.model_json_schema(), mode='serialization'),
]

class ZepBaseNumber(BaseModel):
    description: typing.Optional[str] = None
    type: ZepDataType = "ZepFloat"

ZepNumber = Annotated[
    int | None,
    Field(default=None, zeptype="ZepNumber"),
    WithJsonSchema(ZepBaseNumber.model_json_schema(), mode='serialization'),
]

class ZepBaseFloat(BaseModel):
    description: typing.Optional[str] = None
    type: ZepDataType = "ZepFloat"

ZepFloat = Annotated[
    float | None,
    Field(default=None, zeptype="ZepFloat"),
    WithJsonSchema(ZepBaseNumber.model_json_schema(), mode='serialization'),
]

class ZepModel(BaseModel):
    _client: Optional[AsyncZep] = None

    def __init__(self, **data):
        super().__init__(**data)
        self._client = data.get('_client')

    @property
    def client(self):
        return self._client
    
    async def extract(self):
        model_schema = self.schema_json()
        extractor_result = await self._client.memory.extract_session_data("a2eb841bc24245fdb0901b0a87a865cd", last_n_messages=100, model_schema=model_schema)
        print("Extracting", result)
        for key, value in extractor_result.items():
            if hasattr(self, key):
                setattr(self, key, value)
        # result_json = json.dumps(result)
        # print("Extracting json", result_json)
        return self

API_KEY = os.environ.get("ZEP_API_KEY")
print(API_KEY)

client = AsyncZep(
        api_key=API_KEY,
        base_url="http://localhost:8000/api/v2"
    )

class TestModel (ZepModel):
    shopping_purpose: ZepText = Field(description="what is the user shopping for?" )
    shoe_size: ZepNumber = Field(description="what is the user's shoe size?" )
    price: ZepFloat = Field(description="How much was spent on shoes" )
    # b: ZepText = Field(description="This is a descriptionB")
    
t = TestModel(_client=client)

print(t.client)

# Create a task for the coroutine
task = asyncio.create_task(t.extract())

result = await asyncio.gather(task)

# p = TestModel(**result[0])
print("Result:", result[0].shoe_size)
print("Result:", result[0].price)

# Wait for the task to complete

# g = GuitarInfoModel(_client=client, session_id="session_id", balls="lskdjf")
# 
# g.extract()

z_1dWlkIjoiN2E0NjcxZmMtZDM5ZS00OTQ1LWE4YWMtODFjNTM5NTU5N2M1In0.um46UqJMM7XwfaG2BhwpLGVVrA4fxSZAzDbh_5cX3k9ickzBFZsp9NQFMLr8BN0tPpD2-7tUAv60eJsA907rEg
<zep_cloud.client.AsyncZep object at 0x11bffab00>
Extracting [TestModel(shopping_purpose='', shoe_size='10', price='12999')]
Result: 10
Result: 129.99
