In [1]:
!pip install pyspark



In [7]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import re

In [3]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [29]:
rddCategorias = sc.textFile("categories.csv")
rddProductos = sc.textFile("products.csv")
rddResenias = sc.textFile("reviews.csv")
rddClientes = sc.textFile("customers.csv")
rddOrdenes = sc.textFile("orders.csv")
rddOrdenesProductos = sc.textFile("order_items.csv")
rddRegistrosInventario = sc.textFile("inventory_logs.csv")

In [27]:
import re
import csv
from io import StringIO

def parsear_linea_csv(linea):
    if linea.startswith(','):
        return None
    try:
        archivo_csv = StringIO(linea)
        lector = csv.reader(archivo_csv)
        linea = next(lector)
        return [campo.lower().strip() for campo in linea]
    except:
        return None

## Consulta 1: Cuál es el estado que más descuentos tiene en total? y en promedio?

In [30]:
def extraer_estado_y_descuento (linea):
    billing_address = linea[7]
    discount_amount_str = linea[8]

    matching = re.search(r',?\s*([A-Za-z]{2})\s*\d{5}', billing_address, flags=re.IGNORECASE)
    estado = matching.group(1).upper()
    descuento = float(discount_amount_str)

    return (estado, descuento)

rddOrdenes_parseadas = rddOrdenes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddOrdenes_filtradas = rddOrdenes_parseadas.filter(lambda linea: len(linea) > 8 and linea[8] != '' and linea[7] != '' and linea[7] != 'undefined' and float(linea[8]) > 0)

rddOrdenes_con_estado_descuento = rddOrdenes_filtradas.map(extraer_estado_y_descuento)

rddOrdenes_con_estado_descuento.cache()

total_descuento_por_estado = rddOrdenes_con_estado_descuento.reduceByKey(lambda x, y: x + y)

conteo_descuento_por_estado = rddOrdenes_con_estado_descuento.mapValues(lambda x: (x, 1))

total_descuento_y_conteo_por_estado = conteo_descuento_por_estado.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

promedio_descuento_por_estado = total_descuento_y_conteo_por_estado.mapValues(lambda x: x[0] / x[1] if x[1] > 0 else 0)

estado_mas_descuentos = total_descuento_por_estado.reduce(lambda x, y: x if x[1] > y[1] else y)

estado_mas_descuentos_promedio = promedio_descuento_por_estado.reduce(lambda x, y: x if x[1] > y[1] else y)

display(estado_mas_descuentos, estado_mas_descuentos_promedio)

('AE', 1625258.6600000004)

('NC', 50.67375117676878)

## Consulta 2: ¿Cuáles son los 5 códigos postales más comunes para las órdenes con estado ‘Refunded’? ¿Y cuál es el nombre más frecuente entre los clientes de esas direcciones?

In [31]:
def extraer_id_y_codigo_postal(linea):
    customer_id = linea[2]
    billing_address = linea[7]

    matching = re.search(r'\s+(\d{5})\s*$', billing_address)
    if matching:
        codigo_postal = matching.group(1)
        return (customer_id, codigo_postal)
    return None

rddOrdenes_parseadas = rddOrdenes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddOrdenes_filtradas = rddOrdenes_parseadas.filter(lambda linea: len(linea) > 7 and linea[2] != '' and linea[4] == 'refunded' and linea[7] != 'undefined' and linea[7] != '')

rddOrdenes_con_id_codigo_postal = rddOrdenes_filtradas.map(extraer_id_y_codigo_postal).filter(lambda x: x is not None)

rddOrdenes_con_id_codigo_postal.cache()

cantidad_ordenes_por_codigo_postal = rddOrdenes_con_id_codigo_postal.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y)

top_5_codigos_postales = cantidad_ordenes_por_codigo_postal.takeOrdered(5, key=lambda x: -x[1])

top_5_codigos = [codigo for codigo, count in top_5_codigos_postales]

