### Technical Demonstration ACID operations - Mono Architecture

In [1]:
# Print information about the active SparkContext
sc

In [2]:
# Import SparkSession (main entry point to Spark) and PySpark SQL functions
from pyspark.sql import SparkSession, functions as F

# Create or get an existing SparkSession
# This is how we initialize Spark in Python so we can read/write data, run SQL queries, etc.
spark = SparkSession.builder.appName('NoACID').getOrCreate()

25/10/12 17:17:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Load a structured dataset (the wine-quality CSV) into a Spark DataFrame,
df = spark.read.csv("/CA1/ACID/winequality-red.csv", header = True, inferSchema = True, sep = ";")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
# Bring data to memory, cache the data
df.cache()

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]

In [5]:
# It is an action in spark, creates a spark job and count 
# all the rows using parallel processing (it is controlled by Spark) across the cpu cores in the local machine
df.count()

1599

In [6]:
# Shows the dataFrame schema (columns + types)
df

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]

###### Check for Nulls in the dataset

In [7]:
# Check for null values
# For each column c, create a new column named c that contains the number of null values in that column.
df_result = df.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

# Convert to pandas for a better visualisation
df_result_pd = df_result.toPandas().T  # Transpose for vertical display
df_result_pd.columns = ["null_count"] # Add column name null count
display(df_result_pd)


Unnamed: 0,null_count
fixed acidity,0
volatile acidity,0
citric acid,0
residual sugar,0
chlorides,0
free sulfur dioxide,0
total sulfur dioxide,0
density,0
pH,0
sulphates,0


###### Check for Duplicate rows in the dataset

In [8]:
# Find duplicates rows
total_rows = df.count()
unique_rows = df.dropDuplicates().count()
duplicates = total_rows - unique_rows

print(f"Total rows: {total_rows}")
print(f"Unique rows: {unique_rows}")
print(f"Duplicate rows: {duplicates}")

Total rows: 1599
Unique rows: 1359
Duplicate rows: 240


In [9]:
# Delete duplicates
df = df.dropDuplicates()

In [10]:
# Find duplicates rows
total_rows = df.count() # total number of rows
unique_rows = df.dropDuplicates().count() # number of distinct rows, after removing duplicates only one copy of each unique record remains.
duplicates = total_rows - unique_rows # duplicates

print(f"Total rows: {total_rows}")
print(f"Unique rows: {unique_rows}")
print(f"Duplicate rows: {duplicates}")

Total rows: 1359
Unique rows: 1359
Duplicate rows: 0


###### Prepare the dataset for inserting a row and for training neural network models. A copy of an existing row is created and modified (its label changed) and then inserted in the dataset to demonstrate that this simple architecture does not support ACID transactions.

In [11]:
# Add a column id + column label (quality >= 7 -> good wine)
df = (df.withColumn("id", F.monotonically_increasing_id())
         .withColumn("label", (F.col("quality") >= 7).cast("int")))

In [12]:
# Shows the new dataFrame schema (columns + types) after adding columns id and label
df

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int, id: bigint, label: int]

In [13]:
# select -> This is a transformation, meaning Spark is defining what you want to do 
# (create a new logical plan), but not executing it yet. In other words, 
# create a new logical DataFrame that only keeps these three columns: id, label, and quality
# show -> this is the action that triggers Spark to actually execute the plan
df.select("id", "label", "quality").show()

+---+-----+-------+
| id|label|quality|
+---+-----+-------+
|  0|    0|      4|
|  1|    0|      5|
|  2|    1|      7|
|  3|    1|      7|
|  4|    0|      6|
|  5|    0|      5|
|  6|    0|      5|
|  7|    1|      7|
|  8|    0|      5|
|  9|    0|      6|
| 10|    0|      6|
| 11|    0|      5|
| 12|    0|      6|
| 13|    0|      6|
| 14|    0|      6|
| 15|    1|      7|
| 16|    0|      6|
| 17|    0|      5|
| 18|    1|      7|
| 19|    1|      8|
+---+-----+-------+
only showing top 20 rows



In [14]:
# reorganize the dataframe, removing the column quality
# Python list comprehension with all the columns removing id, quality and label 
feature_cols = [c for c in df.columns if c not in ("id", "quality", "label")]
# add the id column at the beggining and label at the end
df = df.select("id", *feature_cols, "label")

