###Trabajo Practico 2: Bautista Boeri 110898

Enlace a este notebook: https://colab.research.google.com/drive/1bLjl8MtPB0FRkDDXR9jeC1WLbUdigbGd?usp=sharing

###Carga de datos

Antes de comenzar con las consultas, cargamos todos los datasets a partir de los archivos guardados en el drive en formato pkl para una carga más veloz. Además, importamos
todas las librerías necesarias para el análisis y visualización de los datos.

In [None]:
import os
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
import builtins
import datetime

# ────────────────────────────────────────────────
# 1. Crear SparkSession y SQLContext
# ────────────────────────────────────────────────
spark = SparkSession.builder.appName("CSVtoRDD").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# ────────────────────────────────────────────────
# 2. Descargar los CSV desde la carpeta pública (si usás Colab)
# ────────────────────────────────────────────────
carpeta_drive = "https://drive.google.com/drive/folders/1UXAAJ-XgEe5F89U03eNdQ5NBcwzP78A2"
ruta_guardado = "/content/data"
os.makedirs(ruta_guardado, exist_ok=True)

# Descarga con gdown
!gdown --folder "{carpeta_drive}" -O "{ruta_guardado}"

# ────────────────────────────────────────────────
# 3. Listar archivos .csv
# ────────────────────────────────────────────────
archivos_csv = [f for f in os.listdir(ruta_guardado) if f.endswith(".csv")]
print("Archivos descargados:", archivos_csv)

# ────────────────────────────────────────────────
# 4. Leer CSVs con SQLContext y crear RDDs
# ────────────────────────────────────────────────
rdds = {}
spark_dfs = {}

for fname in archivos_csv:
    full_path = os.path.join(ruta_guardado, fname)

    # Leer CSV en DataFrame de Spark
    df = sqlContext.read.csv(full_path, header=True, inferSchema=True)

    # Eliminar columna _c0 si existe
    if "_c0" in df.columns:
        df = df.drop("_c0")

    # Guardar DataFrame y RDD
    spark_dfs[fname] = df
    rdds[fname] = df.rdd

    # Verificar contenido
    print(f"{fname}: {df.count()} filas, columnas: {df.columns}")



Retrieving folder contents
Processing file 1x8hlH5GyqD89YfAdKZ3cYvXmXBojWc4J categories.pkl
Processing file 1Gf21XJoj7HbnLcUMslUIyWefE0p1-tkX customers.pkl
Processing file 1njEPh1dPRiJWNjlAXQc-SrpG87sLCl_N inventory_logs.pkl
Processing file 1lM6RVI5aoEuyL3gUkWrQTL1r5MEWczmw order_items.pkl
Processing file 16O0Ohly99MnkXchEaANXe0_mMnT9b2tr orders.pkl
Processing file 1YV6x7BHM23Ykcm2rKEiSuCxd4gbZmS1J products.pkl
Processing file 19xSuuXS1-hwi_MtNIunCaVoQQqxSdgFK reviews.pkl
Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1x8hlH5GyqD89YfAdKZ3cYvXmXBojWc4J
To: /content/data/categories.pkl
100% 11.9k/11.9k [00:00<00:00, 21.4MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1Gf21XJoj7HbnLcUMslUIyWefE0p1-tkX
From (redirected): https://drive.google.com/uc?id=1Gf21XJoj7HbnLcUMslUIyWefE0p1-tkX&confirm=t&uuid=4df11141-890e-4b5f-b5ee-625ece8c0e0a
To: /content/data/custome

#Descuentos por Estado

Cuál es el estado que más descuentos tiene en total? y en promedio? Supongan que de una direccion del estilo: 3123 Alan Extension Port Andrea, MA 26926, “MA” es el estado.

**Introduccion**

Se busca analizar la cantidad total de descuentos que hay por estado. También, se ve
el promedio de estos descuentos.

**Hipotesis**

Se le adjudica el descuento al estado detallado en la dirección de envió. Ademas, se
tienen en cuenta las direcciones las cuales contengan dos letras en mayúscula despues de una coma para poder
identificar el estado. Se espera poder ver que estados tienen mas descuentos y también su
promedio. Para el promedio se utiliza las ordenes con descuento mayor a 0 para hacer un
promedio de los que si tienen descuento. Tambien se espera analizar obtenemos los mismos resultados que lo resuelto con pandas.

**Codigo**

In [None]:
#3123 Alan Extension Port Andrea, MA 26926, “MA” es el estado.
def extraer_estado(direccion):
  estado=direccion.split(",")
  if len(estado)<=1:
    return None
  estado=estado[1].strip().split()
  if len(estado)<=1:
    return None
  estado=estado[0]
  if estado.isnumeric():
    return None
  if not estado.isalpha() or not estado.isupper() or len(estado)!=2:
    return None
  return estado

def extraer_codigo_postal(direccion):
  codigo_postal=direccion.split(",")
  if len(codigo_postal)<=1:
    return None
  codigo_postal=codigo_postal[1].strip().split()
  if len(codigo_postal)<=1:
    return None

  codigo_postal=codigo_postal[1]
  if not codigo_postal.isnumeric():
    return None
  return codigo_postal
print(extraer_estado("3123 Alan Extension Port Andrea, MA 26926"))

MA


In [None]:
rdd_cp = rdds['orders.csv']\
    .filter(lambda x: x['discount_amount'] is not None and x['discount_amount'] > 0 and x['shipping_address'] is not None)\
    .map(lambda x: (extraer_estado(x['shipping_address']), (1,x['discount_amount'])))\
    .filter(lambda x: x[0] is not None)\
    .reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1]))\
    .cache()