rddClientes_con_id_nombre = rddClientes.map(parsear_linea_csv).filter(lambda linea: linea is not None and len(linea) > 3 and linea[1] != '' and linea[3] != 'undefined' and linea[3] != '').map(lambda linea: (linea[1], linea[3]))

rddOrdenes_con_id_codigoPostal_nombre = rddOrdenes_con_id_codigo_postal.join(rddClientes_con_id_nombre)

rddOrdenes_top_5_codigos_postales = rddOrdenes_con_id_codigoPostal_nombre.filter(lambda x: x[1][0] in top_5_codigos)

cantidad_ordenes_por_nombre = rddOrdenes_top_5_codigos_postales.map(lambda x: (x[1][1], 1)).reduceByKey(lambda x, y: x + y)

nombre_mas_frecuente = cantidad_ordenes_por_nombre.reduce(lambda x, y: x if x[1] > y[1] else y)

display(top_5_codigos_postales, nombre_mas_frecuente)

[('31571', 6), ('09045', 5), ('14396', 5), ('73291', 5), ('91623', 5)]

('jamie', 1)

## Consulta 3: Para cada tipo de pago y segmento de cliente, devolver la suma y el promedio expresado como porcentaje, de clientes activos y de consentimiento de marketing.

In [None]:
rddOrdenes_parseadas = rddOrdenes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddOrdenes_filtradas = rddOrdenes_parseadas.filter(lambda linea: len(linea) > 5 and linea[1] != '' and linea[5] != '' and linea[5] != 'undefined')

rddClientes_parseados = rddClientes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddClientes_filtrados = rddClientes_parseados.filter(lambda linea: len(linea) > 16 and linea[1] != '' and linea[14] != '' and linea[15] != '' and linea[16] != '' and linea[15] != 'undefined')

rddOrdenes_con_id_MetodoPago = rddOrdenes_filtradas.map(lambda linea: (linea[1], linea[5])).reduceByKey(lambda x, y: x)

rddClientes_reducidos = rddClientes_filtrados.map(lambda linea: (linea[1], (linea[14], linea[15], linea[16])))

rddOrdenes_clientes = rddOrdenes_con_id_MetodoPago.join(rddClientes_reducidos)

rddPagos_segmentos_clientes = rddOrdenes_clientes.map(lambda x: ((x[1][0], x[1][1][1]), (x[1][1][0] == 'true', x[1][1][2] == 'true')))

suma_y_cantidad_clientes = rddPagos_segmentos_clientes.mapValues(lambda x: (int(x[0]), int(x[1]), 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))

resultado = suma_y_cantidad_clientes.mapValues(lambda x: (x[0], x[1], (x[0]/x[2])*100 if x[2] > 0 else 0, (x[1]/x[2])*100 if x[2] > 0 else 0))

display(resultado.collect())

