In [1]:
# Implement a PySpark script to handle any missing values and scale numerical features.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Imputer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder \
    .appName("HandleMissingValuesAndScale") \
    .getOrCreate()

data = [
    (1.0, 0.1, None),
    (2.0, None, 0.2),
    (None, 0.3, 0.3),
    (4.0, 0.4, None),
    (5.0, 0.5, 0.5)
]

columns = ["feature1", "feature2", "feature3"]
df = spark.createDat
print("Initial DataFrame:")
df.show()

imputer = Imputer(
    inputCols=["feature1", "feature2", "feature3"],
    outputCols=["feature1_imputed", "feature2_imputed", "feature3_imputed"]
)

assembler = VectorAssembler(
    inputCols=["feature1_imputed", "feature2_imputed", "feature3_imputed"],
    outputCol="features"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

pipeline = Pipeline(stages=[imputer, assembler, scaler])

# fit and transform the pipeline
pipeline_model = pipeline.fit(df)
scaled_df = pipeline_model.transform(df)

print("DataFrame after handling missing values and scaling:")
scaled_df.select("feature1", "feature2", "feature3", "scaled_features").show(truncate=False)

spark.stop()



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.1.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/usr/lib/python3/dist-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/lib/python3/dist-packages/traitlets/config/application.py", line 846, in launch_instance
    app.start()
  File "/usr/lib/python3/dist-packages/ipykernel/kernelapp.py", line 677, in start
    s

AttributeError: _ARRAY_API not found

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/07 10:29:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


AttributeError: 'SparkSession' object has no attribute 'createDat'

In [2]:
# Develop a PySpark script that uses the K-means algorithm to cluster data points.

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder \
    .appName("KMeansClustering") \
    .getOrCreate()

data = [
    (0.0, 0.0),
    (0.1, 0.1),
    (0.2, 0.2),
    (9.0, 9.0),
    (9.1, 9.1),
    (9.2, 9.2)
]

columns = ["x", "y"]
df = spark.createDataFrame(data, columns)

print("Initial DataFrame:")
df.show()

assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")

feature_df = assembler.transform(df)
feature_df.show()

kmeans = KMeans(k=2, seed=1)  # change k and see what happens
model = kmeans.fit(feature_df)

predictions = model.transform(feature_df)

print("Cluster Centers:")
for center in model.clusterCenters():
    print(center)

print("Predictions:")
predictions.select("x", "y", "prediction").show()

spark.stop()


24/10/07 10:29:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Initial DataFrame:
+---+---+
|  x|  y|
+---+---+
|0.0|0.0|
|0.1|0.1|
|0.2|0.2|
|9.0|9.0|
|9.1|9.1|
|9.2|9.2|
+---+---+

+---+---+---------+
|  x|  y| features|
+---+---+---------+
|0.0|0.0|(2,[],[])|
|0.1|0.1|[0.1,0.1]|
|0.2|0.2|[0.2,0.2]|
|9.0|9.0|[9.0,9.0]|
|9.1|9.1|[9.1,9.1]|
|9.2|9.2|[9.2,9.2]|
+---+---+---------+



24/10/07 10:29:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


Cluster Centers:
[9.1 9.1]
[0.1 0.1]
Predictions:
+---+---+----------+
|  x|  y|prediction|
+---+---+----------+
|0.0|0.0|         1|
|0.1|0.1|         1|
|0.2|0.2|         1|
|9.0|9.0|         0|
|9.1|9.1|         0|
|9.2|9.2|         0|
+---+---+----------+



In [3]:
# Develop a PySpark script that labels data points as anomalies based on their cluster
# assignments


from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import numpy as np

spark = SparkSession.builder \
    .appName("AnomalyDetection") \
    .getOrCreate()

data = [
    (0.0, 0.0),
    (0.1, 0.1),
    (0.2, 0.2),
    (9.0, 9.0),
    (9.1, 9.1),
    (9.2, 9.2),
    (11.0, 11.0)  # outlier
]

columns = ["x", "y"]
df = spark.createDataFrame(data, columns)

print("Initial DataFrame:")
df.show()

assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
feature_df = assembler.transform(df)

kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(feature_df)

predictions = model.transform(feature_df)

centers = model.clusterCenters() # this is how you get the centers

def label_anomaly(features, prediction):
    center = centers[int(prediction)]
    distance = np.linalg.norm(np.array(features) - np.array(center))
    threshold = 1.0  # Define a threshold for anomaly detection
    return "anomaly" if distance > threshold else "normal"

# udf is user defined fucntions, applys any conditin on all the values in a col
label_anomaly_udf = udf(label_anomaly, StringType())

labeled_predictions = predictions.withColumn(
    "anomaly_label", label_anomaly_udf(col("features"), col("prediction"))
)

print("Labeled Predictions:")
labeled_predictions.select("x", "y", "prediction", "anomaly_label").show()

spark.stop()


Initial DataFrame:
+----+----+
|   x|   y|
+----+----+
| 0.0| 0.0|
| 0.1| 0.1|
| 0.2| 0.2|
| 9.0| 9.0|
| 9.1| 9.1|
| 9.2| 9.2|
|11.0|11.0|
+----+----+

Labeled Predictions:



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.1.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/lplab/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 218, in <module>
  File "/home/lplab/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
  File "/home/lplab/.local/lib/python3.10/site-packages/pyspark

AttributeError: _ARRAY_API not found
AttributeError: _ARRAY_API not found

A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.1.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/home/lplab/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 218, in <module>
  File "/home/lplab/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, 



+----+----+----------+-------------+
|   x|   y|prediction|anomaly_label|
+----+----+----------+-------------+
| 0.0| 0.0|         1|       normal|
| 0.1| 0.1|         1|       normal|
| 0.2| 0.2|         1|       normal|
| 9.0| 9.0|         0|       normal|
| 9.1| 9.1|         0|       normal|
| 9.2| 9.2|         0|       normal|
|11.0|11.0|         0|      anomaly|
+----+----+----------+-------------+



In [4]:
pip install numpy --upgrade

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [4]:
# Implement code to evaluate the effectiveness of the K-means clustering model in detecting
# anomalies.

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import pandas as pd
import math

spark = SparkSession.builder \
    .appName("KMeansAnomalyEvaluation") \
    .getOrCreate()

data = [
    (0.0, 0.0, 0),   # Normal
    (0.1, 0.1, 0),   # Normal
    (0.2, 0.2, 0),   # Normal
    (9.0, 9.0, 1),   # Anomaly
    (9.1, 9.1, 1),   # Anomaly
    (9.2, 9.2, 1),   # Anomaly
    (10.0, 10.0, 1)  # Anomaly
]

columns = ["x", "y", "label"]
df = spark.createDataFrame(data, columns)

print("Initial DataFrame:")
df.show()

assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
feature_df = assembler.transform(df)

kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(feature_df)

predictions = model.transform(feature_df)

predictions = predictions.withColumnRenamed("prediction", "cluster")
predictions.show()

# Step 5: Define anomaly detection function
def label_anomaly(features, prediction):
    center = model.clusterCenters()[int(prediction)]
    distance = []
    for i in range(len(features)):
        distance.append(math.sqrt(features[i]**2+center[i]**2))
    threshold = 2.0  # Define a threshold for anomaly detection
    for i in range(len(features)):
        if distance[i] > threshold:
            return 1    # 1 for anomaly, 0 for normal

def calculate_accuracy(true, predicted):
    correct = sum(1 for true_label, pred_label in zip(true, predicted) if true_label == pred_label)
    accuracy = correct / len(true)  # Divide by total number of predictions
    return accuracy

# Step 6: Collect results for evaluation
results = []
for row in predictions.collect():
    features = row.features.toArray()
    prediction = row.cluster
    label = row.label
    anomaly_label = label_anomaly(features, prediction)
    results.append((label, anomaly_label))

# Convert results to a DataFrame for evaluation
results_df = spark.createDataFrame(results, ["label", "predicted_anomaly"])

# Step 7: Evaluate effectiveness using accuracy
result_pandas = results_df.toPandas()
accuracy = calculate_accuracy(result_pandas['label'], result_pandas['predicted_anomaly'])

# Show accuracy
print(f"Accuracy of the K-means anomaly detection model: {accuracy:.2f}")

# Stop the Spark session
spark.stop()


Initial DataFrame:
+----+----+-----+
|   x|   y|label|
+----+----+-----+
| 0.0| 0.0|    0|
| 0.1| 0.1|    0|
| 0.2| 0.2|    0|
| 9.0| 9.0|    1|
| 9.1| 9.1|    1|
| 9.2| 9.2|    1|
|10.0|10.0|    1|
+----+----+-----+

+----+----+-----+-----------+-------+
|   x|   y|label|   features|cluster|
+----+----+-----+-----------+-------+
| 0.0| 0.0|    0|  (2,[],[])|      1|
| 0.1| 0.1|    0|  [0.1,0.1]|      1|
| 0.2| 0.2|    0|  [0.2,0.2]|      1|
| 9.0| 9.0|    1|  [9.0,9.0]|      0|
| 9.1| 9.1|    1|  [9.1,9.1]|      0|
| 9.2| 9.2|    1|  [9.2,9.2]|      0|
|10.0|10.0|    1|[10.0,10.0]|      0|
+----+----+-----+-----------+-------+



TypeError: '>' not supported between instances of 'list' and 'float'