<a href="https://colab.research.google.com/github/Ananya-Nair20/predective-log-analysis/blob/main/Log_analysis_using_device_logs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive # Import the 'drive' object

#Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


Data Cleaning

In [3]:
# prompt: I want to read multiple files and clean, preprocess multiple files and multiple columns. The columns are empolyee_name, user_id, email, role, projects,buisness_unit, function_unit, department, team, supervisor. write a code according to clean and preprocess 20+ files at the same time and all these columns in the same block of code

from google.colab import drive
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, trim
import os


# Define the path to your data files
data_dir = f"/content/drive/MyDrive/device_logs"
# Replace with the actual path

# Initialize SparkSession
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# Columns to clean and preprocess
columns_to_clean = ["id", "date", "user", "pc", "file_tree",
                    "activity"]

# Function to clean a single DataFrame
def clean_dataframe(df_spark):
  for col_name in columns_to_clean:
    # Handling Missing Values: Fill nulls with empty string for string columns
    if df_spark.schema[col_name].dataType == 'string':
        df_spark = df_spark.na.fill('', subset=[col_name])
    else:
        df_spark = df_spark.na.fill(0, subset=[col_name])  # Or appropriate value for the column type

    # Removing Special Characters/Cleaning Strings
    #df_spark = df_spark.withColumn(col_name, regexp_replace(col(col_name), "[^a-zA-Z0-9 ]", ""))

    # Trimming Whitespace
    df_spark = df_spark.withColumn(col_name, trim(col(col_name)))

  # Removing Duplicates
  #df_spark = df_spark.dropDuplicates()

  return df_spark

# Loop through all files in the directory
for filename in os.listdir(data_dir):
    if filename.endswith(".csv"): #process only csv files
        file_path = os.path.join(data_dir, filename)
        print(f"Processing file: {filename}")

        try:
            # Load each CSV file into a Spark DataFrame
            df_spark = spark.read.csv(file_path, header=True, inferSchema=True)

            # Clean and preprocess the DataFrame
            df_spark = clean_dataframe(df_spark)

            # Output path for the cleaned file
            output_file_path = os.path.join('/content/drive/MyDrive/cleaned', filename.replace('.csv', '_clean.csv'))

            # Save the cleaned data
            df_spark.write.csv(output_file_path, header=True, mode='overwrite')

        except Exception as e:
            print(f"Error processing {filename}: {e}")

print("Data cleaning process completed.")


Processing file: device.csv
Data cleaning process completed.


In [4]:
# prompt: write a code to save this preprocessed data into a checkpoint so i wouldn't have to run the previous cells again and again

import os
from pyspark.sql import SparkSession

# Initialize SparkSession (if not already initialized)
spark = SparkSession.builder.appName("DataCheckpoint").getOrCreate()

# Define checkpoint directory
checkpoint_dir = "/content/drive/MyDrive/checkpoint"  # Replace with your desired checkpoint directory
os.makedirs(checkpoint_dir, exist_ok=True)

# Load the preprocessed data from the cleaned CSV files.  Assuming the cleaned data is in
# /content/drive/MyDrive/cleaned/
cleaned_data_dir = "/content/drive/MyDrive/cleaned"

# Check if checkpoint exists
if not os.path.exists(os.path.join(checkpoint_dir, "_SUCCESS")):
    print("Checkpoint does not exist. Loading and saving data.")

    # Initialize an empty list to store DataFrames
    all_dfs = []

    for filename in os.listdir(cleaned_data_dir):
        if filename.endswith("_clean.csv"):
            file_path = os.path.join(cleaned_data_dir, filename)
            try:
              df = spark.read.csv(file_path, header=True, inferSchema=True)
              all_dfs.append(df)
            except Exception as e:
              print(f"Error loading file {filename}: {e}")

    #Combine all dataframes together
    if all_dfs:
      combined_df = all_dfs[0]
      for i in range(1, len(all_dfs)):
          combined_df = combined_df.union(all_dfs[i])

      # Write the combined dataframe to the checkpoint directory
      combined_df.write.parquet(checkpoint_dir, mode="overwrite")
      # Create a success file to indicate completion
      with open(os.path.join(checkpoint_dir, "_SUCCESS"), "w") as f:
          pass

