In [1]:
from collections import deque
import numpy as np
import random
import time
from collections import defaultdict
from sklearn.datasets import make_classification
from abc import ABC, abstractmethod
from sklearn.model_selection import train_test_split
import pandas as pd
from datetime import date, timedelta

In [None]:
np.random.seed(42)

# Generate 90 business days
start_date = "2025-01-01"
date_range = pd.date_range(start=start_date, periods=90, freq="B")

# Simulate realistic portfolio with drift and volatility
initial_value = 100000
daily_returns = np.random.normal(
    0.0005, 0.015, size=len(date_range)
)  # 0.05% drift, 1.5% daily vol

portfolio_values = [initial_value]
for ret in daily_returns[1:]:
    portfolio_values.append(portfolio_values[-1] * (1 + ret))

df = pd.DataFrame(
    {"portfolio_date": date_range, "total_portfolio_value_usd": portfolio_values}
)

print(f"Length: {len(df)}")
print(f"Start value: ${df['total_portfolio_value_usd'].iloc[0]:,.2f}")
print(f"End value: ${df['total_portfolio_value_usd'].iloc[-1]:,.2f}")
df.head()

Length: 90
Start value: $100,000.00
End value: $90,019.86


Unnamed: 0,portfolio_date,total_portfolio_value_usd
0,2025-01-01,100000.0
1,2025-01-02,99842.603548
2,2025-01-03,100862.528499
3,2025-01-06,103217.209398
4,2025-01-07,102906.288134


In [None]:
df["daily_returns"] = df["total_portfolio_value_usd"].pct_change()
risk_free_rate = 0.03
mean_returns = df["daily_returns"].mean()
std_dev_daily = df["daily_returns"].std()
annualized_volatility = std_dev_daily * np.sqrt(252)
sharpe_ratio = (mean_returns - risk_free_rate) / std_dev_daily
peak, trough = df["daily_returns"].max(), df["daily_returns"].min()
max_drawdown = (trough - peak) / peak * 100
var_95 = df["daily_returns"].quantile(0.05)


-2.2089527015353037


In [None]:
index = 20
assert (
    df["daily_returns_centered"].iloc[index]
    == df["daily_returns"].iloc[index] - mean_returns
)

# Pluang portfolio simulation

In [None]:
-- User accounts
CREATE TABLE users (
    user_id INT PRIMARY KEY,
    email VARCHAR(255),
    created_at TIMESTAMP,
    country_code VARCHAR(2),
    kyc_status VARCHAR(20)
);

-- Asset types available on platform
CREATE TABLE assets (
    asset_id INT PRIMARY KEY,
    asset_code VARCHAR(10),  -- e.g., 'AAPL', 'GOLD', 'BTC'
    asset_name VARCHAR(100),
    asset_type VARCHAR(20),  -- 'stock', 'commodity', 'crypto'
    currency VARCHAR(3)      -- 'USD', 'IDR', etc.
);

-- User transactions (buys and sells)
CREATE TABLE transactions (
    transaction_id INT PRIMARY KEY,
    user_id INT,
    asset_id INT,
    transaction_type VARCHAR(4),  -- 'buy' or 'sell'
    quantity DECIMAL(18, 8),
    price_per_unit DECIMAL(18, 4),  -- in asset currency
    transaction_date TIMESTAMP,
    fee_amount DECIMAL(18, 4),
    FOREIGN KEY (user_id) REFERENCES users(user_id),
    FOREIGN KEY (asset_id) REFERENCES assets(asset_id)
);

-- Daily closing prices for all assets
CREATE TABLE daily_prices (
    price_id INT PRIMARY KEY,
    asset_id INT,
    price_date DATE,
    closing_price DECIMAL(18, 4),
    opening_price DECIMAL(18, 4),
    high_price DECIMAL(18, 4),
    low_price DECIMAL(18, 4),
    FOREIGN KEY (asset_id) REFERENCES assets(asset_id)
);

