In [2]:
!pip --quiet install pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
# Importing necessary libraries for data manipulation and processing
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, dayofweek, weekofyear, year, month
from pyspark.sql.types import FloatType, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import sum as _sum

In [4]:
# Setting up the Spark session
spark = SparkSession.builder.master("local").appName("Sales Prediction").getOrCreate()

In [15]:
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [5]:
# Reading and loading the dataset
ds = spark.read.format("csv").options(header="true", inferSchema="true", delimiter=",").load("dataset.csv")

In [6]:
ds

DataFrame[IdFacturaCab: int, Documento: string, Identificacion: string, NombreCliente: string, IdFarmacia: string, Nombre: string, Descripcion: string, IdMedicamento: string, Medicamento: string, Unidad: string, Fraccion: int, PrecioUnitario: double, PrecioParcial: double, Descuento: double, Iva: double, PrecioTotal: double, TotalVenta: double, EstadoDocumento: string, Observacion: string, FechaEntero: int, FechaCreacion: string, UsuarioCreacion: string, FechaActualiza: string, UsuarioActualiza: string]

In [7]:
# Data cleaning and preprocessing
ds = ds.withColumn("PrecioUnitario", col("PrecioUnitario").cast(FloatType())) \
       .withColumn("IdMedicamento", col("IdMedicamento").cast(IntegerType())) \
       .withColumn("IdFarmacia", col("IdFarmacia").cast(IntegerType())) \
       .withColumn("PrecioParcial", col("PrecioParcial").cast(FloatType())) \
       .withColumn("Descuento", col("Descuento").cast(FloatType())) \
       .withColumn("Iva", col("Iva").cast(FloatType())) \
       .withColumn("PrecioTotal", col("PrecioTotal").cast(FloatType())) \
       .withColumn("TotalVenta", col("TotalVenta").cast(FloatType())) \
       .withColumn("FechaCreacion", to_date(col("FechaCreacion"), "yyyy-MM-dd")) \
       .withColumn("FechaActualiza", to_date(col("FechaActualiza"), "yyyy-MM-dd"))

In [8]:
# Filtering and transforming the dataset
df = ds.filter((col("EstadoDocumento") == "EMI") & (col("PrecioUnitario") <= 201) & (col("PrecioTotal") <= 250))
df = df.withColumn("Anio", year(col("FechaCreacion"))) \
       .withColumn("Mes", month(col("FechaCreacion"))) \
       .withColumn("DiaSemana", dayofweek(col("FechaCreacion"))) \
       .withColumn("Mes", weekofyear(col("FechaCreacion")))

In [9]:
# Identifying top-selling medications
best_sellers = df.groupBy("Medicamento").sum("TotalVenta").orderBy(col("sum(TotalVenta)").desc()).head(10)
best_seller_medications = [row["Medicamento"] for row in best_sellers]
dff = df.filter(col("Medicamento").isin(best_seller_medications))

In [10]:
# Selecting and grouping required columns for analysis
dw = dff.select("IdFarmacia", "IdMedicamento", "Mes", "TotalVenta")
df_grouped = dw.groupBy("IdFarmacia", "IdMedicamento", "Mes").agg(_sum("TotalVenta").alias("VentaMensual"))

In [11]:
df_grouped

DataFrame[IdFarmacia: int, IdMedicamento: int, Mes: int, VentaMensual: double]

In [12]:
# Feature engineering for the model
vectorAssembler = VectorAssembler(inputCols=['IdFarmacia', 'IdMedicamento', 'Mes'], outputCol='features')
df_features = vectorAssembler.transform(df_grouped).select('features', 'VentaMensual')

In [13]:
# Splitting data into training and test sets
(trainingData, testData) = df_features.randomSplit([0.75, 0.25], seed=0)

In [27]:
trainingData.show(n=7, vertical=False)