else:
    print("Checkpoint exists. Loading data from checkpoint.")
    # Load the data from the checkpoint directory directly
    combined_df = spark.read.parquet(checkpoint_dir)


# Now combined_df holds your preprocessed data, loaded from the checkpoint if it exists
# or created from the cleaned data if not
print("Data successfully loaded")
# Continue with your analysis using combined_df
# Example
#combined_df.show(5)


Checkpoint exists. Loading data from checkpoint.
Data successfully loaded


FEATURE ENGINEERING:

>




In [9]:
from pyspark.sql.functions import countDistinct, size, split, date_format, to_timestamp, hour, dayofweek, count, when, lit
from pyspark.sql.types import StringType, TimestampType

# Assuming 'df_spark' is your cleaned DataFrame from the previous code.

# 1. Activity Count per User (per hour)
df_spark = df_spark.withColumn("timestamp", to_timestamp(col("date"), "dd/MM/yyyy HH:mm:ss"))
df_spark = df_spark.withColumn("date_only", date_format(col("timestamp"), "dd/MM/yyyy").cast(StringType()))
df_spark = df_spark.withColumn("time_only", date_format(col("timestamp"), "HH:mm:ss").cast(StringType()))

# Filter out rows with null timestamps before grouping
df_spark = df_spark.filter(col("timestamp").isNotNull())

activity_counts = df_spark.groupBy("user", "date_only", "time_only").agg(count("*").alias("activity_count_per_hour"))
df_spark = df_spark.join(activity_counts, ["user", "date_only", "time_only"], "left")

# Select necessary columns to avoid duplicates
df_spark = df_spark.select("id", "date", "user", "pc", "file_tree", "activity", "timestamp", "date_only", "time_only", "activity_count_per_hour")

# 2. Unique PCs Accessed per User
unique_pcs = df_spark.groupBy("user").agg(countDistinct("pc").alias("unique_pcs_accessed"))
df_spark = df_spark.join(unique_pcs, "user", "left")

# Select necessary columns again
df_spark = df_spark.select("id", "date", "user", "pc", "file_tree", "activity", "timestamp", "date_only", "time_only", "activity_count_per_hour", "unique_pcs_accessed")

# 3. File Path Depth - Handling empty paths
df_spark = df_spark.withColumn("file_path_depth", when(col("file_tree") != '', size(split(col("file_tree"), "/"))).otherwise(lit(0)))

# 4. Activity Frequency (per day) - Ensuring date_only is in the correct format for join
activity_frequency = df_spark.groupBy("user", "date_only").agg(count("*").alias("activity_frequency_per_day"))
df_spark = df_spark.join(activity_frequency, ["user", "date_only"], "left")

# Select necessary columns
df_spark = df_spark.select("id", "date", "user", "pc", "file_tree", "activity", "timestamp", "date_only", "time_only", "activity_count_per_hour", "unique_pcs_accessed", "file_path_depth", "activity_frequency_per_day")

# 5. Day of Week / Hour of Day
df_spark = df_spark.withColumn("day_of_week", dayofweek("timestamp"))
df_spark = df_spark.withColumn("hour_of_day", hour("timestamp"))

# Show the first 20 rows of the dataframe
df_spark.show()