-- Exchange rates to convert everything to a base currency
CREATE TABLE exchange_rates (
    rate_id INT PRIMARY KEY,
    from_currency VARCHAR(3),
    to_currency VARCHAR(3),  -- Base currency is 'USD'
    rate_date DATE,
    exchange_rate DECIMAL(18, 6)
);

Task 1: Write a query to calculate each user's current portfolio
  holdings (net position) for all assets. The result should include:
  - user_id
  - asset_code
  - asset_type
  - total_quantity_held (sum of buys minus sells)
  - average_cost_basis (weighted average purchase price in USD)
  - current_value_usd (using the most recent closing price converted to
  USD)

Task 1: Current Portfolio Holdings

Logic breakdown:

1. Calculate net positions per user-asset
- Sum buy transactions (positive quantity)
- Subtract sell transactions (negative quantity)
- Group by user_id and asset_id
2. Calculate weighted average cost basis
- For buys only: sum(quantity × price) / sum(quantity)
- Convert to USD using exchange rates at transaction dates
3. Get current value
- Join with most recent daily_prices closing_price
- Convert to USD using latest exchange rate
- Multiply by total_quantity_held
4. Key joins needed:
- transactions → assets (for asset details)
- transactions → exchange_rates (for historical conversion)
- assets → daily_prices (for current prices)
- Filter where net quantity > 0

In [None]:
WITH net_positions AS(
    SELECT 
        t.user_id,
        t.asset_id,
        SUM(
            CASE
                WHEN t.transaction_type = 'buy' THEN t.quantity
                WHEN t.transaction_type = 'sell' THEN -t.quantity
            END 
        ) AS total_quantity_held,
        SUM(
            CASE
                WHEN t.transaction_type = 'buy' THEN t.quantity * t.price_per_unit
                ELSE 0
            END
        ) / NULLIF(
            SUM(
                CASE
                    WHEN t.transaction_type = 'buy' THEN t.quantity
                    ELSE 0
                END
            ), 0
        ) AS average_cost_basis_local
    FROM 
        transactions as t
    GROUP BY
        t.user_id,
        t.asset_id
    HAVING
        SUM(CASE 
            WHEN t.transaction_type = 'buy' THEN t.quantity
            WHEN t.transaction_type = 'sell' THEN -t.quantity 
        END) > 0
),

last_price AS(
    SELECT
        dp.asset_id,
        dp.price_date,
        dp.closing_price,
        ROW_NUMBER() OVER(PARTITION BY dp.asset_id ORDER BY dp.price_date DESC) as rn
    FROM
        daily_prices as dp
),

currency_rates AS(
    SELECT
        er.from_currency,
        er.to_currency,
        er.rate_date,
        er.exchange_rate,
        ROW_NUMBER() OVER(PARTITION BY er.from_currency ORDER BY er.rate_date DESC) AS rn
    FROM
        exchange_rates as er
    WHERE
        er.to_currency = 'USD'
)

SELECT
    np.user_id,
    a.asset_code,
    a.asset_type,
    np.total_quantity_held,
    round(np.average_cost_basis_local * COALESCE(cr.exchange_rate,1), 4) AS average_cost_basis,
    round(np.total_quantity_held * lp.closing_price * COALESCE(cr.exchange_rate,1), 4) AS current_value_usd
FROM
    net_positions AS np
INNER JOIN
    assets AS a ON np.asset_id = a.asset_id
LEFT JOIN
    currency_rates AS cr ON a.currency = cr.from_currency AND cr.rn = 1
INNER JOIN
    last_price AS lp ON np.asset_id = lp.asset_id AND lp.rn = 1
ORDER BY
    np.user_id,
    a.asset_type,
    a.asset_code

# Simple rolling average and lagging

In [None]:
"""
SELECT date, closing_price
FROM StockPrices
"""

In [None]:
# Define start and end date
start_date = date(2024, 1, 1)
end_date = date(2024, 12, 31)

