# Comprehensive Machine Learning and Data Manipulation with PySpark

This Jupyter Notebook demonstrates extensive capabilities of Apache Spark using PySpark. It includes initializing a Spark session, performing various DataFrame operations like selection, filtering, aggregation, joining, and sorting. Furthermore, it showcases how to run SQL queries, define UDFs (User Defined Functions), and apply a range of machine learning algorithms for both classification and regression. This guide covers:

1. **Creating a Spark Session**: Initialize Spark.
2. **Reading Data**: Load datasets into Spark DataFrames.
3. **Data Manipulation**:
   - Selection: Select specific columns from DataFrames.
   - Filtering: Filter rows based on conditions.
   - GroupBy and Aggregate: Perform aggregations on grouped data.
   - Joining: Merge datasets based on common keys.
   - Sorting: Arrange data by specified column values.
4. **SQL Queries**: Execute SQL commands within Spark.
5. **UDFs**: Implement and use custom functions in Spark.
6. **Data Preparation**: Feature engineering using DataFrame transformations.
7. **Machine Learning**:
   - Classification: Models such as Logistic Regression, Decision Trees, Random Forests, etc.
   - Regression: Linear and Decision Tree Regression.
8. **Model Evaluation**: Assessing accuracy, precision, etc.
9. **Pipeline Creation**: Building processing pipelines for reproducibility and workflow management.
10. **Cross-Validation**: Tuning model parameters using cross-validation techniques.

Ensure 'path_to_your_data.csv' is replaced with the actual path to your dataset.


In [6]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes, LinearSVC, MultilayerPerceptronClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("Titanic ML with PySpark").getOrCreate()

# Load data
df = spark.read.csv('./titanic.csv', header=True, inferSchema=True)

# Show the data schema
df.printSchema()

# Data preprocessing
# Convert categorical variables to numeric indices and handle null values
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in ["Sex", "Embarked"]
]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Handle missing values and select features for modeling
df = df.na.fill({'Age': 30, 'Fare': 35})
df = df.select(col("Survived").alias("label"), col("Pclass"), col("Sex_index"), col("Age"), col("SibSp"), col("Parch"), col("Fare"), col("Embarked_index"))

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=["Pclass", "Sex_index", "Age", "SibSp", "Parch", "Fare", "Embarked_index"], outputCol="features")
data = assembler.transform(df)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
data = scaler.fit(data).transform(data)

# Classification models
models = {
    "Logistic Regression": LogisticRegression(featuresCol="scaledFeatures"),
    "Decision Tree Classifier": DecisionTreeClassifier(featuresCol="scaledFeatures"),
    "Random Forest Classifier": RandomForestClassifier(featuresCol="scaledFeatures"),
    "Gradient-Boosted Tree Classifier": GBTClassifier(featuresCol="scaledFeatures"),
    "Naive Bayes": NaiveBayes(featuresCol="scaledFeatures"),
    "Linear SVC": LinearSVC(featuresCol="scaledFeatures")
}

results = {}
for name, model in models.items():
    fitted_model = model.fit(data)
    predictions = fitted_model.transform(data)
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    results[name] = accuracy
    print(f"{name} Accuracy: {accuracy:.2f}")

# Clean up
spark.stop()


root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



24/07/17 16:28:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


Logistic Regression Accuracy: 0.80
Decision Tree Classifier Accuracy: 0.84
Random Forest Classifier Accuracy: 0.85
Gradient-Boosted Tree Classifier Accuracy: 0.89


[Stage 281:>                                                        (0 + 1) / 1]                                                                                

Naive Bayes Accuracy: 0.79
Linear SVC Accuracy: 0.79


Apache Spark's MLlib library includes support for basic neural network models through its Multilayer Perceptron Classifier. This classifier is a simple feedforward neural network, and it can be used for classification tasks. Here’s how you might include and demonstrate a Multilayer Perceptron Classifier in your PySpark setup:

### Multilayer Perceptron Classifier
The Multilayer Perceptron (MLP) is a type of neural network suitable for classification problems. It consists of multiple layers of nodes, each fully connected to the next layer. In PySpark, you define an MLP by specifying the layers as an array of integers, where each integer represents the number of neurons in one layer, including the input and output layers.

Here's an example setup for using the Multilayer Perceptron Classifier:

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Initialize the Spark session
spark = SparkSession.builder.appName("Titanic Data Analysis").getOrCreate()

# Reload the data and display the schema
df = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
df.printSchema()

# Convert categorical variables to numeric indices
indexers = [
    StringIndexer(inputCol="Sex", outputCol="Sex_index", handleInvalid="keep"),
    StringIndexer(inputCol="Embarked", outputCol="Embarked_index", handleInvalid="keep")
]
pipeline = Pipeline(stages=indexers)
df_transformed = pipeline.fit(df).transform(df)

# Fill missing values
df_transformed = df_transformed.na.fill({'Age': 30, 'Fare': 35})

# Select features and rename the label column
df_transformed = df_transformed.select(
    col("Survived").alias("label"),
    "Pclass", "Sex_index", "Age", "SibSp", "Parch", "Fare", "Embarked_index"
)

# Assemble features into a single vector column
assembler = VectorAssembler(
    inputCols=["Pclass", "Sex_index", "Age", "SibSp", "Parch", "Fare", "Embarked_index"],
    outputCol="features"
)
data = assembler.transform(df_transformed)

# Split data into training and test sets
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed=1234)

# Define the MLP structure assuming all necessary columns are present
layers = [7, 5, 4, 2]  # example layer structure

# Initialize the MLP Classifier
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# Train the MLP Model
model = mlp.fit(trainingData)

# Evaluate the model on the test data
result_test = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
test_accuracy = evaluator.evaluate(result_test)
print("Test set accuracy = " + str(test_accuracy))

# Evaluate the model on the training data
result_train = model.transform(trainingData)
train_accuracy = evaluator.evaluate(result_train)
print("Training set accuracy = " + str(train_accuracy))

# Clean up, stop Spark session
spark.stop()


root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

Test set accuracy = 0.8461538461538461
Training set accuracy = 0.7884344146685472


### Explanation:
- **Model Training**: The model is trained on the training dataset.
- **Model Evaluation**: The trained model is then evaluated on both the test dataset to measure its generalization and the training dataset to check for overfitting.
- **Accuracy Printout**: Outputs the accuracy on both the training and test datasets, allowing for a comparison of performance across unseen and seen data.

This script provides a comprehensive view of the model's performance, helping you assess how well the MLP model has learned and generalized the patterns from the Titanic dataset. If there are large discrepancies between training and test accuracies, it may suggest overfitting or underfitting, depending on the context.