[(('credit card', 'budget'),
  (12366, 9560, 90.14433590902465, 69.68945910482577)),
 (('credit card', 'regular'),
  (37168, 28856, 89.9907994770229, 69.86586605975498)),
 (('digital wallet', 'premium'),
  (12529, 9718, 89.9813272048262, 69.79316288422866)),
 (('digital wallet', 'regular'),
  (37513, 29101, 90.03696236559139, 69.84687019969277)),
 (('paypal', 'budget'), (12339, 9576, 89.96718920889538, 69.82136347065257)),
 (('cash on delivery', 'budget'),
  (12328, 9488, 90.26872666032072, 69.47353005784579)),
 (('paypal', 'premium'), (12483, 9659, 90.12996389891697, 69.74007220216608)),
 (('bank transfer', 'budget'),
  (12457, 9703, 89.9032909930716, 70.02742494226328)),
 (('credit card', 'premium'),
  (12426, 9768, 89.36996547756041, 70.25316455696202)),
 (('cash on delivery', 'premium'),
  (12522, 9739, 89.85361653272102, 69.8837543053961)),
 (('digital wallet', 'budget'),
  (12326, 9645, 90.23426061493412, 70.60761346998537)),
 (('cash on delivery', 'regular'),
  (37426, 29168, 89

## Consulta 4: Para los productos que contienen en su descripción la palabra “stuff” (sin importar mayúsculas o minúsculas), calcular el peso total de su inventario agrupado por marca, mostrar sólo la marca y el peso total de las 5 más pesadas.

In [None]:
rddProductos_parseados = rddProductos.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddProductos_filtrados = rddProductos_parseados.filter(lambda linea: len(linea) > 10 and linea[4] != '' and linea[4] != 'undefined' and linea[8] != '' and 'stuff' in linea[10])

rddProductos_con_marca_peso = rddProductos_filtrados.map(lambda linea: (linea[4], float(linea[8])))

peso_total_por_marca = rddProductos_con_marca_peso.reduceByKey(lambda x, y: x + y)

top_5_marcas_pesadas = peso_total_por_marca.takeOrdered(5, key=lambda x: -x[1])

display(top_5_marcas_pesadas)

[('3m', 4250.86),
 ('wayfair', 4080.1700000000005),
 ('adidas', 4057.3400000000006),
 ('nike', 3614.9600000000005),
 ('hasbro', 3338.5799999999995)]

## Consulta 5: Calcular el porcentaje de productos cuyo stock es al menos 20% más alto que el stock promedio de su marca

In [None]:
total_productos = rddProductos.count()

rddProductos_parseados = rddProductos.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddProductos_filtrados = rddProductos_parseados.filter(lambda linea: len(linea) > 7 and linea[4] != '' and linea[4] != 'undefined' and linea[7] != '')

rddProductos_con_marca_stock = rddProductos_filtrados.map(lambda linea: (linea[4], float(linea[7])))

rddProductos_con_marca_stock.cache()

stock_promedio_por_marca = rddProductos_con_marca_stock.map(lambda x: (x[0], (x[1], 1))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).mapValues(lambda x: x[0] / x[1])

rddProductos_con_stock_por_marca = rddProductos_con_marca_stock.join(stock_promedio_por_marca)

rddProductos_con_stock_al_menos_20_porciento_mas_alto = rddProductos_con_stock_por_marca.filter(lambda x: x[1][0] > ((0.2 * x[1][1]) + x[1][1]))

porcentaje_productos_al_menos_20_porciento_mas_alto = rddProductos_con_stock_al_menos_20_porciento_mas_alto.count() / total_productos * 100

display(porcentaje_productos_al_menos_20_porciento_mas_alto)

25.075404230747083

## Consulta 6: Obtener la cantidad de órdenes que no hayan comprado ninguno de los 10 productos más vendidos.

In [None]:
rddRegistrosInventario_parseados = rddRegistrosInventario.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddRegistrosInventario_filtrados = rddRegistrosInventario_parseados.filter(lambda linea: len(linea) > 5 and linea[2] != '' and linea[5] == 'sale' and linea[3] == 'out')

rddRegistrosInventario_productos_vendidos = rddRegistrosInventario_filtrados.map(lambda linea: (linea[2], 1)).reduceByKey(lambda x, y: x + y)

top_10_productos_vendidos_lista = rddRegistrosInventario_productos_vendidos.takeOrdered(10, key=lambda x: -x[1])

top_10_productos_ids = [id_producto for id_producto, cuenta in top_10_productos_vendidos_lista]

top_10_productos_ids_rdd = sc.parallelize(top_10_productos_ids).map(lambda x: (x, True))

rddOrdenesProductos_parseados = rddOrdenesProductos.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddOrdenesProductos_filtrados = rddOrdenesProductos_parseados.filter(lambda linea: len(linea) > 3 and linea[2] != '' and linea[3] != '')

rddOrdenesProductos_producto_orden = rddOrdenesProductos_filtrados.map(lambda linea: (linea[2], linea[1]))

rddOrdenes_sin_top_10_productos = rddOrdenesProductos_producto_orden.leftOuterJoin(top_10_productos_ids_rdd).filter(lambda x: x[1][1] is None)

ids_ordenes_unicos_sin_top_10 = rddOrdenes_sin_top_10_productos.map(lambda x: x[1][0]).distinct()

cantidad_ordenes_unicas_sin_top_10 = ids_ordenes_unicos_sin_top_10.count()

display(cantidad_ordenes_unicas_sin_top_10)

285243

## Consulta 7: ¿Cuáles son los productos más vendidos en cada categoría?

In [None]:
def encontrar_producto_mas_vendido(productos):
    if not productos:
        return None
    return max(productos, key=lambda x: x[1])

rddRegistrosInventario_parseados = rddRegistrosInventario.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddRegistrosInventario_filtrados = rddRegistrosInventario_parseados.filter(lambda linea: len(linea) > 5 and linea[2] != '' and linea[5] == 'sale' and linea[3] == 'out')

rddRegistrosInventario_productos_vendidos = rddRegistrosInventario_filtrados.map(lambda linea: (linea[2], 1)).reduceByKey(lambda x, y: x + y)

rddCategorias_parseadas = rddCategorias.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddCategorias_filtradas = rddCategorias_parseadas.filter(lambda linea: len(linea) > 2 and linea[1] != '' and linea[2] != '' and linea[2] != 'undefined')

rddCategorias_nombres = rddCategorias_filtradas.map(lambda linea: (linea[1], linea[2]))

rddProductos_parseados = rddProductos.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddProductos_filtrados = rddProductos_parseados.filter(lambda linea: len(linea) > 3 and linea[1] != '' and linea[3] != '')

rddProductos_info = rddProductos_filtrados.map(lambda linea: (linea[1], (linea[2], linea[3])))

rddProductos_vendidos_info = rddProductos_info.join(rddRegistrosInventario_productos_vendidos)

rddProductos_por_categoria = rddProductos_vendidos_info.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1])))