+--------------------+-------------------+-------+-------+--------------------+----------+-------------------+----------+---------+-----------------------+-------------------+---------------+--------------------------+-----------+-----------+
|                  id|               date|   user|     pc|           file_tree|  activity|          timestamp| date_only|time_only|activity_count_per_hour|unique_pcs_accessed|file_path_depth|activity_frequency_per_day|day_of_week|hour_of_day|
+--------------------+-------------------+-------+-------+--------------------+----------+-------------------+----------+---------+-----------------------+-------------------+---------------+--------------------------+-----------+-----------+
|{W0H4-S5OD03FD-56...|01/02/2010 07:48:30|ANC1950|PC-4921|R:\;R:\23svS11;R:...|   Connect|2010-02-01 07:48:30|01/02/2010| 07:48:30|                      1|                  1|              1|                         4|          2|          7|
|{Y3V7-B9RC11WO-81...|10/01/

In [10]:
# prompt: create a checkpoint for saving the previous output

# Checkpoint the DataFrame
checkpoint_dir_features = "/content/drive/MyDrive/checkpoint_features"  # Replace with your desired checkpoint directory
os.makedirs(checkpoint_dir_features, exist_ok=True)

# Check if checkpoint exists
if not os.path.exists(os.path.join(checkpoint_dir_features, "_SUCCESS")):
    print("Checkpoint for features does not exist. Saving data.")

    # Write the combined dataframe to the checkpoint directory
    df_spark.write.parquet(checkpoint_dir_features, mode="overwrite")
    # Create a success file to indicate completion
    with open(os.path.join(checkpoint_dir_features, "_SUCCESS"), "w") as f:
        pass
else:
    print("Checkpoint for features exists. Loading data from checkpoint.")
    # Load the data from the checkpoint directory directly
    df_spark = spark.read.parquet(checkpoint_dir_features)

print("Data successfully checkpointed.")


Checkpoint for features exists. Loading data from checkpoint.
Data successfully checkpointed.


In [11]:
from pyspark.sql.functions import rand

# Assuming df_spark is your DataFrame with features

# Split the data into training, validation, and testing sets (80%, 10%, 10%)
train_df, test_df = df_spark.randomSplit([0.8, 0.2], seed=42)
test_df, val_df = test_df.randomSplit([0.5,0.5], seed=42)

# Show the sizes of the splits
print(f"Train set size: {train_df.count()}")
print(f"Validation set size: {val_df.count()}")
print(f"Test set size: {test_df.count()}")

# Save the splits to parquet files (optional)
# train_df.write.parquet("/content/drive/MyDrive/splits/train", mode="overwrite")
# val_df.write.parquet("/content/drive/MyDrive/splits/val", mode="overwrite")
# test_df.write.parquet("/content/drive/MyDrive/splits/test", mode="overwrite")


Train set size: 1241822
Validation set size: 154616
Test set size: 155390


In [12]:
# prompt: write a code to create a checkpoint to save the above code

# Assuming df_spark is your DataFrame with features from the previous code.

# Checkpoint the DataFrame
checkpoint_dir_final = "/content/drive/MyDrive/checkpoint_final"  # Replace with your desired checkpoint directory
os.makedirs(checkpoint_dir_final, exist_ok=True)

# Check if checkpoint exists
if not os.path.exists(os.path.join(checkpoint_dir_final, "_SUCCESS")):
    print("Checkpoint for final data does not exist. Saving data.")

    # Write the combined dataframe to the checkpoint directory
    df_spark.write.parquet(checkpoint_dir_final, mode="overwrite")
    # Create a success file to indicate completion
    with open(os.path.join(checkpoint_dir_final, "_SUCCESS"), "w") as f:
        pass
else:
    print("Checkpoint for final data exists. Loading data from checkpoint.")
    # Load the data from the checkpoint directory directly
    df_spark = spark.read.parquet(checkpoint_dir_final)

print("Data successfully checkpointed.")


Checkpoint for final data exists. Loading data from checkpoint.
Data successfully checkpointed.


In [15]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming 'df_spark' is defined from the previous code.
# Define features (excluding the target variable 'activity' and irrelevant columns)
feature_cols = ["activity_count_per_hour", "unique_pcs_accessed", "file_path_depth", "activity_frequency_per_day", "day_of_week", "hour_of_day"]

# Split the data into training, validation, and testing sets - This is important to recreate the splits
train_data, temp_data = df_spark.randomSplit([0.7, 0.3], seed=42)  # 70% for training, 30% for temp
validation_data, test_data = temp_data.randomSplit([0.66, 0.34], seed=42) # 30% of the total data (20% of the original)


# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = assembler.transform(train_data)
validation_data = assembler.transform(validation_data)
test_data = assembler.transform(test_data)


# Initialize the Random Forest Classifier
rf_classifier = RandomForestClassifier(labelCol="activity_frequency_per_day", featuresCol="features", numTrees=100)


# Fit the model to the training data
model = rf_classifier.fit(train_data)


# Make predictions on the validation set
predictions_validation = model.transform(validation_data)


# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="activity_frequency_per_day", predictionCol="prediction", metricName="accuracy")
accuracy_validation = evaluator.evaluate(predictions_validation)

print(f"Validation Accuracy: {accuracy_validation}")

# Make predictions on the test set
predictions_test = model.transform(test_data)

# Evaluate the model on the test set
accuracy_test = evaluator.evaluate(predictions_test)

print(f"Test Accuracy: {accuracy_test}")

IllegalArgumentException: activity_count_per_hour does not exist. Available: id, date, user, pc, file_tree, activity

In [None]:
# prompt: save the above code using a checkpoint so i don't have to keep running it again and again

# Save the current state of the notebook using a checkpoint
# This will create a checkpoint file in the specified directory
# You can then restore the notebook to this state later.
checkpoint_dir_final = "/content/drive/MyDrive/checkpoint_final"
df_spark.write.parquet(checkpoint_dir_final, mode="overwrite")
# Create a success file to indicate completion
with open(os.path.join(checkpoint_dir_final, "_SUCCESS"), "w") as f:
    pass

print("Checkpoint created successfully!")


In [2]:
# prompt: write a code for hyperparameter tuning using gridsearch

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create ParamGrid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(rf_classifier.numTrees, [50, 100, 200])
             .addGrid(rf_classifier.maxDepth, [5, 10, 20])
             .build())

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="activity_frequency_per_day", predictionCol="prediction", metricName="accuracy")

