# Fish Weight Prediction

##### With a dataset of fish species, with some of it characteristic like it vertical, diagonal, length, height, and width. We will try to predict the weight of the fish based on their characteristic. We will use Linear Regression Method to see whether the weight of the fish related to their characteristic.

1. **Species**: Species name of fish
2. **Weight**: Weight of fish in gram
3. **Length1**: Vertical length in cm
4. **Length2**: Diagonal length in cm
5. **Length3**: Cross length in cm
6. **Height**: Height in cm
7. **Width**: Diagonal width in cm

In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables/")

[FileInfo(path='dbfs:/FileStore/tables/Admission_Chance.csv', name='Admission_Chance.csv', size=12905, modificationTime=1720190058000),
 FileInfo(path='dbfs:/FileStore/tables/Cancer.csv', name='Cancer.csv', size=125204, modificationTime=1720190099000),
 FileInfo(path='dbfs:/FileStore/tables/Credit_Default.csv', name='Credit_Default.csv', size=101152, modificationTime=1720190106000),
 FileInfo(path='dbfs:/FileStore/tables/Customer_Purchase.csv', name='Customer_Purchase.csv', size=1489, modificationTime=1720190113000),
 FileInfo(path='dbfs:/FileStore/tables/Fish.csv', name='Fish.csv', size=6349, modificationTime=1720190119000),
 FileInfo(path='dbfs:/FileStore/tables/Ice_Cream.csv', name='Ice_Cream.csv', size=4872, modificationTime=1720190124000),
 FileInfo(path='dbfs:/FileStore/tables/Test1.csv', name='Test1.csv', size=108, modificationTime=1720158698000),
 FileInfo(path='dbfs:/FileStore/tables/Test2.csv', name='Test2.csv', size=192, modificationTime=1720158698000),
 FileInfo(path='dbfs:

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
spark = SparkSession.builder.appName('Fish Weight Prediction').getOrCreate() 

In [0]:
spark

In [0]:
df_pyspark = spark.read.csv('dbfs:/FileStore/tables/Fish.csv',header=True,inferSchema=True)

In [0]:
df_pyspark.printSchema()

root
 |-- Category: integer (nullable = true)
 |-- Species: string (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Width: double (nullable = true)
 |-- Length1: double (nullable = true)
 |-- Length2: double (nullable = true)
 |-- Length3: double (nullable = true)



In [0]:
df_pyspark

DataFrame[Category: int, Species: string, Weight: double, Height: double, Width: double, Length1: double, Length2: double, Length3: double]

In [0]:
df_pyspark.show()

+--------+-------+------+-------+------+-------+-------+-------+
|Category|Species|Weight| Height| Width|Length1|Length2|Length3|
+--------+-------+------+-------+------+-------+-------+-------+
|       1|  Bream| 242.0|  11.52|  4.02|   23.2|   25.4|   30.0|
|       1|  Bream| 290.0|  12.48|4.3056|   24.0|   26.3|   31.2|
|       1|  Bream| 340.0|12.3778|4.6961|   23.9|   26.5|   31.1|
|       1|  Bream| 363.0|  12.73|4.4555|   26.3|   29.0|   33.5|
|       1|  Bream| 430.0| 12.444| 5.134|   26.5|   29.0|   34.0|
|       1|  Bream| 450.0|13.6024|4.9274|   26.8|   29.7|   34.7|
|       1|  Bream| 500.0|14.1795|5.2785|   26.8|   29.7|   34.5|
|       1|  Bream| 390.0|  12.67|  4.69|   27.6|   30.0|   35.0|
|       1|  Bream| 450.0|14.0049|4.8438|   27.6|   30.0|   35.1|
|       1|  Bream| 500.0|14.2266|4.9594|   28.5|   30.7|   36.2|
|       1|  Bream| 475.0|14.2628|5.1042|   28.4|   31.0|   36.2|
|       1|  Bream| 500.0|14.3714|4.8146|   28.7|   31.0|   36.2|
|       1|  Bream| 500.0|

# 1. Clean the DataFrame

In [0]:
# Handle missing values if necessary
df_pyspark = df_pyspark.na.drop()

In [0]:
df_pyspark.columns

['Category',
 'Species',
 'Weight',
 'Height',
 'Width',
 'Length1',
 'Length2',
 'Length3']

In [0]:
df_pyspark = df_pyspark.select('Category','Weight','Height','Width','Length1','Length2','Length3')

In [0]:
df_pyspark.show()

+--------+------+-------+------+-------+-------+-------+
|Category|Weight| Height| Width|Length1|Length2|Length3|
+--------+------+-------+------+-------+-------+-------+
|       1| 242.0|  11.52|  4.02|   23.2|   25.4|   30.0|
|       1| 290.0|  12.48|4.3056|   24.0|   26.3|   31.2|
|       1| 340.0|12.3778|4.6961|   23.9|   26.5|   31.1|
|       1| 363.0|  12.73|4.4555|   26.3|   29.0|   33.5|
|       1| 430.0| 12.444| 5.134|   26.5|   29.0|   34.0|
|       1| 450.0|13.6024|4.9274|   26.8|   29.7|   34.7|
|       1| 500.0|14.1795|5.2785|   26.8|   29.7|   34.5|
|       1| 390.0|  12.67|  4.69|   27.6|   30.0|   35.0|
|       1| 450.0|14.0049|4.8438|   27.6|   30.0|   35.1|
|       1| 500.0|14.2266|4.9594|   28.5|   30.7|   36.2|
|       1| 475.0|14.2628|5.1042|   28.4|   31.0|   36.2|
|       1| 500.0|14.3714|4.8146|   28.7|   31.0|   36.2|
|       1| 500.0|13.7592| 4.368|   29.1|   31.5|   36.4|
|       1| 340.0|13.9129|5.0728|   29.5|   32.0|   37.3|
|       1| 600.0|14.9544|5.1708

In [0]:
df_pyspark.printSchema()

root
 |-- Category: integer (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Width: double (nullable = true)
 |-- Length1: double (nullable = true)
 |-- Length2: double (nullable = true)
 |-- Length3: double (nullable = true)



# Prepare the DataFrame

In [0]:
# Define the feature columns
feature_columns = ['Category','Height','Width','Length1','Length2','Length3']

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_pyspark = assembler.transform(df_pyspark)

# Select only the features and target column
df_pyspark = df_pyspark.select("features", "Weight")

In [0]:
df_pyspark.show()

+--------------------+------+
|            features|Weight|
+--------------------+------+
|[1.0,11.52,4.02,2...| 242.0|
|[1.0,12.48,4.3056...| 290.0|
|[1.0,12.3778,4.69...| 340.0|
|[1.0,12.73,4.4555...| 363.0|
|[1.0,12.444,5.134...| 430.0|
|[1.0,13.6024,4.92...| 450.0|
|[1.0,14.1795,5.27...| 500.0|
|[1.0,12.67,4.69,2...| 390.0|
|[1.0,14.0049,4.84...| 450.0|
|[1.0,14.2266,4.95...| 500.0|
|[1.0,14.2628,5.10...| 475.0|
|[1.0,14.3714,4.81...| 500.0|
|[1.0,13.7592,4.36...| 500.0|
|[1.0,13.9129,5.07...| 340.0|
|[1.0,14.9544,5.17...| 600.0|
|[1.0,15.438,5.58,...| 600.0|
|[1.0,14.8604,5.28...| 700.0|
|[1.0,14.938,5.197...| 700.0|
|[1.0,15.633,5.133...| 610.0|
|[1.0,14.4738,5.72...| 650.0|
+--------------------+------+
only showing top 20 rows



# 3. Split the DataFrame

In [0]:
# Split the data into training and testing sets
train_data, test_data = df_pyspark.randomSplit([0.8, 0.2], seed=42)

# 4. Train the Model

In [0]:
# Initialize the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="Weight")

# Fit the model on the training data
lr_model = lr.fit(train_data)

# 5. Evaluate the Model

In [0]:
# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Evaluate the model using RMSE
rmse_evaluator = RegressionEvaluator(labelCol="Weight", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

# Evaluate the model using MAE
mae_evaluator = RegressionEvaluator(labelCol="Weight", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)

# Evaluate the model using MSE
mse_evaluator = RegressionEvaluator(labelCol="Weight", predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)

# Calculate MAPE (Mean Absolute Percentage Error)
predictions = predictions.withColumn("absolute_error", abs(col("prediction") - col("Weight")))
predictions = predictions.withColumn("percentage_error", col("absolute_error") / col("Weight"))
mape = predictions.selectExpr("mean(percentage_error) as MAPE").collect()[0]["MAPE"] * 100

# Print the coefficients and intercept for linear regression
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")

# Print the evaluation metrics
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"Mean Squared Error (MSE) on test data: {mse}")
print(f"Mean Absolute Percentage Error (MAPE) on test data: {mape}%")




Coefficients: [35.49218657758219,60.205693552170466,2.104162706107436,87.64007047629201,-6.086893869639924,-56.481856043755876]
Intercept: -640.5095040591044
Root Mean Squared Error (RMSE) on test data: 192.96307170017775
Mean Absolute Error (MAE) on test data: 125.10917522565474
Mean Squared Error (MSE) on test data: 37234.747039967944
Mean Absolute Percentage Error (MAPE) on test data: 267.5630248285055%


In [0]:
# Show some sample predictions
predictions.select("prediction", "Weight", "features").show(5)

+-----------------+------+--------------------+
|       prediction|Weight|            features|
+-----------------+------+--------------------+
|380.5439443293234| 430.0|[1.0,12.444,5.134...|
|432.3453959285865| 450.0|[1.0,13.6024,4.92...|
|527.1207177060547| 340.0|[1.0,13.9129,5.07...|
|552.2866125257455| 500.0|[1.0,14.3714,4.81...|
|616.8918308237536| 600.0|[1.0,15.438,5.58,...|
+-----------------+------+--------------------+
only showing top 5 rows



In [0]:
# Save the trained logistic regression model
model_path = "./Internship_Sem-6_models/Fish_Weight_Prediction_model"
lr_model.save(model_path)

In [0]:
dbutils.fs.ls("dbfs:/Internship_Sem-6_models/Fish_Weight_Prediction_model")

[FileInfo(path='dbfs:/Internship_Sem-6_models/Fish_Weight_Prediction_model/data/', name='data/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Internship_Sem-6_models/Fish_Weight_Prediction_model/metadata/', name='metadata/', size=0, modificationTime=0)]