rddProductos_por_categoria_con_nombre = rddProductos_por_categoria.join(rddCategorias_nombres)

rddProductos_por_categoria_final = rddProductos_por_categoria_con_nombre.map(lambda x: (x[1][1], (x[1][0][0], x[1][0][1])))

producto_mas_vendido_por_categoria = rddProductos_por_categoria_final.groupByKey().mapValues(encontrar_producto_mas_vendido)

display(producto_mas_vendido_por_categoria.collect())

[('adhesives', ('secured secondary approach', 3)),
 ('smartphones', ('re-engineered system-worthy moratorium', 2)),
 ('grilling', ('cross-platform 5thgeneration knowledgebase', 2)),
 ('software', ('vision-oriented transitional parallelism', 2)),
 ('action figures', ('decentralized solution-oriented open architecture', 3)),
 ('sporting event tickets', ('future-proofed regional emulation', 2)),
 ('living room', ('future-proofed hybrid software', 2)),
 ('power tools', ('expanded static product', 2)),
 ('blu-ray', ('assimilated asynchronous moratorium', 2)),
 ('travel packages', ('customer-focused systemic neural-net', 2)),
 ('figurines', ('multi-lateral multi-tasking middleware', 2)),
 ('video games', ('multi-lateral eco-centric core', 2)),
 ('baby clothing', ('function-based 24hour groupware', 2)),
 ('cameras', ('re-engineered intermediate initiative', 3)),
 ('merchandise', ('inverse upward-trending local area network', 3)),
 ('outerwear', ('managed zero-defect concept', 2)),
 ('outdoor 

## Consulta 8: ¿Cuáles son las palabras más utilizadas en opiniones positivas y en opiniones negativas? (una opinión es positiva cuando tiene una puntuación mayor a 3 puntos y es negativa cuando es menor a 3 puntos)

In [None]:
rddResenias_parseadas = rddResenias.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddResenias_filtradas = rddResenias_parseadas.filter(lambda linea: len(linea) > 6 and linea[4] != '' and linea[6] != '' and linea[6] != 'undefined')

rddResenias_puntuacion_opinion = rddResenias_filtradas.map(lambda linea: (float(linea[4]), linea[6]))

rddResenias_puntuacion_opinion.cache()

rddResenias_positivas = rddResenias_puntuacion_opinion.filter(lambda x: x[0] > 3)

rddResenias_negativas = rddResenias_puntuacion_opinion.filter(lambda x: x[0] < 3)

rddResenias_positivas_palabras = rddResenias_positivas.flatMap(lambda x: re.findall(r'\b\w+\b', x[1]))

rddResenias_negativas_palabras = rddResenias_negativas.flatMap(lambda x: re.findall(r'\b\w+\b', x[1]))

palabras_positivas_mas_usadas = rddResenias_positivas_palabras.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).takeOrdered(3, key=lambda x: -x[1])

palabras_negativas_mas_usadas = rddResenias_negativas_palabras.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).takeOrdered(3, key=lambda x: -x[1])

