# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 2e0622f1-5b87-4669-8630-8c8fd6dadf7a
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 2e0622f1-5b87-4669-8630-8c8fd6dadf7a to get into ready status...
Session 2e0622f1-5b87-4669-8630-8c8fd6dadf7a ha

In [13]:
from pyspark.sql.functions import col, substring

# Definir el path de origen y destino en S3
source_path = "s3://cargadatostradingview/*/*/*.csv"
destination_path = "s3://cargadatosplata/"

# Leer los archivos CSV desde S3 con inferencia de esquema optimizada
df = spark.read.option("header", "true").csv(source_path)
df.show(10)

+-------------------+----------------+--------+--------+--------+--------+------------------+
|           datetime|          symbol|    open|    high|     low|   close|            volume|
+-------------------+----------------+--------+--------+--------+--------+------------------+
|2021-09-09 02:00:00|COINBASE:SHIBUSD|   1e-05|   9e-05|   6e-06|5.67e-05|2011405203216.0005|
|2021-09-10 02:00:00|COINBASE:SHIBUSD|5.67e-05|   6e-05|6.76e-06|7.02e-06|3803666278890.0015|
|2021-09-11 02:00:00|COINBASE:SHIBUSD|7.02e-06|7.35e-06|6.73e-06|6.77e-06|1663194647440.0005|
|2021-09-12 02:00:00|COINBASE:SHIBUSD|6.78e-06|7.19e-06|6.64e-06|6.91e-06| 859389740690.0024|
|2021-09-13 02:00:00|COINBASE:SHIBUSD|6.91e-06| 7.1e-06|6.17e-06|6.48e-06|  851872051559.002|
|2021-09-14 02:00:00|COINBASE:SHIBUSD|6.48e-06| 6.9e-06|6.44e-06|6.67e-06|  799842914475.005|
|2021-09-15 02:00:00|COINBASE:SHIBUSD|6.67e-06|6.91e-06| 6.6e-06|6.78e-06|1354279397992.0017|
|2021-09-16 02:00:00|COINBASE:SHIBUSD|6.78e-06|1.34e-05|6.52

ALMACENAMIENTO EN CAPA PLATA

In [21]:
from pyspark.sql.functions import col, substring

# Definir el path de origen y destino en S3
source_path = "s3://cargadatostradingview/*/*/*.csv"
destination_path = "s3://cargadatosplata/"

# Leer los archivos CSV desde S3 con inferencia de esquema optimizada
df = spark.read.option("header", "true").option("inferSchema", "true").csv(source_path)

# Extraer el año desde la columna "datetime"
df = df.withColumn("year", substring(col("datetime"), 1, 4))

df.write.mode("append").partitionBy("symbol", "year").option("basePath", destination_path).parquet(destination_path)
print("Proceso completado: Archivos CSV convertidos a Parquet y almacenados en S3 correctamente.")


Proceso completado: Archivos CSV convertidos a Parquet y almacenados en S3 correctamente.


ALMACENAMIENTO EN CAPA ORO

Funciones para los KPIs

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, avg, when, lit
from pyspark.sql.window import Window
import pyspark.sql.functions as F


# Función para calcular SMA
def calculate_sma(df, period=14):
    window_spec = Window.partitionBy("symbol", "year").orderBy("datetime").rowsBetween(-period, 0)
    return df.withColumn(f"SMA_{period}", F.avg("close").over(window_spec))

# Función corregida para EMA
def calculate_ema(df, period=14):
    alpha = 2 / (period + 1)
    window_spec = Window.partitionBy("symbol", "year").orderBy("datetime")

    df = df.withColumn("prev_close", F.lag("close", 1).over(window_spec))
    df = df.withColumn(f"EMA_{period}", (alpha * col("close")) + ((1 - alpha) * col("prev_close")))
    
    return df

# Función para calcular RSI
def calculate_rsi(df, period=14):
    window_spec = Window.partitionBy("symbol", "year").orderBy("datetime")

    df = df.withColumn("change", col("close") - F.lag("close", 1).over(window_spec))
    df = df.withColumn("gain", when(col("change") > 0, col("change")).otherwise(0))
    df = df.withColumn("loss", when(col("change") < 0, -col("change")).otherwise(0))

    avg_gain = F.avg("gain").over(Window.partitionBy("symbol", "year").orderBy("datetime").rowsBetween(-period, 0))
    avg_loss = F.avg("loss").over(Window.partitionBy("symbol", "year").orderBy("datetime").rowsBetween(-period, 0))

    rsi = 100 - (100 / (1 + (avg_gain / avg_loss)))

    return df.withColumn("RSI", rsi)

# Función para calcular MACD
def calculate_macd(df, short_period=12, long_period=26, signal_period=9):
    df = calculate_ema(df, short_period).withColumnRenamed(f"EMA_{short_period}", "short_ema")
    df = calculate_ema(df, long_period).withColumnRenamed(f"EMA_{long_period}", "long_ema")

    df = df.withColumn("MACD", col("short_ema") - col("long_ema"))

    window_spec = Window.partitionBy("symbol", "year").orderBy("datetime").rowsBetween(-signal_period, 0)
    df = df.withColumn("Signal_Line", F.avg("MACD").over(window_spec))

    return df

# Leer datos desde S3




In [None]:
#Comprobar que funcione: # Extraer año de la columna datetime

from pyspark.sql.functions import col, year  # Asegura que year está importado

df = spark.read.option("header", "true").csv(source_path)
df = df.withColumn("year", year(col("datetime")))

# Convertir close a tipo numérico (si está en string)
df = df.withColumn("close", col("close").cast("double"))

# Aplicar los indicadores
df = calculate_sma(df, period=14)
df = calculate_ema(df, period=14)
df = calculate_rsi(df, period=14)
df = calculate_macd(df)
df_selected = df.select(
    "datetime",
    "symbol",
    "SMA_14",
    "EMA_14",
    "RSI",
    "MACD",
    "Signal_Line"
)

# Mostrar los primeros 10 registros en formato tabular
df_selected.show(10, truncate=False)



DataFrame[datetime: string, symbol: string, open: string, high: string, low: string, close: double, volume: string, year: int, SMA_14: double, prev_close: double, EMA_14: double, change: double, gain: double, loss: double, RSI: double, short_ema: double, long_ema: double, MACD: double, Signal_Line: double]


In [33]:
# Extraer año de la columna datetime

from pyspark.sql.functions import col, year  # Asegura que year está importado

df = spark.read.option("header", "true").csv(source_path)
df = df.withColumn("year", year(col("datetime")))

# Convertir close a tipo numérico (si está en string)
df = df.withColumn("close", col("close").cast("double"))

# Aplicar los indicadores
df = calculate_sma(df, period=14)
df = calculate_ema(df, period=14)
df = calculate_rsi(df, period=14)
df = calculate_macd(df)

# Guardar en Parquet en la capa oro
destination_gold_path = "s3://cargadatosoro/"
df.write.mode("append").partitionBy("symbol", "year").parquet(destination_gold_path)

print("KPIs calculados y almacenados en S3 correctamente en la capa oro.")


KPIs calculados y almacenados en S3 correctamente en la capa oro.
