# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.10 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: d37cb8b1-5333-489d-9893-751e095dd1ec
Applying the following default arguments:
--glue_kernel_version 1.0.10
--enable-glue-datacatalog true
Waiting for session d37cb8b1-5333-489d-9893-751e095dd1ec to get into ready status...
Session d37cb8b1-5333-489d-9893-751e095dd1ec 

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [6]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, IntegerType

spark.conf.set("spark.sql.session.timeZone", "UTC")

SILVER_PATH = "s3://btc-imat-bucket/btc_silver/"
GOLD_PATH   = "s3://btc-imat-bucket/btc_gold/"

schema = StructType([
    StructField("date", TimestampType()),
    StructField("sma_200", DoubleType()),
    StructField("ema_50", DoubleType()),
    StructField("rsi_14", DoubleType()),
    StructField("macd_12_26", DoubleType()),
    StructField("year", IntegerType()),
    StructField("month", IntegerType()),
])

def compute(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf = pdf.sort_values("date").copy()
    price = pd.to_numeric(pdf["close"], errors="coerce")

    pdf["sma_200"] = price.rolling(200, min_periods=200).mean()
    pdf["ema_50"]  = price.ewm(span=50, adjust=False, min_periods=50).mean()

    delta = price.diff()
    gain = delta.clip(lower=0)
    loss = (-delta).clip(lower=0)

    avg_gain = gain.ewm(alpha=1/14, adjust=False, min_periods=14).mean()
    avg_loss = loss.ewm(alpha=1/14, adjust=False, min_periods=14).mean()

    rs = avg_gain / avg_loss
    pdf["rsi_14"] = 100 - (100/(1+rs))

    ema12 = price.ewm(span=12, adjust=False, min_periods=26).mean()
    ema26 = price.ewm(span=26, adjust=False, min_periods=26).mean()
    pdf["macd_12_26"] = ema12 - ema26

    dt = pd.to_datetime(pdf["date"], utc=True, errors="coerce")
    pdf["year"] = dt.dt.year.astype("int32")
    pdf["month"] = dt.dt.month.astype("int32")

    return pdf[["date", "sma_200", "ema_50", "rsi_14", "macd_12_26", "year", "month"]]

# Leer Silver (todas particiones)
df = spark.read.parquet(SILVER_PATH)

# Asegurar date como timestamp
df = (
    df.withColumn("date", F.to_timestamp(F.col("date")))
      .select("date", "close")
      .where(F.col("date").isNotNull())
      .orderBy("date")
)

# Calcular KPIs (BTC completo)
kpi = (
    df.groupBy(F.lit(1).alias("grp"))
      .applyInPandas(compute, schema=schema)
      .drop("grp")
)

# Escribir Gold particionado year/month
(
    kpi.write
       .mode("overwrite")
       .partitionBy("year", "month")
       .parquet(GOLD_PATH)
)

print("✅ GOLD escrito en:", GOLD_PATH)

✅ GOLD escrito en: s3://btc-imat-bucket/btc_gold/


Comprobación

In [7]:
df_check = spark.read.parquet("s3://btc-imat-bucket/btc_gold/year=2026/month=1/")

df_check.orderBy(F.col("date")).show(5, truncate=False)  # head (20 filas)
df_check.printSchema()

+-------------------+------------------+-----------------+-----------------+-------------------+
|date               |sma_200           |ema_50           |rsi_14           |macd_12_26         |
+-------------------+------------------+-----------------+-----------------+-------------------+
|2026-01-01 00:00:00|106899.97869999999|91640.76285166692|49.56008699758408|-795.8375275088911 |
|2026-01-02 00:00:00|106815.59785      |91573.75528885645|53.80162885227875|-580.0317092254554 |
|2026-01-03 00:00:00|106746.1944       |91537.07625792091|56.15338895533933|-347.99442471022485|
|2026-01-04 00:00:00|106679.68534999999|91538.62267917891|59.1289561947065 |-87.40128812752664 |
|2026-01-05 00:00:00|106626.06895      |91631.3676721523 |65.40166692802941|303.4015543589485  |
+-------------------+------------------+-----------------+-----------------+-------------------+
only showing top 5 rows

root
 |-- date: timestamp (nullable = true)
 |-- sma_200: double (nullable = true)
 |-- ema_50: double