# Get pandas series of date range
date = pd.date_range(start=start_date, end=end_date, freq="B")

# Display date
print(date)
len(date)

In [None]:
# Generate closing price data
np.random.seed = 42
closing_price = np.random.uniform(100, 110, size=len(date))
print(closing_price)

In [None]:
df = pd.DataFrame({"date": date, "closing_price": closing_price})

df.head()

In [None]:
df["SMA_7_day"] = df["closing_price"].rolling(window=7).mean()
df.head(20)

In [None]:
index = 13
assert np.isclose(
    df["closing_price"].iloc[index - 6 : index + 1].mean(), df["SMA_7_day"].iloc[index]
)

In [None]:
df["lag_1"] = df["closing_price"].shift(-1)
df.head(20)

# Find covariance of matrix

In [None]:
matrix = [[1, 2, 3], [4, 5, 6]]

In [None]:
# Calculate mean of each row
mean = []

# Change into list comprehension later
for row in matrix:
    mean_row = sum(row) / len(matrix[0])
    mean.append(mean_row)

print(mean)

In [None]:
# Center each row by subtracting it's mean
centered = []

for i in range(len(matrix)):
    centered_row = []
    for j in range(len(matrix[0])):
        centered_row.append(matrix[i][j] - mean[i])

    centered.append(centered_row)

print(centered)

In [None]:
# DRY: Convert matrix[0] to a common variable

multiplied = []
for j in range(len(matrix[0])):
    product = 1
    sum = 0
    for i in range(len(matrix)):
        product *= centered[i][j]
        sum += product
    print(sum)

print(multiplied)

In [None]:
# Covariance
covariance = sum(multiplied) / (len(multiplied) - 1)
print(covariance)

In [None]:
np.cov(matrix)

# Eigen values and eigenvectors

In [None]:
matrix = [[2, 1], [1, 2]]
n = len(matrix)
print(matrix)
print(n)

In [None]:
identity = []
for i in range(n):
    row = []
    for j in range(n):
        row.append(1 if i == j else 0)
    identity.append(row)

In [None]:
print(identity)

In [None]:
# Use quadratic equation to solve for lambda (eigenvalue)
## Store quadratic equation variables first
a = 1
b = -matrix[0][0] - matrix[1][1]
c = matrix[0][0] * matrix[1][1] - matrix[0][1] * matrix[1][0]
print(a)
print(b)
print(c)

## Calc lambda
lambda_1 = (-b + np.sqrt(b**2 - 4 * a * c)) / (2 * a)
lambda_2 = (-b - np.sqrt(b**2 - 4 * a * c)) / (2 * a)

print(lambda_1)
print(lambda_2)


In [None]:
eigenvalues = sorted([lambda_1, lambda_2], reverse=True)
eigenvalues

# Multiply matrix by scalar

In [None]:
matrix = [[1, 2, 3], [4, 5, 6]]
scalar = 2

In [None]:
n_rows = len(matrix)
n_cols = len(matrix[0])
print(n_rows)
print(n_cols)

In [None]:
for row in range(n_rows):
    for col in range(n_cols):
        matrix[row][col] *= 2

print(matrix)

In [None]:
list = [1, 2, 3, 4]
list = np.array(list)
list *= 2
list

# Calculate mean y row/col

In [None]:
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
mode = "column"

In [None]:
# Calc mean by col
n_cols = len(matrix[0])
n_rows = len(matrix)

In [None]:
# Calc mean by row
mean = [sum(row) / len(row) for row in matrix]
mean

In [None]:
# Calc mean by col
result = []

mean = [
    sum(matrix[row][col] for row in range(n_rows)) / n_rows for col in range(n_cols)
]

for col in range(n_cols):
    column_sum = 0
    for row in range(n_rows):
        column_sum += matrix[row][col]
    mean = column_sum / n_rows
    result.append(mean)

# Reshape matrix

