# Distributed Data Analytics on a Raspberry Pi Spark Cluster
## Honors Project – Knox DS Cluster

### What Is Spark?

Apache Spark is a distributed data processing engine.
Instead of running everything on one machine like pandas, 
Spark Splits data into partitions
Sends partitions to worker nodes
Processes them in parallel then
Combines the results

In our case:
3 Raspberry Pis
4 Spark cores per Pi
12 total cores working together

### PySpark Made Simple (medium.com)
https://medium.com/@nomannayeem/pyspark-made-simple-from-basics-to-big-data-mastery-cb1d702968be

## Starting Spark

From the medium article we learn how to create a spark session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ClassDemobyRidham") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/16 08:29:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/16 08:29:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### How Spark Connects to the Cluster

When you run this notebook:

Your notebook is the Driver, then the Driver talks to the Spark Master, Master distributes tasks to Executors, Executors run on worker Pis

## What is HDFS
https://www.geeksforgeeks.org/big-data/introduction-to-hadoop-distributed-file-systemhdfs/

## How HDFS Stores data in our cluster
### hdfs makes multiple blocks of data and, based on the replication factor decides, it in different data centres (pi's for our case)

### A file is split into three blocks lets say, then each is saved like this is Replication Factor is 2

 ├─ block 1 → pi-node1, pi-node2
 
 ├─ block 2 → pi-node2, pi-node3
 
 ├─ block 3 → pi-node1, pi-node3


## What is Parquet File Format
https://motherduck.com/learn-more/why-choose-parquet-table-file-format/

In [5]:
df = spark.read.parquet(
    "hdfs://pi1.knoxds.org:8020/datasets/yellow_tripdata_2025-01.parquet"
)

In [6]:
df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)





+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

                                                                                

## Reading a CSV from hdfs using spark

In [7]:
csv_df = spark.read.option("header", True).csv(
    "hdfs://pi1.knoxds.org:8020/datasets/yellow_tripdata_2025-01.csv"
)
csv_df.printSchema()
csv_df.count()


root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- Airport_fee: string (nullable = true)
 |-- cbd_congestion_fee: string (nullable = true)



                                                                                

3475226

## Time Comparisons ## 

### For CSV

In [8]:
%%time
csv_df.select("trip_distance").groupBy().avg().collect()




CPU times: user 9.53 ms, sys: 11.2 ms, total: 20.7 ms
Wall time: 18.1 s


                                                                                

[Row()]

### For Parquet

In [9]:
%%time
df.select("trip_distance").groupBy().avg().collect()




CPU times: user 4.47 ms, sys: 4.18 ms, total: 8.65 ms
Wall time: 1.18 s


                                                                                

[Row(avg(trip_distance)=5.85512617884354)]

Although the reported CPU time is nearly the same for both Parquet and CSV, the wall time is very different. CPU time only measures how long the Spark driver process spent coordinating the job, not the actual work done by executors across the cluster. The much larger wall time for CSV reflects the real cost of the computation: Spark must read entire rows from disk, transfer more data over the network, and parse text values for every column, even though only one column is needed. In contrast, Parquet is a columnar format, so Spark reads only the trip_distance column using column metadata, skips all other data, and avoids expensive text parsing. As a result, Parquet completes the same computation much faster in wall-clock time, demonstrating why columnar storage is significantly more efficient for large-scale analytics.

In [12]:
%%time
csv_df.filter(
    (csv_df.trip_distance > 20) & 
    (csv_df.passenger_count == 1)
).count()


#Parquet lets Spark skip work. CSV forces Spark to read it all.

26/02/16 08:47:20 WARN TaskSetManager: Lost task 1.0 in stage 26.0 (TID 75) (10.90.1.37 executor 1): org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '0.7' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"__gt__" was called from
line 2 in cell [13]

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:147)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toLongExact(UTF8StringUtils.scala:31)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toLongExact(UTF8StringUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown 

CPU times: user 17.2 ms, sys: 3.52 ms, total: 20.7 ms
Wall time: 1.08 s


NumberFormatException: [CAST_INVALID_INPUT] The value '2.03' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"__gt__" was called from
line 2 in cell [13]


In [11]:
%%time
df.filter(
    (df.trip_distance > 20) & 
    (df.passenger_count == 1)
).count()


#Spark pushes the filter into the Parquet scan
#Rows not matching are never read from disk

CPU times: user 6.17 ms, sys: 1.08 ms, total: 7.26 ms
Wall time: 937 ms


                                                                                

19281

## Why so efficient? ##

CSV can only push null checks while reading. The actual ```[> 20]``` filter happens after reading every row. 

it's read from disk ------> parsed from text ------> then filtered. Spark is doing extra work for no reason here with csv

But for parquet 
Spark pushed the filter into the Parquet reader. Rows that don’t match ```[trip_distance > 10]``` are never read, are never deserialized, are never sent over the network. The filter runs at disk read time, not after loading data

## Let's test if spark is actually going to different machines for compute


In [15]:
sc = spark.sparkContext

rdd = sc.parallelize(range(100), 9)
# Creates an RDD (Resilient Distributed Dataset) an immutable, fault-tolerant collection 
# of data distributed across 9 partitions containing the numbers 0–99 for parallel processing in Spark.

def where_am_i(x):
    import socket
    return socket.gethostname()

rdd.map(where_am_i).distinct().collect()
# seeing how jobs are distributed over different system's

['epyc2', 'epyc3', 'epyc4']

In [14]:
df.rdd.getNumPartitions()

9

## Fibonachi

In [16]:
def fib(n):
    if n <= 1:
        return n
    return fib(n-1) + fib(n-2)

In [29]:
%%time
fib(40)

CPU times: user 24.5 s, sys: 12 ms, total: 24.5 s
Wall time: 24.5 s


102334155

In [35]:
%%time
nums = [35, 36, 37, 38, 39, 40]
rdd = sc.parallelize(nums, 6)   # 6 partitions
rdd.map(fib).collect()



CPU times: user 6.7 ms, sys: 12 ms, total: 18.7 ms
Wall time: 22.6 s


                                                                                

[9227465, 14930352, 24157817, 39088169, 63245986, 102334155]

## MLib

In [13]:
from pyspark.sql.functions import col, when

ml_df = df.select(
    "trip_distance",
    "fare_amount",
    "passenger_count",
    "tip_amount"
).dropna()

ml_df = ml_df.withColumn(
    "label",
    when(col("tip_amount") > 5, 1).otherwise(0)
)
# new column called "label" that has the target variable where trips with a tip_amount greater than 5 are 
#labeled as 1 (high tip), and all others are labeled as 0 (low or no tip)


In [14]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["trip_distance", "fare_amount", "passenger_count"],
    outputCol="features"
)

final_df = assembler.transform(ml_df).select("features", "label")
#converting multiple input columns into a single feature vector which is the format required by Spark’s machine learning lib

In [15]:
train, test = final_df.randomSplit([0.8, 0.2], seed=42)


In [16]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10)
model = lr.fit(train)


26/02/10 09:43:29 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

## Idea ##

This runs distributed. No single pi sees all data

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

preds = model.transform(test)

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(preds)


                                                                                

0.9035212912563072

### Idea ##

0.9009 is the Area Under the ROC Curve (AUC) produced by Spark’s BinaryClassificationEvaluator. An AUC of 0.5 corresponds to random guessing, while 1.0 indicates perfect separation. An AUC of 0.9009 means the model has strong discriminative power -> given a random positive and a random negative example, the model assigns a higher score to the positive one about 90% of the time.

In [36]:
spark.stop()
