## Utilización, procesamiento y visualiazción de grandes volúmenes de datos.
##### Portafolio de análisis - Módulo 1 Big Data
> María de los Angeles Arista Huerta / *A01369984*

***Configurazión del entorno***

In [1]:
# Variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-arm64"
os.environ["SPARK_HOME"] = "/home/mc/spark/spark-3.2.2-bin-hadoop3.2"

# Buscando e inicializando la instalación de Spark
import findspark
findspark.find()
findspark.init()

In [2]:
# Sesion de  Spark
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName('BD_Angie').getOrCreate()
spark_session

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/25 06:15:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


***Carga de datos***
##### *Dataset obtenido de https://www.kaggle.com/datasets/pigment/big-sales-data*

Para conocer el dataset se imprimen las variables y el esquemas de las mismas, de esta forma podemos saber cuales columnas contienen strings, enteros y flotantes.

In [3]:
# Carga del archivo csv, primera visualización
df  = spark_session.read.csv('Liquor_Sales.csv', header=True, inferSchema=True)
df.show()

                                                                                

+-------------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+-------------+--------+--------------------+-------------+--------------------+-----------+--------------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|Invoice/Item Number|      Date|Store Number|          Store Name|             Address|           City|Zip Code|      Store Location|County Number|       County|Category|       Category Name|Vendor Number|         Vendor Name|Item Number|    Item Description|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|Volume Sold (Gallons)|
+-------------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+-------------+--------+--------------------+-------------+---------

In [4]:
# Esquema de los datos
df.printSchema()

root
 |-- Invoice/Item Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Store Number: integer (nullable = true)
 |-- Store Name: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Store Location: string (nullable = true)
 |-- County Number: integer (nullable = true)
 |-- County: string (nullable = true)
 |-- Category: integer (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Vendor Number: integer (nullable = true)
 |-- Vendor Name: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item Description: string (nullable = true)
 |-- Pack: integer (nullable = true)
 |-- Bottle Volume (ml): integer (nullable = true)
 |-- State Bottle Cost: double (nullable = true)
 |-- State Bottle Retail: double (nullable = true)
 |-- Bottles Sold: integer (nullable = true)
 |-- Sale (Dollars): double (nullable = true)
 |-- Volume Sold (Liters): doub

Después de ver los datos, se deben preparar para implementarlos a un modelo. De inicio se desea hacer una regresión lineal para predecir las ventas en dolares.

***Limpieza de datos***

Se comienza la limpieza mostrando la cantidad de datos nulos que posee nuestro dataset.

In [5]:
# Montrar cantidad de valores nulos
from pyspark.sql.functions import when,lit,count,isnan,col
df.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df.columns]).show()



+-------------------+----+------------+----------+-------+-----+--------+--------------+-------------+------+--------+-------------+-------------+-----------+-----------+----------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|Invoice/Item Number|Date|Store Number|Store Name|Address| City|Zip Code|Store Location|County Number|County|Category|Category Name|Vendor Number|Vendor Name|Item Number|Item Description|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|Volume Sold (Gallons)|
+-------------------+----+------------+----------+-------+-----+--------+--------------+-------------+------+--------+-------------+-------------+-----------+-----------+----------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|                  0|   0|           

                                                                                

##### 79927 | Address
##### 79926 | City
##### 79971 | Zip Code
##### 1886023 | Store Location
##### 156731 | County Number
##### 156729 | County
##### 16974 | Category
##### 25040 | Category Name
##### 5 | Vendor Number
##### 3 | Vendor Name
##### 10 | State Bottle Cost
##### 10 | State Bottle Retail
##### 10 | Sale (Dollars)
Aqui se enlistan los valores nulos totales que poseen las variables, las no listadas no tienen datos nulos. Durante la visualización de los datos en tablue se tenía considerado utilizar la categoria de los licores para definir la venta, sin embargo al mostrar demasiado valor nulos y tener una gran cantidad de categorías, no se ha considerado para implementarlo en el modelo.

Tome la desición de primero separar las variables relevantes para el modelo y después eliminar los valores nulos, ya que varia demasiado la cantidad de nulos en ciertas variables, lo cual eliminarlas de inicio provocaría perder información importante.

Las variables a ocupar para el modelo son:
* Pack
* Bottle Volume (ml)
* State Bottle Cost
* State Bottle Retail
* Bottles Sold
* Volume Sold (Liters)
* Sale (Dollars)

In [6]:
# Variables relevantes
df = df.select(['Pack', 'Bottle Volume (ml)', 'State Bottle Cost', 'State Bottle Retail', 'Bottles Sold', 'Sale (Dollars)', 'Volume Sold (Liters)'])
df.show()

