In [6]:
import pathway as pw
pets = pw.debug.table_from_markdown("owner pet \n Alice dog \n Bob cat \n Alice cat")

### `API` endpoint as a `sink`

In [None]:
# Lets say we need to send the stream of the changes to particular endpoint then
message_template_tokens = ["owner={table.owner}", "pet={table.pet}", "time={table.time}", "diff={table.diff}"]
message_template = "\t".join(message_template_tokens)
pw.io.http.write(pets, "http://www.example.com/api/event", method="POST", format="custom", request_payload_template=message_template)

### Pathway User-defined Functions (`UDF`)

In Pathway, UDFs (User-Defined Functions) are one of the most powerful and essential tools they allow us to `inject custom logic` into the lazy and dataflow-driven model of computation.

In [None]:
import time
t = pw.debug.table_from_markdown('''a b \n 1 2 \n 3 4 \n 5 6''')

# pathway user defined functions
@pw.udf(executor=pw.udfs.auto_executor())
def long_running_function(a: int, b: int) -> int:
    time.sleep(0.1)
    return a * b

result_2 = t.select(res=long_running_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_2, include_id=False)

res
2
12
30


In [9]:
class VerySophisticatedUDF(pw.UDF):
    exponent: float
    def __init__(self, exponent: float) -> None:
        super().__init__()
        self.exponent = exponent
    def __wrapped__(self, a: int, b: int) -> float:
        intermediate = (a * b) ** self.exponent
        return round(intermediate, 2)

func = VerySophisticatedUDF(1.5)
res = t.select(result=func(pw.this.a, t.b))
pw.debug.compute_and_print(res, include_id=True)


            | result
^X1MXHYY... | 2.83
^YYY4HAB... | 41.57
^Z3QWT29... | 164.32


#### `Caching` at the udf level

1. Pathway uses internal caching to avoid recomputing results when the same data and logic are used again. \nl
2. A `cache hit` happens when:
    - Same input values (e.g., same string like "teja")
    - Same function logic (UDF hasn't changed)
    - Function is deterministic (i.e., no randomness or side effects)
3. Pathway creates a cache key from: 
```cache_key = hash(function) + hash(input_values) + row_pointer_id```
4. **Reference** : https://pathway.com/developers/api-docs/udfs#pathway.udfs.CacheStrategy

In [None]:
import time
from pathway.internals.udfs.caches import DiskCache, InMemoryCache
t = pw.debug.table_from_markdown('''a b \n 1 2 \n 3 4 \n 5 6''')

# pathway user defined functions
@pw.udf(executor=pw.udfs.auto_executor(), cache_strategy=DiskCache)
def long_running_function(a: int, b: int) -> int:
    time.sleep(0.1)
    return a * b

result_2 = t.select(res=long_running_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_2, include_id=False)

### `Debugging` in pathway

**Reference** : https://pathway.com/developers/api-docs/debug

In [12]:
import pathway as pw
t1 = pw.debug.table_from_markdown('''pet \n Dog \n Cat''')
t2 = t1.select(animal=t1.pet, desc="fluffy")

In [13]:
pw.debug.compute_and_print(t2, include_id=False)

animal | desc
Cat    | fluffy
Dog    | fluffy


### Pathway Indexing (`RAG`)

**Reference** : https://pathway.com/developers/api-docs/indexing

### Pathway `DataTypes`

1. `pw.this.id` - pw.ColumnReference
2. `pw.apply(lambda x: x.upper(), pw.this.name)` - pw.ColumnExpression
3. `unique row identity` - pw.Pointer
4. `Schema` - pw.Schema

### `Transformations` in pathway

##### Performing the transformation using async calls 
Reference : https://pathway.com/developers/api-docs/pathway

In [None]:
import asyncio
from typing import Any

class OutputSchema(pw.Schema):
   ret: int

class AsyncIncrementTransformer(pw.AsyncTransformer.with_options(cache_strategy=DiskCache), output_schema=OutputSchema):
    async def invoke(self, value) -> dict[str, Any]:
        await asyncio.sleep(0.1)
        return {"ret": value + 1 }

input = pw.debug.table_from_markdown('''
  | value
1 | 42
2 | 44
''')

result = AsyncIncrementTransformer(input_table=input).successful

pw.debug.compute_and_print(result, include_id=False)


ret
43
45


##### Changing the datatype of the columns

In [18]:
import pandas as pd

class InputSchema(pw.Schema):
    data: dict

dt = pd.DataFrame(data={"data": [{"value": True}, {"value": False}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)

result = table.select(result=pw.this.data.get("value").as_bool())

pw.debug.compute_and_print(result, include_id=False)

result
False
True


In [19]:
dt = pd.DataFrame(data={"data": [{"value": 1.5}, {"value": 3.14}]})
table = pw.debug.table_from_pandas(dt, schema=InputSchema)

result = table.select(result=pw.this.data.get("value").as_float())

pw.debug.compute_and_print(result, include_id=False)

result
1.5
3.14


In [20]:
# Similarly we has `.as_str()`, `.as_int()`, `to_string()`