In [5]:
from pyspark.sql import SparkSession

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [4]:

# Inicializar una sesión de Spark
spark = SparkSession.builder.appName("example").getOrCreate()

# Cargar el DataFrame desde el archivo Parquet
df = spark.read.parquet("datasets/online_retail_set")
df.show()


+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+------------------+---+----+-----+-------+------+-------------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|             Sales|Day|Week|Month|Entrada|Salida|InventarioAcumulado|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+------------------+---+----+-----+-------+------+-------------------+
| 489437|    10002|INFLATABLE POLITI...|      12|2009-12-01 09:08:00| 0.85|    15362.0|United Kingdom|              10.2|  1|  49|   12|     12|     0|                 12|
| 490063|    10002|INFLATABLE POLITI...|       1|2009-12-03 13:49:00| 0.85|    17271.0|United Kingdom|              0.85|  3|  49|   12|      1|     0|                 14|
| 490063|    10002|INFLATABLE POLITI...|       1|2009-12-03 13:49:00| 0.85|    17271.0|United Kingdom|              0.85|  3|  49|   12|    

In [8]:
# Discretizar la variable 'Country'
indexer = StringIndexer(inputCol='Country', outputCol='CountryIndex')

# Seleccionar las columnas relevantes
features = ['Quantity', 'Price', 'CountryIndex', 'Day', 'Week', 'Month']
assembler = VectorAssembler(inputCols=features, outputCol='features')
# Modelo de regresión lineal
lr = LinearRegression(featuresCol='features', labelCol='InventarioAcumulado')

# Pipeline
pipeline = Pipeline(stages=[indexer, assembler, lr])

# Dividir los datos en conjunto de entrenamiento y prueba
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Entrenar el modelo
model = pipeline.fit(train_data)

# Hacer predicciones en el conjunto de prueba
predictions = model.transform(test_data)

In [10]:

# Ver resultados
predictions.select('InventarioAcumulado', 'prediction', *features).show()

+-------------------+------------------+--------+-----+------------+---+----+-----+
|InventarioAcumulado|        prediction|Quantity|Price|CountryIndex|Day|Week|Month|
+-------------------+------------------+--------+-----+------------+---+----+-----+
|                 24| 4849.655454344459|      24| 1.25|         0.0|  1|  49|   12|
|                 12| 4824.452855967288|      12| 6.75|         0.0|  1|  49|   12|
|                 12| 4824.452855967288|      12| 6.75|         0.0|  1|  49|   12|
|                  3| 4808.218752008031|       3| 5.95|         0.0|  1|  49|   12|
|                 16| 4833.680708847137|      16| 3.39|         0.0|  1|  49|   12|
|                 12| 4827.323836183569|      12| 1.45|         0.0|  1|  49|   12|
|                  3| 4809.139632454761|       3| 4.25|         0.0|  1|  49|   12|
|                  2| 4805.933456563286|       2| 6.75|         0.0|  1|  49|   12|
|                  1| 4802.348094605511|       1| 9.95|         0.0|  1|  49

In [11]:
model.save("datasets/path_model")