# Big Data Analytics [CN7031] CRWK 2024-25

## Group ID: CN7031_Group136_2024

### Group Members:
1. **Navya Athoti**  
    Email: u2793047@uel.ac.uk
2. **Phalguna Avalagunta**  
    Email: u2811669@uel.ac.uk
3. **Nikhil Sai Damera**  
    Email: u2810262@uel.ac.uk
4. **Sai Kishore Dodda**  
    Email: u2773584@uel.ac.uk

---


## Initiate and Configure Spark

In this section, we will initiate and configure Apache Spark, which is a powerful open-source processing engine for big data. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.


In [1]:
!pip3 install pyspark

# Cell 4 [Code]:
# Import required libraries
import os
print(f"JAVA_HOME: {os.environ.get('JAVA_HOME', 'Not set')}")
import sys

# environment variables
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import max as spark_max
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import time
from datetime import datetime


# Initialize Spark session
def initialize_spark():
    spark = (SparkSession.builder
            .appName('CN7031_Group136_2024')
            .config("spark.driver.memory", "4g")
            .config("spark.executor.memory", "4g")
            .config("spark.sql.shuffle.partitions", "100")
            .master("local[*]")
            .getOrCreate())
    return spark

spark = initialize_spark()

JAVA_HOME: C:\Program Files\Java\jdk-21


# Load Unstructured Data

In this section, we will load and process unstructured data. Unstructured data refers to information that does not have a predefined data model or is not organized in a predefined manner. This type of data is typically text-heavy, but may also contain data such as dates, numbers, and facts.

We will explore various techniques to handle and analyze unstructured data, including tokenization, vectorization, and the use of embeddings to capture semantic information.

In [2]:
def load_data(spark, path="web.log"):
    try:
        # Check if file exists
        if not os.path.exists(path):
            raise FileNotFoundError(f"File not found: {path}")
            
        data = spark.read.text(path)
        print(f"Successfully loaded {data.count()} log entries")
        return data
    except Exception as e:
        print(f"Error loading data: {str(e)}")
        raise

# Test the data loading
try:
    data = load_data(spark)
except Exception as e:
    print(f"Failed to load data: {str(e)}")


Successfully loaded 3000000 log entries


# Task 1: Data Processing using PySpark DataFrame [40 marks]

---

## DataFrame Creation with REGEX (10 marks)

Each member will define a custom schema using REGEX to extract specific metrics from the dataset.

### Student Metrics to Extract

- **Student 1: IP Address, Timestamp, HTTP Method**
    - **REGEX Example:** `(\d+\.\d+\.\d+\.\d+) - - \[(.*?)\] \"([A-Z]+)`

- **Student 2: HTTP Status Code, Response Size, Timestamp**
    - **REGEX Example:** `\".*\" (\d+) (\d+) \[(.*?)\]`

- **Student 3: URL Path, IP Address, Response Size**
    - **REGEX Example:** `\"[A-Z]+ (\/.*?) HTTP.* (\d+\.\d+\.\d+\.\d+) (\d+)`

- **Student 4: Log Message, HTTP Status Code, Timestamp**
    - **REGEX Example:** `\".*\" (\d+) .* \[(.*?)\] (.*)`

In [3]:
# Common imports and Spark initialization for all students
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from pyspark.sql.functions import regexp_extract, to_timestamp, col

spark = SparkSession.builder \
    .appName("Log Analysis") \
    .getOrCreate()

# Read the log file
logs_df = spark.read.text("web.log")

# Student 1 (Navya A) - IP Address, Timestamp, HTTP Method
student1_df = logs_df.select(
    regexp_extract(col("value"), r"(\d+\.\d+\.\d+\.\d+)", 1).alias("ip_address"),
    to_timestamp(
        regexp_extract(col("value"), r"\[(.*?)\]", 1),
        "dd/MMM/yyyy:HH:mm:ss"
    ).alias("timestamp"),
    regexp_extract(col("value"), r'"(\w+)', 1).alias("http_method")
)

# Student 2 - HTTP Status Code, Response Size, Timestamp
student2_df = logs_df.select(
    regexp_extract(col("value"), r'" (\d{3})', 1).alias("status_code"),
    regexp_extract(col("value"), r'" \d{3} (\d+)', 1).cast(IntegerType()).alias("response_size"),
    to_timestamp(
        regexp_extract(col("value"), r"\[(.*?)\]", 1),
        "dd/MMM/yyyy:HH:mm:ss"
    ).alias("timestamp")
)

# Student 3 - URL Path, IP Address, Response Size
student3_df = logs_df.select(
    regexp_extract(col("value"), r'"[A-Z]+ (.*?) HTTP', 1).alias("url_path"),
    regexp_extract(col("value"), r"(\d+\.\d+\.\d+\.\d+)", 1).alias("ip_address"),
    regexp_extract(col("value"), r'" \d{3} (\d+)', 1).cast(IntegerType()).alias("response_size")
)

# Student 4 - Log Message, HTTP Status Code, Timestamp
student4_df = logs_df.select(
    regexp_extract(col("value"), r'"(.*?)"', 1).alias("log_message"),
    regexp_extract(col("value"), r'" (\d{3})', 1).alias("status_code"),
    to_timestamp(
        regexp_extract(col("value"), r"\[(.*?)\]", 1),
        "dd/MMM/yyyy:HH:mm:ss"
    ).alias("timestamp")
)

# Function to validate and show results for each student's DataFrame
def validate_dataframe(df, student_num):
    print(f"\nStudent {student_num} DataFrame Schema:")
    df.printSchema()
    
    print(f"\nStudent {student_num} Sample Data:")
    df.show(5, truncate=False)
    
    # Count non-null values for each column
    print(f"\nStudent {student_num} Validation Counts:")
    df.select([
        sum(col(c).isNotNull().cast("int")).alias(f"{c}_count")
        for c in df.columns
    ]).show()

# Validate each student's DataFrame
validate_dataframe(student1_df, 1)
validate_dataframe(student2_df, 2)
validate_dataframe(student3_df, 3)
validate_dataframe(student4_df, 4)