In [None]:
rdd_cp.map(lambda x: (x[0],x[1][0])).takeOrdered(1, key= lambda x : -x[1])

[('HI', 13273)]

In [None]:
rdd_cp.map(lambda x: (x[0],x[1][1]/x[1][0])).takeOrdered(1, key=lambda x: -x[1])

[('CO', 50.416058305496556)]

**Conclusiones**

Se puede observar como HI es el estado que mas descuentos tiene(en cantidad) y el estado Co es el que mejor promedio de descuento tiene. Podemos notar como en pandas obtuvimos resultados distintos pues se extrae el estado distinto. En pandas extraiamamos el estado buscando dos mayusculas. De esa forma aparecian otros estados que aca no atrapamos. Igualmente, no sabemos si eran estados validos, pues necesitamos mas informacion sobre el contexto.

#Órdenes Devueltas y Códigos Postales

**Introduccion**

¿Cuáles son los 5 códigos postales más comunes para las órdenes con estado ‘Refunded’? ¿Y cuál es el nombre más frecuente entre los clientes de esas direcciones?

In [None]:
rdd_orders=rdds['orders.csv']
rdd_orders.take(1)

[Row(order_id=1, customer_id=447917, order_date='2024-07-26T03:04:05.462241', status='completed', payment_method='Digital Wallet', shipping_address='49599 Wesley Burg Richardview, AZ 30649', billing_address='USNV Morrison FPO AP 90901', discount_amount=0.0, tax_amount=None, shipping_cost=None, total_amount=25.96, currency='USD', created_at='2024-07-26T03:04:05.462241', updated_at='2024-07-27T03:04:05.462241', subtotal=0)]

In [None]:
rdd_customers=rdds['customers.csv']
rdd_customers.take(1)

[Row(customer_id=1, email='uthomas@example.net', first_name='Kayla', last_name='Smith', phone='329-672-0449x89707', date_of_birth='1948-05-20', gender='F', country='Brazil', city='south michelle', postal_code=70351, address=None, registration_date='2022-09-22T09:40:47.913063', last_login='2025-01-07T12:09:24.323425', is_active=True, customer_segment='Regular', marketing_consent=True)]

In [None]:
rdd_orders_rf=rdd_orders.filter(lambda x: x['status']=='Refunded' and x['shipping_address'] is not None).cache()
rdd_orders_rf_rbk=rdd_orders_rf.map(lambda x: (extraer_codigo_postal(x['shipping_address']),1)).filter(lambda x: x[0] is not None).reduceByKey(lambda x,y: x+y)
top_5_cp=rdd_orders_rf_rbk.takeOrdered(5, key=lambda x: -x[1])
top_5_cp