# Create CrossValidator
cv = CrossValidator(estimator=rf_classifier, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)


# Run cross-validation
cvModel = cv.fit(train_data)


# Make predictions on the validation set using the best model
bestModel = cvModel.bestModel
predictions_validation = bestModel.transform(validation_data)

# Evaluate the best model on the validation set
accuracy_validation = evaluator.evaluate(predictions_validation)
print(f"Validation Accuracy (best model): {accuracy_validation}")


# Make predictions on the test set using the best model
predictions_test = bestModel.transform(test_data)

# Evaluate the best model on the test set
accuracy_test = evaluator.evaluate(predictions_test)
print(f"Test Accuracy (best model): {accuracy_test}")

# Print the best hyperparameters
print(f"Best hyperparameters: {cvModel.bestModel.extractParamMap()}")


NameError: name 'rf_classifier' is not defined

Hyperparameter Tuning:

In [None]:
# prompt: write a code for hyperparameter tuning this isolation forest algorithm using gridsearchCV

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create ParamGrid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf_classifier.numTrees, [10, 50, 100]) \
    .addGrid(rf_classifier.maxDepth, [5, 10, 20]) \
    .addGrid(rf_classifier.impurity, ['gini', 'entropy']) \
    .build()

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="activity_frequency_per_day", predictionCol="prediction", metricName="accuracy")

# Create CrossValidator
crossval = CrossValidator(estimator=rf_classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # Adjust numFolds as needed

# Run cross-validation
cvModel = crossval.fit(train_data)

# Make predictions on the validation set using the best model
best_predictions_validation = cvModel.transform(validation_data)
accuracy_validation = evaluator.evaluate(best_predictions_validation)
print(f"Validation Accuracy (after tuning): {accuracy_validation}")

# Make predictions on the test set using the best model
best_predictions_test = cvModel.transform(test_data)
accuracy_test = evaluator.evaluate(best_predictions_test)
print(f"Test Accuracy (after tuning): {accuracy_test}")

# Print the best hyperparameters
print("Best hyperparameters:", cvModel.bestModel.extractParamMap())


KeyboardInterrupt: 