In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Abalone')\
.config("spark.master", "local")\
.config("spark.hadoop.fs.defaultFS", "file:///")\
.getOrCreate()
spark

In [2]:
# df = spark.read.csv('Original/abalone.data', inferSchema=True)
# df = df.toDF(*['Sex', 
#                'Length', 'Diameter', 'Height', 
#                'Whole weight', 'Shucked weight', 'Viscera weight', 'Shell weight', 
#                'Rings'])
# df.show()

In [3]:
df = spark.read.csv('train.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

In [5]:
df.show()

## Rename Columns

In [6]:
df = df.withColumnsRenamed({'Whole weight.1': 'Shucked weight',
                            'Whole weight.2': 'Viscera weight'})

In [7]:
df.columns

## Check Missing or Empty Values

In [8]:
from pyspark.sql.functions import col, trim
from functools import reduce


# first way of doing it
condition = reduce(
    lambda a, b: a | b,
    [(col(c).isNull()) | (trim(col(c)) == '') if dict(df.dtypes)[c] == 'string'
     else (col(c).isNull()) if c == 'id'
     else (col(c).isNull()) | (col(c) == 0)
     for c in df.columns])

condition

In [9]:
# second way of doing it but cannor work on complex query (with different data types)
" OR ".join(
    [f"({c} IS NULL OR TRIM({c}) = '')"  if dict(df.dtypes)[c] == 'string'
     else f"({c} IS NULL OR {c} = 0)"
     for c in df.columns])

In [10]:
df.filter(condition).show()

In [11]:
df = df.filter(~condition)

## Exploration

In [12]:
df.groupBy('Sex').count().show() # Consider evenly distributed

In [13]:
from pyspark.sql.functions import min, max

df.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()

In [14]:
cols_name = [col for col in df.columns if dict(df.dtypes)[col] == 'double']

df.select(cols_name).describe().toPandas().set_index('summary').transpose()

In [15]:
import pandas as pd

quartiles = {}
for col in cols_name:
    quartiles[col] = df.approxQuantile(col, [0.0, 0.25, 0.5, 0.75, 1.0], 0)
    
quartiles_df = pd.DataFrame(quartiles, index=["Min", "25%", "50%", "75%", "Max"])
quartiles_df.rename(columns={"index": "Column"}, inplace=True)
quartiles_df

- Length: There is outliers with small values (based on quartiles). The std is not consider large. \
- Diameter: **Similar to Length**. The std is consider small. \
- Height: There is outliers with big and small values. The std is consider small. \
- Whole Weight: **Similar to Height**. The std is consider in normal range given the difference between 25%-50%-75%. \
- Shucked Weight: **Similar to Whole Weight**. *(Even for std)* \
- Viscera Weight: There is outliers with small values. *Maximum can also be consider as outliers??* The std is large which can be due to Min and Max.\
- Shell WeightL **Similar to Whole Weight**. *(Even for std)*

## Visualisation

In [16]:
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns

import numpy as np

In [17]:
# Plotly doesn't support pyspark
pandas_df = df.toPandas()

In [18]:
pandas_df.dtypes

### Boxplot

In [19]:
fig, axs = plt.subplots(4, 3, figsize=(15, 20))
axs = axs.flatten()

for index, col in enumerate(cols_name):
    sns.boxplot(x = 'Sex', y = col, data = pandas_df, ax=axs[index])
    
    axs[index].set_title(f'Boxplot for {col} By Sex', fontdict={"fontsize":16}, pad=10)
    axs[index].set_xlabel('Sex', fontsize=16)
    axs[index].set_ylabel(f'{col}', fontsize=16)
    axs[index].tick_params(axis='both', which='major', labelsize=12)
    
for ax in axs[len(cols_name):]:
    fig.delaxes(ax)
    
plt.tight_layout(pad=3.0)
    
plt.show()

### Histogram

In [20]:
fig, axs = plt.subplots(4, 3, figsize=(15, 20))
axs = axs.flatten()

colors = ['red', 'green', 'blue']

for index, col in enumerate(cols_name):
    
    data_list = []
    for sex in list(pandas_df['Sex'].unique()):
        
        data = pandas_df[pandas_df["Sex"] == sex][col]
        data_list.append(data)
        
    bins = np.arange(0.0, pandas_df[col].max(), 0.1)
    
    axs[index].hist(data_list, bins=bins, density=True, histtype='step', 
                    stacked=False, color=colors, label=list(pandas_df['Sex'].unique()),
                    alpha=0.5)
    
    axs[index].set_title(f'Stacked Histogram for {col}')
    axs[index].set_xlabel(col)
    axs[index].set_ylabel('Density')
    axs[index].legend()
    
for ax in axs[len(cols_name):]:
        fig.delaxes(ax)

# Adjust layout
plt.tight_layout(pad=3.0)

Based on the **Boxplot** and **Histogram**, there is not clear different between Male and Female Abalone. Assuming there is no clear difference between Male and Female in Abalone, unlike other animals in the nature. Infant does have noticeable difference which make sense because they are still small.

### Hexbin

In [21]:
fig, axs = plt.subplots(4, 3, figsize=(15, 20))
axs = axs.flatten()

colors = ['red', 'green', 'blue']

for index, col in enumerate(cols_name):
    
    axs[index].hexbin(pandas_df['Rings'], pandas_df[col], gridsize = 30, bins = 'log', cmap = 'BuPu')
    
    axs[index].set_title(f'Hexbin Plot for {col}')
    axs[index].set_xlabel('Rings')
    axs[index].set_ylabel(col)
    
for ax in axs[len(cols_name):]:
        fig.delaxes(ax)

# Adjust layout
plt.tight_layout(pad=3.0)

### Pair Plot

In [22]:
# Execute too long
# plot_features = cols_name + ['Sex', 'Rings']
# sns.pairplot(pandas_df[plot_features], hue = 'Sex')

### Scatter Plot

In [23]:
fig, axs = plt.subplots(4, 3, figsize=(15, 20))
axs = axs.flatten()

for index, col in enumerate(cols_name):
    
    sns.scatterplot(x = 'Rings', y = col, data = pandas_df, hue = 'Sex', ax=axs[index])
    
    axs[index].set_title(f'Scatter Plot for Rings vs {col}')
    axs[index].set_xlabel('Rings')
    axs[index].set_ylabel(col)
    axs[index].legend()
    
for ax in axs[len(cols_name):]:
        fig.delaxes(ax)

# Adjust layout
plt.tight_layout(pad=3.0)

The dimensions (**Length, Diameter, Height**) of Abalone grow logarithmically with Rings. The dimensions doesn't change that much as they grow older. But all **weights** grow exponentially with Rings. While the dimensions doesn't change, their weight grows as they grow older. Assuming the shell (dimensions) doesn't change that much, but the meat become heavier (shell muscle grows bigger). 

### 3D Scatter Plot for Length, Diameter, Height

In [24]:
# sample_df = pandas_df.groupby('Sex').apply(lambda x: x.sample(n=min(len(x), 500))).reset_index(drop=True)

In [25]:
fig = px.scatter_3d(pandas_df, x='Length', y='Diameter', z='Height',
                    color='Sex')
fig.show()

## Test Data

In [26]:
test_data = spark.read.csv('test.csv', header=True, inferSchema=True)

test_data = test_data.withColumnsRenamed({'Whole weight.1': 'Shucked weight',
                                      'Whole weight.2': 'Viscera weight'})

test_data.show()

## Modelling

In [27]:
# train_data, test_data = train_data.randomSplit([0.7, 0.3])

# print(f"Train Data: {train_data.count()} rows")
# print(f"Test Data: {test_data.count()} rows")

### Linear Regression

In [28]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol='Sex', outputCol='Sex_Index')
encoder = OneHotEncoder(inputCol='Sex_Index', outputCol='Sex_OneHot')
scaler_assembler = VectorAssembler(inputCols=cols_name, outputCol="numerical_features")
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_features")
final_assembler = VectorAssembler(inputCols=["Sex_OneHot", "scaled_features"], outputCol="independent_features")