In [15]:
# Return a Python list containing the names of all the columns in the spark dataFrame, in order.
df.columns

['id',
 'fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'label']

In [16]:
# shuffle the dataframe using a seed (always the same order), then select the first element.
# then return a list with one element. [0] returns the first row of that list  
row = df.orderBy(F.rand(42)).limit(1).collect()[0]
# return the id
bad_id = row["id"]
# return the label for that row
orig_label = row["label"]
# Change the label
fix_label = 1 - orig_label

In [17]:
# build a condition id == bad_id
cond = F.col("id") == bad_id
# .withColumn()-> adds or replaces a column, transofrmation
# F.lit(fix_label)-> creates a literal constant value for every row
# where() -> tells Spark which rows you want, transformation
# From DataFrame df, take the rows that satisfy cond,
# and assign (or overwrite) their label column with the constant value fix_label
row_fix = df.where(cond).withColumn("label", F.lit(fix_label))

###### The following two rows have identical feature values but different labels

In [18]:
# Convert that Spark Row object into a Python dictionary
print(row.asDict())

{'id': 255, 'fixed acidity': 8.9, 'volatile acidity': 0.875, 'citric acid': 0.13, 'residual sugar': 3.45, 'chlorides': 0.088, 'free sulfur dioxide': 4.0, 'total sulfur dioxide': 14.0, 'density': 0.9994, 'pH': 3.44, 'sulphates': 0.52, 'alcohol': 11.5, 'label': 0}


In [19]:
print(row_fix.first().asDict())

{'id': 255, 'fixed acidity': 8.9, 'volatile acidity': 0.875, 'citric acid': 0.13, 'residual sugar': 3.45, 'chlorides': 0.088, 'free sulfur dioxide': 4.0, 'total sulfur dioxide': 14.0, 'density': 0.9994, 'pH': 3.44, 'sulphates': 0.52, 'alcohol': 11.5, 'label': 1}


###### HDFS CSV (DIRTY): INSERT  -> do not support ACID properties (no consistency)

In [20]:
# Write dataframe to hadoop
# Write the file in only one partition because I use coalesce, if I dont use coalesce it can create multiple partitions
# for the same file
df.coalesce(1).write.csv(
    path="/CA1/ACID/dirty/winequality-red",
    mode="overwrite",
    header=True
)

                                                                                

In [21]:
# Append the corrected row to hadoop (keeps old version of the row too)
row_fix.coalesce(1).write.csv(
    path="/CA1/ACID/dirty/winequality-red",
    mode="append",
    header=True
)

In [22]:
# read the csv from hadoop with sep = , because the files were written by Spark with the default delimiter = comma
df_in = spark.read.csv("/CA1/ACID/dirty/winequality-red", header = True, sep = ",")

# define list comprehension to create a list from df_in.
cols = [
    F.col(c).cast("double") if c not in ("id", "label") else F.col(c)
    for c in df_in.columns
]

# select(...) select the columns and transform the types because in csv they are strings
# column id is cast to bigint and label to int
# select(*cols) unpack the column type and pass them as arguments to select()
# withcolumn the same
# it is a transformation, nothing is executed yet
df_dirty = (df_in
               .select(*cols)
               .withColumn("id",F.col("id").cast("bigint"))
               .withColumn("label",F.col("label").cast("int")))

In [23]:
# how many partitions (tasks) are created to read the file 
df_in.rdd.getNumPartitions()

2

In [24]:
# One row more than before because we duplicate a row with different label
df_dirty.count()

1360

In [25]:
# select the column which is inserted twice
# group the DataFrame df_dirty by the column id
# count how many rows there are for that id
# count how many distinct values of label exist for that id
# condition where id = bad_id 
(df_dirty.groupBy("id")
 .agg(F.count("*").alias("n"), F.countDistinct("label").alias("label_versions"))
 .where(df_dirty["id"]==bad_id)).show()


+---+---+--------------+
| id|  n|label_versions|
+---+---+--------------+
|255|  2|             2|
+---+---+--------------+



In [26]:
# Replace invalid characters with underscores, sanitize the columns
for c in df_dirty.columns:
    df_dirty = df_dirty.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))

In [27]:
df_dirty.count()

1360

### Train Keras ANN

In [28]:
#import sys
#print(sys.executable)  # sanity check: this is your system Python
#%pip install --user --break-system-packages "tensorflow>=2.16"  # or tensorflow-cpu

