In [1]:
from pyspark.sql import SparkSession

# Spark session
spark = SparkSession.builder.appName("PySparkSteppingUp").getOrCreate()

print("Apache Spark version:", spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/01 19:45:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/01 19:45:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/02/01 19:45:13 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


Apache Spark version: 3.5.4


In [2]:
# Read the CSV file into a DataFrame
df = spark.read.csv("data/sample.csv", header=True, inferSchema=True)

# Display the first few rows of the DataFrame
df.show()

+---+-------+---+------+-----------+
| id|   name|age|salary| department|
+---+-------+---+------+-----------+
|  1|  Alice| 30| 70000|         HR|
|  2|    Bob| 35| 80000|Engineering|
|  3|Charlie| 25| 50000|  Marketing|
|  4|  David| 40| 90000|Engineering|
|  5|    Eva| 28| 60000|         HR|
|  6|  Frank| 32| 75000|  Marketing|
|  7|   Gina| 27| 55000|Engineering|
|  8|  Harry| 31| 70000|         HR|
|  9|    Ivy| 29| 60000|  Marketing|
| 10|   Jack| 33| 80000|Engineering|
| 11|   Kate| 26| 50000|         HR|
| 12|   Lily| 34| 75000|  Marketing|
| 13|   Mike| 28| 60000|Engineering|
| 14|  Nancy| 30| 70000|         HR|
| 15|  Oscar| 32| 80000|  Marketing|
+---+-------+---+------+-----------+



In [3]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



## Window Functions - More advanced "Aggregations"

> Window functions allow you to perform calculations across a set of rows that are related to the current row.

In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank

# Define a window partitioned by 'department' and ordered by 'salary' in descending order
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

# Apply the window function to compute the rank of employees within their department
df_with_rank = df.withColumn("salary_rank", rank().over(window_spec))
df_with_rank.show()

# The 'Window.partitionBy' divides data into groups (departments),
# and 'orderBy' sorts employees within each group by salary in desc order. 
# The 'rank' function assigns a rank to each row based on the order.

+---+-------+---+------+-----------+-----------+
| id|   name|age|salary| department|salary_rank|
+---+-------+---+------+-----------+-----------+
|  4|  David| 40| 90000|Engineering|          1|
|  2|    Bob| 35| 80000|Engineering|          2|
| 10|   Jack| 33| 80000|Engineering|          2|
| 13|   Mike| 28| 60000|Engineering|          4|
|  7|   Gina| 27| 55000|Engineering|          5|
|  1|  Alice| 30| 70000|         HR|          1|
|  8|  Harry| 31| 70000|         HR|          1|
| 14|  Nancy| 30| 70000|         HR|          1|
|  5|    Eva| 28| 60000|         HR|          4|
| 11|   Kate| 26| 50000|         HR|          5|
| 15|  Oscar| 32| 80000|  Marketing|          1|
|  6|  Frank| 32| 75000|  Marketing|          2|
| 12|   Lily| 34| 75000|  Marketing|          2|
|  9|    Ivy| 29| 60000|  Marketing|          4|
|  3|Charlie| 25| 50000|  Marketing|          5|
+---+-------+---+------+-----------+-----------+



## Custom transformations using User Defined Functions (UDF)

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# custom function that classifies salary
def classify_salary(salary):
    if salary >= 80000:
        return "High"
    elif salary >= 60000:
        return "Medium"
    else:
        return "Low"

# Register the function as a UDF
classify_salary_udf = udf(classify_salary, StringType())

# Apply the UDF to create a new column 'salary_category'
df_with_category = df.withColumn("salary_category", classify_salary_udf(col("salary")))
df_with_category.show()

# The UDF 'classify_salary' categorizes each employee's salary.
# UDFs should be used when built-in functions do not meet our requirements.

[Stage 6:>                                                          (0 + 1) / 1]

+---+-------+---+------+-----------+---------------+
| id|   name|age|salary| department|salary_category|
+---+-------+---+------+-----------+---------------+
|  1|  Alice| 30| 70000|         HR|         Medium|
|  2|    Bob| 35| 80000|Engineering|           High|
|  3|Charlie| 25| 50000|  Marketing|            Low|
|  4|  David| 40| 90000|Engineering|           High|
|  5|    Eva| 28| 60000|         HR|         Medium|
|  6|  Frank| 32| 75000|  Marketing|         Medium|
|  7|   Gina| 27| 55000|Engineering|            Low|
|  8|  Harry| 31| 70000|         HR|         Medium|
|  9|    Ivy| 29| 60000|  Marketing|         Medium|
| 10|   Jack| 33| 80000|Engineering|           High|
| 11|   Kate| 26| 50000|         HR|            Low|
| 12|   Lily| 34| 75000|  Marketing|         Medium|
| 13|   Mike| 28| 60000|Engineering|         Medium|
| 14|  Nancy| 30| 70000|         HR|         Medium|
| 15|  Oscar| 32| 80000|  Marketing|           High|
+---+-------+---+------+-----------+----------

                                                                                

### Efficient Join operations - using Broadcast variables

In [6]:
from pyspark.sql import Row
from pyspark.sql.functions import broadcast

# Create a small DataFrame for department details
dept_data = [
    Row(department="HR", location="New York"),
    Row(department="Engineering", location="San Francisco"),
    Row(department="Marketing", location="Chicago")
]
dept_df = spark.createDataFrame(dept_data)

# Perform a broadcast join between the employee DataFrame and the department DataFrame
joined_df = df.join(broadcast(dept_df), on="department", how="left")
joined_df.show()

# The 'broadcast' function hints to Spark that dept_df is small, 

# and it should be broadcasted to all worker nodes for a more efficient join.

[Stage 7:>                                                          (0 + 8) / 8]

+-----------+---+-------+---+------+-------------+
| department| id|   name|age|salary|     location|
+-----------+---+-------+---+------+-------------+
|         HR|  1|  Alice| 30| 70000|     New York|
|Engineering|  2|    Bob| 35| 80000|San Francisco|
|  Marketing|  3|Charlie| 25| 50000|      Chicago|
|Engineering|  4|  David| 40| 90000|San Francisco|
|         HR|  5|    Eva| 28| 60000|     New York|
|  Marketing|  6|  Frank| 32| 75000|      Chicago|
|Engineering|  7|   Gina| 27| 55000|San Francisco|
|         HR|  8|  Harry| 31| 70000|     New York|
|  Marketing|  9|    Ivy| 29| 60000|      Chicago|
|Engineering| 10|   Jack| 33| 80000|San Francisco|
|         HR| 11|   Kate| 26| 50000|     New York|
|  Marketing| 12|   Lily| 34| 75000|      Chicago|
|Engineering| 13|   Mike| 28| 60000|San Francisco|
|         HR| 14|  Nancy| 30| 70000|     New York|
|  Marketing| 15|  Oscar| 32| 80000|      Chicago|
+-----------+---+-------+---+------+-------------+



                                                                                

### Caching and Persisting intermediate results

In [7]:
# Cache the DataFrame after performing a transformation
cached_df = df_with_category.cache()
cached_df.show()

# Using 'cache()' will persist the DataFrame in memory, reducing the time needed for subsequent actions.

+---+-------+---+------+-----------+---------------+
| id|   name|age|salary| department|salary_category|
+---+-------+---+------+-----------+---------------+
|  1|  Alice| 30| 70000|         HR|         Medium|
|  2|    Bob| 35| 80000|Engineering|           High|
|  3|Charlie| 25| 50000|  Marketing|            Low|
|  4|  David| 40| 90000|Engineering|           High|
|  5|    Eva| 28| 60000|         HR|         Medium|
|  6|  Frank| 32| 75000|  Marketing|         Medium|
|  7|   Gina| 27| 55000|Engineering|            Low|
|  8|  Harry| 31| 70000|         HR|         Medium|
|  9|    Ivy| 29| 60000|  Marketing|         Medium|
| 10|   Jack| 33| 80000|Engineering|           High|
| 11|   Kate| 26| 50000|         HR|            Low|
| 12|   Lily| 34| 75000|  Marketing|         Medium|
| 13|   Mike| 28| 60000|Engineering|         Medium|
| 14|  Nancy| 30| 70000|         HR|         Medium|
| 15|  Oscar| 32| 80000|  Marketing|           High|
+---+-------+---+------+-----------+----------

## More analytics with DataFrames :)

### Pivoting Data for aggregation

Pivot operations allows us to transform data from rows to columns, which is useful for creating summary tables.

In [8]:
pivot_df = df_with_category.groupBy("department") \
    .pivot("salary_category", ["Low", "Medium", "High"]) \
    .avg("salary")

pivot_df.show()

# 'pivot()' reshapes the DataFrame by pivoting on the 'salary_category'.
# This creates columns for each category with the average salary as values.

+-----------+-------+-------+-----------------+
| department|    Low| Medium|             High|
+-----------+-------+-------+-----------------+
|Engineering|55000.0|60000.0|83333.33333333333|
|         HR|50000.0|67500.0|             NULL|
|  Marketing|50000.0|70000.0|          80000.0|
+-----------+-------+-------+-----------------+



#### Filtering with multiple conditions

In [9]:
# from pyspark.sql.functions import expr

# Filter employees who are either in 'Engineering' or have a 'High' salary category
advanced_filter_df = df_with_category.filter(
    (col("department") == "Engineering") | (col("salary_category") == "High")
)
advanced_filter_df.show()

# The expression uses logical OR (|) to combine filtering conditions.

+---+-----+---+------+-----------+---------------+
| id| name|age|salary| department|salary_category|
+---+-----+---+------+-----------+---------------+
|  2|  Bob| 35| 80000|Engineering|           High|
|  4|David| 40| 90000|Engineering|           High|
|  7| Gina| 27| 55000|Engineering|            Low|
| 10| Jack| 33| 80000|Engineering|           High|
| 13| Mike| 28| 60000|Engineering|         Medium|
| 15|Oscar| 32| 80000|  Marketing|           High|
+---+-----+---+------+-----------+---------------+



## ML pipelines with PySpark

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

# Assuming our dataset is enhanced with categorical features (using 'department' here)
# Step 1: Convert the 'department' column to numeric indices
dept_indexer = StringIndexer(inputCol="department", outputCol="dept_index")

# Step 2: One-hot encode the indexed department
dept_encoder = OneHotEncoder(inputCol="dept_index", outputCol="dept_vec")

# Step 3: Assemble features (using age and the one-hot encoded department)
assembler = VectorAssembler(inputCols=["age", "dept_vec"], outputCol="features")

# Step 4: Initialize the linear regression estimator (predicting salary)
lr = LinearRegression(featuresCol="features", labelCol="salary")

# Create the pipeline with all stages
pipeline = Pipeline(stages=[dept_indexer, dept_encoder, assembler, lr])

# Fit the pipeline to the data
pipeline_model = pipeline.fit(df)
predictions = pipeline_model.transform(df)
predictions.select("name", "department", "age", "salary", "prediction").show()


# This pipeline preprocesses categorical data and numerical features before applying linear regression.

25/02/01 19:45:24 WARN Instrumentation: [673185dc] regParam is zero, which might cause numerical instability and overfitting.
25/02/01 19:45:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/02/01 19:45:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/02/01 19:45:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


+-------+-----------+---+------+------------------+
|   name| department|age|salary|        prediction|
+-------+-----------+---+------+------------------+
|  Alice|         HR| 30| 70000|  67026.9058295965|
|    Bob|Engineering| 35| 80000| 80264.57399103155|
|Charlie|  Marketing| 25| 50000|   51654.708520179|
|  David|Engineering| 40| 90000| 95399.10313901394|
|    Eva|         HR| 28| 60000|60973.094170403536|
|  Frank|  Marketing| 32| 75000| 72843.04932735435|
|   Gina|Engineering| 27| 55000|56049.327354259716|
|  Harry|         HR| 31| 70000| 70053.81165919297|
|    Ivy|  Marketing| 29| 60000| 63762.33183856492|
|   Jack|Engineering| 33| 80000| 74210.76233183859|
|   Kate|         HR| 26| 50000| 54919.28251121058|
|   Lily|  Marketing| 34| 75000| 78896.86098654731|
|   Mike|Engineering| 28| 60000|59076.233183856195|
|  Nancy|         HR| 30| 70000|  67026.9058295965|
|  Oscar|  Marketing| 32| 80000| 72843.04932735435|
+-------+-----------+---+------+------------------+



## Hyperparameter Tuning

In [11]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Create a parameter grid for tuning the linear regression model
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define a regression evaluator (using RMSE as the metric)
evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="rmse")