# Register DataFrames as views for SQL queries later
student1_df.createOrReplaceTempView("student1_logs")
student2_df.createOrReplaceTempView("student2_logs")
student3_df.createOrReplaceTempView("student3_logs")
student4_df.createOrReplaceTempView("student4_logs")


Student 1 DataFrame Schema:
root
 |-- ip_address: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- http_method: string (nullable = true)


Student 1 Sample Data:
+--------------+-------------------+-----------+
|ip_address    |timestamp          |http_method|
+--------------+-------------------+-----------+
|88.211.105.115|2022-03-04 14:17:48|POST       |
|144.6.49.142  |2022-09-02 15:16:00|POST       |
|231.70.64.145 |2022-07-19 01:31:31|PUT        |
|219.42.234.172|2022-02-08 11:34:57|POST       |
|183.173.185.94|2023-08-29 03:07:11|GET        |
+--------------+-------------------+-----------+
only showing top 5 rows


Student 1 Validation Counts:
+----------------+---------------+-----------------+
|ip_address_count|timestamp_count|http_method_count|
+----------------+---------------+-----------------+
|         3000000|        3000000|          3000000|
+----------------+---------------+-----------------+


Student 2 DataFrame Schema:
root
 |-- status_code

# Task 2: Two Advanced DataFrame Analysis (20 marks)

Each member will write unique SQL queries for the analysis:

## SQL Query 1: Window Functions

- **Student 1: Rolling hourly traffic per IP**
    - **Description:** Calculate traffic count per IP over a sliding window.

- **Student 2: Session identification**
    - **Description:** Identify sessions based on timestamp gaps.

- **Student 3: Unique visitors per hour**
    - **Description:** Count distinct IPs for each hour.

- **Student 4: Average response size per status code**
    - **Description:** Compute averages grouped by status codes.

## SQL Query 2: Aggregation Functions

- **Student 1: Traffic patterns by URL path**
    - **Description:** Analyze URL visits by hour.

- **Student 2: Top 10 failed requests by size**
    - **Description:** Identify the largest failed requests.

- **Student 3: Response size distribution by status**
    - **Description:** Show min, max, and avg sizes for each status.

- **Student 4: Daily unique visitors**
    - **Description:** Count unique IPs per day.


In [4]:
# First, let's confirm what columns we have in each DataFrame
print("Available columns in student1_logs:")
spark.sql("SELECT * FROM student1_logs").printSchema()
print("\nAvailable columns in student2_logs:")
spark.sql("SELECT * FROM student2_logs").printSchema()

# Now let's modify our functions to only use available columns

# Student 2: Session identification (Modified)
def analyze_sessions():
    query = """
    WITH time_gaps AS (
        SELECT 
            timestamp,
            LAG(timestamp) OVER (
                ORDER BY timestamp
            ) as prev_timestamp,
            status_code
        FROM student2_logs
    )
    SELECT 
        timestamp,
        status_code,
        CASE 
            WHEN (unix_timestamp(timestamp) - unix_timestamp(prev_timestamp)) > 1800 
            OR prev_timestamp IS NULL 
            THEN 1 
            ELSE 0 
        END as new_session
    FROM time_gaps
    ORDER BY timestamp
    """
    return spark.sql(query)

# Student 3: Unique visitors per hour (Modified)
def analyze_unique_visitors():
    query = """
    SELECT 
        date_trunc('hour', timestamp) as hour,
        COUNT(*) as total_visits
    FROM student1_logs
    GROUP BY date_trunc('hour', timestamp)
    ORDER BY hour
    """
    return spark.sql(query)

# Student 4: Average response size per status code (Original - should work)
def analyze_avg_response_size():
    query = """
    SELECT 
        status_code,
        AVG(response_size) as avg_size,
        COUNT(*) as request_count,
        MIN(response_size) as min_size,
        MAX(response_size) as max_size
    FROM student2_logs
    GROUP BY status_code
    ORDER BY status_code
    """
    return spark.sql(query)

# Try executing the queries
try:
    print("\n=== Window Functions Analysis ===")
    
    print("\nStudent 2 - Session Identification:")
    analyze_sessions().show(5)
    
    print("\nStudent 3 - Hourly Visit Counts:")
    analyze_unique_visitors().show(5)
    
    print("\nStudent 4 - Response Size Analysis:")
    analyze_avg_response_size().show(5)

except Exception as e:
    print(f"An error occurred: {str(e)}")
    
    # Print the actual data for debugging
    print("\nSample data from student2_logs:")
    spark.sql("SELECT * FROM student2_logs LIMIT 5").show()

Available columns in student1_logs:
root
 |-- ip_address: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- http_method: string (nullable = true)


Available columns in student2_logs:
root
 |-- status_code: string (nullable = true)
 |-- response_size: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)


=== Window Functions Analysis ===

Student 2 - Session Identification:
+-------------------+-----------+-----------+
|          timestamp|status_code|new_session|
+-------------------+-----------+-----------+
|2022-01-01 00:01:34|        203|          1|
|2022-01-01 00:01:50|        414|          0|
|2022-01-01 00:02:15|        200|          0|
|2022-01-01 00:02:30|        203|          0|
|2022-01-01 00:02:45|        308|          0|
+-------------------+-----------+-----------+
only showing top 5 rows


Student 3 - Hourly Visit Counts:
+-------------------+------------+
|               hour|total_visits|
+-------------------+------------+
|20

# Task 3: Data Visualization (10 marks)

Each member will visualize the results of their unique SQL queries using different chart types.

### Student Visualization Type Examples

- **Student 1: Line Chart (Hourly Traffic)**
  - **Tool:** Matplotlib for rolling traffic visualization.

- **Student 2: Bar Chart (Top 10 Failed Requests)**
  - **Tool:** Seaborn for aggregated failure counts.

- **Student 3: Heatmap (Hourly Unique Visitors)**
  - **Tool:** Seaborn for visualizing traffic density.

- **Student 4: Pie Chart (Response Code Distribution)**
  - **Tool:** Matplotlib for status code proportions.

In [15]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# Set style for better visualizations
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# ===================== Student 1: Line Chart of Rolling Traffic =====================
def analyze_rolling_traffic():
    # Example implementation, replace with actual logic
    return student1_df

