In [None]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
!pip install -q findspark

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [None]:
df = spark.read.csv("/content/drive/My Drive/Colab Notebooks/superstore.csv", header=True, inferSchema=True)

In [None]:
# to check out the schema of the dataframe
df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)



In [None]:
# to display first 5 rows
df.show(5)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420| South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset 

In [None]:
# total number of rows in the dataframe
df.count()

9994

In [None]:
# to show specific columns of first 5 rows
df.select("Order ID","Order Date","Customer ID","Product Name").show(5)

+--------------+----------+-----------+--------------------+
|      Order ID|Order Date|Customer ID|        Product Name|
+--------------+----------+-----------+--------------------+
|CA-2016-152156| 11/8/2016|   CG-12520|Bush Somerset Col...|
|CA-2016-152156| 11/8/2016|   CG-12520|Hon Deluxe Fabric...|
|CA-2016-138688| 6/12/2016|   DV-13045|Self-Adhesive Add...|
|US-2015-108966|10/11/2015|   SO-20335|Bretford CR4500 S...|
|US-2015-108966|10/11/2015|   SO-20335|Eldon Fold 'N Rol...|
+--------------+----------+-----------+--------------------+
only showing top 5 rows



In [None]:
# describing the columns
df.describe().show()

+-------+------------------+--------------+----------+---------+--------------+-----------+------------------+-----------+-------------+--------+-------+------------------+-------+---------------+----------+------------+--------------------+------------------+------------------+------------------+------------------+
|summary|            Row ID|      Order ID|Order Date|Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|    City|  State|       Postal Code| Region|     Product ID|  Category|Sub-Category|        Product Name|             Sales|          Quantity|          Discount|            Profit|
+-------+------------------+--------------+----------+---------+--------------+-----------+------------------+-----------+-------------+--------+-------+------------------+-------+---------------+----------+------------+--------------------+------------------+------------------+------------------+------------------+
|  count|              9994|          9994|   

In [None]:
# to show distinct values
df.select("Ship Mode").distinct().show()

+--------------+
|     Ship Mode|
+--------------+
|   First Class|
|      Same Day|
|  Second Class|
|Standard Class|
+--------------+



In [None]:
# to show customers who are corporates and have ordered greater than 10 products
df_filtered = df.filter("Quantity > 10 AND Segment = 'Corporate'")
df_filtered.select("Customer ID", "Customer Name", "City").show(5)

+-----------+----------------+------------+
|Customer ID|   Customer Name|        City|
+-----------+----------------+------------+
|   JE-16165|  Justin Ellison|    Franklin|
|   DR-12880| Dan Reichenbach|     Chicago|
|   AG-10495| Andrew Gjertsen|Philadelphia|
|   JD-15895|Jonathan Doherty|  Belleville|
|   JD-15895|Jonathan Doherty|Philadelphia|
+-----------+----------------+------------+
only showing top 5 rows



In [None]:
# to find maximum profit in each city
from pyspark.sql.functions import max
df.groupBy("City").max("Profit").show()

+---------------+-----------+
|           City|max(Profit)|
+---------------+-----------+
|          Tyler|     4.5201|
|    Springfield|  2302.9671|
|        Edmonds|    311.652|
|          Tempe|     107.46|
|  Bowling Green|    74.9985|
|          Pasco|     121.99|
|         Auburn|   240.8595|
|North Las Vegas|  1644.2913|
|       Thornton|    89.5888|
|       Palatine|    23.2624|
|        Phoenix|   211.4955|
|     Plainfield|   621.9744|
|  Lake Elsinore|     17.745|
|     Georgetown|   195.9944|
|      Bethlehem|      0.711|
|         Wilson|     20.392|
|      Hollywood|     38.396|
|         Monroe|    217.767|
|       Woodland|    23.9984|
| Pembroke Pines|     50.396|
+---------------+-----------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [StringIndexer(inputCol="Ship Mode", outputCol="Ship Mode Index") , StringIndexer(inputCol="Customer ID", outputCol="Customer ID Index"), 
            StringIndexer(inputCol="Customer Name", outputCol="Customer Name Index"), StringIndexer(inputCol="Segment", outputCol="Segment Index"),
            StringIndexer(inputCol="City", outputCol="City Index"), StringIndexer(inputCol="State", outputCol="State Index"),
            StringIndexer(inputCol="Region", outputCol="Region Index"), StringIndexer(inputCol="Category", outputCol="Category Index"),
            StringIndexer(inputCol="Sales", outputCol="Sales Index"), StringIndexer(inputCol="Quantity", outputCol="Quantity Index"),
            StringIndexer(inputCol="Discount", outputCol="Discount Index")]


