In [1]:
%load_ext IPython.extensions.autoreload
%autoreload 2

In [2]:
import sys
from pathlib import Path

def find_src_folder(current_path: Path, folder_name: str = 'src') -> Path:
    search_directories = [current_path] + list(current_path.parents)
    for parent in search_directories:
        if parent.name == folder_name:
            return parent.parent
    return current_path

src_path = find_src_folder(Path.cwd(), 'src')
sys.path.append(str(src_path))

# Extracción de información de productos y reseñas de Amazon Reviews
Se tomará el repositorio de datos público de reseñas de productos de Amazon Reviews. Se trata de un conjunto de información recopilada en el 2023 referente a productos de diversas categorías. Actualmente se plantea el estudio de únicamente los productos pertenecientes al entorno tecnológico.

Específicamente, se eligieron las siguientes categorías:
- Accesorios y Celulares
- Electrónica
- Software
- Videojuegos

## Importación de librerias necesarias

In [3]:
import os
from pyspark.sql import functions as F, types as T, DataFrame
from pyspark.sql import SparkSession
import html, re

In [4]:
from src.utils import SparkUtils
from src.config import BASE_JSON_PATH

  from pkg_resources import parse_version





## Definición de variables necesarias

In [5]:
REGENERATE_INTERMEDIATE = True

In [6]:
spark_utils = SparkUtils('consume')

In [7]:
spark = spark_utils.spark

## Importar información de caracterización de productos
Se carga información de productos, incluyendo características abstraídas en su descripción, categoría principal, cantidad de referencias, calificación promedio, precio y código interno (parent_asin). Este último código refleja la categoría general del producto, descartando características particulares como color o presentación, lo cuál no se considerará dentro del estudio.

El esquema de los metadatos de items se compone de los campos descritos a continuación:

In [8]:
item_schema = T.StructType([
    T.StructField("title", T.StringType(), True),
    T.StructField("main_category", T.StringType(), True),
    T.StructField("features", T.ArrayType( T.StringType() ), True),
    T.StructField("description", T.ArrayType( T.StringType() ), True),
    T.StructField("average_rating", T.FloatType(), True),
    T.StructField("rating_number", T.IntegerType(), True),
    T.StructField("price", T.DoubleType(), True),
    T.StructField("store", T.StringType(), True),
    T.StructField("parent_asin", T.StringType(), True),
    T.StructField("categories", T.ArrayType(T.StringType()), True),
    T.StructField("details", T.StringType(), True),
    T.StructField("images", T.ArrayType(T.StringType()), True),
])

#### Se consideran 4 categorìas: Software, Electrónicos, Celulares & Accesorios y Videojuegos.

In [9]:
def load_category_data( path ) -> DataFrame:
    file = (
        spark.read
            .format('json')
            .option("quote",'/"')
            .schema(item_schema)
            .load(path)
        )

    return file

In [10]:
def merge_dfs( *args ):
    base = args[0]
    for i in range(1, len(args)):
        base = base.unionByName( args[i] )

    return base

In [11]:
if REGENERATE_INTERMEDIATE:
    meta_software = load_category_data( f"{BASE_JSON_PATH}/meta_categories/meta_Software.jsonl" )
    meta_electronics = load_category_data( f"{BASE_JSON_PATH}/meta_categories/meta_Electronics.jsonl" )
    meta_Cell_Phones_and_Accessories = load_category_data( f"{BASE_JSON_PATH}/meta_categories/meta_Cell_Phones_and_Accessories.jsonl" )
    meta_Video_Games = load_category_data( f"{BASE_JSON_PATH}/meta_categories/meta_Video_Games.jsonl" )

    unified_meta_items = merge_dfs(
        meta_software, meta_electronics,
        meta_Cell_Phones_and_Accessories, meta_Video_Games
    )

    (
        unified_meta_items.write
            .format('delta')
            .mode('overwrite')
            .save(
                spark_utils.path('meta_items')
            )
    )

unified_meta_items = spark.read.format('delta').load(spark_utils.path('meta_items'))

##### Cantidad de productos totales distintos

Se encuentran un total de 7'854.088 productos distintos dentro de las categorías preseleccionadas

In [12]:
unified_meta_items.count()

3125022

Se revisa la cantidad de productos padres distintos, es decir, productos reales descartando variantes como color y presentaciòn

Descartando varianetes de productos, se encuentran 89.251 productos diferentes entre las categorías seleccionadas

In [13]:
unified_meta_items[['parent_asin']].distinct().count()

3125022

## Importar información de referencias de usuarios del producto
Se carga información de opiniones de usuarios sobre diferentes productos. Los productos se califican en un rango de 1 a 5, incluyendo un título y texto, así mismo, se incluye información de imágenes, código de la variante del producto, código padre del producto (descartando variantes como color, presentación, etc), identificador de usuario, hora de la revisión, y calificación de la referencia (helpful).

In [14]:
review_schema = T.StructType([
    T.StructField("rating", T.FloatType(), True),
    T.StructField("title", T.StringType(), True),
    T.StructField("text", T.StringType(), True),
    T.StructField("timestamp", T.StringType(), True),
    T.StructField("helpful_vote", T.IntegerType(), True),
    T.StructField("parent_asin", T.StringType(), True),
    T.StructField("images", T.ArrayType(T.StringType()), True),
])

In [15]:
def load_review_data( path ) -> DataFrame:
    file = (
        spark.read
            .format('json')
            .option("quote",'/"')
            .schema(review_schema)
            .load(path)
        )

    return file

In [16]:
if REGENERATE_INTERMEDIATE:

    software = load_review_data( f"{BASE_JSON_PATH}/review_categories/Software.jsonl" )
    electronics = load_review_data( f"{BASE_JSON_PATH}/review_categories/Electronics.jsonl" )
    cell_Phones_and_Accessories = load_review_data( f"{BASE_JSON_PATH}/review_categories/Cell_Phones_and_Accessories.jsonl" )
    video_Games = load_review_data( f"{BASE_JSON_PATH}/review_categories/Video_Games.jsonl" )

    unified_reviews = (
        software.unionByName(
            electronics.unionByName(
                cell_Phones_and_Accessories.unionByName(
                    video_Games
                )
            )
        )
    )

    (
        unified_reviews.write
            .format('delta')
            .mode('overwrite')
            .save(spark_utils.path('reviews'))
    )

unified_reviews = spark.read.format('delta').load(spark_utils.path('reviews'))

Se encuentran un total de 222'614.055 reseñas de productos diferentes para las categorías preselccionadsa

In [17]:
unified_reviews.count()

74204685

## Limpieza de textos
Algunos elementos de referencias de productos poseen carácteres derivados de HTML y carácteres especiales. Al tratarse de una representación semántica, se requiere de la transformación de dichos textos

In [18]:
def clean_text(text):
    if text is None:
        return None
    text = html.unescape(text)
    text = re.sub(r'<[^>]+>', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

In [19]:
clean_text_udf = F.udf(clean_text, T.StringType())

In [20]:
if REGENERATE_INTERMEDIATE:

    unified_reviews_clean = (
        unified_reviews.withColumn(
            'text', clean_text_udf( F.col('text') )
        )
    )

    (
        unified_reviews_clean.write
            .format('delta')
            .mode('overwrite')
            .save(spark_utils.path('reviews_clean'))
    )

unified_reviews_clean = spark.read.parquet(spark_utils.path('reviews_clean'))

In [21]:
unified_reviews.count()

74204685