display(palabras_positivas_mas_usadas, palabras_negativas_mas_usadas)

[('stuff', 1773), ('buy', 1772), ('answer', 1760)]

[('check', 265), ('game', 265), ('amount', 263)]

## Consulta 9: ¿Cuáles son los estados con mayor cantidad de ventas?

In [None]:
def extraer_estado(linea):
    billing_address = linea[7]

    matching = re.search(r',?\s*([A-Za-z]{2})\s*\d{5}', billing_address, flags=re.IGNORECASE)
    estado = matching.group(1).upper()

    return estado

rddOrdenes_parseadas = rddOrdenes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddOrdenes_filtradas = rddOrdenes_parseadas.filter(lambda linea: len(linea) > 7 and linea[7] != '' and linea[7] != 'undefined' and linea[4] == 'completed')

rddOrdenes_con_estado = rddOrdenes_filtradas.map(extraer_estado)

ventas_por_estado = rddOrdenes_con_estado.map(lambda linea: (linea, 1)).reduceByKey(lambda x, y: x + y)

estados_con_mas_ventas = ventas_por_estado.takeOrdered(3, key=lambda x: -x[1])

display(estados_con_mas_ventas)

[('AE', 97669), ('AP', 97534), ('AA', 96728)]

## Consulta 10: ¿Cuál es el valor promedio de una orden de compra para cada segmento de cliente?

In [None]:
rddOrdenes_parseadas = rddOrdenes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddOrdenes_filtradas = rddOrdenes_parseadas.filter(lambda linea: len(linea) > 11 and linea[2] != '' and linea[11] != '')

rddClientes_parseados = rddClientes.map(parsear_linea_csv).filter(lambda linea: linea is not None)

rddClientes_filtrados = rddClientes_parseados.filter(lambda linea: len(linea) > 15 and linea[1] != '' and linea[15] != '' and linea[15] != 'undefined')

rddOrdenes_con_id_valor = rddOrdenes_filtradas.map(lambda linea: (linea[2], float(linea[11])))

rddClientes_reducidos = rddClientes_filtrados.map(lambda linea: (linea[1], (linea[15])))

rddOrdenes_clientes = rddOrdenes_con_id_valor.join(rddClientes_reducidos)

rddOrdenes_clientes_segmentos = rddOrdenes_clientes.map(lambda x: (x[1][1], (x[1][0], 1)))

valor_promedio_por_segmento = rddOrdenes_clientes_segmentos.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).mapValues(lambda x: x[0] / x[1])

display(valor_promedio_por_segmento.collect())

[('regular', 24.997134690962728),
 ('budget', 24.99471581416989),
 ('premium', 24.99973741902044)]