In [1]:
#confirm java is running
!java -version

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [2]:
#install pyspark
!pip install pyspark



In [47]:
#import relevant packages
import os
import json

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType, TimestampType
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder
from pyspark.ml import pipeline
from pyspark.ml.linalg import DenseVector

import numpy as np
import pandas as pd
import sklearn
from sklearn.model_selection import train_test_split
from tabulate import tabulate
import gc
from sklearn.datasets import load_iris
import csv

In [5]:
#build a spark session
spark = (SparkSession.builder
         .appName('Spark Fundamentals')
         .config('spark.executor.memory', '1G')
         .config('spark.executor.cores', '4')
         .getOrCreate())

In [6]:
spark.sparkContext.setLogLevel('INFO')

In [7]:
spark.version

'3.5.1'

# Working With Iris Dataset

In [28]:
#load the data & export to CSV
iris = load_iris()

iris_df = pd.DataFrame(
    data = iris.data,
    columns = iris.feature_names
)

iris_df['species'] = iris.target

iris_df['species_name'] = iris_df['species'].apply(lambda x: iris.target_names[x])

iris_df.to_csv("iris.csv", index = False)

In [29]:
#load the data for spark
url = '/content/iris.csv'

data = spark.read.format('csv')\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .load(url)

data.cache()

DataFrame[sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, species: int, species_name: string]

In [21]:
#tally observations
data.count()

150

In [22]:
#check the schema
data.printSchema()

root
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (nullable = true)
 |-- species: integer (nullable = true)
 |-- species_name: string (nullable = true)



In [23]:
data.show(5)

+-----------------+----------------+-----------------+----------------+-------+------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|species|species_name|
+-----------------+----------------+-----------------+----------------+-------+------------+
|              5.1|             3.5|              1.4|             0.2|      0|      setosa|
|              4.9|             3.0|              1.4|             0.2|      0|      setosa|
|              4.7|             3.2|              1.3|             0.2|      0|      setosa|
|              4.6|             3.1|              1.5|             0.2|      0|      setosa|
|              5.0|             3.6|              1.4|             0.2|      0|      setosa|
+-----------------+----------------+-----------------+----------------+-------+------------+
only showing top 5 rows



In [24]:
#tally by species
data.groupBy('species').count().show()

+-------+-----+
|species|count|
+-------+-----+
|      1|   50|
|      2|   50|
|      0|   50|
+-------+-----+



In [25]:
#summary statistics of the data
data.describe().show()

+-------+------------------+-------------------+------------------+------------------+------------------+------------+
|summary| sepal length (cm)|   sepal width (cm)| petal length (cm)|  petal width (cm)|           species|species_name|
+-------+------------------+-------------------+------------------+------------------+------------------+------------+
|  count|               150|                150|               150|               150|               150|         150|
|   mean| 5.843333333333335|  3.057333333333334|3.7580000000000027| 1.199333333333334|               1.0|        NULL|
| stddev|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467|0.8192319205190406|        NULL|
|    min|               4.3|                2.0|               1.0|               0.1|                 0|      setosa|
|    max|               7.9|                4.4|               6.9|               2.5|                 2|   virginica|
+-------+------------------+-------------------+

In [30]:
#the data was loaded with species already converted to numerics
#the following showcases use of StringIndexer to make a new column

SIndexer = StringIndexer(inputCol='species', outputCol='species_index')
data_sample = SIndexer.fit(data).transform(data)

data_sample.show(5)

+-----------------+----------------+-----------------+----------------+-------+------------+-------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|species|species_name|species_index|
+-----------------+----------------+-----------------+----------------+-------+------------+-------------+
|              5.1|             3.5|              1.4|             0.2|      0|      setosa|          0.0|
|              4.9|             3.0|              1.4|             0.2|      0|      setosa|          0.0|
|              4.7|             3.2|              1.3|             0.2|      0|      setosa|          0.0|
|              4.6|             3.1|              1.5|             0.2|      0|      setosa|          0.0|
|              5.0|             3.6|              1.4|             0.2|      0|      setosa|          0.0|
+-----------------+----------------+-----------------+----------------+-------+------------+-------------+
only showing top 5 rows



