In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 44.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=6f83383f96f7bc890392eeab85a0c35824f36426055e0909341fa4fbafa71caa
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


# Dependencies

In [2]:
# To create the file's path
import os 
# To import the file into the colab enviroment 
from google.colab import drive
# To download files from cloud
from google.colab import files
# To connect to Spark and Create Row
from pyspark.sql import SparkSession, Row
# User defined function (UDF) to apply custom functions to dataset, col to return columns
from pyspark.sql.functions import *
# Import data types for casting/converting
from pyspark.sql.types import *
# Import StringIndexer to perform categorical transformation to numerical
from pyspark.ml.feature import StringIndexer
# Transforms multiple rows into a row vector
from pyspark.ml.feature import VectorAssembler
# Linear regression algorithm
from pyspark.ml.regression import LinearRegression
# For model loading
from pyspark.ml.regression import LinearRegressionModel

# Setting Up Drive and Spark Session

In [3]:
# Importing the drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Creating the path
path = os.path.join('drive','MyDrive','Colab Datasets','sales_detail_report.csv')
path

'drive/MyDrive/Colab Datasets/sales_detail_report.csv'

In [5]:
# Creating a Spark Session
spark = SparkSession.builder.appName('Spark Project').getOrCreate()
spark

# Data Preprocessing

In [6]:
# Reading the .csv from the mounted drive
df_original = spark.read.csv(path, header=True, sep=';', inferSchema=True)
df = df_original

In [7]:
# Data frame lenght
df.count()

11793728

In [8]:
# First rows
df.head(5)

