<img src="../../images/NxLogoTransparent.png" alt="Nx Icon" width=200px align=right /> 


# Workshop: Transformaciones de Dataframes con Snowpark

### Escenario: 

En este ejercicio, usarás la API Snowpark para examinar un conjunto de datos sin procesar y transformarlo en un DataFrame, al cual los analistas de su organización puedan explorar fácilmente.

### Pasos:
1. Instalar las librerías y configurar los parámetros de conexión.
2. Conectarnos y crear un objeto Sesion.
3. Examinar los datasets.
4. Explorar los datasets y ejecutar transformaciones.
5. (Anexo) Usar una UDF para realizar transformaciones.

### 1. Instalar las librerías y configurar los parámetros de conexión.


Inicialice una variable config, los parámetros necesasrios para conectarse a su cuenta de Snowflakea.
> **&#128221; Nota:** Actualice los valores con los datos de su cuenta free trial

In [None]:
%pip install ipython-sql

In [15]:
import os
import getpass
from urllib.parse import quote

# Load Jupyter/IPython sql magic
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [16]:
# Gather account credentials
sf_account   = input('Snowflake Account: ') #Example: qz79006.us-east-2.aws
sf_user      = input('Snowflake User: ') #Example: WORKSHOP_USER
sf_password      = input('Snowflake Password: ') #Example: WORKSHOP_USER_PASSWORD

# Generate default object names
wh_name    = f"COMPUTE_WH"
db_name    = f"SNOWPARK_DEMO_DB"

print("\r\nAccount credentials gathered. Select the next code cell to continue.")


Account credentials gathered. Select the next code cell to continue.


### 2. Conectarnos y crear un objeto Sesion.

El siguiente bloque de código, creará un objeto de tipo `Session` y se conectará a tu cuenta de Snowflake.

*No es necesario modificar nada. Solamente ejecuta el bloque de código.*
> &#10071; Para Windows se requiere que se actualice el path del archivo `spp_utils_python.ipynb`.

In [17]:
# Import Snowpark Session
from snowflake.snowpark import Session

connection_parameters = {
    "account": sf_account.upper(),
    "user": sf_user.upper(),
    "password": sf_password
}  

session = Session.builder.configs(connection_parameters).create()

### 3. Examinar los Datasets.

Importe las librerías `functions` y `types` para usarlas durante el workshop.

Cree dos `DataFrame` a partir de las tablas `campaign_spend` y `monthly_revenue`, luego examinelas para descubrir qué tipo de información está disponible en estos Datasets.

*No necesitas editar nada en la siguiente celda. Simplemente ejecútalo.*

In [18]:
# Import Snowpark functions and types
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *

# Create a DataFrame for the table 
snow_df_campaing_spend = session.table('SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CAMPAIGN_SPEND')
snow_df_revenue = session.table('SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.MONTHLY_REVENUE')

print('\nTable CAMPAIGN_SPEND')
for field in snow_df_campaing_spend.schema.fields:
    print(field)

print('\nTable MONTHLY_REVENUE')
for field in snow_df_revenue.schema.fields:
    print(field)


Table CAMPAIGN_SPEND
StructField('CAMPAIGN', StringType(16777216), nullable=True)
StructField('CHANNEL', StringType(16777216), nullable=True)
StructField('DATE', DateType(), nullable=True)
StructField('TOTAL_CLICKS', LongType(), nullable=True)
StructField('TOTAL_COST', LongType(), nullable=True)
StructField('ADS_SERVED', LongType(), nullable=True)

Table MONTHLY_REVENUE
StructField('YEAR', LongType(), nullable=True)
StructField('MONTH', LongType(), nullable=True)
StructField('REVENUE', DoubleType(), nullable=True)


### 4. Explorar los datasets y ejecutar transformaciones.

#### 4.1 Seleccione columnas del DataFrame snow_df_campaing_spend.

Seleccione los siguientes campos del DataFrame `snow_df_campaing_spend` usando el método `select()` (recuerde encerrar los nombres de las columnas en una llamada al método `col()`):
- CAMPAIGN
- CHANNEL
- DATE
- TOTAL_CLICKS
- TOTAL_COST
- ADS_SERVED


*TIP: ¿Quieres echar un vistazo a los resultados de tus transformaciones? Utilice la* función `show()` *para ejecutar la consulta y mostrar las primeras diez filas.*

> **&#128221; Nota:** The [Snowpark API Reference (Python)](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/index.html#snowpark-api-reference-python)