In [None]:
a = [[1, 2, 3, 4], [5, 6, 7, 8]]
new_shape = (4, 2)

In [None]:
flat = [elem for row in a for elem in row]
reshaped = [flat[i : i + 2] for i in range(0, len(flat), 2)]

In [None]:
reshaped

# Transpose matrix

In [None]:
a = [[1, 2, 3], [4, 5, 6]]
type(a)

In [None]:
tranposed = []

transposed = [[a[row][col] for row in range(len(a))] for col in range(len(a[0]))]

In [None]:
transposed

# Matrix dot vector

In [None]:
a = [[1, 2], [2, 4]]
b = [1, 2]

result = np.dot(a, b)
result

In [None]:
n_rows = len(a)
n_cols = len(b)

In [None]:
if n_rows != n_cols
    return -1

In [None]:
result = []

for row in range(len(a)):
    dot = sum(a[row][col] * b[col] for col in range(len(b)))
    result.append(dot)

print(result)

# Positional encoding

In [None]:
def positional_encoding(seq_len, d_model):
    """
    Args:
        seq_len: Max sequence length
        d_model: Embedding dimension
    Returns:
        PE matrix (seq_len, d_model)
    """
    pe = np.zeros((seq_len, d_model))
    position = np.arange(seq_len).reshape(-1, 1)
    div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
    pe[:, 0::2] = np.sin(position * div_term)
    pe[:, 1::2] = np.cos(position * div_term)

    return pe

In [None]:
seq_len = 10
d_model = 10

In [None]:
pe = positional_encoding(10, 10)
print(pe)

# Self attention mechanism implementation

In [None]:
# Why is the softmax using np.max() and not max()?
# Give me the formula for softmax, and also why axis=-1 and keepdims=True
# softmax(x) = np.exp(x)/sum(np.exp(x))
def softmax(x, axis=-1):
    exp_x = np.exp(x - np.max(x, axis=axis, keepdims=True))
    return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

In [None]:
def self_attention(X, W_q, W_k, W_v):
    """
    Args:
        X: Input matrix (seq_len, d_model)
        W_q: Query weight matrix (d_model, d_k)
        W_k: Key weight matrix (d_model, d_k)
        W_v: Value weight matrix (d_model, d_k)

    Returns:
        Output matrix (seq_len, d_v)
    """

    # Project inputs to QKV
    Q = X @ W_q
    K = X @ W_k
    V = X @ W_v

    # Calculate attention score
    d_k = Q.shape[-1]
    scores = Q @ K.T / np.sqrt(d_k)  # (seq_len, seq_len)

    # Apply softmax to get attention weights
    attention_weights = softmax(scores, axis=-1)  # (seq_len, seq_len)

    # Weighted sum of values
    output = attention_weights @ V  # (seq_len, d_v)

    return output, attention_weights

In [None]:
def multi_head_attention(X, num_heads, W_q, W_k, W_v, W_o):
    seq_len, d_model = X.shape
    d_k = d_model // num_heads

    # Project and split into heads
    Q = (X @ W_q).reshape(seq_len, num_heads, d_k)
    K = (X @ W_k).reshape(seq_len, num_heads, d_k)
    V = (X @ W_v).reshape(seq_len, num_heads, d_k)

    # Transpose for batche computation: (num_heads, seq_len, d_k)
    Q = Q.transpose(1, 0, 2)
    K = K.transpose(1, 0, 2)
    V = V.transpose(1, 0, 2)

    # Attention per head
    scores = Q @ K.transpose(0, 2, 1) / np.sqrt(d_k)  # (num_heads, seq_len, seq_len)
    attention_weights = softmax(scores, axis=-1)
    head_outputs = attention_weights @ V  # (num_heads, seq_len, d_k)

    # Concat heads and project
    concat = head_outputs.transpose(1, 0, 2).reshape(
        seq_len, d_model
    )  # (seq_len, d_model)
    output = concat @ W_o  # (seq_len, d_model). W_o (d_model, d_model)

    return output, attention_weights