def visualize_rolling_traffic():
    # Get data from our previous SQL query
    rolling_traffic_df = analyze_rolling_traffic()
    pdf = rolling_traffic_df.toPandas()
    
    plt.figure(figsize=(12, 6))
    
    # Create line plot
    plt.plot(pdf['hour'], pdf['rolling_3hour_traffic'], 
             marker='o', linewidth=2, markersize=6)
    
    plt.title('3-Hour Rolling Traffic by Hour', fontsize=14, pad=20)
    plt.xlabel('Time', fontsize=12)
    plt.ylabel('Rolling Traffic Count', fontsize=12)
    
    # Rotate x-axis labels for better readability
    plt.xticks(rotation=45)
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # Add annotations for peaks
    max_traffic = pdf['rolling_3hour_traffic'].max()
    plt.axhline(y=max_traffic, color='r', linestyle='--', alpha=0.3)
# ===================== Student 2: Bar Chart of Failed Requests =====================
def analyze_failed_requests():
    # Example implementation, replace with actual logic
    return student2_df

def visualize_failed_requests():
    # Get data from previous SQL query
    failed_requests_df = analyze_failed_requests()
    pdf = failed_requests_df.toPandas()
def visualize_failed_requests():
    # Get data from previous SQL query
    failed_requests_df = analyze_failed_requests()
    pdf = failed_requests_df.toPandas()
    
    plt.figure(figsize=(24, 12))
    
    # Create bar plot using Seaborn
    sns.barplot(x='status_code', y='response_size', data=pdf)
    
    plt.title('Top 10 Failed Requests by Response Size', fontsize=14, pad=20)
    plt.xlabel('Status Code', fontsize=12)
    plt.ylabel('Response Size', fontsize=12)
# ===================== Student 3: Heatmap of Hourly Traffic =====================
def analyze_unique_visitors():
    # Example implementation, replace with actual logic
    return student3_df

def visualize_hourly_traffic():
    # Get data from previous SQL query
    unique_visitors_df = analyze_unique_visitors()
    pdf = unique_visitors_df.toPandas()
    plt.tight_layout()
    plt.show()

# ===================== Student 3: Heatmap of Hourly Traffic =====================
def visualize_hourly_traffic():
    # Get data from previous SQL query
    unique_visitors_df = analyze_unique_visitors()
    pdf = unique_visitors_df.toPandas()
    
    # Reshape data for heatmap
    pdf['hour_of_day'] = pd.to_datetime(pdf['hour']).dt.hour
    pdf['day'] = pd.to_datetime(pdf['hour']).dt.date
    
    # Create pivot table for heatmap
    pivot_table = pdf.pivot_table(
        values='total_visits', 
        index='day',
        columns='hour_of_day',
        aggfunc='sum'
    ).fillna(0)
    
    plt.figure(figsize=(15, 8))
    
    # Create heatmap
    sns.heatmap(pivot_table, 
                cmap="YlGnBu", 
                linewidths=.5, 
                annot=True, 
                fmt="d")
# ===================== Student 4: Pie Chart of Response Codes =====================
def analyze_avg_response_size():
    # Example implementation, replace with actual logic
    return student4_df

def visualize_response_distribution():
    # Get data from previous SQL query
    response_dist_df = analyze_avg_response_size()
    pdf = response_dist_df.toPandas()
    plt.title('Traffic Density by Hour and Day', fontsize=14, pad=20)
    plt.xlabel('Hour of Day', fontsize=12)
    plt.ylabel('Date', fontsize=12)
    
    plt.tight_layout()
    plt.show()

# ===================== Student 4: Pie Chart of Response Codes =====================
def visualize_response_distribution():
    # Get data from previous SQL query
    response_dist_df = analyze_avg_response_size()
    pdf = response_dist_df.toPandas()
    
    plt.figure(figsize=(10, 10))
    
    # Create pie chart
    plt.pie(pdf['request_count'], 
            labels=pdf['status_code'],
            autopct='%1.1f%%',
            explode=[0.05] * len(pdf),
            shadow=True)
    
    plt.title('Distribution of HTTP Status Codes', fontsize=14, pad=20)
    plt.axis('equal')
    
    plt.tight_layout()
    plt.show()

# Function to generate all visualizations
def generate_all_visualizations():
    try:
        print("Generating visualizations for all students...")
        
        print("\nStudent 1 - Rolling Traffic Line Chart:")
        visualize_rolling_traffic()
        
        print("\nStudent 2 - Failed Requests Bar Chart:")
        visualize_failed_requests()
        
        print("\nStudent 3 - Traffic Density Heatmap:")
        visualize_hourly_traffic()
        
        print("\nStudent 4 - Response Codes Pie Chart:")
        visualize_response_distribution()
        
    except Exception as e:
        print(f"An error occurred while generating visualizations: {str(e)}")
        
        # Print debugging information
        print("\nDebugging information:")
        print("Available columns in the DataFrames:")
        spark.sql("SELECT * FROM student1_logs").printSchema()
        spark.sql("SELECT * FROM student2_logs").printSchema()

# Generate all visualizations
generate_all_visualizations()

Generating visualizations for all students...

Student 1 - Rolling Traffic Line Chart:
An error occurred while generating visualizations: 'hour'

Debugging information:
Available columns in the DataFrames:
root
 |-- ip_address: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- http_method: string (nullable = true)

root
 |-- status_code: string (nullable = true)
 |-- response_size: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



<Figure size 1200x600 with 0 Axes>

# Data Processing using PySpark RDD

## Task 1: Basic RDD Analysis (10 marks)
Each member will create a custom function to parse and process the log entries.


### Student Basic Extraction Examples
- **Student 1**: Extract Timestamp and IP  
    **Description**: Parse timestamp and IP address from logs.


- **Student 2**: Extract URL and HTTP Method  
    **Description**: Parse URL path and HTTP method from logs.


- **Student 3**: Extract Status Code and Response Size  
    **Description**: Parse HTTP status and response size from logs.

- **Student 4**: Extract Log Message and IP Address  
    **Description**: Parse log messages and corresponding IP addresses.

In [23]:
import re