In [32]:
#confirm column names
data.columns

['sepal length (cm)',
 'sepal width (cm)',
 'petal length (cm)',
 'petal width (cm)',
 'species',
 'species_name']

In [33]:
#remove flower names
df = data.select("species", "sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)")
df.show(5)

+-------+-----------------+----------------+-----------------+----------------+
|species|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|
+-------+-----------------+----------------+-----------------+----------------+
|      0|              5.1|             3.5|              1.4|             0.2|
|      0|              4.9|             3.0|              1.4|             0.2|
|      0|              4.7|             3.2|              1.3|             0.2|
|      0|              4.6|             3.1|              1.5|             0.2|
|      0|              5.0|             3.6|              1.4|             0.2|
+-------+-----------------+----------------+-----------------+----------------+
only showing top 5 rows



In [34]:
#define the species column as a dense vector (e.g. label) for the future models

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [35]:
#create a dataframe holding the labels and features
df_index = spark.createDataFrame(input_data, ['label', 'features'])

In [36]:
df_index.show(5)

+-----+-----------------+
|label|         features|
+-----+-----------------+
|    0|[5.1,3.5,1.4,0.2]|
|    0|[4.9,3.0,1.4,0.2]|
|    0|[4.7,3.2,1.3,0.2]|
|    0|[4.6,3.1,1.5,0.2]|
|    0|[5.0,3.6,1.4,0.2]|
+-----+-----------------+
only showing top 5 rows



In [38]:
#scaling to normalize the data

stdScaler = StandardScaler(inputCol = "features", outputCol = "features_scaled")

scaler = stdScaler.fit(df_index)

df_scaler = scaler.transform(df_index)

df_scaler.show(5)

