## Import necessary packages

In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

## Checking the versions

In [2]:
print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

[('numpy', '1.20.3'), ('pandas', '1.3.4'), ('pyspark', '3.2.1')]


## Session Building

In [3]:
spark = pyspark.sql.SparkSession.builder.appName('Profit').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")



In [4]:
df = spark.read.csv('online.csv',header=True,inferSchema=True)
print(df.count())
df.show()

50
+---------------+--------------+---------+-------+---------+
|Marketing Spend|Administration|Transport|   Area|   Profit|
+---------------+--------------+---------+-------+---------+
|      114523.61|      136897.8| 471784.1|  Dhaka|192261.83|
|       162597.7|     151377.59|443898.53|    Ctg|191792.06|
|      153441.51|     101145.55|407934.54|Rangpur|191050.39|
|      144372.41|     118671.85|383199.62|  Dhaka|182901.99|
|      142107.34|      91391.77|366168.42|Rangpur|166187.94|
|       131876.9|      99814.71|362861.36|  Dhaka|156991.12|
|      134615.46|     147198.87|127716.82|    Ctg|156122.51|
|      130298.13|     145530.06|323876.68|Rangpur| 155752.6|
|      120542.52|     148718.95|311613.29|  Dhaka|152211.77|
|      123334.88|     108679.17|304981.62|    Ctg|149759.96|
|      101913.08|     110594.11|229160.95|Rangpur|146121.95|
|      100671.96|      91790.61|249744.55|    Ctg| 144259.4|
|       93863.75|     127320.38|249839.44|Rangpur|141585.52|
|       91992.39|    

In [5]:
df.printSchema()

root
 |-- Marketing Spend: double (nullable = true)
 |-- Administration: double (nullable = true)
 |-- Transport: double (nullable = true)
 |-- Area: string (nullable = true)
 |-- Profit: double (nullable = true)



In [6]:
print(df.columns)

['Marketing Spend', 'Administration', 'Transport', 'Area', 'Profit']


In [7]:
df.describe().toPandas().transpose()


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Marketing Spend,50,73721.61559999999,45902.25648230754,0.0,165349.2
Administration,50,121344.63959999995,28017.802755488683,51283.14,182645.56
Transport,50,211025.09780000005,122290.31072584528,0.0,471784.1
Area,50,,,Ctg,Rangpur
Profit,50,112012.63920000002,40306.180337650534,14681.4,192261.83


In [8]:
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation between Profit and ", i, df.stat.corr('Profit',i))

Correlation between Profit and  Marketing Spend 0.9379477570676916
Correlation between Profit and  Administration 0.20071656826872136
Correlation between Profit and  Transport 0.7477657217414768
Correlation between Profit and  Profit 1.0


In [9]:
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+---------------+--------------+---------+----+------+
|Marketing Spend|Administration|Transport|Area|Profit|
+---------------+--------------+---------+----+------+
|              0|             0|        0|   0|     0|
+---------------+--------------+---------+----+------+



In [10]:
df.select('Area').distinct().collect()


[Row(Area='Ctg'), Row(Area='Rangpur'), Row(Area='Dhaka')]

In [11]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)

In [12]:
Area_indexer = StringIndexer(inputCol='Area', outputCol='Area_index')

Area_encoder = OneHotEncoder(inputCol='Area_index', outputCol='Area_vec')

In [13]:
Area_encoder

OneHotEncoder_6a614c9b6ef7

In [14]:
assembler = VectorAssembler(inputCols=['Marketing Spend','Administration','Transport',
                                      'Area_vec'],
                           outputCol='features')

In [15]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

## Linear Regression

In [16]:
lr = LinearRegression(featuresCol='features',labelCol='Profit')

In [17]:
pipeline = Pipeline(stages=[Area_indexer,
                            Area_encoder,
                           
                           assembler, lr])

In [18]:
train, test = df.randomSplit([0.7, 0.3])

In [19]:
lr_model = pipeline.fit(train)

In [20]:
results = lr_model.transform(test)

In [21]:
train.show(5)

+---------------+--------------+---------+-------+--------+
|Marketing Spend|Administration|Transport|   Area|  Profit|
+---------------+--------------+---------+-------+--------+
|            0.0|      116983.8| 45173.06|    Ctg| 14681.4|
|            0.0|     135426.92|      0.0|    Ctg|42559.73|
|         542.05|      51743.15|      0.0|  Dhaka|35673.41|
|        1000.23|     124153.04|  1903.93|  Dhaka|64926.08|
|        1315.46|     115816.21|297114.46|Rangpur|49490.75|
+---------------+--------------+---------+-------+--------+
only showing top 5 rows



In [22]:
results

DataFrame[Marketing Spend: double, Administration: double, Transport: double, Area: string, Profit: double, Area_index: double, Area_vec: vector, features: vector, prediction: double]

In [23]:
results.select('Profit','prediction').show()

+---------+------------------+
|   Profit|        prediction|
+---------+------------------+
| 81229.06| 69202.64314068602|
| 78239.91| 77514.97873436662|
|101004.64|101586.38803344106|
|108733.99|111895.54408511499|
|118474.03|118636.56181053547|
|126992.93|118121.75581534335|
|141585.52| 128095.9461668768|
|132602.65|148703.65854959108|
|156991.12|161088.61177088245|
|166187.94|166993.93465618376|
|191050.39|178315.41442285356|
+---------+------------------+



In [24]:
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
my_eval = RegressionEvaluator(predictionCol='prediction',labelCol='Profit',metricName='r2')

In [26]:
auc = my_eval.evaluate(results)
auc

0.9306698600406349

## Random Forest

In [27]:
rf = RandomForestRegressor(predictionCol='prediction',labelCol='Profit',numTrees=20, maxDepth=20)

In [28]:
pipeline = Pipeline(stages=[Area_indexer,
                            Area_encoder,
                           
                           assembler, rf])

In [29]:
rf_model = pipeline.fit(train)

In [30]:
results = rf_model.transform(test)

In [31]:
results.select('Profit','prediction').show()

+---------+------------------+
|   Profit|        prediction|
+---------+------------------+
| 81229.06|       80451.70475|
| 78239.91| 89826.33575000001|
|101004.64|104113.49979166672|
|108733.99|121371.97999999998|
|118474.03|        114554.171|
|126992.93|       123922.1105|
|141585.52|137809.74275000003|
|132602.65|143069.72425000003|
|156991.12|       169497.2744|
|166187.94|       165204.4315|
|191050.39|       165204.4315|
+---------+------------------+



In [32]:
s

0.896356894648003