pipeline = Pipeline(stages=indexers)
new_df = pipeline.fit(df).transform(df)

new_df.show()

+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+---------------+-----------------+-------------------+-------------+----------+-----------+------------+--------------+-----------+--------------+--------------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|Ship Mode Index|Customer ID Index|Customer Name Index|Segment Index|City Index|State Index|Region Index|Category Index|Sales Index|Quantity Index|Discount Index|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+---

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ["Ship Mode Index", "Customer ID Index", "Customer Name Index", "Segment Index", "City Index", 
                                               "State Index", "Region Index", "Category Index", "Sales Index", "Quantity Index", "Discount Index"], 
                                                outputCol = 'Features')
dataset = vectorAssembler.transform(new_df)
dataset = dataset.select(['Features', 'Profit'])
dataset.show(3)

+--------------------+-------+
|            Features| Profit|
+--------------------+-------+
|[1.0,717.0,717.0,...|41.9136|
|[1.0,717.0,717.0,...|219.582|
|[1.0,539.0,537.0,...| 6.8714|
+--------------------+-------+
only showing top 3 rows



In [None]:
splits = dataset.randomSplit([0.8, 0.2])
train_set = splits[0]
test_set = splits[1]

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = "Features", labelCol='Profit', maxIter=20, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_set)

print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [2.6265463400705684,-0.0023543838926686003,-0.012959842823215162,2.8487541078506586,-0.03631123910503697,1.3060903694874213,-4.360683880100757,24.09477271819643,0.004428120241835511,1.016687829255581,-13.220149143492911]
Intercept: 18.474273434345115
RMSE: 219.498389
r2: 0.032648


In [None]:
train_set.describe().show()

+-------+------------------+
|summary|            Profit|
+-------+------------------+
|  count|              7991|
|   mean| 29.31127337004122|
| stddev|223.18568640586852|
|    min|         -6599.978|
|    max|         6719.9808|
+-------+------------------+



In [None]:
predictions = lr_model.transform(test_set)
predictions.select("Prediction","Profit","Features").show()

+------------------+-------+--------------------+
|        Prediction| Profit|            Features|
+------------------+-------+--------------------+
|25.140233471309653| 68.976|(11,[0,1,2,3,4,8]...|
|26.329826869407192|11.2308|(11,[0,1,2,3,4,8]...|
|22.515606964257273| 6.2208|(11,[0,1,2,3,4,9]...|
| 28.03514886435923|  0.864|(11,[0,1,2,4,5,8]...|
|  26.8731912649859| 9.6192|(11,[0,1,2,4,5,8]...|
| 19.12880113056666| 4.8588|(11,[0,1,2,4,5,8]...|
|23.224285503284847| 2.2518|(11,[0,1,2,4,5,8]...|
|42.105753019950626|11.1564|(11,[0,1,2,4,7,8]...|
| 46.16811477257488| 2.3232|(11,[0,1,2,4,7,8]...|
|  60.4334029879711|15.4872|(11,[0,1,2,4,7,8]...|
|34.648756743859934|10.4148|(11,[0,1,2,4,7,8]...|
| 21.16843217050135|10.3071|(11,[0,1,2,4,8],[...|
| 20.61792867454396| 0.6258|(11,[0,1,2,4,8],[...|
|20.496470575907313|23.9688|(11,[0,1,2,4,8],[...|
|17.744708754519067| 1.5288|(11,[0,1,2,4,8],[...|
|24.257297953390108| 1.7901|(11,[0,1,2,4,8],[...|
| 16.55915550078182| 9.3312|(11,[0,1,2,4,8],[...|