# Feature depency resolver

In [None]:
class FeatureDependencyResolver:
    def __init__(self, dependencies: dict):
        # Build graph structure
        pass

    def topological_sort(self) -> list:
        # Return valid computation order
        pass

    def has_cycle(self) -> bool:
        # Return True if circular dependency exists
        pass

    def compute_parallel_batches(self) -> list:
        # Return list of batches (each batch is a list of features)
        pass

In [None]:
dependencies = {
    "raw_price": [],
    "raw_volume": [],
    "returns": ["raw_price"],
    "volatility": ["returns"],
    "volume_ma": ["raw_volume"],
    "price_volume_corr": ["returns", "volume_ma"],
    "risk_score": ["volatility", "price_volume_corr"],
}

In [None]:
# Build in-degree map
## Build a new dict that has feature as key, and the no. of dependencies as value
in_degree = {feature: len(deps) for feature, deps in dependencies.items()}
print(in_degree)

In [None]:
# Initialize queue with zero in-degree nodes
queue = deque([feature for feature, degree in in_degree.items() if degree == 0])
print(queue)

In [None]:
result = []

while queue:
    current = queue.popleft()
    result.append(current)

    # Decrease in-degree for features depending on current
    for feature, deps in dependencies.items():
        if current in deps:
            in_degree[feature] -= 1
            if in_degree[feature] == 0:
                queue.append(feature)

# Check for cycle
if len(result) != len(dependencies):
    raise ValueError("Circular dependency detected")

In [None]:
result

In [None]:
in_degree = {feature: len(dependency) for feature, dependency in dependencies.items()}

In [None]:
queue = deque(feature for feature, depdendency in in_degree.items() if depdendency == 0)
queue

In [None]:
batch_num = {}

In [None]:
while queue:
    current = queue.popleft()
    if not dependencies[current]:
        batch_num[current] = 0
    else:
        batch_num[current] = (
            max(batch_num[dependency] for dependency in dependencies[current]) + 1
        )

    for feature, dependency in dependencies.items():
        if current in dependency:
            in_degree[feature] -= 1
            if in_degree[feature] == 0:
                queue.append(feature)

max_batch = max(batch_num.values())
batches = [[] for _ in range(max_batch + 1)]

for feature, batch in batch_num.items():
    batches[batch].append(feature)

In [None]:
batches

In [None]:
def compute_parallel_batches(dependencies: dict) -> list:
    """
    Returns list of batches where each batch can execute in parallel.
    Raises ValueError if circular dependency detected.
    """
    # Step 1: Build reverse graph (who depends on me?)
    graph = {feature: [] for feature in dependencies}
    for feature, deps in dependencies.items():
        for dep in deps:
            graph[dep].append(feature)

    # Step 2: Count incoming dependencies
    in_degree = {feature: len(deps) for feature, deps in dependencies.items()}

    # Step 3: Start with features that have no dependencies
    queue = deque([f for f, degree in in_degree.items() if degree == 0])

    # Step 4: Process in topological order, assign batch numbers
    batch_num = {}

    while queue:
        current = queue.popleft()

        # Assign batch: 0 if no deps, else max(dep batches) + 1
        if len(dependencies[current]) == 0:
            batch_num[current] = 0
        else:
            batch_num[current] = (
                max(batch_num[dep] for dep in dependencies[current]) + 1
            )

        # Reduce in-degree for features that depend on current
        for dependent in graph[current]:
            in_degree[dependent] -= 1
            if in_degree[dependent] == 0:
                queue.append(dependent)

    # Step 5: Check for cycles
    if len(batch_num) != len(dependencies):
        raise ValueError("Circular dependency detected")

    # Step 6: Group features by batch number
    max_batch = max(batch_num.values())
    batches = [[] for _ in range(max_batch + 1)]
    for feature, batch in batch_num.items():
        batches[batch].append(feature)

    return batches


