In [1]:
# vikasan
import os
import findspark
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME'] = '/home/vik/spark-3.5.6-bin-hadoop3'  

findspark.init()

In [2]:
# vikasan
from pyspark import SparkContext
sc=SparkContext("local","Pyspark new")

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.executor.memory", "6G") \
    .config("spark.driver.memory", "6G") \
    .config("spark.sql.shuffle.partitions", "300") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()


25/10/24 10:39:55 WARN Utils: Your hostname, VKSN resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/24 10:39:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/24 10:39:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.functions import *
from pyspark.ml import Pipeline

In [4]:
statistical_anomalies = spark.read.parquet("file:///home/vik/BDA_miniproj/results/statistical_outliers.parquet")
behavioural_anomalies = spark.read.parquet("file:///home/vik/BDA_miniproj/results/final_behavioural_anomalies_m2.parquet")

                                                                                

In [5]:
print(f"Statistical Anomalies (Paths): {statistical_anomalies.count()}")
print(f"Behavioural Anomalies (Articles): {behavioural_anomalies.count()}")

Statistical Anomalies (Paths): 119506
Behavioural Anomalies (Articles): 12


In [6]:
behavioural_anomalies.columns

['curr', 'features', 'prediction', 'distance_to_center']

In [7]:
statistical_anomalies.columns

['prev',
 'curr',
 'type',
 'n',
 'type_encoded',
 'normalized_clicks',
 'zscore',
 'abs_zscore']

In [9]:
# from kmeans we can get anomaloud articles
anomalous_articles = behavioural_anomalies.select(
    col("curr").alias("article"),
    col("distance_to_center")
)

In [10]:
# from zscore list we can get the anomalous paths
anomalous_paths = statistical_anomalies.groupBy("curr").agg(
    count("*").alias("hot_path_count"),
    avg("zscore").alias("avg_zscore")
)

In [11]:
# We can use an inner join to find articles that are obtained from from  methods
common_anomalies = anomalous_articles.join(
    anomalous_paths,
    anomalous_articles["article"] == anomalous_paths["curr"],
    how="inner"
)

In [12]:
print(f"Found {common_anomalies.count()} articles that are BOTH behavioural and statistical anomalies:")

                                                                                

Found 7 articles that are BOTH behavioural and statistical anomalies:


In [13]:
common_anomalies.select(
    "article",
    "distance_to_center",
    "hot_path_count",
    "avg_zscore"
).orderBy(desc("distance_to_center")).show(truncate=False)

+-----------------+------------------+--------------+------------------+
|article          |distance_to_center|hot_path_count|avg_zscore        |
+-----------------+------------------+--------------+------------------+
|Main_Page        |448097.19018891774|20            |4.123577427271675 |
|A                |81777.4127286     |1             |7.1803893195856725|
|Issue_(genealogy)|14582.065861567951|1             |3.461547060366284 |
|Incumbent        |1260.7850077946496|1             |3.2390137055078316|
|Killed_in_action |1155.622536228467 |2             |3.52295686180159  |
|Undefined        |977.6561377904477 |1             |3.3188823466757147|
|United_States    |936.8134634205683 |4             |4.158346551435512 |
+-----------------+------------------+--------------+------------------+



## demo(sub-sampling) for kmeans

In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.sql.types import *

In [9]:
train_df= spark.read.parquet("file:///home/vik/BDA_miniproj/dataset/train_clickstream.parquet").\
                            sample(withReplacement=False, fraction=0.03, seed=42)    #wsl-vik

In [10]:
print(f"Using a demo sample of {train_df.count()} rows.")



Using a demo sample of 1077273 rows.


                                                                                

In [11]:
# test set
test_df2 = spark.read.parquet("file:///home/vik/BDA_miniproj/dataset/final_test_clickstream_m2.parquet")    #wsl-vik

In [12]:
in_agg = train_df.groupBy("curr").agg(
    countDistinct("prev").alias("in_degree"),              # distinct incoming sources
    sum(col("normalized_clicks")).alias("in_clicks"),     # sum of normalized inbound clicks
    count("*").alias("in_events")                         # number of inbound records (helpful)
)

out_agg = train_df.groupBy("prev").agg(
    countDistinct("curr").alias("out_degree"),              #countDistinct() - used  to count the no.of unique values within one or more columns of a DF
    sum(col("normalized_clicks")).alias("out_clicks"),
    count("*").alias("out_events")
).withColumnRenamed("prev", "curr")                         #withColumnRenamed() - used to rename a single existing column

In [13]:
article_features = in_agg.join(out_agg, on="curr", how="full_outer").fillna(0)

# Derived features (ratios and normalized rates)
article_features = article_features.withColumn(
    "ratio_out_in", (col("out_clicks") / (col("in_clicks") + lit(1.0)))         #lit() is used to create col with a constant or literal value within a DF
).withColumn(
    "bounce_rate", 1 - (col("out_clicks") / (col("in_clicks") + lit(1.0)))
).withColumn(
    "in_out_event_ratio", (col("in_events") / (col("out_events") + lit(1.0)))
)


In [14]:
feature_cols = [
    "in_degree", "out_degree",
    "in_clicks", "out_clicks",
    "ratio_out_in", "bounce_rate", "in_out_event_ratio"
]

In [13]:
article_features.select("curr", *feature_cols).show(10, truncate=False)
print("Total articles:", article_features.count())

                                                                                

