### Analyse search terms on the e-commerce web server


##### In this assignment you will download the search term data set for the e-commerce web server and run analytic queries on it.


In [1]:
# Install spark
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.4.4.tar.gz (311.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m311.4/311.4 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m35.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.4-py2.py3-none-any.whl size=311905466 sha256=781d3551dbad64106ff3a7c391c4fdc3208f6b10af0510c290a2a0f37341e6f4
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/4e/66/db/939eb1c49afb8a7fd2c4e393ad34e12b77db67bb4cc974c00e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.4
C

In [5]:
# Import findspark y otras librerías iniciales
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [6]:
# Start session
# Crear SparkContext y SparkSession

#sc = SparkContext() # A veces iniciar el contexto explícitamente así puede dar problemas si ya existe uno.
                     # SparkSession.builder.getOrCreate() usualmente maneja la creación del contexto.

spark = SparkSession \
    .builder \
    .appName("ECommerceSearchAnalysis") \
    .getOrCreate()

sc = spark.sparkContext # Obtener el SparkContext de la SparkSession es más seguro

print("SparkSession y SparkContext iniciados.")
print("Spark version:", spark.version)
print("SparkContext version:", sc.version)

SparkSession y SparkContext iniciados.
Spark version: 2.4.3
SparkContext version: 2.4.3


In [7]:
# Download The search term dataset
# https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv

# Usaremos un comando de shell para descargar el archivo a la ubicación donde Spark pueda leerlo.
# Primero, vamos a definir el nombre del archivo.
file_name = "searchterms.csv"
url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv"

# Descargar usando wget
!wget -O {file_name} {url}

# Verificar que el archivo se descargó (opcional, pero bueno para depurar)
!ls -l {file_name}

--2025-05-07 03:48:34--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/searchterms.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 233457 (228K) [text/csv]
Saving to: ‘searchterms.csv’


2025-05-07 03:48:34 (48.4 MB/s) - ‘searchterms.csv’ saved [233457/233457]

-rw-r--r-- 1 jupyterlab resources 233457 Sep 29  2022 searchterms.csv


In [8]:
# Load the csv into a spark dataframe
# Spark puede inferir el esquema y sabe que es un CSV con encabezado.

# Definir la ruta al archivo descargado
# Si la celda anterior lo descargó en el directorio actual del notebook, solo el nombre es suficiente.
file_path = file_name # "searchterms.csv"

# Leer el CSV en un DataFrame de Spark
# Es importante decirle a Spark que el CSV tiene una cabecera y que infiera el esquema.
search_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Mostrar una muestra para verificar que se cargó correctamente
search_df.show(5)
search_df.printSchema()

                                                                                

+---+-----+----+--------------+
|day|month|year|    searchterm|
+---+-----+----+--------------+
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|laptop 14 inch|
| 12|   11|2021|     mobile 5g|
+---+-----+----+--------------+
only showing top 5 rows

root
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- searchterm: string (nullable = true)



In [9]:
# Print the number of rows and columns

num_rows = search_df.count()
num_cols = len(search_df.columns)

print(f"El DataFrame tiene {num_rows} filas.")
print(f"El DataFrame tiene {num_cols} columnas.")
print(f"Nombres de las columnas: {search_df.columns}")

[Stage 3:>                                                          (0 + 1) / 1]

El DataFrame tiene 10000 filas.
El DataFrame tiene 4 columnas.
Nombres de las columnas: ['day', 'month', 'year', 'searchterm']


                                                                                

In [10]:
# Print the top 5 rows

print("Las primeras 5 filas del DataFrame son:")
search_df.show(5) # show() por defecto muestra 20, pero podemos especificar 5.
# Alternativamente, para obtener una lista de objetos Row:
# top_5_rows_list = search_df.head(5)
# for row in top_5_rows_list:
# print(row)

Las primeras 5 filas del DataFrame son:
+---+-----+----+--------------+
|day|month|year|    searchterm|
+---+-----+----+--------------+
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|laptop 14 inch|
| 12|   11|2021|     mobile 5g|
+---+-----+----+--------------+
only showing top 5 rows



In [11]:
# Find out the datatype of the column searchterm?

# Imprimir el esquema completo para ver todos los tipos de datos
search_df.printSchema()

# Para obtener específicamente el tipo de dato de la columna 'searchterm'
# El archivo CSV parece tener columnas como 'day', 'month', 'year', 'searchterm'
# Vamos a asumir que la columna de interés se llama 'searchterm'
# Si el nombre de la columna es diferente después de cargar el CSV (por ejemplo, con espacios o mayúsculas/minúsculas),
# ajústalo aquí. Las cabeceras en el CSV son probablemente 'Day', 'Month', 'Year', 'Search Term'.
# Spark a menudo reemplaza espacios con guiones bajos o los maneja, pero es bueno verificar.
# Por ahora, asumiré que Spark la cargó como 'searchterm' o 'Search Term'.
# Si la cabecera original es 'Search Term', Spark podría haberla cargado como `Search Term` o `Search_Term`

# Primero, veamos los nombres exactos de las columnas después de la carga
print(f"Nombres de columna reales: {search_df.columns}")

# Asumiendo que la columna se llama 'searchterm' o 'Search Term'
# Ajusta el nombre de la columna si es necesario después de ver search_df.columns
column_name_to_check = None
if 'searchterm' in search_df.columns:
    column_name_to_check = 'searchterm'
elif 'Search Term' in search_df.columns: # El CSV original probablemente tiene esta cabecera
    column_name_to_check = 'Search Term'
elif 'Search_Term' in search_df.columns: # Spark a veces reemplaza espacios con _
    column_name_to_check = 'Search_Term'


if column_name_to_check:
    searchterm_dtype = search_df.schema[column_name_to_check].dataType
    print(f"El tipo de dato de la columna '{column_name_to_check}' es: {searchterm_dtype}")
else:
    print("ERROR: La columna 'searchterm' (o una variante) no fue encontrada. Por favor, verifica los nombres de columna.")

root
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- searchterm: string (nullable = true)

Nombres de columna reales: ['day', 'month', 'year', 'searchterm']
El tipo de dato de la columna 'searchterm' es: StringType


In [13]:
# How many times was the term `gaming laptop` searched?
from pyspark.sql.functions import col

# Asegúrate de usar el nombre correcto de la columna que contiene los términos de búsqueda
# Asumiremos que es column_name_to_check de la celda anterior, o directamente 'Search Term'
# si lo confirmaste.
# Si la columna se cargó como `Search Term` (con espacio), necesitas usar comillas invertidas: col("`Search Term`")

# Primero, verifica el nombre exacto de la columna de términos de búsqueda de la celda anterior
# Si es 'Search Term' (con espacio):
search_term_column_actual_name = "Searchterm" # Ajusta si es diferente

count_gaming_laptop = search_df.filter(col(f"`{search_term_column_actual_name}`") == "gaming laptop").count()

print(f"El término 'gaming laptop' fue buscado {count_gaming_laptop} veces.")

El término 'gaming laptop' fue buscado 499 veces.


In [14]:
# Print the top 5 most frequently used search terms?
from pyspark.sql.functions import desc

# De nuevo, usa el nombre correcto de la columna de términos de búsqueda
# search_term_column_actual_name = "Search Term" # (del paso anterior)

top_5_search_terms = search_df.groupBy(f"`{search_term_column_actual_name}`") \
                              .count() \
                              .orderBy(desc("count")) \
                              .limit(5)

print("Los 5 términos de búsqueda más frecuentes son:")
top_5_search_terms.show(truncate=False)

Los 5 términos de búsqueda más frecuentes son:




+-------------+-----+
|Searchterm   |count|
+-------------+-----+
|mobile 6 inch|2312 |
|mobile 5g    |2301 |
|mobile latest|1327 |
|laptop       |935  |
|tablet wifi  |896  |
+-------------+-----+



                                                                                

In [15]:
# The pretrained sales forecasting model is available at the below url
# https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz

model_file_name = "model.tar.gz"
model_url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz"

# Descargar el modelo
!wget -O {model_file_name} {model_url}

# Verificar
!ls -l {model_file_name}

# Los modelos de SparkML a menudo se guardan como directorios.
# Un archivo .tar.gz es un archivo comprimido. Necesitamos descomprimirlo y extraerlo.
# El nombre del directorio del modelo suele ser el mismo que el del archivo sin .tar.gz, o
# puede estar contenido dentro del tar.
# Vamos a crear un directorio y extraerlo allí.
model_dir_name = "sales_forecast_model_dir" # Nombre del directorio donde extraeremos el modelo
!mkdir -p {model_dir_name}
!tar -xzf {model_file_name} -C {model_dir_name} # Extraer en el directorio especificado

# Verificar el contenido del directorio del modelo
!ls -l {model_dir_name}
# Deberías ver subdirectorios como 'metadata', 'stages', etc., si es un modelo de Pipeline.
# O 'metadata' y 'data' si es un modelo simple.

--2025-05-07 03:50:29--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1490 (1.5K) [application/x-tar]
Saving to: ‘model.tar.gz’


2025-05-07 03:50:29 (6.16 MB/s) - ‘model.tar.gz’ saved [1490/1490]

-rw-r--r-- 1 jupyterlab resources 1490 Sep 29  2022 model.tar.gz
total 4
drwxr-xr-x 1 jupyterlab resources 4096 Mar 16  2022 sales_prediction.model


In [18]:
# Load the sales forecast model.
from pyspark.ml.regression import LinearRegressionModel # <--- CAMBIO IMPORTANTE AQUÍ

# model_dir_name fue definido en la celda anterior como "sales_forecast_model_dir"
path_to_model_files = f"{model_dir_name}/sales_prediction.model"

print(f"Contenido del directorio del modelo ({path_to_model_files}):")
!ls -l {path_to_model_files}

try:
    sales_model = LinearRegressionModel.load(path_to_model_files) # <--- CAMBIO IMPORTANTE AQUÍ
    print(f"Modelo de pronóstico de ventas (LinearRegressionModel) cargado exitosamente desde: {path_to_model_files}")
    # Puedes imprimir algunas propiedades del modelo para verificar
    print(f"Coeficientes del modelo: {sales_model.coefficients}")
    print(f"Intercepto del modelo: {sales_model.intercept}")
except Exception as e:
    print(f"Error al cargar el modelo: {e}")
    print("Verifica la ruta y el tipo de modelo.")

Contenido del directorio del modelo (sales_forecast_model_dir/sales_prediction.model):
total 8
drwxr-xr-x 1 jupyterlab resources 4096 Mar 16  2022 data
drwxr-xr-x 1 jupyterlab resources 4096 Mar 16  2022 metadata


[Stage 15:>                                                         (0 + 1) / 1]

Modelo de pronóstico de ventas (LinearRegressionModel) cargado exitosamente desde: sales_forecast_model_dir/sales_prediction.model
Coeficientes del modelo: [6.522567861288859]
Intercepto del modelo: -13019.989140447298


                                                                                

In [20]:
# Using the sales forecast model, predict the sales for the year of 2023.
# Para hacer una predicción, necesitamos crear un DataFrame con las características
# que el modelo espera. No sabemos qué características son sin más información
# sobre el modelo.
# Asumamos que el modelo espera una característica de 'year' o alguna característica temporal.

# Esto es MUY ESPECULATIVO y probablemente necesite ser ajustado
# basado en las características reales que el modelo 'sales_model' espera.

# Si el modelo espera una columna 'features' que es un vector del año:
from pyspark.ml.feature import VectorAssembler

# Crear un DataFrame con el año para el que queremos predecir
year_to_predict_data = [(2023,)] # Asumiendo que el modelo toma el año como una característica escalar
year_df_schema = ["year_feature"] # Nombre de la columna temporal
predict_df_raw = spark.createDataFrame(year_to_predict_data, year_df_schema)

# Si el modelo (o su pipeline) espera una columna llamada 'features'
# y esta se crea a partir de 'year_feature':
try:
    
    
    assembler_pred = VectorAssembler(inputCols=["year_feature"], outputCol="features_for_pred")
    predict_df_assembled = assembler_pred.transform(predict_df_raw)
    
    # Seleccionar solo la columna de características si el modelo la espera directamente
    # (y no es un PipelineModel que la busque desde el nombre original)
    # predict_df_final = predict_df_assembled.select("features_for_pred")
    # Renombrar 'features_for_pred' a 'features' si el modelo espera 'features'
    predict_df_final = predict_df_assembled.withColumnRenamed("features_for_pred", "features").select("features")

    print("DataFrame para predicción:")
    predict_df_final.show()
    
    # Hacer la predicción
    # La columna de predicción suele llamarse 'prediction' por defecto
    forecast = sales_model.transform(predict_df_final) # Usar predict_df_raw si sales_model es un Pipeline que maneja el ensamblaje
                                                        # Usar predict_df_final si sales_model es un estimador que espera "features"
    
    print("Predicción de ventas para 2023:")
    forecast.select("prediction").show()

except Exception as e:
    print(f"Error al hacer la predicción: {e}")
    print("No se pudo realizar la predicción. Esto puede deberse a que:")
    print("1. El modelo no se cargó correctamente.")
    print("2. El DataFrame de entrada para la predicción no tiene el esquema/características que el modelo espera.")
    print("   - Necesitamos saber qué columnas de entrada y en qué formato (ej. vector 'features') espera el 'sales_model'.")
    print("   - Si 'sales_model' es un PipelineModel, podría esperar las columnas originales antes del ensamblaje.")

DataFrame para predicción:
+--------+
|features|
+--------+
|[2023.0]|
+--------+

Predicción de ventas para 2023:
+------------------+
|        prediction|
+------------------+
|175.16564294006457|
+------------------+

