# Original Code for Flume

In [None]:
import re

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

parts = [
    r'(?P<host>\S+)',                   # host %h
    r'\S+',                             # indent %l (unused)
    r'(?P<user>\S+)',                   # user %u
    r'\[(?P<time>.+)\]',                # time %t
    r'"(?P<request>.+)"',               # request "%r"
    r'(?P<status>[0-9]+)',              # status %>s
    r'(?P<size>\S+)',                   # size %b (careful, can be '-')
    r'"(?P<referer>.*)"',               # referer "%{Referer}i"
    r'"(?P<agent>.*)"',                 # user agent "%{User-agent}i"
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')

def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
           requestFields = request.split()
           if (len(requestFields) > 1):
                return requestFields[1]


if __name__ == "__main__":

    sc = SparkContext(appName="StreamingFlumeLogAggregator")
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)

    flumeStream = FlumeUtils.createStream(ssc, "localhost", 9092)

    lines = flumeStream.map(lambda x: x[1])
    urls = lines.map(extractURLRequest)

    # Reduce by URL over a 5-minute window sliding every second
    urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)

    # Sort and print the results
    sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
    sortedResults.pprint()

    ssc.checkpoint("/home/maria_dev/checkpoint")
    ssc.start()
    ssc.awaitTermination()

###  Understanding the Log Parsing Pattern

Think of a **log line** as a *movie ticket stub*:  
It contains a lot of information squished into one line (seat number, time, movie name, buyer...), but we want to split it into meaningful fields.

The `parts = [...]` defines **regular expressions (regex)** that act like **scissors **. Each regex cuts out one piece of information from the log line.

---

####  `parts = [...]`

- `r'(?P<host>\S+)'`  
  → Extracts the **host/client address**.  
  → Like the ticket stub printing the buyer’s computer IP.

- `r'\S+'`  
  → Field exists in Apache logs, but we don’t use it.  
  → Like a small serial number on the ticket that we ignore.

- `r'(?P<user>\S+)'`  
  → Extracts the **username**.  
  → Like the buyer’s name on the ticket.

- `r'\[(?P<time>.+)\]'`  
  → Extracts the **timestamp**, which is inside square brackets.  
  → Like the ticket showing “Showtime: 7:30 pm”.

- `r'"(?P<request>.+)"'`  
  → Extracts the **request content**, e.g., `"GET /index.html HTTP/1.1"`.  
  → Like the ticket saying “You bought this movie”.

- `r'(?P<status>[0-9]+)'`  
  → Extracts the **status code** (200 = success, 404 = not found).  
  → Like the ticket saying “Valid = Yes/No”.

- `r'(?P<size>\S+)'`  
  → Extracts the **response size** (can be `-` if empty).  
  → Like the ticket showing “Popcorn size: medium/large”.

- `r'"(?P<referer>.*)"'`  
  → Extracts the **referrer** (the page where the request came from).  
  → Like the ticket saying “Bought from this ticket booth”.

- `r'"(?P<agent>.*)"'`  
  → Extracts the **user agent** (browser or device info).  
  → Like the ticket saying “Purchased via phone or computer”.

---



####  `pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')`

- `\s+` = spaces, meaning *fields are separated by spaces*.  
- `join(parts)` = stitches all those “scissors” together into one complete rule.  
- `\s*\Z` = means “until the very end of the line”.

 **Effect**:  
This creates a **log ticket stub scanner**.  
When you feed a log line into it, it slices the line into meaningful fields:  
`host, user, time, request, status, size, referer, agent`.

---


## **Summary**:  
This block of code defines a **log parsing template**.  
It transforms a messy one-line log entry into clean, labeled pieces of data, ready for analysis.

### Mapping URLs to Key-Value Pairs and Counting with a Window

```python
urls.map(lambda x: (x, 1))

This line converts each URL into a key-value pair:

"/index.html" → ("/index.html", 1)

"/about.html" → ("/about.html", 1)

```

Meaning: every time a URL appears, we count it as 1.

Using reduceByKeyAndWindow

reduceByKeyAndWindow is specifically used to perform aggregations over a time window.


```python
urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(
    lambda x, y: x + y,  # Add counts for new data entering the window
    lambda x, y: x - y,  # Subtract counts for old data leaving the window
    300,                  # Window duration = 300 seconds (last 5 minutes)
    1                     # Slide interval = 1 second (update every second)
)
```

Intuitive understanding:

The program counts, every 1 second, how many times each URL has appeared in the last 5 minutes.

This allows you to maintain a continuously updated, real-time URL popularity ranking.

# Run on Colab:

## Import Libraries and Initialize Spark Streaming

In [78]:
import re
import os
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import random

## Close Old SparkContext

In [79]:
try:
    ssc.stop(stopSparkContext=False, stopGraceFully=True)
except:
    pass

## Initialize SparkContext and StreamingContext

In [80]:
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 1)
ssc.checkpoint("/content/checkpoint")

## Define Log Parsing Function

