# PySpark DataFrame

A PySpark DataFrame is a distributed collection of data organized into named columns. It is similar in concept to a table in a relational database or a DataFrame in pandas. PySpark DataFrame is built on top of RDD (Resilient Distributed Dataset), which allows for distributed processing of large datasets across a cluster of computers.

# Key characteristics of PySpark DataFrame include:

Distributed Computing: Data in a PySpark DataFrame is distributed across multiple nodes in a cluster, allowing for parallel processing and scalability.

Immutable: Similar to RDDs, PySpark DataFrames are immutable, meaning that once created, they cannot be changed. However, transformations can be applied to create new DataFrames.

Lazy Evaluation: PySpark DataFrame operations are lazily evaluated, meaning that transformations are not executed immediately. Instead, they are queued up and executed only when an action is called, such as show() or count().

Schema: PySpark DataFrame has a defined schema, which specifies the data types of each column. This schema allows for efficient data storage and processing.

SQL Interface: PySpark DataFrame provides a SQL-like interface for querying data using SQL queries, allowing users familiar with SQL to easily manipulate and analyze data.

Integration with Libraries: PySpark DataFrame integrates seamlessly with other Python libraries like pandas, allowing for easy conversion between PySpark DataFrame and pandas DataFrame.


# list of PySpark DataFrame operations 

        1] select(): Select specific columns from the DataFrame.

        2] filter(): Filter rows based on a condition.

        3] groupBy(): Group rows based on one or more columns.

        4] agg(): Perform aggregate functions like sum, avg, count, etc., on grouped data.

        5] orderBy(): Sort the DataFrame based on one or more columns.

        6] join(): Join two DataFrames based on a condition.

        7] withColumn(): Add a new column or replace an existing one.

        8] drop(): Drop specified columns from the DataFrame.

        9] count(): Count the number of rows in the DataFrame.

        10] sum(): Compute the sum of values in a column.

        11] avg(): Compute the average of values in a column.

        12] min(): Find the minimum value in a column.

        13] max(): Find the maximum value in a column.

        14] groupBy() + pivot(): Pivot the DataFrame based on a column.

        15] fillna(): Replace missing values with specified values.

        16] show(): Display a preview of the DataFrame.

        17] sample(): Sample a fraction of rows from the DataFrame.

In [1]:
# Importing the SparkSession module from the pyspark.sql package
from pyspark.sql import SparkSession

# Importing specific functions like col, when, count from the pyspark.sql.functions module
from pyspark.sql.functions import col, when, count

# Importing the VectorAssembler module from the pyspark.ml.feature package
from pyspark.ml.feature import VectorAssembler

# Importing the StringIndexer and OneHotEncoder modules from the pyspark.ml.feature package
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Importing the Pipeline module from the pyspark.ml package
from pyspark.ml import Pipeline

# Importing the MinMaxScaler module from the pyspark.ml.feature package
from pyspark.ml.feature import MinMaxScaler

# Importing the LinearRegression module from the pyspark.ml.regression package
from pyspark.ml.regression import LinearRegression

# Importing the RegressionEvaluator module from the pyspark.ml.evaluation package
from pyspark.ml.evaluation import RegressionEvaluator



## Read CSV File into Spark DataFrame


In [2]:
# Creating a SparkSession with a specified application name "DataFrame Preprocessing" using the SparkSession.builder method
spark = SparkSession.builder.appName("DataFrame Preprocessing").getOrCreate()

# Reading a CSV file "big_mart.csv" into a DataFrame named "dataset" with the first row as header
dataset = spark.read.csv("./big_mart.csv",header=True)

In [3]:
# displaying  the dataframe using PySparks's show() method 
dataset.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|   Category|City_Type|Variance_of_sales|  mean_comparison|Data_direction|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|          FDP10|       12.6|         Low Fat|    0.127469857|         Snack Foods|107.7622|           OUT027|                     1985|     Medium|              T

## Print Schema of the Dataset