In [29]:
# import libraries to train ANN
import pandas as pd, numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow import keras
from tensorflow.keras import layers

2025-10-12 17:20:04.230878: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [29]:
# This list names the input columns — the variables that the model will use to make predictions
feature_cols = [
    "fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar",
    "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide",
    "density", "ph", "sulphates", "alcohol"
]

# this function takes a spark dataframe, as input and returns 
# NumPy arrays (X, y) — the standard format expected by TensorFlow
def to_numpy(sdf):
    # This collects the entire spark dataFrame into memory as a pandas dataFrame, it is an action
    # Trigger Spark to execute a job and produce a result.
    pdf = sdf.toPandas()
    # Select only the columns in the list above, and convert those Pandas columns into a 2D NumPy array
    # the data is stored as 32-bit floats (what TensorFlow expects).
    X = pdf[feature_cols].to_numpy(dtype=np.float32)
    # Select the label column and Converts it to a 1D NumPy array of integers
    y = pdf["label"].to_numpy(dtype=np.int64)
    return X, y

In [30]:
# call the function to_numpy
X, y = to_numpy(df_dirty)

In [31]:
# Split dataset into training and testing subsets
# test_size= 0.2 -> 20% of the data will go to the test set, and 80% will go to the training set
# If you have 1,599 samples: Training set is ~1,279 rows and Test set is 320 rows approximately
# random_state -> Ensure the same split every time you run the code, otherwise different 20% of rows go to X_test each time 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=7)

In [32]:
# Wrap the NumPy array X_train inside a pandas dataFrame to call describe
# we check values min and max to verify if they have different scales
pd.DataFrame(X_train).describe()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
count,1088.0,1088.0,1088.0,1088.0,1088.0,1088.0,1088.0,1088.0,1088.0,1088.0,1088.0
mean,8.330882,0.5317,0.273199,2.502252,0.088859,15.912683,46.302849,0.996734,3.309127,0.655882,10.421829
std,1.736495,0.186346,0.19606,1.308558,0.052025,10.561567,31.897776,0.001862,0.153015,0.162066,1.067141
min,4.6,0.12,0.0,0.9,0.012,1.0,6.0,0.99007,2.74,0.33,8.4
25%,7.1,0.39,0.09,1.9,0.07,7.0,22.0,0.995615,3.21,0.55,9.5
50%,7.9,0.52,0.26,2.2,0.079,13.0,38.0,0.99671,3.31,0.62,10.2
75%,9.3,0.64,0.43,2.6,0.091,22.0,62.0,0.997853,3.4,0.73,11.1
max,15.9,1.58,1.0,15.5,0.611,68.0,165.0,1.00369,4.01,2.0,14.9


In [33]:
# we standardize because different features have different scales, and we want them all to 
# have equal importance for the model
# fit on the training set, use the same mean and std from training data to scale the test set
# it means that both datasets are on the same scale
sc = StandardScaler().fit(X_train)
X_train, X_test = sc.transform(X_train), sc.transform(X_test)

###### First Model

In [34]:
# import libraries from tensorflow
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# build a model with two hidden layers
# Dense -> fully connected layer (each neuron connects to every neuron in the next layer)
# d is the number of input features
# activation function relu for both hidden layers
# The output layer has one neuron because it is a binary classification (good wine vs bad wine)
# The activation function is sigmoid, so the network outputs is a probability: 
# close to 0 -> class 0 (bad wine)
# close to 1 -> class 1 (good wine)
# optimizer adam -> to adjusts learning rates during training
# loss binary_crossentropy -> used for binary classification
# metric accuracy -> to measure performance
def build_model(d):
    model = Sequential()
    # Input + Hidden Layer 1
    model.add(Dense(64, input_dim=d, activation="relu"))
    
    # Hidden Layer 2
    model.add(Dense(32, activation="relu"))
    
    # Output Layer
    model.add(Dense(1, activation="sigmoid"))
    
    # Compile
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model


In [35]:
# build the model
model = build_model(X_train.shape[1])
# trains the neural network
# epochs = 200 -> Feeds training data through the model 200 times
# give the model unseen data (validation data) to evaluate after each epoch
# the model trains on mini-batches of 16 examples at a time
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=200, batch_size=16, verbose=2) # -> Model 1

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
2025-10-10 14:49:34.234344: E external/local_xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)