# Set up the CrossValidator
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3  # Usingg 3-fold cross-validation
)

# Run cross-validation, and choose the best set of parameters.
cv_model = crossval.fit(df)
best_model = cv_model.bestModel

# Make predictions using the best model found
cv_predictions = best_model.transform(df)
cv_predictions.select("name", "department", "age", "salary", "prediction").show()


# CrossValidator tests different parameter combinations defined in paramGrid.

# The best model is selected based on the lowest RMSE (Root-Mean squared Error).

25/02/01 19:45:27 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+-------+-----------+---+------+------------------+
|   name| department|age|salary|        prediction|
+-------+-----------+---+------+------------------+
|  Alice|         HR| 30| 70000|  67026.7185513027|
|    Bob|Engineering| 35| 80000|  80264.6054316039|
|Charlie|  Marketing| 25| 50000| 51654.95601220027|
|  David|Engineering| 40| 90000|  95398.9054438366|
|    Eva|         HR| 28| 60000|60972.998546409624|
|  Frank|  Marketing| 32| 75000| 72842.97602932606|
|   Gina|Engineering| 27| 55000| 56049.72541203156|
|  Harry|         HR| 31| 70000| 70053.57855374925|
|    Ivy|  Marketing| 29| 60000| 63762.39602198644|
|   Jack|Engineering| 33| 80000| 74210.88542671081|
|   Kate|         HR| 26| 50000| 54919.27854151654|
|   Lily|  Marketing| 34| 75000| 78896.69603421915|
|   Mike|Engineering| 28| 60000|59076.585414478104|
|  Nancy|         HR| 30| 70000|  67026.7185513027|
|  Oscar|  Marketing| 32| 80000| 72842.97602932606|
+-------+-----------+---+------+------------------+