In [4]:
# Print the schema of the DataFrame "dataset"
dataset.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- City_Type: string (nullable = true)
 |-- Variance_of_sales: string (nullable = true)
 |-- mean_comparison: string (nullable = true)
 |-- Data_direction: string (nullable = true)



# type casting features

In [5]:
# converting features to float
column_names = ['Item_Weight','Item_MRP','Item_Outlet_Sales']
# for cols in column_names:
#     dataset = dataset.withColumn([cols, col(cols).cast('float')])
dataset = dataset.select(*[col(cols).cast('float').alias(cols) if cols in column_names else col(cols) for cols in dataset.columns])


In [6]:
dataset.printSchema()  # display datatype for the features in the dataframe

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: float (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: float (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: float (nullable = true)
 |-- Category: string (nullable = true)
 |-- City_Type: string (nullable = true)
 |-- Variance_of_sales: string (nullable = true)
 |-- mean_comparison: string (nullable = true)
 |-- Data_direction: string (nullable = true)



In [7]:
dataset.show() # to display record by default 20

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|   Category|City_Type|Variance_of_sales|  mean_comparison|Data_direction|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|          FDP10|       12.6|         Low Fat|    0.127469857|         Snack Foods|107.7622|           OUT027|                     1985|     Medium|              T

# counting total rows

In [8]:
dataset.count()  # total rows

8523

# reading the first row

In [9]:
dataset.first()  # to display only the first record

Row(Item_Identifier='FDP10', Item_Weight=12.600000381469727, Item_Fat_Content='Low Fat', Item_Visibility='0.127469857', Item_Type='Snack Foods', Item_MRP=107.76219940185547, Outlet_Identifier='OUT027', Outlet_Establishment_Year='1985', Outlet_Size='Medium', Outlet_Location_Type='Tier 3', Outlet_Type='Supermarket Type3', Item_Outlet_Sales=4022.763671875, Category='Healthy', City_Type='Village', Variance_of_sales='1841.47', mean_comparison='greater_than_mean', Data_direction='Left tailed')

In [10]:
dataset.show(15)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|   Category|City_Type|Variance_of_sales|  mean_comparison|Data_direction|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|          FDP10|       12.6|         Low Fat|    0.127469857|         Snack Foods|107.7622|           OUT027|                     1985|     Medium|              T

# data filteration (same as SQL)

In [11]:
filter_by_weight = dataset.filter("Item_Weight <= 20")   # to filter out record based on condiction

In [12]:
filter_by_weight.show(21)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|   Category|City_Type|Variance_of_sales|  mean_comparison|Data_direction|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+
|          FDP10|       12.6|         Low Fat|    0.127469857|         Snack Foods|107.7622|           OUT027|                     1985|     Medium|              T

In [13]:
# filtering data on two features
dataset.select('Item_Type','Outlet_Size').filter("Item_Outlet_Sales > 2000").show(50)  

+--------------------+-----------+
|           Item_Type|Outlet_Size|
+--------------------+-----------+
|         Snack Foods|     Medium|
|         Hard Drinks|     Medium|
|        Baking Goods|     Medium|
|Fruits and Vegeta...|     Medium|
|         Snack Foods|     Medium|
|         Snack Foods|     Medium|
|        Baking Goods|     Medium|
|           Breakfast|     Medium|
|           Household|     Medium|
|         Snack Foods|     Medium|
|         Snack Foods|     Medium|
|              Canned|     Medium|
|  Health and Hygiene|     Medium|
|       Starchy Foods|     Medium|
|           Household|     Medium|
|        Frozen Foods|     Medium|
|        Baking Goods|     Medium|
|           Household|     Medium|
|               Dairy|     Medium|
|              Canned|     Medium|
|        Baking Goods|     Medium|
|              Canned|     Medium|
|           Household|     Medium|
|             Seafood|     Medium|
|         Hard Drinks|     Medium|
|         Snack Food

# features description

In [14]:
dataset.describe().show() # same functionality as "describe()" function in pandas

+-------+---------------+------------------+----------------+--------------------+-------------+-----------------+-----------------+-------------------------+-----------+--------------------+-----------------+------------------+-----------+---------+--------------------+-----------------+--------------+
|summary|Item_Identifier|       Item_Weight|Item_Fat_Content|     Item_Visibility|    Item_Type|         Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type| Item_Outlet_Sales|   Category|City_Type|   Variance_of_sales|  mean_comparison|Data_direction|
+-------+---------------+------------------+----------------+--------------------+-------------+-----------------+-----------------+-------------------------+-----------+--------------------+-----------------+------------------+-----------+---------+--------------------+-----------------+--------------+
|  count|           8523|              8523|            8523|                8523|   

# groupby with count

In [15]:
dataset.groupby('Item_Fat_Content').count().show() # same as "value_counts()" for pandas

+----------------+-----+
|Item_Fat_Content|count|
+----------------+-----+
|         Low Fat| 5517|
|         Regular| 3006|
+----------------+-----+



# converting pyspark dataframe to pandas

In [16]:
dataset.groupby('Item_Fat_Content').count().toPandas()   # displaying record in pandas

Unnamed: 0,Item_Fat_Content,count
0,Low Fat,5517
1,Regular,3006


# groupby aggregate functions

In [17]:
dataset.groupby(['Item_Fat_Content','Outlet_Type']).agg({'Item_Outlet_Sales': 'mean','Item_MRP': "mean"}).show() # group by with average value

+----------------+-----------------+------------------+----------------------+
|Item_Fat_Content|      Outlet_Type|     avg(Item_MRP)|avg(Item_Outlet_Sales)|
+----------------+-----------------+------------------+----------------------+
|         Low Fat|Supermarket Type3| 138.4701006613487|    3643.9465082341976|
|         Low Fat|Supermarket Type2|141.88061356305278|    2008.8711346272241|
|         Low Fat|Supermarket Type1|140.72770716127314|    2288.0356293845025|
|         Low Fat|Supermarket Type4| 141.5806591331536|    341.39201943147265|
|         Regular|Supermarket Type3| 142.2432228897557|     3785.873982654918|
|         Regular|Supermarket Type4|137.89624885276513|    336.91241408655884|
|         Regular|Supermarket Type2|141.31262228994657|    1971.2663408915203|
|         Regular|Supermarket Type1| 142.1054834078967|    2367.7955709007697|
+----------------+-----------------+------------------+----------------------+



In [18]:
dataset.groupby(['Item_Fat_Content','Outlet_Type']).agg({'Item_Outlet_Sales': 'mean','Item_MRP': "mean"}).toPandas().set_index('Item_Fat_Content')

Unnamed: 0_level_0,Outlet_Type,avg(Item_MRP),avg(Item_Outlet_Sales)
Item_Fat_Content,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Low Fat,Supermarket Type3,138.470101,3643.946508
Low Fat,Supermarket Type2,141.880614,2008.871135
Low Fat,Supermarket Type1,140.727707,2288.035629
Low Fat,Supermarket Type4,141.580659,341.392019
Regular,Supermarket Type3,142.243223,3785.873983
Regular,Supermarket Type4,137.896249,336.912414
Regular,Supermarket Type2,141.312622,1971.266341
Regular,Supermarket Type1,142.105483,2367.795571


In [19]:
# creating a new feature
mean_value = dataset.select('Item_Outlet_Sales').agg({'Item_Outlet_Sales': 'mean'}).collect()[0][0]
new_dataset = dataset.withColumn('reduced_to_mean', dataset.Item_Outlet_Sales / mean_value)

In [20]:
new_dataset.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+-------------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|   Category|City_Type|Variance_of_sales|  mean_comparison|Data_direction|    reduced_to_mean|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-----------+---------+-----------------+-----------------+--------------+-------------------+
|          FDP10|       12.6|         Low Fat|    0.127469857|         Snack Foods|107.7622|           

# counting total null values for individual feature

Null Values In the Dataframe

In [21]:
# display the total null values in the features
null_values = dataset.select([count(when(col(c).isNull(),c)).alias(c) for c in dataset.columns])
null_values.show()

+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+--------+---------+-----------------+---------------+--------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|Category|City_Type|Variance_of_sales|mean_comparison|Data_direction|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+--------+---------+-----------------+---------------+--------------+
|              0|          0|               0|              0|        0|       0|                0|                        0|          0|                   0|          0|                0|       0|        0|                0|    

total columns in the dataframe same as pandas "columns" attribute

In [22]:
dataset.columns # returen list of columns

['Item_Identifier',
 'Item_Weight',
 'Item_Fat_Content',
 'Item_Visibility',
 'Item_Type',
 'Item_MRP',
 'Outlet_Identifier',
 'Outlet_Establishment_Year',
 'Outlet_Size',
 'Outlet_Location_Type',
 'Outlet_Type',
 'Item_Outlet_Sales',
 'Category',
 'City_Type',
 'Variance_of_sales',
 'mean_comparison',
 'Data_direction']

# Selecting features for prediction

In [23]:
new_dataset = dataset.select("Item_Weight","Item_Type","Outlet_Size","Item_MRP","Item_Outlet_Sales") # featur selection

In [24]:
new_dataset.show()

+-----------+--------------------+-----------+--------+-----------------+
|Item_Weight|           Item_Type|Outlet_Size|Item_MRP|Item_Outlet_Sales|
+-----------+--------------------+-----------+--------+-----------------+
|       12.6|         Snack Foods|     Medium|107.7622|        4022.7637|
|       12.6|         Hard Drinks|     Medium|113.2834|         2303.668|
|       12.6|        Baking Goods|     Medium|144.5444|        4064.0432|
|       12.6|Fruits and Vegeta...|     Medium|128.0678|        2797.6917|
|       12.6|         Snack Foods|     Medium| 36.9874|         388.1614|
|       12.6|         Snack Foods|     Medium| 87.6198|         2180.495|
|       12.6|Fruits and Vegeta...|     Medium| 38.2848|         484.7024|
|       12.6|         Snack Foods|     Medium|255.8356|         2543.356|
|       12.6|        Baking Goods|     Medium|171.3764|         3091.975|
|       12.6|           Breakfast|     Medium| 155.963|         3285.723|
|       12.6|           Household|    

In [25]:
independent_features = new_dataset.drop("Item_Outlet_Sales")

In [26]:
independent_features.show()

+-----------+--------------------+-----------+--------+
|Item_Weight|           Item_Type|Outlet_Size|Item_MRP|
+-----------+--------------------+-----------+--------+
|       12.6|         Snack Foods|     Medium|107.7622|
|       12.6|         Hard Drinks|     Medium|113.2834|
|       12.6|        Baking Goods|     Medium|144.5444|
|       12.6|Fruits and Vegeta...|     Medium|128.0678|
|       12.6|         Snack Foods|     Medium| 36.9874|
|       12.6|         Snack Foods|     Medium| 87.6198|
|       12.6|Fruits and Vegeta...|     Medium| 38.2848|
|       12.6|         Snack Foods|     Medium|255.8356|
|       12.6|        Baking Goods|     Medium|171.3764|
|       12.6|           Breakfast|     Medium| 155.963|
|       12.6|           Household|     Medium|149.9708|
|       12.6|         Snack Foods|     Medium|178.5344|
|       12.6|         Snack Foods|     Medium|121.7098|
|       12.6|              Canned|     Medium|180.5976|
|       12.6|  Health and Hygiene|     Medium|22

In [27]:
new_dataset.printSchema()

root
 |-- Item_Weight: float (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Item_MRP: float (nullable = true)
 |-- Item_Outlet_Sales: float (nullable = true)



In [28]:
new_dataset.printSchema()

root
 |-- Item_Weight: float (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Item_MRP: float (nullable = true)
 |-- Item_Outlet_Sales: float (nullable = true)



In [29]:
string_columns = [col_name for col_name, col_type in new_dataset.dtypes if col_type == "string"]

# Data Encoding (creating dummies)

In [30]:
# onehotencodeing for the categorical features
stages = []

for col in string_columns:
   
    string_indexer = StringIndexer(inputCol=col, outputCol=col + "_index")
    one_hot_encoder = OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded")
    stages += [string_indexer, one_hot_encoder]

pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(new_dataset)
transformed_df = pipeline_model.transform(new_dataset)
transformed_df.show()

+-----------+--------------------+-----------+--------+-----------------+---------------+-----------------+-----------------+-------------------+
|Item_Weight|           Item_Type|Outlet_Size|Item_MRP|Item_Outlet_Sales|Item_Type_index|Item_Type_encoded|Outlet_Size_index|Outlet_Size_encoded|
+-----------+--------------------+-----------+--------+-----------------+---------------+-----------------+-----------------+-------------------+
|       12.6|         Snack Foods|     Medium|107.7622|        4022.7637|            1.0|   (15,[1],[1.0])|              0.0|      (2,[0],[1.0])|
|       12.6|         Hard Drinks|     Medium|113.2834|         2303.668|           11.0|  (15,[11],[1.0])|              0.0|      (2,[0],[1.0])|
|       12.6|        Baking Goods|     Medium|144.5444|        4064.0432|            6.0|   (15,[6],[1.0])|              0.0|      (2,[0],[1.0])|
|       12.6|Fruits and Vegeta...|     Medium|128.0678|        2797.6917|            0.0|   (15,[0],[1.0])|              0.0

# dropping unwanted columns

In [31]:
final_dataframe = transformed_df.drop('Item_Type','Outlet_Size','Item_Type_index','Outlet_Size_index')

In [32]:
final_dataframe.show()

+-----------+--------+-----------------+-----------------+-------------------+
|Item_Weight|Item_MRP|Item_Outlet_Sales|Item_Type_encoded|Outlet_Size_encoded|
+-----------+--------+-----------------+-----------------+-------------------+
|       12.6|107.7622|        4022.7637|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6|113.2834|         2303.668|  (15,[11],[1.0])|      (2,[0],[1.0])|
|       12.6|144.5444|        4064.0432|   (15,[6],[1.0])|      (2,[0],[1.0])|
|       12.6|128.0678|        2797.6917|   (15,[0],[1.0])|      (2,[0],[1.0])|
|       12.6| 36.9874|         388.1614|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6| 87.6198|         2180.495|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6| 38.2848|         484.7024|   (15,[0],[1.0])|      (2,[0],[1.0])|
|       12.6|255.8356|         2543.356|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6|171.3764|         3091.975|   (15,[6],[1.0])|      (2,[0],[1.0])|
|       12.6| 155.963|         3285.723|  (15,[14],[

# creating feature vectors using VectorAssembler

In [33]:
x = final_dataframe.drop('Item_Outlet_Sales')

In [34]:
x.show()

+-----------+--------+-----------------+-------------------+
|Item_Weight|Item_MRP|Item_Type_encoded|Outlet_Size_encoded|
+-----------+--------+-----------------+-------------------+
|       12.6|107.7622|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6|113.2834|  (15,[11],[1.0])|      (2,[0],[1.0])|
|       12.6|144.5444|   (15,[6],[1.0])|      (2,[0],[1.0])|
|       12.6|128.0678|   (15,[0],[1.0])|      (2,[0],[1.0])|
|       12.6| 36.9874|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6| 87.6198|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6| 38.2848|   (15,[0],[1.0])|      (2,[0],[1.0])|
|       12.6|255.8356|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6|171.3764|   (15,[6],[1.0])|      (2,[0],[1.0])|
|       12.6| 155.963|  (15,[14],[1.0])|      (2,[0],[1.0])|
|       12.6|149.9708|   (15,[2],[1.0])|      (2,[0],[1.0])|
|       12.6|178.5344|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6|121.7098|   (15,[1],[1.0])|      (2,[0],[1.0])|
|       12.6|180.5976|  

In [35]:
# converting input features as vectors
assembler = VectorAssembler(inputCols=x.columns, outputCol="features")
output = assembler.transform(final_dataframe)


In [36]:
output.show()

+-----------+--------+-----------------+-----------------+-------------------+--------------------+
|Item_Weight|Item_MRP|Item_Outlet_Sales|Item_Type_encoded|Outlet_Size_encoded|            features|
+-----------+--------+-----------------+-----------------+-------------------+--------------------+
|       12.6|107.7622|        4022.7637|   (15,[1],[1.0])|      (2,[0],[1.0])|(19,[0,1,3,17],[1...|
|       12.6|113.2834|         2303.668|  (15,[11],[1.0])|      (2,[0],[1.0])|(19,[0,1,13,17],[...|
|       12.6|144.5444|        4064.0432|   (15,[6],[1.0])|      (2,[0],[1.0])|(19,[0,1,8,17],[1...|
|       12.6|128.0678|        2797.6917|   (15,[0],[1.0])|      (2,[0],[1.0])|(19,[0,1,2,17],[1...|
|       12.6| 36.9874|         388.1614|   (15,[1],[1.0])|      (2,[0],[1.0])|(19,[0,1,3,17],[1...|
|       12.6| 87.6198|         2180.495|   (15,[1],[1.0])|      (2,[0],[1.0])|(19,[0,1,3,17],[1...|
|       12.6| 38.2848|         484.7024|   (15,[0],[1.0])|      (2,[0],[1.0])|(19,[0,1,2,17],[1...|


In [37]:
df = output.select('features','Item_Outlet_Sales')

In [38]:
df.show()

+--------------------+-----------------+
|            features|Item_Outlet_Sales|
+--------------------+-----------------+
|(19,[0,1,3,17],[1...|        4022.7637|
|(19,[0,1,13,17],[...|         2303.668|
|(19,[0,1,8,17],[1...|        4064.0432|
|(19,[0,1,2,17],[1...|        2797.6917|
|(19,[0,1,3,17],[1...|         388.1614|
|(19,[0,1,3,17],[1...|         2180.495|
|(19,[0,1,2,17],[1...|         484.7024|
|(19,[0,1,3,17],[1...|         2543.356|
|(19,[0,1,8,17],[1...|         3091.975|
|(19,[0,1,16,17],[...|         3285.723|
|(19,[0,1,4,17],[1...|        4363.6533|
|(19,[0,1,3,17],[1...|        2854.9504|
|(19,[0,1,3,17],[1...|         4097.333|
|(19,[0,1,7,17],[1...|        7968.2944|
|(19,[0,1,9,17],[1...|        6976.2524|
|(19,[0,1,15,17],[...|        5262.4834|
|(19,[0,1,10,17],[...|           898.83|
|(19,[0,1,2,17],[1...|        1808.9786|
|(19,[0,1,4,17],[1...|         5555.435|
|(19,[0,1,5,17],[1...|         6024.158|
+--------------------+-----------------+
only showing top

# scaling features

In [39]:
# feature scaling
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)

In [40]:
scaledData.show()

+--------------------+-----------------+--------------------+
|            features|Item_Outlet_Sales|      scaledFeatures|
+--------------------+-----------------+--------------------+
|(19,[0,1,3,17],[1...|        4022.7637|(19,[0,1,3,17],[0...|
|(19,[0,1,13,17],[...|         2303.668|(19,[0,1,13,17],[...|
|(19,[0,1,8,17],[1...|        4064.0432|(19,[0,1,8,17],[0...|
|(19,[0,1,2,17],[1...|        2797.6917|(19,[0,1,2,17],[0...|
|(19,[0,1,3,17],[1...|         388.1614|(19,[0,1,3,17],[0...|
|(19,[0,1,3,17],[1...|         2180.495|(19,[0,1,3,17],[0...|
|(19,[0,1,2,17],[1...|         484.7024|(19,[0,1,2,17],[0...|
|(19,[0,1,3,17],[1...|         2543.356|(19,[0,1,3,17],[0...|
|(19,[0,1,8,17],[1...|         3091.975|(19,[0,1,8,17],[0...|
|(19,[0,1,16,17],[...|         3285.723|(19,[0,1,16,17],[...|
|(19,[0,1,4,17],[1...|        4363.6533|(19,[0,1,4,17],[0...|
|(19,[0,1,3,17],[1...|        2854.9504|(19,[0,1,3,17],[0...|
|(19,[0,1,3,17],[1...|         4097.333|(19,[0,1,3,17],[0...|
|(19,[0,

In [41]:
scaledData = scaledData.drop('features')

In [42]:
scaledData.show()

+-----------------+--------------------+
|Item_Outlet_Sales|      scaledFeatures|
+-----------------+--------------------+
|        4022.7637|(19,[0,1,3,17],[0...|
|         2303.668|(19,[0,1,13,17],[...|
|        4064.0432|(19,[0,1,8,17],[0...|
|        2797.6917|(19,[0,1,2,17],[0...|
|         388.1614|(19,[0,1,3,17],[0...|
|         2180.495|(19,[0,1,3,17],[0...|
|         484.7024|(19,[0,1,2,17],[0...|
|         2543.356|(19,[0,1,3,17],[0...|
|         3091.975|(19,[0,1,8,17],[0...|
|         3285.723|(19,[0,1,16,17],[...|
|        4363.6533|(19,[0,1,4,17],[0...|
|        2854.9504|(19,[0,1,3,17],[0...|
|         4097.333|(19,[0,1,3,17],[0...|
|        7968.2944|(19,[0,1,7,17],[0...|
|        6976.2524|(19,[0,1,9,17],[0...|
|        5262.4834|(19,[0,1,15,17],[...|
|           898.83|(19,[0,10,17],[0....|
|        1808.9786|(19,[0,1,2,17],[0...|
|         5555.435|(19,[0,1,4,17],[0...|
|         6024.158|(19,[0,1,5,17],[0...|
+-----------------+--------------------+
only showing top

# train and test split

In [43]:
# splitting the dataframe into train and test
train,test = scaledData.randomSplit([0.80,0.20])

In [44]:
train.show()

+-----------------+--------------------+
|Item_Outlet_Sales|      scaledFeatures|
+-----------------+--------------------+
|            33.29|(19,[0,1,4,17],[0...|
|            33.29|(19,[0,1,10,17],[...|
|          33.9558|(19,[0,1,3,18],[0...|
|          35.2874|(19,[0,1,12,17],[...|
|           36.619|(19,[0,1,5,17],[0...|
|           36.619|(19,[0,1,5,18],[0...|
|          37.2848|(19,[0,1,8,18],[0...|
|          37.9506|(19,[0,1,5,18],[0...|
|          37.9506|(19,[0,1,7,18],[0...|
|          37.9506|(19,[0,1,9,17],[0...|
|          37.9506|(19,[0,1,13,18],[...|
|          38.6164|(19,[0,1,8,18],[0...|
|           39.948|(19,[0,1,14,18],[...|
|          40.6138|(19,[0,1,6,18],[0...|
|          40.6138|(19,[0,1,10,18],[...|
|          41.2796|(19,[0,1,4,18],[0...|
|          41.2796|(19,[0,1,5,17],[0...|
|          41.2796|(19,[0,1,5,18],[0...|
|          41.9454|(19,[0,1,4,17],[0...|
|          42.6112|(19,[0,1,3,18],[0...|
+-----------------+--------------------+
only showing top

In [45]:
test.show()

+-----------------+--------------------+
|Item_Outlet_Sales|      scaledFeatures|
+-----------------+--------------------+
|          34.6216|(19,[0,1,9,18],[0...|
|          37.9506|(19,[0,1,5,18],[0...|
|          38.6164|(19,[0,1,5,18],[0...|
|           39.948|(19,[0,1,16,17],[...|
|          41.9454|(19,[0,1,5,18],[0...|
|          41.9454|(19,[0,1,7,17],[0...|
|          44.6086|(19,[0,1,4,17],[0...|
|          45.2744|(19,[0,1,2,17],[0...|
|          45.9402|(19,[0,1,3,18],[0...|
|          47.9376|(19,[0,1,11,17],[...|
|          48.6034|(19,[0,1,8,18],[0...|
|          50.6008|(19,[0,1,3,18],[0...|
|          57.2588|(19,[0,1,3,17],[0...|
|          59.2562|(19,[0,1,3,17],[0...|
|          61.2536|(19,[0,1,9,18],[0...|
|           63.251|(19,[0,1,3,17],[0...|
|          69.2432|(19,[0,1,4,17],[0...|
|          71.9064|(19,[0,1,11,17],[...|
|           73.238|(19,[0,1,4],[0.94...|
|           73.238|(19,[0,1,13,17],[...|
+-----------------+--------------------+
only showing top

In [46]:
train.count() , test.count()

(6876, 1647)

# Linear Regression

In [47]:
# linear regression to predict the sales
lin_reg = LinearRegression(featuresCol='scaledFeatures',labelCol='Item_Outlet_Sales')
lin_model = lin_reg.fit(train)

# evaluating the model

In [48]:
print("coefficients: ",lin_model.coefficients)
print('intercept: ',lin_model.intercept)

coefficients:  [-47.526892717811165,3694.23651229249,-47.23581347992821,-99.97850838426577,-108.99847753146935,-114.02608172920395,-140.08238301123225,-14.759205993893962,-86.35949251766314,-131.1207458524887,-116.12472487702583,-90.45145023908395,-91.22189181205908,-36.77962192881197,-201.50688069908944,-0.33586738933393506,-151.53315078316604,-8.670267447095629,-424.9127602566422]
intercept:  707.5812017346295


# model summary

In [49]:
# evaluating the model using regression metrices
model_summary = lin_model.summary
print('RMSE: ',model_summary.rootMeanSquaredError)
print('R2Score: ',model_summary.r2*100)

RMSE:  1389.449130294381
R2Score:  33.78847416580976


# prediction on test set

In [50]:
prediction = lin_model.transform(test) # predicting the sales

In [51]:
prediction.show()

+-----------------+--------------------+------------------+
|Item_Outlet_Sales|      scaledFeatures|        prediction|
+-----------------+--------------------+------------------+
|          34.6216|(19,[0,1,9,18],[0...|176.31795476902312|
|          37.9506|(19,[0,1,5,18],[0...| 280.1085876122519|
|          38.6164|(19,[0,1,5,18],[0...|281.14034210030024|
|           39.948|(19,[0,1,16,17],[...| 653.4846514848872|
|          41.9454|(19,[0,1,5,18],[0...| 311.3875148348544|
|          41.9454|(19,[0,1,7,17],[0...|  815.248104162974|
|          44.6086|(19,[0,1,4,17],[0...| 778.2025556270044|
|          45.2744|(19,[0,1,2,17],[0...| 846.0633204789615|
|          45.9402|(19,[0,1,3,18],[0...| 392.7785272716899|
|          47.9376|(19,[0,1,11,17],[...| 843.8649681193923|
|          48.6034|(19,[0,1,8,18],[0...| 449.7251304734849|
|          50.6008|(19,[0,1,3,18],[0...| 486.2420613306261|
|          57.2588|(19,[0,1,3,17],[0...|1008.0973465970196|
|          59.2562|(19,[0,1,3,17],[0...|

# evaluating test result for linear regression

In [52]:
# model evaluation for the test data
linear_model_evaluator = RegressionEvaluator(predictionCol='prediction',labelCol='Item_Outlet_Sales',metricName='r2')
print('rsquared_for_test_data: ',linear_model_evaluator.evaluate(prediction))

rsquared_for_test_data:  0.313935295035073