# Student 1: Extract Timestamp and IP
def extract_timestamp_ip(log_line):
    """Extract timestamp and IP address from log entry"""
    if isinstance(log_line, str):
        text = log_line
    else:
        text = log_line.value  # For DataFrame rows
        
    ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
    timestamp_pattern = r'\[([^\]]+)\]'
    
    try:
        ip = re.search(ip_pattern, text).group(1)
        timestamp = re.search(timestamp_pattern, text).group(1)
        return (ip, timestamp)
    except Exception as e:
        return ("Error", "Error")

# Student 2: Extract URL and HTTP Method
def extract_url_method(log_line):
    """Extract URL path and HTTP method from log entry"""
    if isinstance(log_line, str):
        text = log_line
    else:
        text = log_line.value  # For DataFrame rows
        
    pattern = r'"(GET|POST|PUT|DELETE)\s+([^"\s]+)'
    
    try:
        match = re.search(pattern, text)
        if match:
            return (match.group(1), match.group(2))
        return ("Not Found", "Not Found")
    except Exception as e:
        return ("Error", "Error")

# Student 3: Extract Status Code and Response Size
def extract_status_size(log_line):
    """Extract HTTP status code and response size from log entry"""
    if isinstance(log_line, str):
        text = log_line
    else:
        text = log_line.value  # For DataFrame rows
        
    pattern = r'HTTP/[\d.]+" (\d{3}) (\d+)'
    
    try:
        match = re.search(pattern, text)
        if match:
            return (int(match.group(1)), int(match.group(2)))
        return (0, 0)
    except Exception as e:
        return (0, 0)

# Student 4: Extract Log Message and IP Address
def extract_message_ip(log_line):
    """Extract log message and IP address from log entry"""
    if isinstance(log_line, str):
        text = log_line
    else:
        text = log_line.value  # For DataFrame rows
        
    ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
    message_pattern = r'(?:Warning|Update|Debug|Error|Info):(.*?)(?:\s*$)'
    
    try:
        ip = re.search(ip_pattern, text).group(1)
        message = re.search(message_pattern, text).group(1).strip()
        return (ip, message)
    except Exception as e:
        return ("Error", "Error")

def analyze_logs(data):
    """Analyze logs using the existing Spark session"""
    print("\nProcessing log entries...")
    print("\n" + "="*50 + "\n")
    
    try:
        # Convert DataFrame to RDD of log lines
        logs_rdd = data.rdd.map(lambda row: row[0])
        
        # Student 1 Analysis
        print("STUDENT 1 - TIMESTAMP AND IP EXTRACTION")
        print("-" * 40)
        rdd1_results = logs_rdd.map(extract_timestamp_ip).collect()
        for ip, timestamp in rdd1_results[:5]:  # Show first 5 results
            print(f"IP Address: {ip}")
            print(f"Timestamp:  {timestamp}\n")

        print("\n" + "="*50 + "\n")
        
        # Student 2 Analysis
        print("STUDENT 2 - URL AND HTTP METHOD EXTRACTION")
        print("-" * 40)
        rdd2_results = logs_rdd.map(extract_url_method).collect()
        for method, url in rdd2_results[:5]:  # Show first 5 results
            print(f"HTTP Method: {method}")
            print(f"URL Path:    {url}\n")

        print("\n" + "="*50 + "\n")
        
        # Student 3 Analysis
        print("STUDENT 3 - STATUS CODE AND RESPONSE SIZE EXTRACTION")
        print("-" * 40)
        rdd3_results = logs_rdd.map(extract_status_size).collect()
        for status, size in rdd3_results[:5]:  # Show first 5 results
            print(f"Status Code:   {status}")
            print(f"Response Size: {size} bytes\n")

        print("\n" + "="*50 + "\n")
        
        # Student 4 Analysis
        print("STUDENT 4 - LOG MESSAGE AND IP EXTRACTION")
        print("-" * 40)
        rdd4_results = logs_rdd.map(extract_message_ip).collect()
        for ip, message in rdd4_results[:5]:  # Show first 5 results
            print(f"IP Address:  {ip}")
            print(f"Log Message: {message}\n")
            
    except Exception as e:
        print(f"Error during processing: {str(e)}")

# Execute the analysis
analyze_logs(data)


Processing log entries...


STUDENT 1 - TIMESTAMP AND IP EXTRACTION
----------------------------------------
IP Address: 88.211.105.115
Timestamp:  04/Mar/2022:14:17:48

IP Address: 144.6.49.142
Timestamp:  02/Sep/2022:15:16:00

IP Address: 231.70.64.145
Timestamp:  19/Jul/2022:01:31:31

IP Address: 219.42.234.172
Timestamp:  08/Feb/2022:11:34:57

IP Address: 183.173.185.94
Timestamp:  29/Aug/2023:03:07:11



STUDENT 2 - URL AND HTTP METHOD EXTRACTION
----------------------------------------
HTTP Method: POST
URL Path:    /history/missions/

HTTP Method: POST
URL Path:    /security/firewall/

HTTP Method: PUT
URL Path:    /web-development/countdown/

HTTP Method: POST
URL Path:    /networking/technology/

HTTP Method: GET
URL Path:    /security/firewall/



STUDENT 3 - STATUS CODE AND RESPONSE SIZE EXTRACTION
----------------------------------------
Status Code:   414
Response Size: 12456 bytes

Status Code:   0
Response Size: 0 bytes

Status Code:   201
Response Size: 33093 bytes

St

## Task 2: Two Advanced RDD Analysis (30 marks)
Each member will perform unique advanced processing tasks.

### Student Advanced Analysis Examples
- **Student 1**: Calculate hourly visit counts per IP  
    **Description**: Count visits grouped by hour and IP.
- **Student 2**: Identify top 10 URLs by visit count  
    **Description**: Aggregate visit counts and rank top URLs.
- **Student 3**: Find average response size per URL  
    **Description**: Compute average response size for each URL.
- **Student 4**: Detect failed requests per IP  
    **Description**: Identify IPs with the most failed requests.

In [21]:
from pyspark.sql.functions import *
from datetime import datetime
import matplotlib.pyplot as plt
import re # Regular expressions