In [81]:
# Log regex parser
parts = [
    r'(?P<host>\S+)',
    r'\S+',
    r'(?P<user>\S+)',
    r'\[(?P<time>.+)\]',
    r'"(?P<request>.+)"',
    r'(?P<status>[0-9]+)',
    r'(?P<size>\S+)',
    r'"(?P<referer>.*)"',
    r'"(?P<agent>.*)"',
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')

def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
            requestFields = request.split()
            if len(requestFields) > 1:
                return requestFields[1]
    return None


# Function: `extractURLRequest`

```python
def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
           requestFields = request.split()
           if (len(requestFields) > 1):
                return requestFields[1]
```

The `extractURLRequest` function extracts the URL path from an Apache log line. Here's the full step-by-step explanation:

```python
1. Function Definition
def extractURLRequest(line):
    Defines a function called extractURLRequest.
    The parameter `line` represents one line of a log file (e.g., an Apache access log entry).

2. Match the Log Format
exp = pattern.match(line)
    `pattern` is the compiled regular expression (re.compile(...)) we created earlier to describe the log format.
    `pattern.match(line)` checks if the given log line follows that format.
    If it matches, it returns a match object (`exp`), otherwise None.

3. If the Line Matches
if exp:
    This condition ensures that we only continue processing if the line matches the expected format.

4. Extract the Request Field
request = exp.groupdict()["request"]
    `exp.groupdict()` converts the named groups in the regex (like (?P<request>.+)) into a Python dictionary.
    From this dictionary, we access the "request" field, which corresponds to the request string inside the log.
    Example log snippet: "GET /index.html HTTP/1.1"
        → Here, the "request" part is exactly that string.

5. Check If Request Exists
if request:
    Makes sure that the request string is not empty before proceeding.

6. Split the Request String
requestFields = request.split()
    Splits the request string by spaces.
    Example: "GET /index.html HTTP/1.1"
        → becomes: ["GET", "/index.html", "HTTP/1.1"]

7. Check If It Has Enough Fields
if (len(requestFields) > 1):
    Typically, a request has three parts:
        - Method (e.g., GET, POST)
        - Path/URL (e.g., /index.html)
        - Protocol (e.g., HTTP/1.1)
    This condition ensures the log entry is valid and has at least the method and the path.

8. Return the URL Path
return requestFields[1]
    Returns the second element of the list (index 1), which is the requested URL path.
    From the earlier example: ["GET", "/index.html", "HTTP/1.1"]
        → it returns: /index.html

# Summary
This function extracts the URL path from an Apache log line.

Example:
Input log line:
127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326

Output:
/apache_pb.gif


## Setup Log Folder and Streaming Logic

In [82]:
# Use a folder to simulate a Flume stream
log_dir = "/content/logs"
os.makedirs(log_dir, exist_ok=True)  # create folder if it doesn't exist

# Read files from folder as a "stream"
lines = ssc.textFileStream(log_dir)
urls = lines.map(extractURLRequest).filter(lambda x: x is not None)


# Reduce by URL over a 5-minute window sliding every second
urlCounts = urls.map(lambda x: (x, 1)) \
                .reduceByKeyAndWindow(lambda x, y: x + y,
                                      lambda x, y: x - y,
                                      300, 1)

# Sort and print the results
sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
sortedResults.pprint()

# --- # Get top URL
def printTopURL(rdd):
    if not rdd.isEmpty():
        top = rdd.sortBy(lambda x: x[1], False).take(1)
        all_sorted = rdd.sortBy(lambda x: x[1], False).collect()
        print("=== URL counts in this batch ===")
        for u, c in all_sorted:
            print(f"{u}: {c}")
        print("=== Top URL ===")
        if top:
            print(f"{top[0][0]} with {top[0][1]} hits\n")

# 调用 foreachRDD
urlCounts.foreachRDD(printTopURL)


## Simulate Writing Log Files

In [83]:
ssc.start()
urls_pool = ['/index.html', '/about.html', '/contact.html', '/products.html', '/blog.html']

for batch in range(10):
    with open(f"{log_dir}/log{batch}.txt", "w") as f:
        for _ in range(random.randint(5, 20)):  # 5-20 logs per batch
            url = random.choice(urls_pool)       # random URL
            f.write(f'127.0.0.1 - user [10/Oct/2023:13:55:{batch}] "GET {url} HTTP/1.1" 200 1024 "-" "Mozilla/5.0"\n')
    time.sleep(2)  # 2 sec per batch log


-------------------------------------------
Time: 2025-09-09 13:08:31
-------------------------------------------
('/products.html', 3)
('/contact.html', 3)
('/index.html', 2)
('/blog.html', 2)

=== URL counts in this batch ===
/products.html: 3
/contact.html: 3
/index.html: 2
/blog.html: 2
=== Top URL ===
/products.html with 3 hits

-------------------------------------------
Time: 2025-09-09 13:08:32
-------------------------------------------
('/products.html', 3)
('/contact.html', 3)
('/index.html', 2)
('/blog.html', 2)



## Start Streaming and Stop

In [84]:
time.sleep(10)

ssc.stop(stopSparkContext=True, stopGraceFully=True)


=== URL counts in this batch ===
/contact.html: 8
/blog.html: 6
/index.html: 5
/products.html: 4
/about.html: 3
=== Top URL ===
/contact.html with 8 hits

-------------------------------------------
Time: 2025-09-09 13:08:34
-------------------------------------------
('/contact.html', 8)
('/blog.html', 6)
('/index.html', 5)
('/products.html', 4)
('/about.html', 3)

=== URL counts in this batch ===
/contact.html: 8
/blog.html: 6
/index.html: 5
/products.html: 4
/about.html: 3
=== Top URL ===
/contact.html with 8 hits

-------------------------------------------
Time: 2025-09-09 13:08:35
-------------------------------------------
('/blog.html', 13)
('/contact.html', 12)
('/index.html', 9)
('/products.html', 7)
('/about.html', 5)

=== URL counts in this batch ===
/blog.html: 13
/contact.html: 12
/index.html: 9
/products.html: 7
/about.html: 5
=== Top URL ===
/blog.html with 13 hits

-------------------------------------------
Time: 2025-09-09 13:08:36
-----------------------------------