**Big Data Tools - Final Task**

github: riifo

(codes mostly from Moodle code examples of lecture6)

dataset: https://www.kaggle.com/datasets/prajwal6362venom/choclate-sales-project

**Logistic and Linear Regression**

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=bf8730d1d12e75230228faa7fa12db81faacc6646b0c25e6af2f4eaa8ee3ae6c
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Classification").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [3]:
# Read in functions we will need
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

In [4]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [6]:
data = spark.read.csv('/content/drive/My Drive/chocolate.csv', inferSchema=True, header=True)

In [7]:
data.printSchema()

root
 |-- Sales Person: string (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- cost per unit: integer (nullable = true)
 |-- Cost: string (nullable = true)
 |-- Profit: string (nullable = true)
 |-- profit %: string (nullable = true)



In [10]:
data.groupBy("Amount").count().show(100)

+--------+-----+
|  Amount|count|
+--------+-----+
| $7,189 |    1|
|    $42 |    1|
| $4,858 |    1|
| $8,155 |    1|
|   $182 |    1|
| $9,051 |    1|
| $8,862 |    1|
| $9,632 |    1|
|$10,129 |    1|
|   $819 |    2|
| $1,778 |    1|
| $7,483 |    1|
| $1,302 |    1|
|   $861 |    1|
| $6,657 |    2|
|   $497 |    1|
|   $238 |    1|
| $6,608 |    1|
| $5,551 |    1|
|$10,073 |    1|
| $6,300 |    1|
| $2,114 |    2|
| $1,085 |    1|
| $6,909 |    1|
| $5,474 |    1|
| $9,660 |    1|
| $4,760 |    1|
| $4,417 |    1|
|   $154 |    1|
| $7,812 |    1|
| $2,471 |    1|
|   $854 |    1|
| $1,561 |    1|
| $5,306 |    1|
| $6,734 |    1|
| $1,400 |    1|
| $5,775 |    1|
| $9,443 |    1|
| $1,701 |    1|
| $3,402 |    1|
| $4,438 |    1|
| $1,274 |    1|
|   $189 |    1|
| $4,326 |    1|
| $1,638 |    2|
| $6,986 |    1|
| $2,919 |    2|
| $5,439 |    1|
| $2,100 |    1|
| $3,262 |    1|
| $7,777 |    2|
|   $357 |    1|
|$15,610 |    1|
| $6,118 |    2|
| $5,194 |    1|
| $6,860 |    

In [11]:
# Declare values you will need
# Collect the column names as a list
input_columns = data.columns 
print(input_columns)

['Sales Person', 'Geography', 'Product', 'Amount', 'Units', 'cost per unit', 'Cost', 'Profit', 'profit %']


In [12]:
# keep only the indepent variables:
input_columns = input_columns[1:-1]

In [13]:
print(input_columns)

['Geography', 'Product', 'Amount', 'Units', 'cost per unit', 'Cost', 'Profit']


In [14]:
dependent_var = 'Amount'

In [15]:
# change label (class variable) to string type to prep for reindexing
# Pyspark is expecting a zero indexed integer for the label column. 
# Just in case our data is not in that format... we will treat it by using the StringIndexer built in method
#Rename and change to string type
renamed = data.withColumn("label_str", data[dependent_var].cast(StringType()))
renamed

DataFrame[Sales Person: string, Geography: string, Product: string, Amount: string, Units: int, cost per unit: int, Cost: string, Profit: string, profit %: string, label_str: string]

In [16]:
#Pyspark is expecting the this naming convention 
indexer = StringIndexer(inputCol="label_str", outputCol="label")
indexed = indexer.fit(renamed).transform(renamed)

In [17]:
data.schema['Cost'].dataType

StringType()

In [18]:
# Convert all string type data in the input column list to numeric
# Otherwise the Algorithm will not be able to process it

# Also we will use these lists later on
numeric_inputs = []
string_inputs = []
for column in input_columns:
    # First identify the string vars in your input column list
    if str(indexed.schema[column].dataType) == 'StringType()':
        # Set up your String Indexer function
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
        # Then call on the indexer you created here
        indexed = indexer.fit(indexed).transform(indexed)
        # Rename the column to a new name so you can disinguish it from the original
        new_col_name = column+"_num"
        # Add the new column name to the string inputs list
        string_inputs.append(new_col_name)
    else:
        # If no change was needed, take no action 
        # And add the numeric var to the num list
        numeric_inputs.append(column)

In [19]:
print(string_inputs)

['Geography_num', 'Product_num', 'Amount_num', 'Cost_num', 'Profit_num']


In [20]:
print(numeric_inputs)

['Units', 'cost per unit']


In [21]:
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [22]:
output

DataFrame[features: vector, label: double]

In [23]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

Features scaled to range: [0.000000, 1000.000000]


In [24]:
scalerModel = scaler.fit(output)

In [25]:
# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')

In [26]:
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 45.0|[217.142857142857...|
|210.0|[874.285714285714...|
| 27.0|[280.0,642.857142...|
|259.0|[548.571428571428...|
| 75.0|[788.571428571428...|
|249.0|[822.857142857142...|
| 95.0|[102.857142857142...|
|174.0|[400.0,500.0,400....|
|  3.0|[142.857142857142...|
| 18.0|[22.8571428571428...|
| 49.0|[880.0,714.285714...|
|142.0|[274.285714285714...|
| 14.0|[228.571428571428...|
|112.0|[102.857142857142...|
| 89.0|[445.714285714285...|
|  9.0|[125.714285714285...|
| 24.0|[165.714285714285...|
| 67.0|[645.714285714285...|
|143.0|[274.285714285714...|
|257.0|[308.571428571428...|
+-----+--------------------+
only showing top 20 rows



In [27]:
train,test = final_data.randomSplit([0.7,0.3])

In [28]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Logistic Regression

In [29]:
classifier = LogisticRegression()
fitModel = classifier.fit(train)

In [30]:
predictions = fitModel.transform(test)

In [31]:
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction"
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy: {0:.2f}".format(accuracy),"%") #     print("Test Error = %g " % (1.0 - accuracy))
print(" ")

Accuracy: 0.00 %
 


In [32]:
# Load the Summary
trainingSummary = fitModel.summary

# General Describe
trainingSummary.predictions.describe().show()

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print(" ")
print("objectiveHistory: (scaled loss + regularization) at each iteration")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print(" ")
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print(" ")
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print(" ")
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

# Generate confusion matrix and print (includes accuracy)
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(" ")
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

+-------+------------------+------------------+
|summary|             label|        prediction|
+-------+------------------+------------------+
|  count|               227|               227|
|   mean|118.56387665198238|118.56387665198238|
| stddev| 81.76839252891959| 81.76839252891959|
|    min|               0.0|               0.0|
|    max|             267.0|             267.0|
+-------+------------------+------------------+

 
objectiveHistory: (scaled loss + regularization) at each iteration
5.439962985866309
4.584053186814962
1.8805450641702472
1.871564339930548
1.2834006851321165
0.9366156260906833
0.7017660301026495
0.4912355625924652
0.3747065150769765
0.2684919976058607
0.1987975774716919
0.11831723399274868
0.09985150396438586
0.055468492031782834
0.040769780990268886
0.02997331338214479
0.019002879072964954
0.010900675434447558
0.007633674863727023
0.006786598886408559
0.0033108039418502985
0.0018501126243534473
0.001153744213846479
0.0005358302500121422
0.00030702051786440

Linear Regression

In [33]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [40]:
data = spark.read.csv('/content/drive/My Drive/chocolate.csv', inferSchema=True, header=True)

In [44]:
input_columns = data.columns 
print(input_columns)

['Sales Person', 'Geography', 'Product', 'Amount', 'Units', 'cost per unit', 'Cost', 'Profit', 'profit %']


In [45]:
input_columns = input_columns[1:-1]

In [46]:
dependent_var = 'Amount'

In [47]:
renamed = data.withColumn("label_str", data[dependent_var].cast(StringType()))
renamed

DataFrame[Sales Person: string, Geography: string, Product: string, Amount: string, Units: int, cost per unit: int, Cost: string, Profit: string, profit %: string, label_str: string]

In [48]:
indexer = StringIndexer(inputCol="label_str", outputCol="label")
indexed = indexer.fit(renamed).transform(renamed)

In [49]:
# Convert all string type data in the input column list to numeric
# Otherwise the Algorithm will not be able to process it

# Also we will use these lists later on
numeric_inputs = []
string_inputs = []
for column in input_columns:
    # First identify the string vars in your input column list
    if str(indexed.schema[column].dataType) == 'StringType()':
        # Set up your String Indexer function
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
        # Then call on the indexer you created here
        indexed = indexer.fit(indexed).transform(indexed)
        # Rename the column to a new name so you can disinguish it from the original
        new_col_name = column+"_num"
        # Add the new column name to the string inputs list
        string_inputs.append(new_col_name)
    else:
        # If no change was needed, take no action 
        # And add the numeric var to the num list
        numeric_inputs.append(column)

In [50]:
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [51]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

Features scaled to range: [0.000000, 1000.000000]


In [52]:
scalerModel = scaler.fit(output)

In [53]:
# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')

In [54]:
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 45.0|[217.142857142857...|
|210.0|[874.285714285714...|
| 27.0|[280.0,642.857142...|
|259.0|[548.571428571428...|
| 75.0|[788.571428571428...|
|249.0|[822.857142857142...|
| 95.0|[102.857142857142...|
|174.0|[400.0,500.0,400....|
|  3.0|[142.857142857142...|
| 18.0|[22.8571428571428...|
| 49.0|[880.0,714.285714...|
|142.0|[274.285714285714...|
| 14.0|[228.571428571428...|
|112.0|[102.857142857142...|
| 89.0|[445.714285714285...|
|  9.0|[125.714285714285...|
| 24.0|[165.714285714285...|
| 67.0|[645.714285714285...|
|143.0|[274.285714285714...|
|257.0|[308.571428571428...|
+-----+--------------------+
only showing top 20 rows



In [55]:
train,test = final_data.randomSplit([0.7,0.3])

In [41]:
# Split the data into training and test sets
#(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [56]:
# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")

In [58]:
# Fit the model to the training data
lrModel = lr.fit(train)

In [59]:
# Use the model to make predictions on the test data
predictions = lrModel.transform(test)

In [60]:
# Evaluate the performance of the model using Root Mean Square Error (RMSE)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [61]:
print("Root Mean Square Error (RMSE) on test data = %g" % rmse)

Root Mean Square Error (RMSE) on test data = 2.07945e-13