def advanced_rdd_analysis(data):
    """Perform advanced RDD analysis for all students"""
    # Convert DataFrame to RDD
    logs_rdd = data.rdd.map(lambda row: row[0])
    
    print("\nAdvanced RDD Analysis Results")
    print("=" * 50)

    # Student 1: Calculate hourly visit counts per IP
    def student1_analysis(logs_rdd):
        print("\nStudent 1 - Hourly Visit Counts per IP")
        print("-" * 40)
        
        def extract_hour_ip(log_line):
            ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
            timestamp_pattern = r'\[([^\]]+)\]'
            try:
                ip = re.search(ip_pattern, log_line).group(1)
                timestamp_str = re.search(timestamp_pattern, log_line).group(1)
                # Parse timestamp and extract hour
                dt = datetime.strptime(timestamp_str, '%d/%b/%Y:%H:%M:%S')
                hour = dt.strftime('%H:00')
                return ((ip, hour), 1)
            except Exception as e:
                return (("Error", "Error"), 0)

        # Map-Reduce to count visits
        hourly_counts = logs_rdd \
            .map(extract_hour_ip) \
            .reduceByKey(lambda x, y: x + y) \
            .map(lambda x: (x[0][0], x[0][1], x[1])) \
            .sortBy(lambda x: (x[0], x[1]))

        print("\nSample of Hourly Visit Counts:")
        for ip, hour, count in hourly_counts.take(10):
            print(f"IP: {ip}, Hour: {hour}, Visits: {count}")

        # Calculate total visits per IP
        total_visits = hourly_counts \
            .map(lambda x: (x[0], x[2])) \
            .reduceByKey(lambda x, y: x + y) \
            .sortBy(lambda x: x[1], ascending=False)

        print("\nTop 5 IPs by Total Visits:")
        for ip, total in total_visits.take(5):
            print(f"IP: {ip}, Total Visits: {total}")

    # Student 2: Identify top 10 URLs by visit count
    def student2_analysis(logs_rdd):
        print("\nStudent 2 - Top 10 URLs by Visit Count")
        print("-" * 40)
        
        def extract_url(log_line):
            pattern = r'"(?:GET|POST|PUT|DELETE)\s+([^"\s]+)'
            try:
                url = re.search(pattern, log_line).group(1)
                return (url, 1)
            except Exception:
                return ("Error", 0)

        # Calculate URL visit counts
        url_counts = logs_rdd \
            .map(extract_url) \
            .reduceByKey(lambda x, y: x + y) \
            .sortBy(lambda x: x[1], ascending=False)

        print("\nTop 10 Most Visited URLs:")
        for url, count in url_counts.take(10):
            print(f"URL: {url}")
            print(f"Visit Count: {count}\n")

        # Calculate percentage of total traffic for top URLs
        total_visits = url_counts.map(lambda x: x[1]).sum()
        print("\nTraffic Distribution:")
        for url, count in url_counts.take(5):
            percentage = (count / total_visits) * 100
            print(f"URL: {url}")
            print(f"Percentage of Total Traffic: {percentage:.2f}%\n")

    # Student 3: Find average response size per URL
    def student3_analysis(logs_rdd):
        print("\nStudent 3 - Average Response Size per URL")
        print("-" * 40)
        
        def extract_url_size(log_line):
            url_pattern = r'"(?:GET|POST|PUT|DELETE)\s+([^"\s]+)'
            size_pattern = r'HTTP/[\d.]+" \d{3} (\d+)'
            try:
                url = re.search(url_pattern, log_line).group(1)
                size = int(re.search(size_pattern, log_line).group(1))
                return (url, (size, 1))
            except Exception:
                return ("Error", (0, 0))

        # Calculate average response size
        avg_sizes = logs_rdd \
            .map(extract_url_size) \
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
            .mapValues(lambda x: x[0] / x[1] if x[1] > 0 else 0) \
            .sortBy(lambda x: x[1], ascending=False)

        print("\nURL Response Size Analysis:")
        for url, avg_size in avg_sizes.take(10):
            print(f"URL: {url}")
            print(f"Average Response Size: {avg_size:.2f} bytes\n")

        # Calculate size distribution statistics
        all_sizes = avg_sizes.map(lambda x: x[1]).collect()
        if all_sizes:
            print("\nResponse Size Statistics:")
            print(f"Maximum Average Size: {max(all_sizes):.2f} bytes")
            print(f"Minimum Average Size: {min(all_sizes):.2f} bytes")
            print(f"Overall Average Size: {sum(all_sizes)/len(all_sizes):.2f} bytes")

   # Student 4: Detect failed requests per IP
    def student4_analysis(logs_rdd):
        print("\nStudent 4 - Failed Requests Analysis per IP")
        print("-" * 40)
        
        def extract_ip_status(log_line):
            ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
            status_pattern = r'HTTP/[\d.]+" (\d{3})'
            try:
                ip = re.search(ip_pattern, log_line).group(1)
                status = int(re.search(status_pattern, log_line).group(1))
                is_failed = 1 if status >= 400 else 0
                return (ip, (is_failed, 1))  # Count both failed and total requests
            except Exception:
                return ("Error", (0, 0))

        # Calculate failed requests statistics
        ip_stats = logs_rdd \
            .map(extract_ip_status) \
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
            .mapValues(lambda x: {
                'failed': x[0],
                'total': x[1],
                'failure_rate': (x[0] / x[1] * 100) if x[1] > 0 else 0
            }) \
            .sortBy(lambda x: x[1]['failed'], ascending=False)

        print("\nFailed Requests Analysis:")
        for ip, stats in ip_stats.take(10):
            print(f"IP: {ip}")
            print(f"Failed Requests: {stats['failed']}")
            print(f"Total Requests: {stats['total']}")
            print(f"Failure Rate: {stats['failure_rate']:.2f}%\n")

        # Calculate overall statistics
        stats_collected = ip_stats.collect()
        if stats_collected:
            total_failed = sum(stats[1]['failed'] for stats in stats_collected)
            total_requests = sum(stats[1]['total'] for stats in stats_collected)
            overall_failure_rate = (total_failed / total_requests * 100) if total_requests > 0 else 0
            
            print("\nOverall Statistics:")
            print(f"Total Failed Requests: {total_failed}")
            print(f"Total Requests: {total_requests}")
            print(f"Overall Failure Rate: {overall_failure_rate:.2f}%")

            # Additional Analysis
            # Find IPs with highest failure rates (minimum 10 requests)
            high_failure_ips = [(ip, stats) for ip, stats in stats_collected 
                              if stats['total'] >= 10]
            high_failure_ips.sort(key=lambda x: x[1]['failure_rate'], reverse=True)
            
            print("\nTop 5 IPs with Highest Failure Rates (min. 10 requests):")
            for ip, stats in high_failure_ips[:5]:
                print(f"IP: {ip}")
                print(f"Failure Rate: {stats['failure_rate']:.2f}%")
                print(f"Failed Requests: {stats['failed']}")
                print(f"Total Requests: {stats['total']}\n")
    

    # Student 4: Detect failed requests per IP
    def student4_analysis(logs_rdd):
        print("\nStudent 4 - Failed Requests Analysis per IP")
        print("-" * 40)
        
        def extract_ip_status(log_line):
            ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
            status_pattern = r'HTTP/[\d.]+" (\d{3})'
            try:
                ip = re.search(ip_pattern, log_line).group(1)
                status = int(re.search(status_pattern, log_line).group(1))
                is_failed = 1 if status >= 400 else 0
                return (ip, (is_failed, 1))
            except Exception as e:
                return ("Error", (0, 0))

        # Process the data
        ip_stats = logs_rdd \
            .map(extract_ip_status) \
            .filter(lambda x: x[0] != "Error") \
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
            .map(lambda x: (
                x[0], 
                {
                    'failed': x[1][0],
                    'total': x[1][1],
                    'failure_rate': round((x[1][0] / x[1][1] * 100), 2) if x[1][1] > 0 else 0
                }
            ))

        # Sort by number of failed requests
        top_failed = ip_stats.sortBy(lambda x: x[1]['failed'], ascending=False)

        print("\nTop 10 IPs by Failed Requests:")
        for ip, stats in top_failed.take(10):
            print(f"IP: {ip}")
            print(f"Failed Requests: {stats['failed']}")
            print(f"Total Requests: {stats['total']}")
            print(f"Failure Rate: {stats['failure_rate']:.2f}%\n")

        # Calculate overall statistics
        all_stats = ip_stats.collect()
        if all_stats:
            total_failed = sum(stats[1]['failed'] for stats in all_stats)
            total_requests = sum(stats[1]['total'] for stats in all_stats)
            overall_rate = round((total_failed / total_requests * 100), 2) if total_requests > 0 else 0

            print("Overall Statistics:")
            print(f"Total Failed Requests: {total_failed}")
            print(f"Total Requests: {total_requests}")
            print(f"Overall Failure Rate: {overall_rate:.2f}%\n")

            # Find IPs with highest failure rates (min 10 requests)
            high_failure_rates = ip_stats \
                .filter(lambda x: x[1]['total'] >= 10) \
                .sortBy(lambda x: x[1]['failure_rate'], ascending=False)

            print("Top 5 IPs with Highest Failure Rates (min 10 requests):")
            for ip, stats in high_failure_rates.take(5):
                print(f"IP: {ip}")
                print(f"Failure Rate: {stats['failure_rate']:.2f}%")
                print(f"Failed: {stats['failed']}/{stats['total']} requests\n")
        else:
            print("No valid data found for analysis")

    # Execute all students' analyses
    try:
        student1_analysis(logs_rdd)
        print("\n" + "="*50)
        student2_analysis(logs_rdd)
        print("\n" + "="*50)
        student3_analysis(logs_rdd)
        print("\n" + "="*50)
        student4_analysis(logs_rdd)
    except Exception as e:
        print(f"Error during analysis: {str(e)}")
        raise