+-----+-----------------+--------------------+
|label|         features|     features_scaled|
+-----+-----------------+--------------------+
|    0|[5.1,3.5,1.4,0.2]|[6.15892840883878...|
|    0|[4.9,3.0,1.4,0.2]|[5.9174018045706,...|
|    0|[4.7,3.2,1.3,0.2]|[5.67587520030241...|
|    0|[4.6,3.1,1.5,0.2]|[5.55511189816831...|
|    0|[5.0,3.6,1.4,0.2]|[6.03816510670469...|
+-----+-----------------+--------------------+
only showing top 5 rows



In [39]:
df_scaled = df_scaler.drop("features")

In [40]:
#train / test split the model

train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed = 123)

In [41]:
train_data.show(5)

+-----+--------------------+
|label|     features_scaled|
+-----+--------------------+
|    0|[5.19282199176603...|
|    0|[5.31358529390013...|
|    0|[5.31358529390013...|
|    0|[5.43434859603422...|
|    0|[5.55511189816831...|
+-----+--------------------+
only showing top 5 rows



## Machine Learning

In [47]:
#build ML models

model = ['Decision Tree', 'Random Forest', 'Naive Bayes']
model_results = []

In [49]:
#decision tree

decision_tree = DecisionTreeClassifier(labelCol = "label", featuresCol = "features_scaled")

tree_model = decision_tree.fit(train_data)

tree_predict = tree_model.transform(test_data)

#evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")

model_accuracy = evaluator.evaluate(tree_predict)

model_results.extend([[model[0], '{:.2}'.format(model_accuracy)]])

In [50]:
#random forest

random_forest = RandomForestClassifier(labelCol = "label", featuresCol="features_scaled")

forest_model = random_forest.fit(train_data)

forest_predict = forest_model.transform(test_data)

#evaluate the model
rf_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', predictionCol = 'prediction', metricName = 'accuracy')

rf_model_accuracy = rf_evaluator.evaluate(forest_predict)

model_results.extend([[model[1], '{:.2}'.format(rf_model_accuracy)]])

In [51]:
#Naive Bayes

nv_bayes = NaiveBayes(smoothing = 1.0, modelType = "multinomial", labelCol = "label", featuresCol="features_scaled")

nbayes_model = nv_bayes.fit(train_data)

nbayes_predict = nbayes_model.transform(test_data)

#evaluate model

nv_evaluation = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = 'prediction', metricName = "accuracy")

nv_model_accuracy = nv_evaluation.evaluate(nbayes_predict)

model_results.extend([[model[2], '{:.2}'.format(nv_model_accuracy)]])

In [52]:
gc.collect()

867

In [53]:
print(tabulate(model_results, headers=['Classifier Models', 'Accuracy']))

Classifier Models      Accuracy
-------------------  ----------
Decision Tree              0.93
Random Forest              0.97
Naive Bayes                0.97


# Working with Big Data

In [2]:
#we'll download a large dataset of Amazon reviews using curl

!curl -O https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/review_categories/Automotive.jsonl.gz
!gunzip -f Automotive.jsonl.gz

In [5]:
#the data is zipped, how big is it?

print(f'The size of the zipped file is: {os.path.getsize("/content/Automotive.jsonl") / (1024 ** 3):.2f} GB')

The size of the zipped file is: 8.13 GB


In [14]:
#start a spark session

spark = SparkSession.builder.appName("AutomotiveData").getOrCreate()

print(f'The Spark Version is {spark.version}')

The Spark Version is 3.5.1


In [None]:
#let's try reading the data via pandas

test_df = pd.read_json('/content/Automotive.jsonl', lines = True)
print(test_df)

In [None]:
#if you run the above code, the session will fail due to all available RAM being used.

#clearly, the data is too massive to be handled with conventional packages.

#Spark is required!

In [6]:
#let's confirm the columns of the data

with open('/content/Automotive.jsonl', 'r') as file:
  line = file.readline()
  record = json.loads(line)
  print(pd.Series(record))

rating                                                             5.0
title                          It fit our 2012 Chevy Colorado perfect!
text                 Item came as described! It fit our 2012 Chevy ...
images                                                              []
asin                                                        B01LZA8SGZ
parent_asin                                                 B0BV88374L
user_id                                   AGXVBIUFLFGMVLATYXHJYL4A5Q7Q
timestamp                                                1513092936205
helpful_vote                                                         0
verified_purchase                                                 True
dtype: object


In [50]:
#making a PySpark Dataframe

schema = StructType([
    StructField("rating", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("text", StringType(), True),
    StructField("images", StringType(), True),
    StructField("asin", StringType(), True),
    StructField("parent_asin", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("helpful_vote", IntegerType(), True),
    StructField("verified_purchase", IntegerType(), True)
])

In [51]:
spark_df = spark.read.schema(schema).json('/content/Automotive.jsonl')

In [52]:
print(spark_df.show(5))

+------+--------------------+--------------------+------+----------+-----------+--------------------+--------------------+------------+-----------------+
|rating|               title|                text|images|      asin|parent_asin|             user_id|           timestamp|helpful_vote|verified_purchase|
+------+--------------------+--------------------+------+----------+-----------+--------------------+--------------------+------------+-----------------+
|   5.0|It fit our 2012 C...|Item came as desc...|    []|B01LZA8SGZ| B0BV88374L|AGXVBIUFLFGMVLATY...|+49918-01-03 17:2...|           0|             NULL|
|   5.0|    Easy to put on!!|Ease of applicati...|    []|B0B2WGS5ND| B0B2WGS5ND|AFE337D2J37YRU5U6...|+54545-07-22 13:5...|           0|             NULL|
|   5.0|             Perfect|Nice quality, fra...|    []|B00A0GV20Q| B00A0GV20Q|AEVWAM3YWN5URJVJI...|+45975-02-05 15:4...|           0|             NULL|
|   2.0|      Not waterproof|Description said ...|    []|B08C27WWVG| B08C27W

In [53]:
spark_df.printSchema()

root
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- images: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- helpful_vote: integer (nullable = true)
 |-- verified_purchase: integer (nullable = true)



In [18]:
#depending on resources, the summary statistics make take over 10min to compute
spark_df.describe().show()

+-------+------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+
|summary|            rating|               title|        text|              images|                asin|         parent_asin|             user_id|      helpful_vote|vertified_purchase|
+-------+------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+
|  count|          19955450|            19955450|    19955450|            19955450|            19955450|            19955450|            19955450|          19955450|                 0|
|   mean| 4.182790716320604|                 NaN|    Infinity|                NULL| 4.609848654133604E9| 4.615182888455246E9|                NULL|0.6502186119581368|              NULL|
| stddev|1.3815796733887575|                 NaN|         NaN|             

In [43]:
spark_df.select("title").show(5)

+--------------------+
|               title|
+--------------------+
|It fit our 2012 C...|
|    Easy to put on!!|
|             Perfect|
|      Not waterproof|
|           Very nice|
+--------------------+
only showing top 5 rows



In [55]:
spark_df.filter(spark_df["verified_purchase"].isNull()).count()

19955450

In [56]:
spark_df.select(F.countDistinct("verified_purchase")).show()

+---------------------------------+
|count(DISTINCT verified_purchase)|
+---------------------------------+
|                                0|
+---------------------------------+



In [57]:
spark_df.select("verified_purchase").distinct().show()

+-----------------+
|verified_purchase|
+-----------------+
|             NULL|
+-----------------+



In [19]:
spark_df.groupBy('rating').count().show()

+------+--------+
|rating|   count|
+------+--------+
|   1.0| 2302427|
|   4.0| 2172554|
|   3.0| 1185429|
|   2.0|  851553|
|   5.0|13443487|
+------+--------+



## Spark SQL

In [26]:
#let's run some SQL statements

spark_df.createOrReplaceGlobalTempView("spark_df")

In [28]:
spark.sql("SELECT rating, helpful_vote \
          FROM global_temp.spark_df \
          WHERE rating >= 4 AND helpful_vote >= 10").show()

+------+------------+
|rating|helpful_vote|
+------+------------+
|   5.0|          11|
|   5.0|          54|
|   5.0|          12|
|   5.0|          83|
|   4.0|          12|
|   4.0|          15|
|   5.0|          13|
|   5.0|          17|
|   4.0|          57|
|   4.0|          14|
|   5.0|         135|
|   4.0|          36|
|   5.0|          10|
|   5.0|          21|
|   5.0|          13|
|   5.0|          11|
|   5.0|          14|
|   5.0|         175|
|   5.0|          31|
|   5.0|          11|
+------+------------+
only showing top 20 rows



In [34]:
spark.sql("""SELECT title, timestamp, helpful_vote, verified_purchase \
            FROM global_temp.spark_df \
            WHERE verified_purchase IS NULL \
            AND rating <= 2""").show()

+-----+---------+------------+------------------+
|title|timestamp|helpful_vote|vertified_purchase|
+-----+---------+------------+------------------+
+-----+---------+------------+------------------+



In [41]:
spark.sql("""SELECT title, timestamp, helpful_vote, verified_purchase \
            FROM global_temp.spark_df \
            WHERE verified_purchase = '1' \
            LIMIT 10
            """).show()

+-----+---------+------------+------------------+
|title|timestamp|helpful_vote|vertified_purchase|
+-----+---------+------------+------------------+
+-----+---------+------------+------------------+



In [39]:
spark.sql("""
          SELECT rating, max(helpful_vote) \
          FROM global_temp.spark_df \
          GROUP BY rating \
          ORDER BY max(helpful_vote) \
          LIMIT 10
          """).show()

+------+-----------------+
|rating|max(helpful_vote)|
+------+-----------------+
|   2.0|             1514|
|   4.0|             1846|
|   3.0|             2036|
|   1.0|             2134|
|   5.0|             2822|
+------+-----------------+