regressor = LinearRegression(featuresCol='independent_features', labelCol='Rings')

pipeline = Pipeline(stages=[indexer, encoder, scaler_assembler, 
                            scaler, final_assembler, regressor])

In [29]:
train_data = df

model = pipeline.fit(train_data)

In [30]:
print(f'Coefficient: {model.stages[-1].coefficients}')
print(f'Intercept: {model.stages[-1].intercept}')

In [31]:
# pred = model.evaluate(test_data)
# pred.predictions.show()

In [32]:
# pred.meanAbsoluteError,pred.meanSquaredError

In [33]:
pred = model.transform(test_data)
pred.select('id', 'prediction').show()

In [34]:
from pyspark.sql.functions import when
from pyspark.sql import functions as F

pred = pred.withColumnRenamed('prediction', 'Rings')

pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()
pred = pred.withColumn('Rings', F.when(F.col('Rings') < 1, 1).otherwise(F.col('Rings')))
pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()

In [35]:
# Save as a single CSV file using Pandas
pred.toPandas().to_csv('D:/Self-Learning/Spark/Abalone/LR_pred.csv', index=False)

# answer.write.csv('D:/Self-Learning/Spark/Abalone/LR_pred', header=True)

### Polynomial Regression

In [36]:
from pyspark.ml.feature import PolynomialExpansion