# Execute the analysis
advanced_rdd_analysis(data)


Advanced RDD Analysis Results

Student 1 - Hourly Visit Counts per IP
----------------------------------------

Sample of Hourly Visit Counts:
IP: 1.1.104.46, Hour: 14:00, Visits: 1
IP: 1.1.108.121, Hour: 07:00, Visits: 1
IP: 1.1.118.39, Hour: 16:00, Visits: 1
IP: 1.1.119.183, Hour: 20:00, Visits: 1
IP: 1.1.120.92, Hour: 12:00, Visits: 1
IP: 1.1.126.167, Hour: 07:00, Visits: 1
IP: 1.1.134.30, Hour: 06:00, Visits: 1
IP: 1.1.134.82, Hour: 06:00, Visits: 1
IP: 1.1.138.78, Hour: 14:00, Visits: 1
IP: 1.1.153.25, Hour: 14:00, Visits: 1

Top 5 IPs by Total Visits:
IP: 10.45.78.214, Total Visits: 2
IP: 100.59.95.187, Total Visits: 2
IP: 102.169.147.90, Total Visits: 2
IP: 104.126.92.191, Total Visits: 2
IP: 108.226.151.237, Total Visits: 2


Student 2 - Top 10 URLs by Visit Count
----------------------------------------

Top 10 Most Visited URLs:
URL: /data-analysis/apollo/
Visit Count: 25440

URL: /software/data/
Visit Count: 25382

URL: /web-development/missions/
Visit Count: 25343

URL: /h

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got list.

## Task 3: Optimization and LSEPI Considerations (10 marks)
Each member chooses two unique methods for optimization.

### Student Optimization Methods
- **Student 1**: Partition Strategies, Caching
- **Student 2**: Caching, Bucketing & Indexing
- **Student 3**: Partition Strategies, Bucketing & Indexing
- **Student 4**: Caching, Partition Strategies

In [26]:
from pyspark.sql.functions import *
import builtins
import time
import re