[('70696', 5), ('87498', 4), ('66666', 4), ('93851', 4), ('47612', 4)]

In [None]:
cps=set()
for cp,_ in top_5_cp:
  cps.add(cp)
clientes=rdd_orders_rf.filter(lambda x: extraer_codigo_postal(x['shipping_address']) in cps)\
          .map(lambda x: (x['customer_id'],1)).reduceByKey(lambda x,y: x+y).takeOrdered(1, key=lambda x: -x[1])
rdd_clientes=rdd_customers.filter(lambda x: x['customer_id']==clientes[0][0]).map(lambda x: (x['first_name'],x['last_name']))
rdd_clientes.take(1)




[(' Michael', 'Jackson')]

Para cada tipo de pago y segmento de cliente, devolver la suma y el promedio expresado como porcentaje, de clientes activos y de consentimiento de marketing. Se valora que el output de la consulta tenga nombres claros y en español.

In [None]:
orders=rdds['orders.csv'].map(lambda x: (x['customer_id'],x['payment_method'] if x['payment_method'] is not None else 'Undefined'))
customers=rdds['customers.csv'].map(lambda x: (x['customer_id'],(x['customer_segment']if x['customer_segment'] is not None else 'Undefined',1 if x['marketing_consent']is True else 0,1 if x['is_active']is True else 0)))
joined = orders.join(customers)
joined.take(1)

[(489660, ('Bank Transfer ', ('Regular', 1, 0)))]

In [None]:
#  Clave: (tipo_pago, segmento), Valor: (consent, active, total)
grouped = joined.map(
    lambda x: (
        (x[1][0].strip().upper(), x[1][1][0].strip().upper()),
        (x[1][1][1], x[1][1][2], 1)
    )
)


reduced = grouped.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2])
)

import builtins # se importa para que se de cuenta de no usar el metodo round de spark

result = reduced.map(lambda x: {
    "Tipo de pago": x[0][0],
    "Segmento": x[0][1],
    "Total clientes": x[1][2],
    "Consentimiento marketing (suma)": x[1][0],
    "Consentimiento marketing (%)": builtins.round((x[1][0]/x[1][2]*100) if x[1][2]>0 else 0, 2),
    "Clientes activos (suma)": x[1][1],
    "Clientes activos (%)": builtins.round((x[1][1]/x[1][2]*100) if x[1][2]>0 else 0, 2)
})
result.take(5)



