# Desafio 4
# =========
* Parseo de un Array de Structs en un dataframe

In [45]:
import os
import requests
import json
import warnings
warnings.filterwarnings('ignore')

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [46]:
spark = SparkSession.builder.getOrCreate()

In [47]:
# Read JSON file into dataframe
df = spark.read.json("./input_ejemplos/MPE1004.json")
df.printSchema()
#df.show()

root
 |-- available_filters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- values: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- results: long (nullable = true)
 |-- available_sorts: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- values: array (nullable = true)
 |    |    |    |-- element: struct (c

In [48]:
df_select = df.select("results.id", "results.sold_quantity", 'results.available_quantity')
df_select.show()

+--------------------+--------------------+--------------------+
|                  id|       sold_quantity|  available_quantity|
+--------------------+--------------------+--------------------+
|[MPE433108265, MP...|[6, 6, 3, 15, 15,...|[9, 3, 17, 18, 1,...|
+--------------------+--------------------+--------------------+



In [49]:
from pyspark.sql.functions import arrays_zip, col, explode, monotonically_increasing_id

df_ = (df_select    
    .withColumn("tmp", arrays_zip("id", "sold_quantity", 'available_quantity'))
    .withColumn("tmp", explode("tmp"))
    .select(col("tmp.id"), col("tmp.sold_quantity"), col("tmp.available_quantity"))).withColumn("rowId", monotonically_increasing_id()+1)
df_.show()

+------------+-------------+------------------+-----+
|          id|sold_quantity|available_quantity|rowId|
+------------+-------------+------------------+-----+
|MPE433108265|            6|                 9|    1|
|MPE434382765|            6|                 3|    2|
|MPE433853177|            3|                17|    3|
|MPE419883282|           15|                18|    4|
|MPE431714651|           15|                 1|    5|
|MPE438492919|            0|               100|    6|
|MPE429448587|            0|                50|    7|
|MPE439307195|            0|                 3|    8|
|MPE439307251|            0|                 3|    9|
|MPE437503507|            0|                10|   10|
|MPE438828260|            0|                 3|   11|
|MPE439307426|            0|                 3|   12|
|MPE440306037|            0|                 1|   13|
|MPE439307206|            0|                 3|   14|
|MPE431446248|            2|                23|   15|
|MPE439307250|            0|

In [50]:
df_4 = df_ \
            .withColumnRenamed("id","itemId") \
            .withColumnRenamed("sold_quantity","soldQuantity") \
            .withColumnRenamed("available_quantity","availableQuantity")
df_4.select('rowId', 'itemId', 'soldQuantity', 'availableQuantity').show(39)

+-----+------------+------------+-----------------+
|rowId|      itemId|soldQuantity|availableQuantity|
+-----+------------+------------+-----------------+
|    1|MPE433108265|           6|                9|
|    2|MPE434382765|           6|                3|
|    3|MPE433853177|           3|               17|
|    4|MPE419883282|          15|               18|
|    5|MPE431714651|          15|                1|
|    6|MPE438492919|           0|              100|
|    7|MPE429448587|           0|               50|
|    8|MPE439307195|           0|                3|
|    9|MPE439307251|           0|                3|
|   10|MPE437503507|           0|               10|
|   11|MPE438828260|           0|                3|
|   12|MPE439307426|           0|                3|
|   13|MPE440306037|           0|                1|
|   14|MPE439307206|           0|                3|
|   15|MPE431446248|           2|               23|
|   16|MPE439307250|           0|                3|
|   17|MPE43

# Desafio 5
# =========

* Agregar las visitas al DataFrame con datos de ventas

In [51]:
df_visit = spark.read.options(header='True', inferSchena='True').csv("./input_ejemplos/visits.csv")
 
df_visit.printSchema()

root
 |-- itemId: string (nullable = true)
 |-- visits: string (nullable = true)



In [52]:
df_visit.count()

39

In [53]:
df_4.count()

39

In [54]:
df_5 = df_4.select('itemId', 'soldQuantity').join(df_visit, on='itemId',how='left') # Could also use 'left_outer'
df_5 = df_5.filter(df_5.soldQuantity > 0)
df_5.show()

+------------+------------+------+
|      itemId|soldQuantity|visits|
+------------+------------+------+
|MPE433108265|           6|   203|
|MPE434382765|           6|   170|
|MPE433853177|           3|  1034|
|MPE419883282|          15|  1772|
|MPE431714651|          15|    33|
|MPE431446248|           2|  2242|
|MPE432990777|           1|   426|
|MPE440389411|           1|   158|
|MPE421767433|           4|   746|
|MPE432439269|           2|    42|
|MPE430002527|           1|    60|
|MPE428549082|           1|   352|
|MPE433933924|           1|    49|
|MPE432291284|           2|     6|
|MPE432728801|           1|    68|
|MPE433252062|           2|    92|
|MPE427140390|          10|    81|
+------------+------------+------+



# Desafio 6
# =========

* Agregar métricas a un DataFrame.

In [55]:
df_venta_visita = df_5.withColumn("conversionRate", df_5.soldQuantity/df_5.visits)
df_venta_visita.show()

+------------+------------+------+--------------------+
|      itemId|soldQuantity|visits|      conversionRate|
+------------+------------+------+--------------------+
|MPE433108265|           6|   203|0.029556650246305417|
|MPE434382765|           6|   170| 0.03529411764705882|
|MPE433853177|           3|  1034|0.002901353965183...|
|MPE419883282|          15|  1772|0.008465011286681716|
|MPE431714651|          15|    33| 0.45454545454545453|
|MPE431446248|           2|  2242|8.920606601248885E-4|
|MPE432990777|           1|   426|0.002347417840375587|
|MPE440389411|           1|   158|0.006329113924050633|
|MPE421767433|           4|   746|0.005361930294906166|
|MPE432439269|           2|    42|0.047619047619047616|
|MPE430002527|           1|    60|0.016666666666666666|
|MPE428549082|           1|   352|0.002840909090909091|
|MPE433933924|           1|    49| 0.02040816326530612|
|MPE432291284|           2|     6|  0.3333333333333333|
|MPE432728801|           1|    68|0.014705882352

In [56]:
from pyspark.sql import Window
import pyspark.sql.functions as psf

conver_r = Window.orderBy(psf.desc('conversionRate'))
df_6 = df_venta_visita.withColumn(
    "conversionRanking", 
    psf.dense_rank().over(conver_r)
).withColumn(
    "conversionRate", 
    psf.round(col('conversionRate'),4)
)
df_6.show()

+------------+------------+------+--------------+-----------------+
|      itemId|soldQuantity|visits|conversionRate|conversionRanking|
+------------+------------+------+--------------+-----------------+
|MPE431714651|          15|    33|        0.4545|                1|
|MPE432291284|           2|     6|        0.3333|                2|
|MPE427140390|          10|    81|        0.1235|                3|
|MPE432439269|           2|    42|        0.0476|                4|
|MPE434382765|           6|   170|        0.0353|                5|
|MPE433108265|           6|   203|        0.0296|                6|
|MPE433252062|           2|    92|        0.0217|                7|
|MPE433933924|           1|    49|        0.0204|                8|
|MPE430002527|           1|    60|        0.0167|                9|
|MPE432728801|           1|    68|        0.0147|               10|
|MPE419883282|          15|  1772|        0.0085|               11|
|MPE440389411|           1|   158|        0.0063

22/01/24 21:42:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/24 21:42:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/24 21:42:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


# Desafio 7
# =========

* Porcentaje de Stock

In [58]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window

df_7 = df_4.withColumn('stockPercentage', f.col('availableQuantity')/f.sum('availableQuantity').over(Window.partitionBy()))
df_7.withColumn(
    "stockPercentage", 
    f.round(col('stockPercentage')*100,2)
).orderBy('stockPercentage', ascending=False).select('itemId', 'availableQuantity', 'stockPercentage').show(39)

+------------+-----------------+---------------+
|      itemId|availableQuantity|stockPercentage|
+------------+-----------------+---------------+
|MPE433046443|              999|           70.3|
|MPE438492919|              100|           7.04|
|MPE436649728|              100|           7.04|
|MPE429448587|               50|           3.52|
|MPE431446248|               23|           1.62|
|MPE419883282|               18|           1.27|
|MPE433853177|               17|            1.2|
|MPE421767433|               11|           0.77|
|MPE432202936|               10|            0.7|
|MPE437503507|               10|            0.7|
|MPE440389411|                9|           0.63|
|MPE433108265|                9|           0.63|
|MPE440131689|                7|           0.49|
|MPE428549066|                5|           0.35|
|MPE432990777|                5|           0.35|
|MPE428549082|                4|           0.28|
|MPE438828260|                3|           0.21|
|MPE439307317|      

22/01/24 21:44:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/24 21:44:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/24 21:44:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