In [19]:
snow_df_spend = (snow_df_campaing_spend
        .select(
             col("CAMPAIGN")
            ,col("CHANNEL")
            ,col("DATE")
            ,col("TOTAL_CLICKS")
            ,col("TOTAL_COST")
            ,col("ADS_SERVED")
          ))

# Uncomment the following 'action' statement to execute transformations and view first ten results
snow_df_spend.show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

#### 4.2 Gasto total por año y mes para todos los canales.

Transformemos los datos para que podamos ver el costo total por año/mes por canal usando las funciones `group_by()` y `agg()` Snowpark DataFrame.

In [20]:
from snowflake.snowpark.functions import month,year,col,sum

snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

print("Total Spend per Year and Month For All Channels")
snow_df_spend_per_channel.show()

Total Spend per Year and Month For All Channels
---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |search_engine  |516431        |
|2012    |5        |video          |516729        |
|2012    |5        |email          |517208        |
|2012    |5        |social_media   |517618        |
|2012    |6        |video          |501098        |
|2012    |6        |search_engine  |506497        |
|2012    |6        |social_media   |504679        |
|2012    |6        |email          |501947        |
|2012    |7        |search_engine  |522780        |
|2012    |7        |email          |518405        |
---------------------------------------------------



#### 4.3 Gasto total en todos los canales.

Esta transformación, nos permitirá joinear a la tabla de ingresos de modo que tengamos nuestras características de entrada y la variable objetivo en una sola tabla para el entrenamiento del modelo.

Generar alias para las columnas, para hacerlas más amigables para el usuario.

In [21]:
snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)

print("Total Spend Across All Channels")

# Uncomment the following 'action' statement to execute transformations and view first ten results
snow_df_spend_per_month.show()

Total Spend Across All Channels
---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |10       |518942           |520863          |522768   |519950   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2012    |12       |520148           |520711          |521427   |520724   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |4

#### 4.4 Ingresos totales por año y datos mensuales.

Ahora transformemos los datos de ingresos en ingresos por año/mes usando las funciones `group_by()` y `agg()`.

In [22]:
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

print("Total Revenue per Year and Month")
snow_df_revenue_per_month.show()

Total Revenue per Year and Month
---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
|2012    |10       |3334028.46  |
|2012    |11       |3185894.64  |
|2012    |12       |3334570.96  |
|2013    |1        |3316455.44  |
|2013    |2        |2995042.21  |
---------------------------------



#### 4.5 Cruce el gasto total, a los ingresos totales por año y mes en todos los canales.

A continuación, cruzamos estos datos de ingresos con los de gasto de campaña transformados para que nuestras entradas (es decir, costo por canal) y variable objetivo (es decir, ingresos) se puedan cargar en una sola tabla para análisis adicionales y capacitación del modelo.


In [23]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])

print("Total Spend and Revenue per Year and Month Across All Channels")
snow_df_spend_and_revenue_per_month.show()

Total Spend and Revenue per Year and Month Across All Channels
----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
----------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |3264300.11  |
|2012    |6        |506497           |504679          |501098   |501947   |3208482.33  |
|2012    |7        |522780           |521395          |522762   |518405   |3311966.98  |
|2012    |8        |519959           |520537          |520685   |521584   |3311752.81  |
|2012    |9        |507211           |507404          |511364   |507363   |3208563.06  |
|2012    |10       |518942           |520863          |522768   |519950   |3334028.46  |
|2012    |11       |505715           |505221          |505292   |503748   |3185894.64  |
|2012    |12       |520148           |520711   

#### 4.6 Examine el Plan de Ejecución de la Consulta.

Snowpark te permite ver la consulta del DataFrame y el plan de ejecución usando la función `explain()`.

In [24]:
snow_df_spend_and_revenue_per_month.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH", "SEARCH_ENGINE" AS "SEARCH_ENGINE", "SOCIAL_MEDIA" AS "SOCIAL_MEDIA", "VIDEO" AS "VIDEO", "EMAIL" AS "EMAIL" FROM ( SELECT "YEAR", "MONTH", "'search_engine'" AS "SEARCH_ENGINE", "'social_media'" AS "SOCIAL_MEDIA", "'video'" AS "VIDEO", "'email'" AS "EMAIL" FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT "YEAR(DATE)" AS "YEAR", "MONTH(DATE)" AS "MONTH", "CHANNEL", "TOTAL_COST" FROM ( SELECT year("DATE") AS "YEAR(DATE)", month("DATE") AS "MONTH(DATE)", "CHANNEL", sum("TOTAL_COST") AS "TOTAL_COST" FROM ( SELECT "CAMPAIGN", "CHANNEL", "DATE", "TOTAL_CLICKS", "TOTAL_COST", "ADS_SERVED" FROM SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CAMPAIGN_SPEND) GROUP BY year("DATE"), month("DATE"), "CHANNEL")) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST) PIVOT (sum("TOTAL_COST") FOR "CHANNEL" IN ('search_engine', 'social_media', 'video', 'email'))) ORDER BY "YEAR" ASC NULL

