# PPO para Cripto con Mecanismos de Seguridad

In [None]:

import numpy as np
import torch, torch.nn as nn, torch.optim as optim

class CryptoEnv:
    def __init__(self, T=4000, fee=0.001):
        self.T=T; self.fee=fee
        rng = np.random.default_rng(0)
        self.p = 20000*np.exp(np.cumsum(rng.normal(0, 0.0015, size=T)))
        self.reset()
    def reset(self):
        self.t=0; self.pos=0; self.cash=1.0; self.nav=1.0; self.entry=None
        self.max_nav=1.0
        return self._obs()
    def _obs(self):
        lo=max(0,self.t-50); x=self.p[lo:self.t+1]
        x=(x - x.mean())/(x.std()+1e-6)
        x = np.pad(x, (51-len(x),0))
        return x.astype(np.float32)
    def step(self, a):
        done=False
        price=self.p[self.t]
        reward=0.0
        if a==1 and self.pos==0:
            size=self.cash*(1-self.fee)
            self.pos = size/price
            self.cash = 0.0
            self.entry = price
        elif a==2 and self.pos>0:
            self.cash = self.pos*price*(1-self.fee)
            self.pos = 0.0
            self.entry=None
        self.nav = self.cash + self.pos*price
        self.max_nav = max(self.max_nav, self.nav)
        dd = (self.max_nav - self.nav)/self.max_nav
        reward = np.log(self.nav+1e-12) - 0.5*dd
        self.t += 1
        if self.t >= self.T-1:
            if self.pos>0:
                self.cash = self.pos*self.p[self.t]*(1-self.fee)
                self.pos=0.0
            self.nav = self.cash
            done=True
        return self._obs(), float(reward), done, {"nav": self.nav}

class ActorCritic(nn.Module):
    def __init__(self, obs=51, acts=3):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(obs,128), nn.Tanh(),
                                 nn.Linear(128,128), nn.Tanh())
        self.pi = nn.Linear(128, acts)
        self.v  = nn.Linear(128, 1)
    def forward(self, x):
        h=self.net(x)
        return self.pi(h), self.v(h)

def ppo_train(epochs=2, steps=2048, gamma=0.99, lam=0.95, clip=0.2):
    env=CryptoEnv()
    ac=ActorCritic().train()
    opt=torch.optim.Adam(ac.parameters(), lr=3e-4)
    for ep in range(epochs):
        s=env.reset()
        buf=[]
        for t in range(steps):
            x=torch.tensor(s).unsqueeze(0)
            logits, v = ac(x)
            prob=torch.distributions.Categorical(logits=logits)
            a=prob.sample().item()
            s2, r, d, info = env.step(a)
            buf.append((s,a,r,prob.log_prob(torch.tensor(a)), v.item(), d))
            s=s2
            if d: s=env.reset()
        _, last_v = ac(torch.tensor(s).unsqueeze(0))
        values=np.array([b[4] for b in buf]+[last_v.item()])
        rewards=np.array([b[2] for b in buf])
        dones=np.array([b[5] for b in buf])
        adv=np.zeros_like(rewards); gae=0.0
        for i in reversed(range(len(rewards))):
            delta = rewards[i] + gamma*(1-dones[i])*values[i+1] - values[i]
            gae = delta + gamma*lam*(1-dones[i])*gae
            adv[i]=gae
        ret = adv + values[:-1]
        states=torch.tensor(np.stack([b[0] for b in buf])).float()
        acts=torch.tensor([b[1] for b in buf]).long()
        old_logp=torch.stack([b[3] for b in buf]).detach()
        adv_t=torch.tensor(adv).float()
        ret_t=torch.tensor(ret).float()
        for _ in range(4):
            logits, v = ac(states)
            dist=torch.distributions.Categorical(logits=logits)
            logp=dist.log_prob(acts)
            ratio=torch.exp(logp - old_logp)
            obj1=ratio*adv_t
            obj2=torch.clamp(ratio, 1-clip, 1+clip)*adv_t
            pi_loss=-(torch.min(obj1, obj2)).mean()
            v_loss=((v.squeeze()-ret_t)**2).mean()
            loss=pi_loss + 0.5*v_loss - 0.01*dist.entropy().mean()
            opt.zero_grad(); loss.backward(); opt.step()
    return ac

class NConsecutiveFilter:
    def __init__(self, N=3):
        self.N=N; self.hist=[]
    def allow(self, action):
        self.hist.append(action)
        self.hist=self.hist[-self.N:]
        if len(self.hist)==self.N and len(set(self.hist))==1 and self.hist[-1]!=0:
            return 0
        return action

class SmurfingGuard:
    def __init__(self, threshold=-0.002):
        self.threshold=threshold
    def allow(self, action, recent_returns):
        mr = np.mean(recent_returns[-20:]) if len(recent_returns)>=20 else 0.0
        if mr < self.threshold:
            return 0
        return action

_ = ppo_train(epochs=2)
print("Demo PPO + filtros lista para extender.")