[{'Tipo de pago': 'DEBIT CARD',
  'Segmento': 'REGULAR',
  'Total clientes': 390574,
  'Consentimiento marketing (suma)': 273540,
  'Consentimiento marketing (%)': 70.04,
  'Clientes activos (suma)': 351591,
  'Clientes activos (%)': 90.02},
 {'Tipo de pago': 'UNDEFINED',
  'Segmento': 'UNDEFINED',
  'Total clientes': 38798,
  'Consentimiento marketing (suma)': 27029,
  'Consentimiento marketing (%)': 69.67,
  'Clientes activos (suma)': 34851,
  'Clientes activos (%)': 89.83},
 {'Tipo de pago': 'BANK TRANSFER',
  'Segmento': 'UNDEFINED',
  'Total clientes': 65325,
  'Consentimiento marketing (suma)': 45603,
  'Consentimiento marketing (%)': 69.81,
  'Clientes activos (suma)': 58775,
  'Clientes activos (%)': 89.97},
 {'Tipo de pago': 'UNDEFINED',
  'Segmento': 'REGULAR',
  'Total clientes': 230845,
  'Consentimiento marketing (suma)': 161836,
  'Consentimiento marketing (%)': 70.11,
  'Clientes activos (suma)': 207565,
  'Clientes activos (%)': 89.92},
 {'Tipo de pago': 'DEBIT CARD',
 

Para los productos que contienen en su descripción la palabra “stuff” (sin importar mayúsculas o minúsculas), calcular el peso total de su inventario agrupado por marca, mostrar sólo la marca y el peso total de las 5 más pesadas.

In [None]:
products=rdds['products.csv']
products.take(1)

[Row(product_id='1', product_name='Centralized attitude-oriented synergy ', category_id=164, brand='undefined', price=120.22, cost=49.48, stock_quantity=754, weight_kg=None, dimensions='194x144x13 ', description='Hotel quickly amount care meet. Likely consider agreement citizen organization.', is_active=True, created_at=' 2024-05-12T00:44:32.354038')]

In [None]:
stuff_products = products.filter(
    lambda x: x['description'] is not None and 'stuff' in x['description'].lower().strip()
)

print(stuff_products.count())

stuff_products = stuff_products.map(
    lambda x: (
        'UNDEFINED' if x['brand'] is None else x['brand'].strip().upper(),
        (x['weight_kg'] if x['weight_kg'] else 0) * (x['stock_quantity'] if x['stock_quantity'] else 0)
    )
)

stuff_products = stuff_products.reduceByKey(lambda a, b: a + b)

top5 = stuff_products.takeOrdered(5, key=lambda x: -x[1])

top5


12411


[('UNDEFINED', 9484173.560000002),
 ('3M', 1851279.46),
 ('ADIDAS', 1578522.37),
 ('HASBRO', 1506279.13),
 ('WAYFAIR', 1448357.5499999998)]

Calculen el porcentaje de productos cuyo stock es al menos 20% más alto que el stock promedio de su marca. Por ejemplo, si el stock promedio de la marca Adidas fuera 100, para los productos de dicha marca la condición será que tengan un stock mayor a 120, y luego se deberá calcular qué porcentaje del total de productos cumple con esta condición.

In [None]:
products=rdds['products.csv']
products.take(1)

[Row(product_id='1', product_name='Centralized attitude-oriented synergy ', category_id=164, brand='undefined', price=120.22, cost=49.48, stock_quantity=754, weight_kg=None, dimensions='194x144x13 ', description='Hotel quickly amount care meet. Likely consider agreement citizen organization.', is_active=True, created_at=' 2024-05-12T00:44:32.354038')]

In [None]:
stock_promedio_por_marca = products.filter(lambda x: x['stock_quantity']is not None)\
.map(lambda x: ('undefined'if x['brand']is None else x['brand'].strip().lower(),(x['stock_quantity'],1)))\
    .reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))\
    .map(lambda x: (x[0],x[1][0]/x[1][1]))
stock_promedio_por_marca.take(1)

[('mobil', 505.56068232662193)]

In [None]:
# Producto con su marca y stock
producto_stock = products.filter(lambda x: x['stock_quantity'] is not None and x['stock_quantity']>0) \
    .map(lambda x: (
        x['product_id'],
        (x['brand'].strip().lower() if x['brand'] is not None else 'undefined', x['stock_quantity'])
    ))

marca_producto_stock = producto_stock.map(lambda x: (x[1][0], (x[0], x[1][1])))

joined = marca_producto_stock.join(stock_promedio_por_marca)
# => (marca, ((product_id, stock), stock_promedio))

productos_20 = joined.filter(lambda x: x[1][0][1] >= x[1][1] * 1.2).map(lambda x: (x[1][0][0],x[1][0][1]))

porcentaje = builtins.round(productos_20.count() / products.count() * 100, 2)
print(f"{porcentaje}%")


27.57%


Obtener la cantidad de órdenes que no hayan comprado ninguno de los 10 productos más vendidos. Preguntar si esta bien hacer asi con el set de los top 10

In [None]:
order_items=rdds['order_items.csv']
order_items.take(1)

[Row(order_item_id=1, order_id=19900399, product_id=940377, quantity=6, unit_price=569.28, line_total=3415.68, discount_amount=0.0)]

In [None]:
top10=order_items.map(lambda x: (x['product_id'],1)).reduceByKey(lambda x,y: x+y).takeOrdered(10, key=lambda x: -x[1])
top10=[x[0] for x in top10]
top10_set=set(top10)
print(top10_set)


{905852, 974404, 962918, 969738, 982924, 990386, 992501, 977590, 942268, 937470}


In [None]:
orders_with_top10=order_items.filter(lambda x: x['product_id'] in top10_set).map(lambda x: x['order_id']).distinct()
rdds['order_items.csv'].map(lambda x: x['order_id']).distinct().count() - orders_with_top10.count()