#### 4.7 Guardar los datos transformados.

Guardaremos los datos transformados en una tabla Snowflake `SPEND_AND_REVENUE_PER_MONTH` para que pueda usarse para análisis adicionales y/o para entrenar un modelo.

In [25]:
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.SPEND_AND_REVENUE_PER_MONTH')

### 5. Usar una UDF para realizar transformaciones.

#### 5.1 Seleccione columnas del DataFrame dfSpendRevenuePerMonthUDF.

Cree un DataFrame a partir de la tabla `SPEND_AND_REVENUE_PER_MONTH` y examinela para descubrir qué información está disponible en este data set.
Seleccione los siguientes campos del DataFrame `dfSpendRevenuePerMonthUDF` usando el método `select()` (recuerde encerrar los nombres de las columnas en una llamada al método `col()`):

- YEAR
- MONTH
- SEARCH_ENGINE
- SOCIAL_MEDIA
- VIDEO
- EMAIL

In [26]:
tableName = 'SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.SPEND_AND_REVENUE_PER_MONTH'
dfSpendRevenuePerMonthUDF = session.table(tableName)

dfOnTimeReporting = (dfSpendRevenuePerMonthUDF
    .select(
        col("YEAR")
        ,col("MONTH")
        ,col("SEARCH_ENGINE")
        ,col("SOCIAL_MEDIA")
        ,col("VIDEO")
        ,col("EMAIL")
        ))
dfOnTimeReporting.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |10       |518942           |520863          |522768   |519950   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2012    |12       |520148           |520711          |521427   |520724   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |469856   |469784   |
------------

#### 5.2 Usa la UDF para examinar los gastos de inversión más altos.

Invoque la UDF `findTopInvestment` usando el método `call_builtin`, para verificar las inversiones de campaña más altas.
Dar a la UDF los siguientes parámetros:
- SEARCH_ENGINE
- SOCIAL_MEDIA
- VIDEO
- EMAIL


> **&#128221; Note:** Código de la UDL
```
create or replace function SNOWPARK_DEMO_DB.PUBLIC.FINDTOPINVESTMENT(val1 int, text1 text, val2 int, text2 text, val3 int, text3 text, val4 int, text4 text)
returns string
language python
runtime_version = '3.8'
handler = 'FINDTOPINVESTMENT'
as
$$
def FINDTOPINVESTMENT(val1, text1, val2, text2, val3, text3, val4, text4):
    values = {val1: text1, val2: text2, val3: text3, val4: text4}
    max_val = max(values)
    sum_val = sum(values)

    porc_val = max_val / sum_val * 100
    
    return f"{values[max_val]} with {porc_val} of total"
$$;
```
> &#10071; La UDF debe de ser desplegada previamente en tu cuenta.


In [27]:
dfOnTimeReporting = (dfSpendRevenuePerMonthUDF
    .select(
        col("YEAR")
        ,col("MONTH")
        ,col("SEARCH_ENGINE")
        ,col("SOCIAL_MEDIA")
        ,col("VIDEO")
        ,col("EMAIL")
        ,call_builtin(
            "SNOWPARK_DEMO_DB.PUBLIC.FINDTOPINVESTMENT"
            ,col("SEARCH_ENGINE"), "SEARCH_ENGINE"
            ,col("SOCIAL_MEDIA"), "SOCIAL_MEDIA"
            ,col("VIDEO"), "VIDEO"
            ,col("EMAIL"), "EMAIL").alias("TOP INVESTMENT")
        )
)

    
dfOnTimeReporting.show()

---------------------------------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"TOP INVESTMENT"                   |
---------------------------------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |SOCIAL_MEDIA with 25.03 of total   |
|2012    |6        |506497           |504679          |501098   |501947   |SEARCH_ENGINE with 25.15 of total  |
|2012    |7        |522780           |521395          |522762   |518405   |SEARCH_ENGINE with 25.07 of total  |
|2012    |8        |519959           |520537          |520685   |521584   |EMAIL with 25.04 of total          |
|2012    |9        |507211           |507404          |511364   |507363   |VIDEO with 25.15 of total          |
|2012    |10       |518942           |520863          |522768   |519950   |VIDEO with 25.1 of total     

### &#10071; `Shut Down Kernel`
> Después de completar las actividades de esta notebook, apague la misma.