def measure_execution_time(func):
    """Decorator to measure execution time of functions"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Execution time: {execution_time:.2f} seconds")
        return result, execution_time
    return wrapper

class Student1Optimization:
    """Student 1: Partition Strategies and Caching"""
    
    def __init__(self, data, spark):
        self.data = data
        self.spark = spark
        
    @measure_execution_time
    def baseline_analysis(self):
        """Baseline analysis without optimizations"""
        print("\nStudent 1 - Baseline Analysis (No Optimization)")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        result = rdd.map(self.extract_data).groupByKey().mapValues(len).collect()
        return result
        
    @measure_execution_time
    def optimized_with_partitioning(self):
        """Analysis with custom partitioning"""
        print("\nStudent 1 - Analysis with Custom Partitioning")
        print("-" * 50)
        
        def custom_partitioner(key):
            return builtins.sum(ord(c) for c in str(key)) % 10
            return sum(ord(c) for c in str(key)) % 10
            
        rdd = self.data.rdd.map(lambda row: row[0])
        result = (rdd.map(self.extract_data)
                    .partitionBy(10, custom_partitioner)
                    .groupByKey()
                    .mapValues(len)
                    .collect())
        return result
        
    @measure_execution_time
    def optimized_with_caching(self):
        """Analysis with caching"""
        print("\nStudent 1 - Analysis with Caching")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        cached_rdd = rdd.map(self.extract_data).cache()
        result = cached_rdd.groupByKey().mapValues(len).collect()
        cached_rdd.unpersist()
        return result
        
    @staticmethod
    def extract_data(log_line):
        """Extract IP and timestamp from log line"""
        ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
        try:
            ip = re.search(ip_pattern, log_line).group(1)
            return (ip, 1)
        except:
            return ("error", 1)

class Student2Optimization:
    """Student 2: Caching and Bucketing & Indexing"""
    
    def __init__(self, data, spark):
        self.data = data
        self.spark = spark
        
    @measure_execution_time
    def baseline_analysis(self):
        """Baseline analysis without optimizations"""
        print("\nStudent 2 - Baseline Analysis (No Optimization)")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        result = (rdd.map(self.extract_url_status)
                    .groupByKey()
                    .mapValues(lambda x: sum(1 for _ in x))
                    .collect())
        return result
        
    @measure_execution_time
    def optimized_with_caching(self):
        """Analysis with caching"""
        print("\nStudent 2 - Analysis with Caching")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        cached_rdd = rdd.map(self.extract_url_status).cache()
        result = (cached_rdd.groupByKey()
                          .mapValues(lambda x: sum(1 for _ in x))
                          .collect())
        cached_rdd.unpersist()
        return result
        
    @measure_execution_time
    def optimized_with_bucketing(self):
        """Analysis with bucketing"""
        print("\nStudent 2 - Analysis with Bucketing")
        print("-" * 50)
        
        # Create temporary view
        df = self.data.selectExpr("value as log_line")
        df.createOrReplaceTempView("logs")
        
        # SQL query with bucketing
        bucketed_df = self.spark.sql("""
            SELECT 
                regexp_extract(log_line, '"(?:GET|POST|PUT|DELETE)\\s+([^"\\s]+)', 1) as url,
                count(*) as count
            FROM logs
            GROUP BY url
            ORDER BY count DESC
        """)
        
        return bucketed_df.collect()
        
    @staticmethod
    def extract_url_status(log_line):
        """Extract URL and status code from log line"""
        url_pattern = r'"(?:GET|POST|PUT|DELETE)\s+([^"\s]+)'
        status_pattern = r'HTTP/[\d.]+" (\d{3})'
        try:
            url = re.search(url_pattern, log_line).group(1)
            status = re.search(status_pattern, log_line).group(1)
            return (url, status)
        except:
            return ("error", "0")

class Student3Optimization:
    """Student 3: Partition Strategies and Bucketing & Indexing"""
    
    def __init__(self, data, spark):
        self.data = data
        self.spark = spark
        
    @measure_execution_time
    def baseline_analysis(self):
        """Baseline analysis without optimizations"""
        print("\nStudent 3 - Baseline Analysis (No Optimization)")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        result = (rdd.map(self.extract_url_size)
                    .groupByKey()
                    .mapValues(lambda x: sum(x)/len(x))
                    .collect())
        return result
        
    @measure_execution_time
    def optimized_with_partitioning(self):
        """Analysis with custom partitioning"""
        print("\nStudent 3 - Analysis with Custom Partitioning")
        print("-" * 50)
        
        def url_partitioner(url):
            return sum(ord(c) for c in str(url)) % 8
            
        rdd = self.data.rdd.map(lambda row: row[0])
        result = (rdd.map(self.extract_url_size)
                    .partitionBy(8, url_partitioner)
                    .groupByKey()
                    .mapValues(lambda x: sum(x)/len(x))
                    .collect())
        return result
        
    @measure_execution_time
    def optimized_with_bucketing(self):
        """Analysis with bucketing"""
        print("\nStudent 3 - Analysis with Bucketing")
        print("-" * 50)
        
        # Create temporary view
        df = self.data.selectExpr("value as log_line")
        df.createOrReplaceTempView("logs")
        
        # SQL query with bucketing
        bucketed_df = self.spark.sql("""
            SELECT 
                regexp_extract(log_line, '"(?:GET|POST|PUT|DELETE)\\s+([^"\\s]+)', 1) as url,
                avg(cast(regexp_extract(log_line, 'HTTP/[\\d.]+" \\d{3} (\\d+)', 1) as int)) as avg_size
            FROM logs
            GROUP BY url
            ORDER BY avg_size DESC
        """)
        
        return bucketed_df.collect()
        
    @staticmethod
    def extract_url_size(log_line):
        """Extract URL and response size from log line"""
        url_pattern = r'"(?:GET|POST|PUT|DELETE)\s+([^"\s]+)'
        size_pattern = r'HTTP/[\d.]+" \d{3} (\d+)'
        try:
            url = re.search(url_pattern, log_line).group(1)
            size = int(re.search(size_pattern, log_line).group(1))
            return (url, size)
        except:
            return ("error", 0)

class Student4Optimization:
    """Student 4: Caching and Partition Strategies"""
    
    def __init__(self, data, spark):
        self.data = data
        self.spark = spark
        
    @measure_execution_time
    def baseline_analysis(self):
        """Baseline analysis without optimizations"""
        print("\nStudent 4 - Baseline Analysis (No Optimization)")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        result = (rdd.map(self.extract_ip_status)
                    .groupByKey()
                    .mapValues(lambda x: sum(1 for status in x if int(status) >= 400))
                    .collect())
        return result
        
    @measure_execution_time
    def optimized_with_caching(self):
        """Analysis with caching"""
        print("\nStudent 4 - Analysis with Caching")
        print("-" * 50)
        
        rdd = self.data.rdd.map(lambda row: row[0])
        cached_rdd = rdd.map(self.extract_ip_status).cache()
        result = (cached_rdd.groupByKey()
                          .mapValues(lambda x: sum(1 for status in x if int(status) >= 400))
                          .collect())
        cached_rdd.unpersist()
        return result
        
    @measure_execution_time
    def optimized_with_partitioning(self):
        """Analysis with custom partitioning"""
        print("\nStudent 4 - Analysis with Custom Partitioning")
        print("-" * 50)
        
        def ip_partitioner(ip):
            try:
                first_octet = sum(ord(c) for c in str(ip.split('.')[0]))
                return first_octet % 8
            except:
                return 0
            
        rdd = self.data.rdd.map(lambda row: row[0])
        result = (rdd.map(self.extract_ip_status)
                    .partitionBy(8, ip_partitioner)
                    .groupByKey()
                    .mapValues(lambda x: sum(1 for status in x if int(status) >= 400))
                    .collect())
        return result
        
    @staticmethod
    def extract_ip_status(log_line):
        """Extract IP and status code from log line"""
        ip_pattern = r'^(\d+\.\d+\.\d+\.\d+)'
        status_pattern = r'HTTP/[\d.]+" (\d{3})'
        try:
            ip = re.search(ip_pattern, log_line).group(1)
            status = re.search(status_pattern, log_line).group(1)
            return (ip, status)
        except:
            return ("error", "0")

def run_all_optimizations(data, spark):
    """Run all students' optimization analyses"""
    
    print("\nRunning Optimization Analyses for All Students")
    print("=" * 60)
    
    # Student 1
    print("\nStudent 1 - Partition Strategies and Caching")
    print("=" * 40)
    s1 = Student1Optimization(data, spark)
    baseline1, time1 = s1.baseline_analysis()
    partition1, time2 = s1.optimized_with_partitioning()
    cache1, time3 = s1.optimized_with_caching()
    print(f"\nPerformance Improvement:")
    print(f"Partitioning: {((time1 - time2)/time1)*100:.2f}% faster")
    print(f"Caching: {((time1 - time3)/time1)*100:.2f}% faster")
    
    # Student 2
    print("\nStudent 2 - Caching and Bucketing")
    print("=" * 40)
    s2 = Student2Optimization(data, spark)
    baseline2, time1 = s2.baseline_analysis()
    cache2, time2 = s2.optimized_with_caching()
    bucket2, time3 = s2.optimized_with_bucketing()
    print(f"\nPerformance Improvement:")
    print(f"Caching: {((time1 - time2)/time1)*100:.2f}% faster")
    print(f"Bucketing: {((time1 - time3)/time1)*100:.2f}% faster")
    
    # Student 3
    print("\nStudent 3 - Partition Strategies and Bucketing")
    print("=" * 40)
    s3 = Student3Optimization(data, spark)
    baseline3, time1 = s3.baseline_analysis()
    partition3, time2 = s3.optimized_with_partitioning()
    bucket3, time3 = s3.optimized_with_bucketing()
    print(f"\nPerformance Improvement:")
    print(f"Partitioning: {((time1 - time2)/time1)*100:.2f}% faster")
    print(f"Bucketing: {((time1 - time3)/time1)*100:.2f}% faster")
    
    # Student 4
    print("\nStudent 4 - Caching and Partition Strategies")
    print("=" * 40)
    s4 = Student4Optimization(data, spark)
    baseline4, time1 = s4.baseline_analysis()
    cache4, time2 = s4.optimized_with_caching()
    partition4, time3 = s4.optimized_with_partitioning()
    print(f"\nPerformance Improvement:")
    print(f"Caching: {((time1 - time2)/time1)*100:.2f}% faster")
    print(f"Partitioning: {((time1 - time3)/time1)*100:.2f}% faster")