+----------------+------------------+
|        features|      VentaMensual|
+----------------+------------------+
|[1.0,2267.0,2.0]|  684.220005273819|
|[1.0,2267.0,3.0]| 508.7100067138672|
|[1.0,2267.0,4.0]|249.58000326156616|
|[1.0,2267.0,5.0]| 314.9900028705597|
|[1.0,2267.0,6.0]| 518.7599935531616|
|[1.0,2267.0,7.0]| 359.2100052833557|
|[1.0,2267.0,8.0]|186.07000017166138|
+----------------+------------------+
only showing top 7 rows



In [16]:
# Training a linear regression model
lr = LinearRegression(featuresCol='features', labelCol='VentaMensual')
lrModel = lr.fit(trainingData)

In [17]:
# Making predictions on the test data and evaluating the model
predictions = lrModel.transform(testData)
evaluator = RegressionEvaluator(labelCol='VentaMensual', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print('Root Mean Squared Error (RMSE) on test data =', rmse)

Root Mean Squared Error (RMSE) on test data = 4977.83824494952


In [18]:
# Setting up for making new predictions
from pyspark.sql import Row
sc = spark.sparkContext

In [19]:
# Example of predicting sales for a specific case
mes = 25
id_med = 2267
id_farm = 1
new_data = sc.parallelize([Row(IdFarmacia=id_farm, IdMedicamento=id_med, Mes=mes)])
new_df = spark.createDataFrame(new_data)
new_df2 = vectorAssembler.transform(new_df)
new_predictions = lrModel.transform(new_df2)
new_predictions.show()

+----------+-------------+---+-----------------+-----------------+
|IdFarmacia|IdMedicamento|Mes|         features|       prediction|
+----------+-------------+---+-----------------+-----------------+
|         1|         2267| 25|[1.0,2267.0,25.0]|409.5655878307707|
+----------+-------------+---+-----------------+-----------------+



In [20]:
# Interpreting the prediction result
pred = float(new_predictions.select('prediction').pandas_api().iloc[0])
reporte = f"El valor estimado de ventas del artículo {id_med} en la farmacia {id_farm} para el mes Nª{mes} es de ${pred:.2f} aproximadamente."
print(reporte)

El valor estimado de ventas del artículo 2267 en la farmacia 1 para el mes Nª25 es de $409.57 aproximadamente.


In [21]:
# Collecting unique medication IDs for further analysis
unique_ids = df_grouped.select("IdMedicamento").distinct().rdd.flatMap(lambda x: x).collect()

In [24]:
# Generating predictions for multiple combinations of farmacies, medications, and months
M = []
for id_med in unique_ids:
    for id_farm in range(1, 4):
        for mes in range(1, 25):
            new_data = sc.parallelize([Row(IdFarmacia=id_farm, IdMedicamento=id_med, Mes=mes)])
            new_df = spark.createDataFrame(new_data)
            new_df2 = vectorAssembler.transform(new_df)
            new_predictions = lrModel.transform(new_df2)
            pred = float(new_predictions.select('prediction').pandas_api().iloc[0])
            m = [id_med, id_farm, mes, pred]
            M.append(m)

In [25]:
# Creating a DataFrame from the predictions and exporting it
dt = pd.DataFrame(M, columns=[['IdFarmacia', 'IdMedicamento', 'Mes', 'Inference']])
# dt.to_csv('data.csv', index=False)

In [26]:
dt

Unnamed: 0,IdFarmacia,IdMedicamento,Mes,Inference
0,78158,1,1,5842.436214
1,78158,1,2,5872.942515
2,78158,1,3,5903.448816
3,78158,1,4,5933.955118
4,78158,1,5,5964.461419
...,...,...,...,...
715,8823,3,20,2178.612357
716,8823,3,21,2209.118658
717,8823,3,22,2239.624959
718,8823,3,23,2270.131260


In this documentation, each step in the script is clearly explained, helping users understand the purpose and functionality of different sections. This includes data preprocessing, model training, prediction, and exporting results, making it more accessible for users who may not be familiar with PySpark or the specific objectives of this notebook.