def topological_sort(dependencies: dict) -> list:
    """Returns features in valid computation order."""
    batches = compute_parallel_batches(dependencies)
    return [feature for batch in batches for feature in batch]


def has_cycle(dependencies: dict) -> bool:
    """Returns True if circular dependency exists."""
    try:
        compute_parallel_batches(dependencies)
        return False
    except ValueError:
        return True

In [None]:
dependencies = {
    "raw_price": [],
    "raw_volume": [],
    "returns": ["raw_price"],
    "volatility": ["returns"],
    "volume_ma": ["raw_volume"],
    "price_volume_corr": ["returns", "volume_ma"],
    "risk_score": ["volatility", "price_volume_corr"],
}

# Get parallel batches
batches = compute_parallel_batches(dependencies)
print(batches)

# EWM Covariance

In [None]:
returns, y = make_classification(n_features=4, random_state=42)
print(returns.shape)
print(returns[:5])

In [None]:
def ewm_covariance(returns: np.ndarray, decay: float) -> np.ndarray:
    """
    Calculates exponentially weighted covariance matric
    Args:
        returns: Array of shape (n_days, n_assets) of daily returns
        decay: Lambda parameter
    Returns:
        Covariance matrix
    """
    # Get number of rows for weights calculation
    n_days, n_assets = returns.shape

    # Get exponents. Last row gets biggest number, oldest gets smallest
    exponents = np.arange(n_days - 1, -1, -1)

    # Calculate weights from exponents
    weights = (1 - decay) * decay**exponents

    # Normalize the weights
    weights = weights / weights.sum()

    # Calculated weighted mean
    weighted_mean = np.dot(weights, returns)

    # Get centered value
    centered = returns - weighted_mean

    # Math trick to calculate covariance
    ## Calculate sqrt weights
    sqrt_weights = np.sqrt(weights).reshape(-1, 1)
    ## Calculated centered_weighted
    centered_weighted = centered * sqrt_weights
    cov_matrix = centered_weighted.T @ centered_weighted

    return cov_matrix


In [None]:
cov_matrix = ewm_covariance(returns, decay=0.94)
cov_matrix

# Rolling risk metrics

In [None]:
# Input data
np.random.seed(42)
df = pd.DataFrame(
    {
        "dates": pd.date_range("2023-01-01", periods=260, freq="B"),
        "daily_return": pd.Series(np.random.normal(0.001, 0.02, 260)),
    }
)

df.tail()

In [None]:
df["std_dev"] = df["daily_return"].rolling(window=20).std()
df["volatility"] = df["std_dev"] * np.sqrt(252)
df.tail()

In [None]:
df["mean_return"] = df["daily_return"].rolling(window=20).mean()
df["sharpe_ratio"] = df["mean_return"] / df["std_dev"] * np.sqrt(252)
df.tail()

In [None]:
df["cumulative_wealth"] = (1 + df["daily_return"]).cumprod()
df["rolling_peak"] = df["cumulative_wealth"].rolling(window=60).max()

In [None]:
df.head(20)

In [None]:
df["max_drawdown"] = df["drawdown"].rolling(window=60).min()

In [None]:
print(df["max_drawdown"].min())

# Feature Transformer Network

In [None]:
data = np.array([[1000, 50], [2000, 100], [1500, 75], [3000, 150]])
display(data)

In [None]:
class BaseTransformer(ABC):
    """Abstract base class for all transformers"""

    @abstractmethod
    def fit(self, X):
        """Learn parameters from X. Must be implemented by subclasses"""
        pass

    @abstractmethod
    def transform(self, X):
        """Apply transformatiion using learned parameters"""
        pass

    def fit_transform(self, X):
        """Convenience method: fir then transform"""
        self.fit(X)
        return self.transform(X)