# Run all optimizations with SparkSession
run_all_optimizations(data, spark)


Running Optimization Analyses for All Students

Student 1 - Partition Strategies and Caching

Student 1 - Baseline Analysis (No Optimization)
--------------------------------------------------
Execution time: 38.66 seconds

Student 1 - Analysis with Custom Partitioning
--------------------------------------------------
Execution time: 52.99 seconds

Student 1 - Analysis with Caching
--------------------------------------------------
Execution time: 45.80 seconds

Performance Improvement:
Partitioning: -37.06% faster
Caching: -18.46% faster

Student 2 - Caching and Bucketing

Student 2 - Baseline Analysis (No Optimization)
--------------------------------------------------


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 303.0 failed 1 times, most recent failure: Lost task 7.0 in stage 303.0 (TID 1127) (192.168.1.159 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\rdd.py", line 4260, in map_values_fn
    return kv[0], f(kv[1])
                  ^^^^^^^^
  File "C:\Users\HP\AppData\Local\Temp\ipykernel_7876\2416317130.py", line 90, in <lambda>
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 174, in wrapped
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 866, in sum
    return _invoke_function_over_columns("sum", col)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 105, in _invoke_function_over_columns
    return _invoke_function(name, *(_to_java_column(col) for col in cols))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 105, in <genexpr>
    return _invoke_function(name, *(_to_java_column(col) for col in cols))
                                    ^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\column.py", line 65, in _to_java_column
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1239, in process
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\rdd.py", line 4260, in map_values_fn
    return kv[0], f(kv[1])
                  ^^^^^^^^
  File "C:\Users\HP\AppData\Local\Temp\ipykernel_7876\2416317130.py", line 90, in <lambda>
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 174, in wrapped
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 866, in sum
    return _invoke_function_over_columns("sum", col)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 105, in _invoke_function_over_columns
    return _invoke_function(name, *(_to_java_column(col) for col in cols))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 105, in <genexpr>
    return _invoke_function(name, *(_to_java_column(col) for col in cols))
                                    ^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\HP\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\column.py", line 65, in _to_java_column
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