+----+------------------+-----------------+-------------------+------------+--------------+--------------------+
|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|
+----+------------------+-----------------+-------------------+------------+--------------+--------------------+
|   6|               500|             4.89|               7.34|           2|         14.68|                 1.0|
|   6|               750|            14.99|              22.49|           6|        134.94|                 4.5|
|  12|              1000|             4.34|               6.51|          12|         78.12|                12.0|
|  12|               500|             5.54|               8.31|           1|          8.31|                 0.5|
|   6|              1750|             31.0|              46.49|           2|         92.98|                 3.5|
|   6|               750|             12.5|              18.75|           6|         112.5|     

In [7]:
# Eliminar datos nulos y verificar que ya se eliminaron
df = df.na.drop()
df.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df.columns]).show()



+----+------------------+-----------------+-------------------+------------+--------------+--------------------+
|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|
+----+------------------+-----------------+-------------------+------------+--------------+--------------------+
|   0|                 0|                0|                  0|           0|             0|                   0|
+----+------------------+-----------------+-------------------+------------+--------------+--------------------+



                                                                                

Al eliminar los valores nulos, se procede con el modelo, que en este caso se implementa una regresión lineal

***Modelo de regresion lineal***

Los pasos a realizar para el modelo son los siguientes:
* Importar librerias necesarias 
* Transformar las columnas relevantes para generar una columna vectorial
* Almacenar la columna vectorial y la columna de la variable a predecir para utilizarlas como entrada al modelo
* Dividir el dataset en datos de entrenamiento y pruebas (80/20) 
* Definir el modelo de regresión lineal

In [8]:
#Librerias necesarias
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [9]:
# Transformador - combina las columnas para generar una columna vectorial
lassembler = VectorAssembler(
    inputCols = ['Pack','Bottle Volume (ml)','State Bottle Cost','State Bottle Retail','Bottles Sold','Volume Sold (Liters)'],
    outputCol = 'features')

In [10]:
output = lassembler.transform(df)