In [None]:
class Standard_Scaler(BaseTransformer):
    """Standardize features by remvoing mean andscalingto unit  variance"""

    def __init__(self):
        self.mean_ = None
        self.std_ = None

    def fit(self, X):
        """Compute mean and std from training data"""
        X = np.array(X)

        # Compute mean and std
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)

        # Handle edge case
        self.std_ = np.where(self.std_ == 0, 1, self.std_)

        return self  # Return self for method chaining

    def transform(self, X):
        "Apply standardization using store mean and std"
        # Check if fitted
        if self.mean_ is None or self.std_ is None:
            raise ValueError("StandardScaler is not fitted. Call fit() first")
        X = np.array(X)
        return (X - self.mean_) / self.std_

In [None]:
class Log_Transformer(BaseTransformer):
    """Log transform doesn't need to fit anything, but we mark as fitted"""

    def __init__(self):
        self.fitted_ = False

    def fit(self, X):
        X = np.array(X)

        # Check for negative values
        if np.any(X < 0):
            raise ValueError(
                "Data contains negative values. LogTransformer does not support negative values"
            )

        self.fitted_ = True
        return self

    def transform(self, X):
        X = np.array(X)

        if not self.fitted_:
            raise ValueError("Log_Transformer not fitted. Call fit() first")

        # Check for negative values
        if np.any(X < 0):
            raise ValueError(
                "Data contains negative values. LogTransformer does not support negative values"
            )

        # Use log1p to handle 0 values better
        return np.log1p(X)

In [None]:
class Pipeline:
    """Chain multiple transformers together"""

    def __init__(self, transformers):
        """
        Args:
            transformers: List of transformer instances
        """
        self.transformers = transformers

    def fit(self, X):
        X_transformed = np.array(X)

        for transformer in self.transformers:
            # Fit on current data
            transformer.fit(X_transformed)
            # Transform for next step
            X_transformed = transformer.transform(X_transformed)

        return self

    def transform(self, X):
        """Apply all transformers seqeuntially"""
        X_transformed = np.array(X)

        for transformer in self.transformers:
            # Transform data
            X_transformed = transformer.transform(X_transformed)

        return X_transformed

    def fit_transform(self, X):
        """Fit and transform in one step"""
        self.fit(X)
        return self.transform(X)

In [None]:
# Create randomize train and test data
X, y = make_classification(
    n_samples=20,
    n_features=3,
    n_informative=2,
    n_redundant=0,
    n_classes=2,
    random_state=42,
)

# Create train test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Check shape
print(X_train.shape)
print(X_test.shape)


In [None]:
transformers_list = [Log_Transformer(), Standard_Scaler()]
pipeline = Pipeline(transformers_list)

X_train_transformed = pipeline.fit_transform(abs(X_train))
X_test_transformed = pipeline.transform(abs(X_test))

In [None]:
scaler = Standard_Scaler()
print(scaler)

# Hashmaps to check duplicates

In [None]:
transactions = [
    {"id": "txn_001", "user_id": "U100", "amount": 50.00, "timestamp": 1000},
    {"id": "txn_002", "user_id": "U100", "amount": 50.00, "timestamp": 1003},
    {"id": "txn_003", "user_id": "U100", "amount": 50.00, "timestamp": 1500},
    {"id": "txn_004", "user_id": "U200", "amount": 75.00, "timestamp": 1001},
    {"id": "txn_005", "user_id": "U200", "amount": 75.00, "timestamp": 1008},
    {"id": "txn_006", "user_id": "U100", "amount": 50.00, "timestamp": 1006},
]
T = 10

In [None]:
# Function to check for duplicates in transactions
def check_transaction_duplicates(transactions: list, T: float) -> list:
    duplicate_index = []

    for i in range(len(transactions)):
        for j in range(i + 1, len(transactions)):
            txn1 = transactions[i]
            txn2 = transactions[j]

            # Check the 3 conditions
            same_user = txn1["user_id"] == txn2["user_id"]
            same_amount = txn1["amount"] == txn2["amount"]
            within_time = abs(txn1["timestamp"] - txn2["timestamp"]) <= T

            if same_user and same_amount and within_time:
                duplicate_index.append((txn1["id"], txn2["id"]))

    return duplicate_index

