# INSTALAR PYSPARK
- Java (JVM) es requerido para iniciar una Spark Session

In [None]:
# Instalar JVM con el shell command:
!java -version


In [None]:
# Instalar PYSPARK
!pip install pyspark


# INICIALIZAR una SPARK SESSION

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
                    .appName("coingecko_challenge") \
                    .getOrCreate()


In [None]:
# Check de SparkSession
print(f"Spark version: {spark.version} \nAppName: \"{spark.sparkContext.appName}\" \nBy: Hector Cruz")

# LEER ARCHIVO FUENTE

## NOTAS:
- Cargar el archivo fuente "btc_challenge.csv" en la sesion activa del Notebook en Google Colab

- O bien, conectar directamente a la carpeta en Drive ejecutando el siguiente codigo:
```
from google.colab import drive
drive.mount('/content/drive/')
```





In [150]:
# Definir SCHEMA

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.sql.functions import col, current_timestamp

btc_schema = StructType([
                            StructField("date", dataType= StringType()),
                            StructField("vol", dataType= DoubleType()),
                            StructField("open", dataType= DoubleType()),
                            StructField("high", dataType= DoubleType()),
                            StructField("low", dataType= DoubleType()),
                            StructField("close", dataType= DoubleType())
                          ])



In [151]:
# LEER ARCHIVO FUENTE
btc_df = spark.read \
                .option("header", True) \
                .schema(btc_schema) \
                .csv("/content/btc_challenge.csv")

In [None]:
btc_df.printSchema()

In [None]:
# CHECK DE LECTURA
btc_df.show(5)

# PREPARACION DE DATOS

In [156]:
# Cambiar fecha a tipo DATE:
from pyspark.sql.functions import to_date, col

btc_date_df = btc_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy")  )


In [None]:
# Check de modificación
btc_date_df.printSchema()

In [None]:
# Check de modificación 2
btc_date_df.show(5)

In [161]:
# Agregar campo 'MES' (Opcional para check de agregaciones)
from pyspark.sql.functions import date_format

btc_month_df = btc_date_df.withColumn("mes", date_format("date", "MM") )


In [None]:
btc_month_df.printSchema()

In [None]:
# Check de modificación
btc_month_df.show(5)

In [None]:
# Check de aggregacion
from pyspark.sql.functions import sum

btc_month_df.groupBy("mes").agg( sum("close") ).show()

# WINDOW FUNCTIONS

In [165]:
# RANK
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, desc, rank


win_spec = Window.partitionBy("mes").orderBy( desc("close") )  #.orderBy( desc("metric")
btc_win_df = btc_month_df.withColumn("rank", rank().over(win_spec))


In [None]:
btc_win_df.show(10)

In [167]:
# MOVING AVG
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

ma_7d = Window.orderBy("date").rowsBetween(start=-6, end=0)
ma_14d = Window.orderBy("date").rowsBetween(start=-13, end=0)
ma_30d = Window.orderBy("date").rowsBetween(start=-29, end=0)



In [None]:
btc_ma_df = btc_month_df.withColumn("7day_avg", avg("close").over(ma_7d)) \
                        .withColumn("14day_avg", avg("close").over(ma_7d)) \
                        .withColumn("30day_avg", avg("close").over(ma_7d)) \
                        .select(["date", "open", "close", "7day_avg", "14day_avg", "30day_avg"])
btc_ma_df.show(40)