indexer = StringIndexer(inputCol='Sex', outputCol='Sex_Index')
encoder = OneHotEncoder(inputCol='Sex_Index', outputCol='Sex_OneHot')
polynomial_assembler = VectorAssembler(inputCols=cols_name, outputCol="numerical_features")
poly_expansion = PolynomialExpansion(degree=2, inputCol="numerical_features", outputCol="poly_features")
scaler = MinMaxScaler(inputCol="poly_features", outputCol="scaled_features")
final_assembler = VectorAssembler(inputCols=["Sex_OneHot", "scaled_features"], outputCol="independent_features")

regressor = LinearRegression(featuresCol='independent_features', labelCol='Rings')

pipeline = Pipeline(stages=[indexer, encoder, 
                            polynomial_assembler, poly_expansion,
                            scaler, final_assembler, regressor])

In [37]:
# train_data, test_data = train_data.randomSplit([0.7, 0.3])

# print(f"Train Data: {train_data.count()} rows")
# print(f"Test Data: {test_data.count()} rows")

In [38]:
train_data = df

model = pipeline.fit(train_data)

In [39]:
# print(f'Coefficient: {model.stages[-1].coefficients}')
# print(f'Intercept: {model.stages[-1].intercept}')

In [40]:
# pred = model.evaluate(test_data)
# pred.predictions.show()

In [41]:
# pred.meanAbsoluteError,pred.meanSquaredError

In [42]:
pred = model.transform(test_data)
pred.select('id', 'prediction').show()

In [43]:
pred = pred.withColumnRenamed('prediction', 'Rings')

pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()
pred = pred.withColumn('Rings', F.when(F.col('Rings') < 1, 1).otherwise(F.col('Rings')))
pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()

In [44]:
pred.toPandas().to_csv('D:/Self-Learning/Spark/Abalone/PR_pred.csv', index=False)

# answer.write.csv('D:/Self-Learning/Spark/Abalone/LR_pred', header=True)

## Decision Tree Regression

In [45]:
from pyspark.ml.regression import DecisionTreeRegressor

indexer = StringIndexer(inputCol='Sex', outputCol='Sex_Index')
encoder = OneHotEncoder(inputCol='Sex_Index', outputCol='Sex_OneHot')
scaler_assembler = VectorAssembler(inputCols=cols_name, outputCol="numerical_features")
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_features")
final_assembler = VectorAssembler(inputCols=["Sex_OneHot", "scaled_features"], outputCol="independent_features")

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.DecisionTreeRegressor.html#pyspark.ml.regression.DecisionTreeRegressor
regressor = DecisionTreeRegressor(featuresCol="independent_features", labelCol="Rings")


pipeline = Pipeline(stages=[indexer, encoder, scaler_assembler, 
                            scaler, final_assembler, regressor])

In [46]:
train_data = df

model = pipeline.fit(train_data)

In [47]:
pred = model.transform(test_data)
pred.select('id', 'prediction').show()

In [48]:
pred = pred.select('id', 'prediction')
pred = pred.withColumnRenamed('prediction', 'Rings')

pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()
pred = pred.withColumn('Rings', F.when(F.col('Rings') < 1, 1).otherwise(F.col('Rings')))
pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()

In [49]:
pred.toPandas().to_csv('D:/Self-Learning/Spark/Abalone/DT_pred.csv', index=False)

## Random Forest Regression

In [50]:
from pyspark.ml.regression import RandomForestRegressor

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.RandomForestRegressor.html#pyspark.ml.regression.RandomForestRegressor
regressor = RandomForestRegressor(featuresCol="independent_features", labelCol="Rings")


pipeline = Pipeline(stages=[indexer, encoder, scaler_assembler, 
                            scaler, final_assembler, regressor])

In [51]:
train_data = df

model = pipeline.fit(train_data)

In [52]:
pred = model.transform(test_data)
pred.select('id', 'prediction').show()

In [53]:
pred = pred.select('id', 'prediction')
pred = pred.withColumnRenamed('prediction', 'Rings')

pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()
pred = pred.withColumn('Rings', F.when(F.col('Rings') < 1, 1).otherwise(F.col('Rings')))
pred.select(min("Rings").alias("MIN"), max("Rings").alias("MAX")).show()

In [54]:
pred.toPandas().to_csv('D:/Self-Learning/Spark/Abalone/RF_pred.csv', index=False)