99490

# Se analiza la cantidad de incidentes ocacionadas por danios o por robos a lo largo de los registros del inventario. agrupado por categoria padre y tipo de incidente

In [None]:
incidentes=rdds['inventory_logs.csv'].filter(lambda x: x['reason']is not None and x['reason'].strip().lower() in ['theft', 'damage'])\
.map(lambda x: ((x['product_id'],x['reason'].strip().lower()),1))\
.reduceByKey(lambda x,y: x+y).map(lambda x: (x[0][0],(x[0][1],x[1])))
incidentes.take(1)
#(productid, (tipoincidente,cantidad))

[(985794, ('damage', 1))]

In [None]:
categories=rdds['categories.csv'].map(lambda x: (x['category_id'],x['parent_category'].strip().lower() if x['parent_category'] is not None else 'undefined'))
categories.take(1)

[(1, 'electronics')]

In [None]:
rdd_product=rdds['products.csv'].map(lambda x: (x['category_id'],x['product_id']))
rdd_product.take(1)
#

[(164, '1')]

In [None]:
product_padre=categories.join(rdd_product).map(lambda x: (int(x[1][1]),x[1][0]))
product_padre.take(1)
# (product id ,categoriapadre)

[(35, 'undefined')]

In [None]:
incides_por_producto=product_padre.join(incidentes)

In [None]:
incidentes_por_categoria_y_inc=incides_por_producto.map(lambda x: ((x[1][0],x[1][1][0]),x[1][1][1])).reduceByKey(lambda x,y: x+y)
incidentes_por_categoria_y_inc.collect()

[(('undefined', 'damage'), 5350),
 (('clothing', 'damage'), 1165),
 (('home & garden', 'theft'), 1441),
 (('toys & games', 'damage'), 1120),
 (('shoes', 'theft'), 1798),
 (('shoes', 'damage'), 1716),
 (('travel', 'damage'), 1687),
 (('travel', 'theft'), 1766),
 (('pet care', 'damage'), 1728),
 (('pet care', 'theft'), 1731),
 (('handmade', 'damage'), 1481),
 (('collectibles', 'damage'), 1755),
 (('tickets & experiences', 'damage'), 1725),
 (('tickets & experiences', 'theft'), 1676),
 (('stationery', 'theft'), 1476),
 (('electronics', 'damage'), 1112),
 (('electronics', 'theft'), 1099),
 (('clothing', 'theft'), 1134),
 (('books', 'damage'), 1419),
 (('home & garden', 'damage'), 1480),
 (('toys & games', 'theft'), 1126),
 (('food & beverages', 'theft'), 1127),
 (('baby & kids', 'theft'), 1388),
 (('office supplies', 'theft'), 1476),
 (('tools & hardware', 'theft'), 1807),
 (('tools & hardware', 'damage'), 1639),
 (('art & crafts', 'theft'), 1479),
 (('garden & outdoor', 'damage'), 1796),


# Profit por categoria padre

In [None]:
products=rdds['products.csv']
products.take(1)

[Row(product_id='1', product_name='Centralized attitude-oriented synergy ', category_id=164, brand='undefined', price=120.22, cost=49.48, stock_quantity=754, weight_kg=None, dimensions='194x144x13 ', description='Hotel quickly amount care meet. Likely consider agreement citizen organization.', is_active=True, created_at=' 2024-05-12T00:44:32.354038')]

In [None]:
categories=rdds['categories.csv']
categories.take(1)

[Row(category_id=1, category_name='Smartphones', parent_category='ELECTRONICS', created_at=None)]

In [None]:
category_product_profit = products.filter(lambda x: x['cost'] is not None and x['cost'] > 0 and x['price'] is not None and x['price'] > 0)\
.map(lambda x: (x['category_id'],(x['price'] / x['cost'],1)))
category_product_profit.take(1)

[(164, (2.4296685529506874, 1))]

In [None]:
categories=categories.map(lambda x: (x['category_id'],x['parent_category'].strip().lower() if x['parent_category'] is not None else 'undefined'))
categories.take(1)

[(1, 'electronics')]

In [None]:
padre_profit=category_product_profit.join(categories).map(lambda x: (x[1][1],x[1][0])).reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])).map(lambda x: (x[0],x[1][0]/x[1][1]))
padre_profit.collect()

