In [2]:
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession
import os
import time

In [4]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('desafio_nubimetrics') \
                    .getOrCreate()

# Desafío 3

In [5]:
#Creo un string con el nombre del archivo
filename_d3 = "Sellers.json"

# Leo el archivo de formato json y lo almaceno en un dataframe
sellers = spark.read.json(filename_d3)

In [6]:
sellers.printSchema()

root
 |-- body: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- buyer_reputation: struct (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- country_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- logo: string (nullable = true)
 |    |-- nickname: string (nullable = true)
 |    |-- permalink: string (nullable = true)
 |    |-- points: long (nullable = true)
 |    |-- registration_date: string (nullable = true)
 |    |-- seller_reputation: struct (nullable = true)
 |    |    |-- level_id: string (nullable = true)
 |    |    |-- power_seller_status: string (nullable = true)
 |    |    |-- transactions: struct (nullable = true)
 |    |    |    |-- canceled: long (nullable = true)
 |    |    |    |-- completed: long (nullable = true)
 |    |    |    |-- period: s

In [7]:
# Selecciono solo la información de interés y lo grabo en un nuevo dataframe
sellers_info = sellers.select("body.site_id", "body.id", "body.nickname", "body.points")

In [8]:
sellers_info.show()

+-------+---------+--------------------+------+
|site_id|       id|            nickname|points|
+-------+---------+--------------------+------+
|    MPE|298734964|      MARIELATAQUIRE|     2|
|    MPE|183049329|         MURO8709951|    -3|
|    MPE| 94592189|          ILLARYPERU|    -2|
|    MPE|520133997|     ISABELLADELPOZO|     1|
|    MPE|684964436|         PHMO1747353|     0|
|    MPE|685079498|MELISSASUSANAARVA...|     0|
|    MPE|646068761|   YOMIDELGADOSNCHEZ|     0|
|    MPE|685310649|DONATILDONATILDEC...|     0|
|    MPE|685419864|        VANESSAURNER|     0|
|    MPE|285674870|    ERICKLOPEZUSMSYA|     2|
|    MPE|685275449|DANIELARUIZRIDRIGUES|     0|
|    MPE| 48893023|        MARCELASUSAN|     1|
|    MPE|603331827|         COVA1031117|     0|
|    MPE|205264135|         GOBR7283790|     1|
|    MPE|580279940|        LORDVENCEDOR|     0|
|    MPE|300834652| DANIELAHILARIORAMOS|     0|
|    MPE|270322958|JHONANTHONYCAYLLA...|     0|
|    MPE|684554092|OSORIOCOLQUIJESSM...|

In [9]:
# Creo un nuevo dataframe para los vendedores con puntaje positivo
positivo = sellers_info.filter(sellers_info.points > 0)

# Creo un nuevo dataframe para los vendedores con puntaje negativo
negativo = sellers_info.filter(sellers_info.points < 0)

# Creo un nuevo dataframe para los vendedores con puntaje cero
cero = sellers_info.filter(sellers_info.points == 0)

positivo.show()
negativo.show()
cero.show()

+-------+---------+----------------+------+
|site_id|       id|        nickname|points|
+-------+---------+----------------+------+
|    MPE|298734964|  MARIELATAQUIRE|     2|
|    MPE|520133997| ISABELLADELPOZO|     1|
|    MPE|285674870|ERICKLOPEZUSMSYA|     2|
|    MPE| 48893023|    MARCELASUSAN|     1|
|    MPE|205264135|     GOBR7283790|     1|
+-------+---------+----------------+------+

+-------+---------+-----------+------+
|site_id|       id|   nickname|points|
+-------+---------+-----------+------+
|    MPE|183049329|MURO8709951|    -3|
|    MPE| 94592189| ILLARYPERU|    -2|
+-------+---------+-----------+------+

+-------+---------+--------------------+------+
|site_id|       id|            nickname|points|
+-------+---------+--------------------+------+
|    MPE|684964436|         PHMO1747353|     0|
|    MPE|685079498|MELISSASUSANAARVA...|     0|
|    MPE|646068761|   YOMIDELGADOSNCHEZ|     0|
|    MPE|685310649|DONATILDONATILDEC...|     0|
|    MPE|685419864|        VANES

In [10]:
#%% Almaceno los resultados

fecha = time.strftime("%m/%Y/%d")
sitio = sellers_info.collect()[0]["site_id"]

# Creo la estructura de carpetas necesaria
os.makedirs( str(sitio) + "/" + str(fecha) + "/positivo")
os.mkdir( str(sitio) + "/" + str(fecha) + "/negativo")
os.mkdir( str(sitio) + "/" + str(fecha) + "/cero")

In [11]:
# Guardo los resultados en archivos CVS
positivo.toPandas().to_csv(str(sitio) + '/' + str(fecha) + '/positivo/positive_sellers_with_spark.csv', header = 'true', sep = ',')
negativo.toPandas().to_csv(str(sitio) + '/' + str(fecha) + '/negativo/negative_sellers_with_spark.csv', header = 'true', sep = ',')
cero.toPandas().to_csv(str(sitio) + '/' + str(fecha) + '/cero/cero_sellers_with_spark.csv', header = 'true', sep = ',')


# Desafío 4

In [12]:
#Creo un string con el nombre del archivo
filename_d4 = "MPE1004.json"

# Leo el archivo de formato json y lo almaceno en un dataframe
MPE1004 = spark.read.json(filename_d4)
MPE1004.printSchema()

root
 |-- available_filters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- values: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- results: long (nullable = true)
 |-- available_sorts: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- values: array (nullable = true)
 |    |    |    |-- element: struct (c

In [125]:
import pyspark.sql.functions as F

products_info = MPE1004.select("results.id", "results.sold_quantity", "results.available_quantity")
products_info = ( products_info.withColumn("tmp", F.arrays_zip("id", "sold_quantity", "available_quantity"))
                  .withColumn("tmp", F.explode("tmp"))
                  .select(F.col("tmp.id"), F.col("tmp.sold_quantity"), F.col("tmp.available_quantity")) )

products_info = products_info.withColumn( "rowId", F.monotonically_increasing_id()+1 ) 
products_info = products_info.select( "rowId", "id", "sold_quantity", "available_quantity")

products_info.show(40) # Resultado del desafío 4

+-----+------------+-------------+------------------+
|rowId|          id|sold_quantity|available_quantity|
+-----+------------+-------------+------------------+
|    1|MPE433108265|            6|                 9|
|    2|MPE434382765|            6|                 3|
|    3|MPE433853177|            3|                17|
|    4|MPE419883282|           15|                18|
|    5|MPE431714651|           15|                 1|
|    6|MPE438492919|            0|               100|
|    7|MPE429448587|            0|                50|
|    8|MPE439307195|            0|                 3|
|    9|MPE439307251|            0|                 3|
|   10|MPE437503507|            0|                10|
|   11|MPE438828260|            0|                 3|
|   12|MPE439307426|            0|                 3|
|   13|MPE440306037|            0|                 1|
|   14|MPE439307206|            0|                 3|
|   15|MPE431446248|            2|                23|
|   16|MPE439307250|        

# Desafío 5

In [127]:
#Creo un string con el nombre del archivo
filename_d5 = "visits.csv"

# Leo el archivo de formato json y lo almaceno en un dataframe
data_visits = spark.read.csv(filename_d5, header = True)

data_visits.show()

+------------+------+
|      itemId|visits|
+------------+------+
|MPE433108265|   203|
|MPE434382765|   170|
|MPE433853177|  1034|
|MPE419883282|  1772|
|MPE431714651|    33|
|MPE438492919|  1160|
|MPE429448587|  2669|
|MPE439307195|    36|
|MPE439307251|   257|
|MPE437503507|   292|
|MPE438828260|   102|
|MPE439307426|    29|
|MPE440306037|    50|
|MPE439307206|   120|
|MPE431446248|  2242|
|MPE439307250|   108|
|MPE439510012|    56|
|MPE439307317|   183|
|MPE439307286|   267|
|MPE439307385|    22|
+------------+------+
only showing top 20 rows



In [134]:
# Uno los datos de visits al dataframe del desafío 4 usando la columna idItem para el join
visits_info = products_info.join(data_visits, products_info.id == data_visits.itemId).select("id", "sold_quantity","visits")

# Filtro los elementos sin ventas
visits_with_sold = visits_info.filter(visits_info.sold_quantity != 0)

visits_with_sold.show() # Resultado del desafío 5

+------------+-------------+------+
|          id|sold_quantity|visits|
+------------+-------------+------+
|MPE433108265|            6|   203|
|MPE434382765|            6|   170|
|MPE433853177|            3|  1034|
|MPE419883282|           15|  1772|
|MPE431714651|           15|    33|
|MPE431446248|            2|  2242|
|MPE432990777|            1|   426|
|MPE440389411|            1|   158|
|MPE421767433|            4|   746|
|MPE432439269|            2|    42|
|MPE430002527|            1|    60|
|MPE428549082|            1|   352|
|MPE433933924|            1|    49|
|MPE432291284|            2|     6|
|MPE432728801|            1|    68|
|MPE433252062|            2|    92|
|MPE427140390|           10|    81|
+------------+-------------+------+



# Desafío 6

In [171]:
from pyspark.sql import Window

# Al dataframe anterior le inserto una nueva columna con el encabezado "conversionRate" y los datos requeridos
conversion_data = ( visits_with_sold.withColumn("conversionRate", 
                    F.round( ( F.col("sold_quantity") / F.col("visits") ), 4 ) ) )

# Agrego la columna del ranking y ordeno los datos de forma descendente respecto a "conversionRate"
conversion_data = ( conversion_data.withColumn( "conversionRanking", 
                    F.dense_rank().over( Window.orderBy( F.desc(conversion_data.conversionRate) ) ) ) )

conversion_data.show() # Resultado del desafío 6

+------------+-------------+------+--------------+-----------------+
|          id|sold_quantity|visits|conversionRate|conversionRanking|
+------------+-------------+------+--------------+-----------------+
|MPE431714651|           15|    33|        0.4545|                1|
|MPE432291284|            2|     6|        0.3333|                2|
|MPE427140390|           10|    81|        0.1235|                3|
|MPE432439269|            2|    42|        0.0476|                4|
|MPE434382765|            6|   170|        0.0353|                5|
|MPE433108265|            6|   203|        0.0296|                6|
|MPE433252062|            2|    92|        0.0217|                7|
|MPE433933924|            1|    49|        0.0204|                8|
|MPE430002527|            1|    60|        0.0167|                9|
|MPE432728801|            1|    68|        0.0147|               10|
|MPE419883282|           15|  1772|        0.0085|               11|
|MPE440389411|            1|   158

# Desafío 7

In [209]:
# Selecciono las columnas indicadas
percentage_stock  = products_info.select("id", "available_quantity")
total_stock = percentage_stock.select("available_quantity").groupBy().sum().collect()[0][0]

# Agrego la columna stockPercentage, con el cálculo del porcentaje de stock de cada articulo
percentage_stock = ( percentage_stock.withColumn("stockPercentage", 
                    F.round( ( F.col("available_quantity") / total_stock  ) * 100, 2 ) ) )

# Ordeno los datos del dataframe de manera que descendente respecto a "stockPercentage"
percentage_stock = percentage_stock.sort( F.desc("stockPercentage") )


percentage_stock.show(40) # Resultado del desafío 7

+------------+------------------+---------------+
|          id|available_quantity|stockPercentage|
+------------+------------------+---------------+
|MPE433046443|               999|           70.3|
|MPE438492919|               100|           7.04|
|MPE436649728|               100|           7.04|
|MPE429448587|                50|           3.52|
|MPE431446248|                23|           1.62|
|MPE419883282|                18|           1.27|
|MPE433853177|                17|            1.2|
|MPE421767433|                11|           0.77|
|MPE432202936|                10|            0.7|
|MPE437503507|                10|            0.7|
|MPE440389411|                 9|           0.63|
|MPE433108265|                 9|           0.63|
|MPE440131689|                 7|           0.49|
|MPE428549066|                 5|           0.35|
|MPE432990777|                 5|           0.35|
|MPE428549082|                 4|           0.28|
|MPE438828260|                 3|           0.21|
