In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt update
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m987.4/987.4 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for PyDrive (setup.py) ... [?25l[?25hdone
Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 https://cli.github.com/packages stable InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,275 

In [3]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

# Creamos el Spark Context

In [5]:
# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [6]:
type(sc)

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 4) Para los productos que contienen en su descripción la palabra “stuff” (sin importar mayúsculas o minúsculas), calcular el peso total de su inventario agrupado por marca, mostrar sólo la marca y el peso total de las 5 más pesadas.

In [8]:
DATA_PATH = "/content/drive/MyDrive/CienciaDeDatos/TP1/data/"

sqlContext = SQLContext(sc)

rdd_products = (
    sqlContext.read.csv(DATA_PATH + "products.csv", header=True, inferSchema=True)
    .select("product_id", "description", 'weight_kg', 'brand')
    .rdd
    )

rdd_inventory_logs = (
    sqlContext.read.csv(DATA_PATH + 'inventory_logs.csv', header=True, inferSchema=True)
    .select('product_id', 'quantity_change')
    .rdd
)




In [43]:
DESCRIPTION_KEYWORD = "stuff"
BRAND_AMOUNT_TO_SHOW = 5

def normalize_category(value):
    if value is None:
        return None
    v = str(value).strip().lower()
    if v in {"nan", "na", "undefined", "none", ""}:
        return None
    return v.title()

rdd_products_nm = (
    rdd_products
    .filter(lambda row: row["weight_kg"] is not None)
    .map(lambda row: (int(row["product_id"]),
        (
          normalize_category(row["description"]),
          float(row["weight_kg"]),
          normalize_category(row["brand"])
        )
    ))
    .filter(lambda row:
        row[1][0] and row[1][1] is not None and row[1][2] is not None and
        DESCRIPTION_KEYWORD.lower() in row[1][0].lower()
    )
)

rdd_inventory_amount = (
    rdd_inventory_logs
    .filter(lambda row: row["quantity_change"] is not None)
    .map(lambda row: (int(row["product_id"]), int(row["quantity_change"])))
    .reduceByKey(lambda a, b: a + b)
)

rdd_products_inventory = rdd_products_nm.join(rdd_inventory_amount)

weight_count_per_brand = (
    rdd_products_inventory
    .map( lambda row: (row[1][0][2], (row[1][0][1] * row[1][1])) )
    .reduceByKey(lambda a, b: a + b)
)

top_brands = weight_count_per_brand.takeOrdered(BRAND_AMOUNT_TO_SHOW, key=lambda x: -x[1])

print(top_brands)


[('Stubhub', 92855.6), ('Cricut', 88683.57), ('Bosch', 70196.71), ('Pilot', 67907.37), ('Universal Music', 64708.659999999996)]


# Conclusiones:
Se obtuvieron las cinco marcas con mayor peso total:
 - Stubhub: 92855.60 kg
 - Cricut:  88683.57 kg
 - Bosch: 70196.71 kg
 - Pilot:  67907.37 kg
 - Universal Music: 64708.66 kg

Lamentablemente, esta vez se observaron notables diferencias con el procesamiento anterior hecho solo sin SPARK. Los resultados muestran valores distintos para los pesos sumados de cada marca. Se intentó extensivamente encontrar la manera de matchear los resultados, sin éxito. Es probable que se deba al manejo distinto de valores inválidos y nulos. Se muestran números similares, incluso para algunas marcas el valor es el mismo, pero en general es un resultado distinto y esto es claramente un error en el código de una de las dos consultas.

### Resultados sin SPARK:
 - Stubhub: 92855.60 kg
 - Bosch: 92757.51 kg
 - Cricut:  82866.07 kg
 - Sony Music: 81667.15 kg
 - Pilot:  67907.37 kg