In [None]:
# Complexity is O(n^2)
start_time = time.time()
duplicate_index = check_transaction_duplicates(transactions, T)
print(f"Time taken: {time.time() - start_time}")
print(duplicate_index)

In [None]:
groups = defaultdict(list)

for txn in transactions:
    key = (txn["user_id"], txn["amount"])
    groups[key].append(txn)

display(groups)

In [None]:
def find_duplicate_transactions(
    transactions: list[dict], T: int
) -> list[tuple[str, str]]:
    # Step 1: Group transactions by (user_id, amount)
    groups = defaultdict(list)

    for txn in transactions:
        key = (txn["user_id"], txn["amount"])
        groups[key].append(txn)

    # Step 2: For each group, find duplicates
    duplicates = []

    for key, txn_list in groups.items():
        txn_list.sort(key=lambda x: x["timestamp"])

        for i in range(len(txn_list)):
            for j in range(i + 1, len(txn_list)):
                time_diff = txn_list[j]["timestamp"] - txn_list[i]["timestamp"]

                if time_diff > T:
                    break

                duplicates.append((txn_list[i]["id"], txn_list[j]["id"]))

    return duplicates

In [None]:
# Complexity is O(n)
start_time = time.time()
duplicates = find_duplicate_transactions(transactions, T)
print(f"Time taken: {time.time() - start_time}")
print(duplicates)

In [None]:
expected = ("txn_001", "txn_002")
assert duplicates[0] == expected, f"Expected {expected}, got {duplicates[0]}"

# Price stream class

In [None]:
class PriceStream:
    def __init__(self):
        self.prices = deque()
        self.total_sum = 0
        self.count = 0
        self.max_deque = deque()
        self.min_deque = deque()

    # Add new price
    def add(self, price) -> None:
        if isinstance(price, (list, tuple)):
            for p in price:
                self._add_single(p)
        else:
            self._add_single(price)

    # Add new price helper
    def _add_single(self, price: float) -> None:
        self.prices.append(price)
        self.total_sum += price
        self.count += 1

        # Imnplement max_deque
        while self.max_deque and self.max_deque[-1] < price:
            self.max_deque.pop()
        self.max_deque.append(price)

        # Imnplement min_deque
        while self.min_deque and self.min_deque[-1] > price:
            self.min_deque.pop()
        self.min_deque.append(price)

    # Get max price from the current stream
    def get_max(self) -> float:
        return self.max_deque[0]

    # Get min price from the current stream
    def get_min(self) -> float:
        return self.min_deque[0]

    # Get mean price from the current steam
    def get_mean(self) -> float:
        return self.total_sum / self.count

    # Remove oldest price in stream
    def remove_oldest(self) -> list:
        oldest_px = self.prices.popleft()
        self.total_sum -= oldest_px
        self.count -= 1

        if self.max_deque and self.max_deque[0] == oldest_px:
            self.prices.popleft()

        if self.min_deque and self.min_deque[0] == oldest_px:
            self.prices.popleft()

    def __repr__(self):
        return f"Current stream: {self.prices}"


In [None]:
stream = PriceStream()

In [None]:
stream.add(10)
stream.add(5)
stream.add(8)
stream.add(11)
stream.add([random.randint(1, 50) for _ in range(10)])
print(stream)

In [None]:
max_px = stream.get_max()
print(max_px)

In [None]:
min_px = stream.get_min()
print(min_px)

In [None]:
mean_px = stream.get_mean()
print(mean_px)

In [None]:
stream.remove_oldest()
print(stream)

In [None]:
mean_px = stream.get_mean()
print(mean_px)

In [None]:
min_px = stream.get_min()
print(min_px)

In [None]:
min_px = stream.get_max()
print(max_px)