# Zenpli challenge

### Parte 1.2


# Instalar Java, Spark, and Findspark
El siguiente bloque de codigo instala Apache Spark 3.4.0, Java 8 y [Findspark](https://github.com/minrk/findspark), una librería que hace facil instalar Spark en ambientes express (Google Colab, etc)

In [1]:
!rm -rf spark-3.4.0-bin-hadoop3.tgz
!rm -rf spark-3.4.0-bin-hadoop3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark --quiet

Varuiables de entorno para findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"
os.environ["SPARK_VERSION"] = "3.4"

### Imports


In [3]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

from pyspark.sql.types import (ArrayType, LongType, StringType, StructField, StructType, IntegerType, StringType, DateType, DoubleType, TimestampType)
import pyspark.sql.functions as F
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession


In [19]:
import pandas as pd
import numpy as np
import time
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

Definimos la configuración de Spark y creamos la sesión

In [7]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .set("spark.sql.parquet.mergeSchema", "true") \
    .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") \
    .set("spark.sql.caseSensitive", "true") \
    .set("spark.storage.memoryFraction", 1) \
    .set("spark.executor.memory", "20g") \
    .set("spark.driver.memory", "20g") \
    .set("spark.cores.max", 5) \
    .set("spark.executor.cores", 5)

In [8]:
sc = SparkContext(conf=conf)

# Local
spark = SparkSession \
    .builder \
    .appName("zenpli-challenge-app") \
    .config('spark.sql.session.timeZone', 'UTC') \
    .config(conf=conf) \
    .getOrCreate()

Definimos el esquema apropiado

In [9]:
schema = StructType([
    StructField("key_1", StringType()),
    StructField("date_2", TimestampType()),
    StructField("cont_3", DoubleType()),
    StructField("cont_4", DoubleType()),
    StructField("disc_5", IntegerType()),
    StructField("disc_6", IntegerType()),
    StructField("cat_7", StringType()),
    StructField("cat_8", StringType()),
    StructField("cont_9", DoubleType()),
    StructField("cont_10", DoubleType())
])

Definimos la variable con la ruta apuntando al archivo local

In [15]:
data_path = 'file:///content/backend-dev-data-dataset.txt'

Ejecutamos la lectura

In [16]:
df = spark.read \
        .schema(schema) \
        .option("header",True) \
        .option("delimiter",",") \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .option("unescapedQuoteHandling", "STOP_AT_DELIMITER") \
        .csv(data_path)

Visualizamos la cantidad de registros

In [17]:
df.count()

572364

Visualizamos esquema

In [18]:
df.printSchema()

root
 |-- key_1: string (nullable = true)
 |-- date_2: timestamp (nullable = true)
 |-- cont_3: double (nullable = true)
 |-- cont_4: double (nullable = true)
 |-- disc_5: integer (nullable = true)
 |-- disc_6: integer (nullable = true)
 |-- cat_7: string (nullable = true)
 |-- cat_8: string (nullable = true)
 |-- cont_9: double (nullable = true)
 |-- cont_10: double (nullable = true)



Visualizamos los primeros 10 registros

In [20]:
df.show(10, truncate=False)

+------+-------------------+-------+------+------+------+--------+---------+------+-------+
| key_1|             date_2| cont_3|cont_4|disc_5|disc_6|   cat_7|    cat_8|cont_9|cont_10|
+------+-------------------+-------+------+------+------+--------+---------+------+-------+
|HC2030|2016-11-16 00:00:00| 622.27| -2.36|     2|     6|frequent|    happy|  0.24|   0.25|
|sP8147|2004-02-18 00:00:00|1056.16| 59.93|     2|     8|   never|    happy|  1.94|   2.29|
|Cq3823|2007-03-25 00:00:00| 210.73|-93.94|     1|     1|   never|    happy| -0.11|   -0.1|
|Hw9428|2013-12-28 00:00:00|1116.48| 80.58|     3|    10|   never|surprised|  1.27|   1.15|
|xZ0360|2003-08-25 00:00:00| 1038.3| 12.37|     6|    17|   never|    happy|  1.76|   1.76|
|IK2721|2012-10-19 00:00:00| 835.17|  16.3|     4|    11|frequent|surprised|  2.04|    2.3|
|iK8875|2005-02-04 00:00:00| 769.02| 75.69|     3|     2|   never|    happy| -1.53|  -1.56|
|qd0312|2014-11-17 00:00:00| 273.11|  66.2|     1|     8|frequent|surprised|  2.

1.   Normalizar una columna (cualquiera de valores continuos);





In [22]:
# Convierta los valores de la columna en un vector denso requerido para MinMaxScaler
assembler = VectorAssembler(inputCols=["cont_3"], outputCol="cont_3_vector")
dfn = assembler.transform(df)

# Inicializamos el MinMaxScaler
scaler = MinMaxScaler(inputCol="cont_3_vector", outputCol="cont_3_norm")

# Fit + transform para normalizar la columna en el dataframe
scaler_model = scaler.fit(dfn)
df_normalized = scaler_model.transform(dfn)

# Tomo las columnas relevantes
df_normalized = df_normalized.select("key_1", "cont_3","cont_3_norm").orderBy(F.col("cont_3"))

min_value = dfn.select(F.min("cont_3")).first()[0]
max_value = dfn.select(F.max("cont_3")).first()[0]

# Obtengo el min y max de la columna normalizada
min_scaled_value = df_normalized.select(F.min("cont_3_norm")).first()[0]
max_scaled_value = df_normalized.select(F.max("cont_3_norm")).first()[0]

# Show the results
print(f"Rangos originales de la columna 'cont_3': [{min_value}, {max_value}]")
print(f"Rangos normalizados en la nueva columna 'cont_3_norm': [{min_scaled_value}, {max_scaled_value}]")

# Ver resultados
df_normalized.show()

Rangos originales de la columna 'cont_3': [8.78, 127527.83]
Rangos normalizados en la nueva columna 'cont_3_norm': [[0.0], [1.0]]
+------+------+--------------------+
| key_1|cont_3|         cont_3_norm|
+------+------+--------------------+
|Vg2186|  8.78|               [0.0]|
|Fr0592| 13.91|[4.02292833894229...|
|CT5216| 14.01|[4.10134799467216...|
|PI4573| 14.27|[4.30523909956982...|
|tf6780| 14.31|[4.33660696186177...|
|af8802|  16.2|[5.81873845515630...|
|xl1206| 16.23|[5.84226435187526...|
|tm2866| 16.27|[5.87363221416721...|
|nf1924| 16.27|[5.87363221416721...|
|ib3166| 16.27|[5.87363221416721...|
|nr4259|  16.6|[6.13241707807578...|
|xg4886| 16.99|[6.43825373542227...|
|tL2721| 17.64|[6.94798149766642...|
|vR6289| 17.99|[7.22245029272096...|
|Ov1552| 18.03|[7.25381815501292...|
|uH1020| 18.27|[7.44202532876460...|
|YL9808| 18.45|[7.58318070907836...|
|Nf8845| 18.71|[7.78707181397603...|
|Xr6024| 18.78|[7.84196557298693...|
|WC0621| 18.85|[7.89685933199784...|
+------+------+----

2.   Filtrar una columna por cierto valor (cualquiera de valores categóricos);

In [23]:
cat_8_with_most_records = df.groupBy("cat_8").agg(F.count(F.col("key_1")).alias("cnt_keys")).orderBy(F.col("cnt_keys").desc()).first()[0]

df_filtered_range = df.filter((F.col("cat_8") == cat_8_with_most_records))
df_filtered_range.show()

+------+-------------------+-------+-------+------+------+--------+-----+------+-------+
| key_1|             date_2| cont_3| cont_4|disc_5|disc_6|   cat_7|cat_8|cont_9|cont_10|
+------+-------------------+-------+-------+------+------+--------+-----+------+-------+
|HC2030|2016-11-16 00:00:00| 622.27|  -2.36|     2|     6|frequent|happy|  0.24|   0.25|
|sP8147|2004-02-18 00:00:00|1056.16|  59.93|     2|     8|   never|happy|  1.94|   2.29|
|Cq3823|2007-03-25 00:00:00| 210.73| -93.94|     1|     1|   never|happy| -0.11|   -0.1|
|xZ0360|2003-08-25 00:00:00| 1038.3|  12.37|     6|    17|   never|happy|  1.76|   1.76|
|iK8875|2005-02-04 00:00:00| 769.02|  75.69|     3|     2|   never|happy| -1.53|  -1.56|
|mb3668|2002-02-26 00:00:00|2369.77| 165.12|     2|     7|   never|happy| -1.11|  -1.15|
|Ch8767|2019-06-26 00:00:00| 259.95| -21.14|     4|     7|   never|happy|  0.06|   0.05|
|MB6485|2009-08-05 00:00:00|2432.88|  -9.38|     1|     9|frequent|happy| -1.15|   null|
|ja8141|2019-11-16 00


3.   Agrupar ciertas columnas (cualesquiera que correspondan a fechas).

In [26]:
df.createOrReplaceTempView("data")

date_df = spark.sql("""
  select date(date_2) as date_2,
         round(avg(cont_3),3) as avg_cont_3,
         sum(disc_5) as sum_disc_5,
         count(cat_7) as cnt_cat_7,
         count(distinct cat_7) as cntd_cat_7
  from data
  group by 1
  order by 1 asc
""")

date_df.show()

+----------+----------+----------+---------+----------+
|    date_2|avg_cont_3|sum_disc_5|cnt_cat_7|cntd_cat_7|
+----------+----------+----------+---------+----------+
|2000-01-01|  1307.283|       261|       80|         4|
|2000-01-02|  1566.393|       278|       71|         3|
|2000-01-03|  1633.249|       207|       60|         4|
|2000-01-04|  1714.456|       308|       80|         4|
|2000-01-05|  1906.198|       231|       64|         4|
|2000-01-06|  1222.739|       244|       72|         4|
|2000-01-07|  1795.397|       271|       79|         4|
|2000-01-08|  1630.553|       272|       80|         4|
|2000-01-09|  1519.671|       273|       76|         3|
|2000-01-10|  1825.371|       284|       74|         4|
|2000-01-11|  1663.886|       271|       80|         4|
|2000-01-12|   1457.76|       231|       63|         3|
|2000-01-13|  1553.177|       277|       78|         3|
|2000-01-14|   1743.32|       253|       68|         4|
|2000-01-15|  1332.923|       254|       81|    