Epoch 1/200
68/68 - 1s - 14ms/step - accuracy: 0.8649 - loss: 0.4218 - val_accuracy: 0.8529 - val_loss: 0.3914
Epoch 2/200
68/68 - 0s - 3ms/step - accuracy: 0.8658 - loss: 0.3007 - val_accuracy: 0.8750 - val_loss: 0.3493
Epoch 3/200
68/68 - 0s - 5ms/step - accuracy: 0.8833 - loss: 0.2775 - val_accuracy: 0.8566 - val_loss: 0.3403
Epoch 4/200
68/68 - 0s - 3ms/step - accuracy: 0.8833 - loss: 0.2664 - val_accuracy: 0.8713 - val_loss: 0.3382
Epoch 5/200
68/68 - 0s - 4ms/step - accuracy: 0.8888 - loss: 0.2616 - val_accuracy: 0.8676 - val_loss: 0.3416
Epoch 6/200
68/68 - 0s - 3ms/step - accuracy: 0.8897 - loss: 0.2564 - val_accuracy: 0.8676 - val_loss: 0.3508
Epoch 7/200
68/68 - 0s - 2ms/step - accuracy: 0.8934 - loss: 0.2523 - val_accuracy: 0.8750 - val_loss: 0.3491
Epoch 8/200
68/68 - 0s - 3ms/step - accuracy: 0.8897 - loss: 0.2475 - val_accuracy: 0.8713 - val_loss: 0.3464
Epoch 9/200
68/68 - 0s - 2ms/step - accuracy: 0.9007 - loss: 0.2457 - val_accuracy: 0.8750 - val_loss: 0.3566
Epoch 10/

Epoch 75/200
68/68 - 0s - 4ms/step - accuracy: 0.9697 - loss: 0.0963 - val_accuracy: 0.8787 - val_loss: 0.6756
Epoch 76/200
68/68 - 0s - 3ms/step - accuracy: 0.9688 - loss: 0.0950 - val_accuracy: 0.8787 - val_loss: 0.7015
Epoch 77/200
68/68 - 0s - 3ms/step - accuracy: 0.9697 - loss: 0.0952 - val_accuracy: 0.8787 - val_loss: 0.7190
Epoch 78/200
68/68 - 0s - 3ms/step - accuracy: 0.9688 - loss: 0.0937 - val_accuracy: 0.8750 - val_loss: 0.7338
Epoch 79/200
68/68 - 0s - 3ms/step - accuracy: 0.9669 - loss: 0.0909 - val_accuracy: 0.8750 - val_loss: 0.7321
Epoch 80/200
68/68 - 0s - 4ms/step - accuracy: 0.9752 - loss: 0.0890 - val_accuracy: 0.8750 - val_loss: 0.7380
Epoch 81/200
68/68 - 0s - 6ms/step - accuracy: 0.9724 - loss: 0.0893 - val_accuracy: 0.8750 - val_loss: 0.7425
Epoch 82/200
68/68 - 0s - 3ms/step - accuracy: 0.9715 - loss: 0.0875 - val_accuracy: 0.8787 - val_loss: 0.7511
Epoch 83/200
68/68 - 0s - 4ms/step - accuracy: 0.9752 - loss: 0.0829 - val_accuracy: 0.8676 - val_loss: 0.7706
E

Epoch 149/200
68/68 - 0s - 3ms/step - accuracy: 0.9972 - loss: 0.0209 - val_accuracy: 0.8713 - val_loss: 1.4096
Epoch 150/200
68/68 - 0s - 3ms/step - accuracy: 0.9963 - loss: 0.0236 - val_accuracy: 0.8640 - val_loss: 1.3851
Epoch 151/200
68/68 - 0s - 3ms/step - accuracy: 0.9991 - loss: 0.0215 - val_accuracy: 0.8603 - val_loss: 1.4617
Epoch 152/200
68/68 - 0s - 3ms/step - accuracy: 0.9991 - loss: 0.0198 - val_accuracy: 0.8676 - val_loss: 1.4864
Epoch 153/200
68/68 - 0s - 3ms/step - accuracy: 0.9991 - loss: 0.0224 - val_accuracy: 0.8640 - val_loss: 1.4875
Epoch 154/200
68/68 - 0s - 3ms/step - accuracy: 0.9972 - loss: 0.0212 - val_accuracy: 0.8603 - val_loss: 1.4461
Epoch 155/200
68/68 - 0s - 3ms/step - accuracy: 0.9835 - loss: 0.0423 - val_accuracy: 0.8603 - val_loss: 1.5103
Epoch 156/200
68/68 - 0s - 3ms/step - accuracy: 0.9945 - loss: 0.0250 - val_accuracy: 0.8493 - val_loss: 1.4853
Epoch 157/200
68/68 - 0s - 3ms/step - accuracy: 0.9972 - loss: 0.0207 - val_accuracy: 0.8640 - val_loss:

<keras.src.callbacks.history.History at 0x794b53cead20>

###### Second Model (includes techniques to improve training and generalization)

In [36]:
# import libraries from tensorflow
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, BatchNormalization, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# build a model with two hidden layers
# Dense -> fully connected layer (each neuron connects to every neuron in the next layer)
# d is the number of input features
# activation function relu for both hidden layers
# The output layer has one neuron because it is a binary classification (good wine vs bad wine)
# The activation function is sigmoid, so the network outputs is a probability: 
# close to 0 -> class 0 (bad wine)
# close to 1 -> class 1 (good wine)
# optimizer adam -> to adjusts learning rates during training
# loss binary_crossentropy -> used for binary classification
# metric accuracy -> to measure performance
# Three core techniques were added to make neural networks train better and 
# generalize well (Dropout, BatchNormalization and L2)
# EarlyStopping and ReduceLROnPlateau are callbacks that help your model train smarter 
# and avoid overfitting or wasting epochs
def build_model1(d):
    model = Sequential()
    
    # Input + Hidden Layer 1
    model.add(Dense(64, input_dim=d, activation="relu", kernel_regularizer=l2(0.001)))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    
    # Hidden Layer 2
    model.add(Dense(32, activation="relu", kernel_regularizer=l2(0.001)))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    
    # Output Layer
    model.add(Dense(1, activation="sigmoid"))
    
    # Compile
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model

es  = EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True)
rlr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-5)

In [37]:
# build the model
model1 = build_model1(X_train.shape[1])
# trains the neural network
# epochs = 200 -> Feeds training data through the model 200 times
# give the model unseen data (validation data) to evaluate after each epoch
# the model trains on mini-batches of 16 examples at a time
model1.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=200, batch_size=16, verbose=2, callbacks=[es, rlr]) # -> Model 2) 

Epoch 1/200
68/68 - 2s - 28ms/step - accuracy: 0.5864 - loss: 0.8440 - val_accuracy: 0.8088 - val_loss: 0.6016 - learning_rate: 0.0010
Epoch 2/200
68/68 - 0s - 5ms/step - accuracy: 0.7270 - loss: 0.6184 - val_accuracy: 0.8382 - val_loss: 0.5234 - learning_rate: 0.0010
Epoch 3/200
68/68 - 0s - 3ms/step - accuracy: 0.7978 - loss: 0.5459 - val_accuracy: 0.8676 - val_loss: 0.4562 - learning_rate: 0.0010
Epoch 4/200
68/68 - 0s - 3ms/step - accuracy: 0.8300 - loss: 0.4643 - val_accuracy: 0.8750 - val_loss: 0.4228 - learning_rate: 0.0010
Epoch 5/200
68/68 - 0s - 3ms/step - accuracy: 0.8585 - loss: 0.4189 - val_accuracy: 0.8676 - val_loss: 0.3998 - learning_rate: 0.0010
Epoch 6/200
68/68 - 0s - 3ms/step - accuracy: 0.8575 - loss: 0.4174 - val_accuracy: 0.8750 - val_loss: 0.3849 - learning_rate: 0.0010
Epoch 7/200
68/68 - 0s - 3ms/step - accuracy: 0.8704 - loss: 0.3934 - val_accuracy: 0.8824 - val_loss: 0.3845 - learning_rate: 0.0010
Epoch 8/200
68/68 - 0s - 3ms/step - accuracy: 0.8704 - loss: 

<keras.src.callbacks.history.History at 0x794ba15be2d0>

In [38]:
# Print versions of PySpark, Python and Spark
import sys, pyspark
print("Python exe:", sys.executable)  
print("PySpark ver:", pyspark.__version__)
print("Spark:", spark.version)


Python exe: /usr/bin/python3
PySpark ver: 3.5.6
Spark: 3.5.6