[('shoes', 1.959918234826427),
 ('stationery', 1.9668827118497516),
 ('grocery & gourmet food', 1.961805623042106),
 ('handmade', 1.9596242741945904),
 ('industrial & scientific', 1.9620134162767187),
 ('tickets & experiences', 1.9679209714105521),
 ('musical instruments', 1.9581192106856062),
 ('electronics', 1.962056652334012),
 ('books', 1.953318222761027),
 ('baby & kids', 1.9650375075694704),
 ('undefined', 1.9627697464431964),
 ('pet care', 1.9586721482656282),
 ('garden & outdoor', 1.9649590483322132),
 ('clothing', 1.9596314593924506),
 ('automotive', 1.958333312666731),
 ('games & virtual goods', 1.965023573875599),
 ('pet supplies', 1.9659007048513224),
 ('travel', 1.9622007331549582),
 ('health & beauty', 1.9677724568952004),
 ('toys & games', 1.9616580027049286),
 ('furniture', 1.9565270490776407),
 ('kitchen & dining', 1.964982404314798),
 ('art & crafts', 1.9714027840307682),
 ('collectibles', 1.9623116342762086),
 ('sports & outdoors', 1.9678647590557847),
 ('music & mov

El producto con mejor promedio en reviews con al menos 5 reviews

In [None]:
reviews=rdds['reviews.csv']
reviews.take(1)
#

[Row(review_id='1', customer_id=473261, product_id='933144', rating=5, title='Reveal threat.', comment='Hospital them face left space. Business answer although add sound country.', is_verified_purchase=None, helpful_votes=None, created_at=None)]

In [None]:
productos_con_5_reviews=reviews.filter(lambda x: x['product_id']is not None and x['rating'] is not None)\
.map(lambda x: (x['product_id'],(x['rating'],1)))\
.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])).filter(lambda x: x[1][1]>=5)\
.map(lambda x: (x[0],(x[1][0]/x[1][1]),x[1][1]))
productos_con_5_reviews.takeOrdered(1, key=lambda x: -x[1])

[('996442', 5.0, 5)]

Ver queanio hay datos de todo el mes




In [None]:
orders=rdds['orders.csv']
orders.take(1)

[Row(order_id=1, customer_id=447917, order_date='2024-07-26T03:04:05.462241', status='completed', payment_method='Digital Wallet', shipping_address='49599 Wesley Burg Richardview, AZ 30649', billing_address='USNV Morrison FPO AP 90901', discount_amount=0.0, tax_amount=None, shipping_cost=None, total_amount=25.96, currency='USD', created_at='2024-07-26T03:04:05.462241', updated_at='2024-07-27T03:04:05.462241', subtotal=0)]

In [None]:

def parse_date(x):
    try:
        date = datetime.datetime.strptime(x['order_date'], '%Y-%m-%dT%H:%M:%S.%f')
        return (x['order_id'], (date.year, date.month))  # guardo también el id
    except:
        return None

In [None]:
orders_parsed = orders.map(parse_date).filter(lambda x: x is not None).cache()


In [None]:
anio_mes = orders_parsed.map(lambda x: x[1])
print(anio_mes.take(5))


[(2024, 7), (2025, 3), (2023, 11), (2025, 2), (2025, 3)]


In [None]:
meses_por_anio = anio_mes.distinct() \
    .map(lambda x: (x[0], 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .filter(lambda x: x[1] == 12)

anios = [x[0] for x in meses_por_anio.collect()]
print(anios)

[2024, 2023]


In [None]:
orders_con_anios_completos = orders_parsed.filter(lambda x: x[1][0] in anios)
mes_cantidad = orders_con_anios_completos \
    .map(lambda x: (x[1][1], 1)) \
    .reduceByKey(lambda x, y: x + y)

print(mes_cantidad.take(5))

[(8, 185396), (9, 195341), (1, 99075), (10, 218031), (2, 100901)]