+--------------------------------------------------------------+---------+----------+------------------+------------------+------------------+-------------------+------------------+
|curr                                                          |in_degree|out_degree|in_clicks         |out_clicks        |ratio_out_in      |bounce_rate        |in_out_event_ratio|
+--------------------------------------------------------------+---------+----------+------------------+------------------+------------------+-------------------+------------------+
|!!                                                            |2        |1         |5.886104031450156 |2.639057329615259 |0.3832438948877593|0.6167561051122408 |1.0               |
|!!!                                                           |1        |0         |4.430816798843313 |0.0               |0.0               |1.0                |1.0               |
|!Oka_Tokat                                                    |2        |0         |5.231

[Stage 26:>                                                         (0 + 1) / 1]

Total articles: 1024497


                                                                                

In [15]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)
kmeans = KMeans(featuresCol="features", predictionCol="prediction", k=5, seed=42)

pipeline = Pipeline(stages=[assembler, scaler, kmeans])
pipeline_model = pipeline.fit(article_features)

# Extract trained models for later reuse
scaler_model = pipeline_model.stages[1]
kmeans_model = pipeline_model.stages[2]

print("K-means model trained on Month 1 data.")
print("Cluster Centers:")
for center in kmeans_model.clusterCenters():
    print(center)

[Stage 213:>                                                        (0 + 1) / 1]

K-means model trained on Month 1 data.
Cluster Centers:
[-0.19821922  0.00304121 -0.21472373  0.00236253  0.0017686  -0.0017686
 -1.01700462]
[ 9.45275017e+02  3.63347844e+01  9.20977962e+02  3.18284056e+01
 -7.84398806e-05  7.84398806e-05 -5.76481126e-01]
[-2.99032905e-01  6.80861850e+02 -3.35845326e-01  6.93799321e+02
  6.94150813e+02 -6.94150813e+02 -1.23276180e+00]
[ 0.06127835 -0.00391597  0.06389158 -0.0035879  -0.00302528  0.00302528
  0.44015834]
[ 1.27054552e+00  1.73473884e-03  1.44810699e+00  2.11827279e-03
 -2.91573878e-03  2.91573878e-03  3.31545384e+00]


                                                                                

In [16]:
in_agg_m2 = test_df2.groupBy("curr").agg(
    countDistinct("prev").alias("in_degree"),
    sum(col("normalized_clicks")).alias("in_clicks"),
    count("*").alias("in_events")
)

out_agg_m2 = test_df2.groupBy("prev").agg(
    countDistinct("curr").alias("out_degree"),
    sum(col("normalized_clicks")).alias("out_clicks"),
    count("*").alias("out_events")
).withColumnRenamed("prev", "curr")

article_features_m2 = in_agg_m2.join(out_agg_m2, on="curr", how="full_outer").fillna(0)

article_features_m2 = article_features_m2.withColumn(
    "ratio_out_in", (col("out_clicks") / (col("in_clicks") + lit(1.0)))
).withColumn(
    "bounce_rate", 1 - (col("out_clicks") / (col("in_clicks") + lit(1.0)))
).withColumn(
    "in_out_event_ratio", (col("in_events") / (col("out_events") + lit(1.0)))
)

# Assembling Month 2 features
feature_cols = [
    "in_degree", "out_degree",
    "in_clicks", "out_clicks",
    "ratio_out_in", "bounce_rate", "in_out_event_ratio"
]

In [17]:
assembled_m2 = assembler.transform(article_features_m2)

In [18]:
scaled_test_df = scaler_model.transform(assembled_m2).select("curr", "features")
print("Month 2 data successfully scaled using Month 1's rules.")

Month 2 data successfully scaled using Month 1's rules.


In [19]:
predictions_m2 = kmeans_model.transform(scaled_test_df)

cluster_centers = kmeans_model.clusterCenters()

In [20]:
def get_distance(features, prediction):
    center = cluster_centers[prediction]
    return float(features.squared_distance(center))

In [21]:
distance_udf = udf(get_distance, DoubleType())

anomalies_df = predictions_m2.withColumn(
    "distance_to_center",
    distance_udf(col("features"), col("prediction"))
)

In [22]:
distance_stats = anomalies_df.select(
    mean("distance_to_center").alias("mean_distance"),
    stddev("distance_to_center").alias("stddev_distance")
).first()

mean_distance = distance_stats["mean_distance"]
stddev_distance = distance_stats["stddev_distance"]

print(f"Mean Distance (Month 1): {mean_distance}")
print(f"StdDev Distance (Month 1): {stddev_distance}")



Mean Distance (Month 1): 62.49285058887141
StdDev Distance (Month 1): 59797.31928747039


                                                                                

In [23]:
anomaly_threshold = mean_distance + (3 * stddev_distance)
print(f"Anomaly Threshold (Distance > {anomaly_threshold}):")

final_behavioural_anomalies = anomalies_df.filter(
    col("distance_to_center") > anomaly_threshold
)

Anomaly Threshold (Distance > 179454.45071300003):


In [24]:
print(f"Found {final_behavioural_anomalies.count()} formally classified behavioural anomalies in Month 2:")
final_behavioural_anomalies.orderBy(desc("distance_to_center")).show(truncate=False)

                                                                                

Found 6 formally classified behavioural anomalies in Month 2:


25/10/24 10:46:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/10/24 10:46:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/10/24 10:46:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/10/24 10:46:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------+
|curr          |features                                                                                                                                         |prediction|distance_to_center |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------------+
|other-empty   |[-0.29903290464061344,5067.854531831927,-0.33584532596657535,4783.387836792241,4785.802860547091,-4785.802860547086,-1.2327617958472552]         |2         |6.945367197001752E7|
|Main_Page     |[6694.080854672584,251.34141834302864,6524.571218462289,222.66857664976078,-1.1534895042959579E-4,1.1534895042961575E-4,-0.5609301477030884]     |1         |6.453167377216124E7|
|other-search  |[-0.2990329046

                                                                                