In [11]:
# Resultado de la columna generada (columna vectorial)
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|[6.0,500.0,4.89,7...|
|[6.0,750.0,14.99,...|
|[12.0,1000.0,4.34...|
|[12.0,500.0,5.54,...|
|[6.0,1750.0,31.0,...|
|[6.0,750.0,12.5,1...|
|[12.0,500.0,4.96,...|
|[12.0,1000.0,4.05...|
|[12.0,500.0,4.9,7...|
|[6.0,1750.0,8.2,1...|
|[24.0,375.0,7.65,...|
|[12.0,1000.0,5.5,...|
|[12.0,750.0,4.5,6...|
|[12.0,750.0,3.36,...|
|[12.0,750.0,27.0,...|
|[6.0,750.0,17.97,...|
|[6.0,1750.0,20.25...|
|[12.0,750.0,5.44,...|
|[6.0,750.0,18.08,...|
|[12.0,750.0,3.42,...|
+--------------------+
only showing top 20 rows



In [12]:
# Almacenar la columna vectorial y la columna de la variable a predecir
loan_data = output.select('features', 'Sale (Dollars)')
loan_data.show()

+--------------------+--------------+
|            features|Sale (Dollars)|
+--------------------+--------------+
|[6.0,500.0,4.89,7...|         14.68|
|[6.0,750.0,14.99,...|        134.94|
|[12.0,1000.0,4.34...|         78.12|
|[12.0,500.0,5.54,...|          8.31|
|[6.0,1750.0,31.0,...|         92.98|
|[6.0,750.0,12.5,1...|         112.5|
|[12.0,500.0,4.96,...|          7.44|
|[12.0,1000.0,4.05...|        145.92|
|[12.0,500.0,4.9,7...|          7.35|
|[6.0,1750.0,8.2,1...|          73.8|
|[24.0,375.0,7.65,...|         11.48|
|[12.0,1000.0,5.5,...|          99.0|
|[12.0,750.0,4.5,6...|          81.0|
|[12.0,750.0,3.36,...|        120.96|
|[12.0,750.0,27.0,...|         486.0|
|[6.0,750.0,17.97,...|         26.96|
|[6.0,1750.0,20.25...|        182.28|
|[12.0,750.0,5.44,...|          97.8|
|[6.0,750.0,18.08,...|        162.78|
|[12.0,750.0,3.42,...|          5.13|
+--------------------+--------------+
only showing top 20 rows



In [13]:
# División del dataset en 80% de datos para entrenamiento y 20% de datos para pruebas
train, test = loan_data.randomSplit([0.8, 0.2])

In [14]:
# Definición del modelo
model = LinearRegression(maxIter=10,
                         featuresCol = 'features', 
                         labelCol = 'Sale (Dollars)', 
                         regParam = 0.3)

***Entrenamiento***

Continuamos con el entrenamiento del modelo, dando como entrada los datos de entrenamiento.

Al termino del entrenamiento se utilizaran metricas para monitorear el rendimiento del entrenamiento. 
Las metricas utilizada son:
* Mean Squared Error
* Mean Absolute Error
* r2

In [15]:
# Entrenamiento del modelo
modelo = model.fit(train)

22/11/25 06:19:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/25 06:19:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/11/25 06:20:21 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [16]:
# Impresión de metricas para el desempeño del modelo
print("Coefficients: %s" % str(modelo.coefficients))
print("Intercept: %s" % str(modelo.intercept))

trainingSummary = modelo.summary
trainingSummary.residuals.show()
print("MSE: %f" % trainingSummary.rootMeanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)
print("r2: %f" % trainingSummary.r2)



Coefficients: [-4.9534996388720245,-0.04847003048185474,3.3598193956201663,1.454646977175727,8.040970878635838,5.85068721506168]
Intercept: 48.41406596043427


[Stage 16:>                                                         (0 + 1) / 1]

+-------------------+
|          residuals|
+-------------------+
|-152.12835773058842|
| 32.634610646617205|
|  36.93464655477334|
|-123.47045104985062|
|-230.72282324229047|
| -98.82079582215465|
|-29.789590441541236|
|-393.37584603609895|
|-393.37584603609895|
| -573.9581660580989|
| 1556.0929847526481|
| 1556.0929847526481|
| -576.8190158376956|
| -576.8190158376956|
| -576.8190158376956|
| -576.8190158376956|
| -576.8190158376956|
| -576.8190158376956|
| -576.8190158376956|
| -576.8190158376956|
+-------------------+
only showing top 20 rows

MSE: 221.977614
MAE: 51.410410
r2: 0.774909


                                                                                

***Evaluación***

Posterior al entrenamiento, se realiza la evaluación del modelo. En esta ocasión la entrada son los datos de pruebas. Con ello esperamos de salida las prediciones de las ventas en dolares. 
Finalmente utilizando las mismas metricas que en el entrenamiento, se monitorea el rendimiento de la predición.
Las metricas utilizada son:

* Mean Squared Error
* Mean Absolute Error
* r2

In [17]:
# Evaluacion del modelo
test_results = modelo.evaluate(test)

                                                                                

In [18]:
# Generar prediciones
predictions_data = test.select('features')
predictions = modelo.transform(predictions_data)
predictions.show()

[Stage 19:>                                                         (0 + 1) / 1]

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|[1.0,750.0,53.83,...| 330.28783751320896|
|[1.0,750.0,53.83,...| 330.28783751320896|
|[1.0,750.0,137.88...|  783.6390158376956|
|[1.0,750.0,137.88...|  783.6390158376956|
|[1.0,750.0,137.88...|  796.0680021276278|
|[1.0,750.0,155.99...|  884.0081034622415|
|[1.0,750.0,5800.0...|  32161.91822577589|
|[1.0,1000.0,27.15...|  159.3490619048599|
|[1.0,2250.0,43.5,...|  196.6758738201071|
|[1.0,2250.0,43.5,...| 239.08590804515637|
|[1.0,3000.0,5.52,...| -45.76581256534277|
|[1.0,3000.0,5.52,...| -45.76581256534277|
|[1.0,3000.0,5.52,...|-20.172780041521875|
|[1.0,3000.0,5.52,...| 31.013285006119872|
|[1.0,3000.0,28.6,...| 107.73172995921523|
|[1.0,3000.0,29.3,...|  86.01795033836298|
|[1.0,3000.0,29.72...|  88.34550208014416|
|[1.0,3000.0,29.72...|  88.34550208014416|
|[1.0,3000.0,29.72...|  88.34550208014416|
|[1.0,3000.0,29.72...|  88.34550208014416|
+----------

                                                                                

In [19]:
# Impresión de metricas para el desempeño de las pruebas
test_results.residuals.show()
print("MSE: %f" % test_results.rootMeanSquaredError)
print("MAE: %f" % test_results.meanAbsoluteError)
print("r2: %f" % test_results.r2)

[Stage 20:>                                                         (0 + 1) / 1]

+-------------------+
|          residuals|
+-------------------+
|-168.78783751320896|
|-168.78783751320896|
| -576.8190158376956|
| -576.8190158376956|
| -382.4280021276278|
| -650.0181034622415|
| -23461.91822577589|
| -118.6190619048599|
| -131.4258738201071|
| -43.33590804515637|
|  54.04581256534277|
|  54.04581256534277|
|  36.73278004152188|
|  2.106714993880125|
|-21.931729959215232|
| -42.06795033836298|
| -43.76550208014416|
| -43.76550208014416|
| -43.76550208014416|
| -43.76550208014416|
+-------------------+
only showing top 20 rows

MSE: 223.836623
MAE: 51.333868
r2: 0.752768


                                                                                

Para concluir se obtiene una r2 en entrenamiento de 77 y una r2 en pruebas de 75