[Row(Store No.='CL104', POS Terminal No.='CL10401', Transaction No.=180560, Date='01/02/18', Shifted Date=None, VAT %=19, Area HO Currency Code='CLP', Net Amount='27,723.00', VAT Amount='5,267.00', Item No.='BY1610', Size Description='8'),
 Row(Store No.='CL104', POS Terminal No.='CL10401', Transaction No.=180563, Date='01/02/18', Shifted Date=None, VAT %=19, Area HO Currency Code='CLP', Net Amount='-52,933.00', VAT Amount='-10,057.00', Item No.='BA8899', Size Description='9-'),
 Row(Store No.='CL104', POS Terminal No.='CL10401', Transaction No.=180564, Date='01/02/18', Shifted Date=None, VAT %=19, Area HO Currency Code='CLP', Net Amount='52,933.00', VAT Amount='10,057.00', Item No.='BA8899', Size Description='8-'),
 Row(Store No.='CL104', POS Terminal No.='CL10401', Transaction No.=180569, Date='01/02/18', Shifted Date=None, VAT %=19, Area HO Currency Code='CLP', Net Amount='13,437.00', VAT Amount='2,553.00', Item No.='BQ0759', Size Description='XL'),
 Row(Store No.='CL104', POS Termi

In [9]:
# Showing columns name
df.columns

['Store No.',
 'POS Terminal No.',
 'Transaction No.',
 'Date',
 'Shifted Date',
 'VAT %',
 'Area HO Currency Code',
 'Net Amount',
 'VAT Amount',
 'Item No.',
 'Size Description']

In [10]:
# Columns and data types 
df.dtypes

[('Store No.', 'string'),
 ('POS Terminal No.', 'string'),
 ('Transaction No.', 'int'),
 ('Date', 'string'),
 ('Shifted Date', 'string'),
 ('VAT %', 'int'),
 ('Area HO Currency Code', 'string'),
 ('Net Amount', 'string'),
 ('VAT Amount', 'string'),
 ('Item No.', 'string'),
 ('Size Description', 'string')]

In [11]:
# Dropping useless columns 
df = df.drop('POS Terminal No.', 'Shifted Date', 'VAT %', 'Area HO Currency Code', 'VAT Amount', 'Size Description')
df.head(3)

[Row(Store No.='CL104', Transaction No.=180560, Date='01/02/18', Net Amount='27,723.00', Item No.='BY1610'),
 Row(Store No.='CL104', Transaction No.=180563, Date='01/02/18', Net Amount='-52,933.00', Item No.='BA8899'),
 Row(Store No.='CL104', Transaction No.=180564, Date='01/02/18', Net Amount='52,933.00', Item No.='BA8899')]

In [12]:
# Loop through each column and rename it
for col in df.columns:
  new_col = col.replace(" ","_").lower().replace(".","")
  print (f'"{col}" RENAMED TO: "{new_col}"\n')
  df = df.withColumnRenamed(col, new_col)

print (f"{df.columns}")

"Store No." RENAMED TO: "store_no"

"Transaction No." RENAMED TO: "transaction_no"

"Date" RENAMED TO: "date"

"Net Amount" RENAMED TO: "net_amount"

"Item No." RENAMED TO: "item_no"

['store_no', 'transaction_no', 'date', 'net_amount', 'item_no']


## Dropping NA values

In [14]:
df = df.na.drop(how='any')

## Change data type

In [16]:
df.show(2)

+--------+--------------+--------+----------+-------+
|store_no|transaction_no|    date|net_amount|item_no|
+--------+--------------+--------+----------+-------+
|   CL104|        180560|01/02/18| 27,723.00| BY1610|
|   CL104|        180563|01/02/18|-52,933.00| BA8899|
+--------+--------------+--------+----------+-------+
only showing top 2 rows



In [17]:
df.dtypes

[('store_no', 'string'),
 ('transaction_no', 'int'),
 ('date', 'string'),
 ('net_amount', 'string'),
 ('item_no', 'string')]

* 'date' from string to date format
* 'net_amount' from string to int, replace ","

In [18]:
# Apply an user defined function to clean

# Define the UDF
udf_replace = udf(lambda x: x.replace(',',''))

# Select the column to replace then apply the function and cast to change it's data type
df = df.withColumn(
    'net_amount',
    udf_replace(
        df['net_amount']
        ).cast(IntegerType())
    )

df.show(3)
df.dtypes

+--------+--------------+--------+----------+-------+
|store_no|transaction_no|    date|net_amount|item_no|
+--------+--------------+--------+----------+-------+
|   CL104|        180560|01/02/18|     27723| BY1610|
|   CL104|        180563|01/02/18|    -52933| BA8899|
|   CL104|        180564|01/02/18|     52933| BA8899|
+--------+--------------+--------+----------+-------+
only showing top 3 rows



[('store_no', 'string'),
 ('transaction_no', 'int'),
 ('date', 'string'),
 ('net_amount', 'int'),
 ('item_no', 'string')]

In [19]:
# Test: change date string MM/dd/yy to date format yy-MM-dd
df.select('date', to_date('date', 'MM/dd/yy').alias('date_formatted')).show(2)
df.select('date', to_date('date', 'MM/dd/yy').alias('date_formatted')).dtypes

+--------+--------------+
|    date|date_formatted|
+--------+--------------+
|01/02/18|    2018-01-02|
|01/02/18|    2018-01-02|
+--------+--------------+
only showing top 2 rows



[('date', 'string'), ('date_formatted', 'date')]

In [20]:
# Change in place
df = df.withColumn('date', to_date('date', 'MM/dd/yy'))
df.show(3)

+--------+--------------+----------+----------+-------+
|store_no|transaction_no|      date|net_amount|item_no|
+--------+--------------+----------+----------+-------+
|   CL104|        180560|2018-01-02|     27723| BY1610|
|   CL104|        180563|2018-01-02|    -52933| BA8899|
|   CL104|        180564|2018-01-02|     52933| BA8899|
+--------+--------------+----------+----------+-------+
only showing top 3 rows



In [21]:
# Export cleaned data frame into a single partition 
df.repartition(1).write.csv("pyspark_sales_forescasting")

# Modelling

## Vectorizing the Data 

In [22]:
input_cols = ['store_no', 'item_no']
output_cols = [col+'_index' for col in input_cols]

In [23]:
# Create indexer instance
indexer = StringIndexer(
    inputCols=input_cols,
    outputCols=output_cols
    )

In [24]:
# Get a crossed list of non-index and indexed columns
crossed_cols = [col_name for sublist in zip(input_cols, output_cols) for col_name in sublist]
crossed_cols

['store_no', 'store_no_index', 'item_no', 'item_no_index']

In [25]:
# Insert the data frame and then transform it
indexed_df = indexer.fit(df).transform(df)

In [26]:
# Show the new data frame
indexed_df.show(3, truncate=False)

+--------+--------------+----------+----------+-------+--------------+-------------+
|store_no|transaction_no|date      |net_amount|item_no|store_no_index|item_no_index|
+--------+--------------+----------+----------+-------+--------------+-------------+
|CL104   |180560        |2018-01-02|27723     |BY1610 |32.0          |20355.0      |
|CL104   |180563        |2018-01-02|-52933    |BA8899 |32.0          |23292.0      |
|CL104   |180564        |2018-01-02|52933     |BA8899 |32.0          |23292.0      |
+--------+--------------+----------+----------+-------+--------------+-------------+
only showing top 3 rows



In [27]:
# Confirms unique values between non-indexed and indexed columns
for col in crossed_cols:
  print(f"Column '{col}' - Unique Values: {indexed_df.select(col).distinct().count()}")

Column 'store_no' - Unique Values: 78
Column 'store_no_index' - Unique Values: 78
Column 'item_no' - Unique Values: 38452
Column 'item_no_index' - Unique Values: 38452


In [28]:
# Convert date to numeric
indexed_df = indexed_df.withColumn('unix_date', unix_timestamp(indexed_df['date']))

In [29]:
indexed_df.show(3)

+--------+--------------+----------+----------+-------+--------------+-------------+----------+
|store_no|transaction_no|      date|net_amount|item_no|store_no_index|item_no_index| unix_date|
+--------+--------------+----------+----------+-------+--------------+-------------+----------+
|   CL104|        180560|2018-01-02|     27723| BY1610|          32.0|      20355.0|1514851200|
|   CL104|        180563|2018-01-02|    -52933| BA8899|          32.0|      23292.0|1514851200|
|   CL104|        180564|2018-01-02|     52933| BA8899|          32.0|      23292.0|1514851200|
+--------+--------------+----------+----------+-------+--------------+-------------+----------+
only showing top 3 rows



In [30]:
# Selecting the independent variables and dependant variable
x_cols = ['unix_date', 'store_no_index', 'item_no_index']
y_col = 'net_amount'

In [31]:
# Vectorizing the independent variables
featureassembler=VectorAssembler(inputCols=x_cols,

                                outputCol=str(x_cols))

vectorized_df=featureassembler.transform(indexed_df)

In [32]:
vectorized_df.show(3, truncate=False)

+--------+--------------+----------+----------+-------+--------------+-------------+----------+------------------------------------------------+
|store_no|transaction_no|date      |net_amount|item_no|store_no_index|item_no_index|unix_date |['unix_date', 'store_no_index', 'item_no_index']|
+--------+--------------+----------+----------+-------+--------------+-------------+----------+------------------------------------------------+
|CL104   |180560        |2018-01-02|27723     |BY1610 |32.0          |20355.0      |1514851200|[1.5148512E9,32.0,20355.0]                      |
|CL104   |180563        |2018-01-02|-52933    |BA8899 |32.0          |23292.0      |1514851200|[1.5148512E9,32.0,23292.0]                      |
|CL104   |180564        |2018-01-02|52933     |BA8899 |32.0          |23292.0      |1514851200|[1.5148512E9,32.0,23292.0]                      |
+--------+--------------+----------+----------+-------+--------------+-------------+----------+-----------------------------------

In [33]:
# Shriking the data frame with vectorized independent variables and dependent variable
vectorized_df = vectorized_df.select(str(x_cols), y_col)

In [34]:
vectorized_df.show(3, truncate=False)

+------------------------------------------------+----------+
|['unix_date', 'store_no_index', 'item_no_index']|net_amount|
+------------------------------------------------+----------+
|[1.5148512E9,32.0,20355.0]                      |27723     |
|[1.5148512E9,32.0,23292.0]                      |-52933    |
|[1.5148512E9,32.0,23292.0]                      |52933     |
+------------------------------------------------+----------+
only showing top 3 rows



## Fitting the Model

In [35]:
# Split the data into train and test validation
train_data, test_data= vectorized_df.randomSplit([0.8,0.2])

In [36]:
# Creating the Linear Regressor object
regressor = LinearRegression(featuresCol=str(x_cols), labelCol=y_col)


In [37]:
# Fitting the data into
regressor = regressor.fit(train_data)
regressor

LinearRegressionModel: uid=LinearRegression_fdab7d7e6ea8, numFeatures=3

## Evaluating the Model

In [38]:
# Metrics: Intercept
regressor.intercept

-27536.480864434543

In [39]:
# Metrics: Coefficients
regressor.coefficients

DenseVector([0.0, 52.1383, 0.2307])

In [40]:
# Making predictions about the test data
predicted_results = regressor.evaluate(test_data)

In [41]:
# Showing 
predicted_results.predictions.show(3, truncate=False)



+------------------------------------------------+----------+------------------+
|['unix_date', 'store_no_index', 'item_no_index']|net_amount|prediction        |
+------------------------------------------------+----------+------------------+
|[1.5148512E9,0.0,7.0]                           |5034      |12632.298837674582|
|[1.5148512E9,0.0,7.0]                           |5034      |12632.298837674582|
|[1.5148512E9,0.0,12.0]                          |15118     |12633.452178866686|
+------------------------------------------------+----------+------------------+
only showing top 3 rows



In [42]:
# Performance Metrics
predicted_results.r2, predicted_results.meanAbsoluteError, predicted_results.meanSquaredError

(0.011389349500087298, 12412.562681188234, 386420196.7691038)

## Saving the Model

In [43]:
# Saving the model
regressor.save('pyspark_regressor_sales_forecasting')

In [110]:
# Download the Model
zip_filename = 'pyspark_regressor_sales_forecasting.zip'

if os.path.exists(zip_filename):
  os.remove(zip_filename)

# Zip output input
os.system(f"zip -r {zip_filename} pyspark_regressor_sales_forecasting/*") 

# Download the Zip file
files.download(zip_filename)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>