# Mining Massive Datasets Problem Set 10

Ruben Hartenstein, Taha Erkoc

# Exercise 1

### a) Advantage and Disadvantage of a Smaller Damping factor β

*"This means Google can dictate the rate of convergence according to how small α is chosen to be. Consequently, Google engineers are forced to perform a delicate balancing act. The smaller α is, the faster the convergence, but the smaller α is, the less the true hyperlink structure of the web is used to determine web page importance. And slightly different values for α can produce very different PageRanks. Moreover, as α → 1, not only does convergence slow drastically, but sensitivity issues begin to surface as well" (Section 5.1 Rate of Convergence)*

A smaller damping factor results in faster convergence of the power iteration method. This is because the subdominant eigenvalue of the matrix scales with $\beta$ and a smaller $\beta$ leads to fewer iterations needed to compute the PageRank vector. This is crucial when working with extremely large matrices such as those encountered in web-scale computations. However, a smaller damping factor also reduces the influence of the actual hyperlink structure of the web, meaning that rankings rely less on the link connectivity rather than the teleportation component, thus making rankings less reflective of the "real" importance of web pages.


### b) Reducing Iterations by Adjusting Tolerance
*"Thus, a rough estimate of the number of iterations needed to converge to a tolerance level τ (measured by the residual,
$x^{(k)T} P − x^{(k)T} = x^{(k+1)T} − x^{(k)T}$)is $\frac{\log 10τ}{\log 10α}$" (Section 5.1 Rate of Convergence)*

The relationship between the number of iterations (k) and tolerance is shown by:

$k = \frac{\log (τ)}{\log (α)}$

or

$$
k = \frac{\log(\tau)}{\log(\lambda_2)}
$$

where:
- $\tau$ is the fixed tolerance level,
- $\lambda_2 = \beta \cdot \lambda_2(M)$, the second-largest (subdominant) eigenvalue,
- $\beta$ is the damping factor (called α in the paper).


To reduce the number of iterations to 100, we need to reduce the dampening factor $\beta$


$\frac{k_{old}}{k_{new}} = \frac{\log(\lambda_{2, old})}{\log(\lambda_{2, new})}$

Substituting the values:

$$
\frac{1000}{100} = \frac{\log(\lambda_{2, \text{old}})}{\log(\lambda_{2, \text{new}})}.
$$

Since $\lambda_2 = \beta \cdot \lambda_2(M)$:

$$
\log(\beta_{\text{old}} \cdot \lambda_2(M)) = 10 \cdot \log(\beta_{\text{new}} \cdot \lambda_2(M)).
$$

Simplify by canceling $\log(\lambda_2(M))$:

$$
\log(\beta_{\text{old}}) = 10 \cdot \log(\beta_{\text{new}})
$$

Solving for $\beta_{\text{new}}$:

$$
\beta_{\text{old}} = (\beta_{\text{new}})^{10}.
$$

Take the 10th root:

$$
\beta_{\text{new}} = (\beta_{\text{old}})^{1/10}.
$$

#### Example:
For $\beta_{\text{old}} = 0.85$:

$$
\beta_{\text{new}} = (0.85)^{1/10} \approx 0.984.
$$


### c) Teleports and Adjustments to the Google Matrix
*"One of the first modifications to the basic PageRank model suggested by its founders was a change to the teleportation matrix $E$. Rather than using $\frac{1}{n}ee^T$, they used $ev^T$,where $v^T > 0$ is a probability vector called the personalization
or teleportation vector. Since $v^T$ is a probability vector with positive elements,
every node is still directly connected to every other node, thus, $P$ is irreducible.
Using $v^T$ in place of $\frac{1}{n}e^T$ means that the teleportation probabilities are no longer uniformly distributed. Instead, each time a surfer teleports, he or she follows the probability distribution given in $v^T$ to jump to the next page(...)To produce a PageRank that is personalized for a particular user, only the constant vector $v^T$ added at each iteration must be modified" (Section 6.2)*

One way Google exploits this is for __spam reduction__: by altering the teleportation vector, Google can penalize pages associated with link farms or spam, reducing their PageRank and overall influence in the ranking.


They also intended to use this for __personalization__, with vector $v^T$ allowing for different PageRank vectors to be tailored to different user preferences or categories, such as sports or news. (*"However, it makes the once query-independent, user independent PageRankings user-dependent and more calculation-laden. Nevertheless, it seems this little personalization vector has had more significant side effects." (Section 6.2)*)

# Exercise 2

In [5]:
def parse_graph(file_path):
    """
    Parses the Stanford web graph dataset and constructs dictionaries for in-neighbors and out-neighbors.
    
    Parameters:
        file_path (str): Path to the dataset file.
    
    Returns:
        dict: in_neighbors - Maps each node to a list of its in-neighbors.
        dict: out_neighbors - Maps each node to a list of its out-neighbors.
    """
    in_neighbors = {}
    out_neighbors = {}

    with open(file_path, 'r') as f:
        for line in f:
            # Skip comments
            if line.startswith("#"):
                continue
            
            # Parse the edge
            from_node, to_node = map(int, line.strip().split('\t'))
            
            # Update out-neighbors
            if from_node not in out_neighbors:
                out_neighbors[from_node] = []
            out_neighbors[from_node].append(to_node)
            
            # Update in-neighbors
            if to_node not in in_neighbors:
                in_neighbors[to_node] = []
            in_neighbors[to_node].append(from_node)
    
    return in_neighbors, out_neighbors

# Example usage
file_path = 'web-Stanford_small.txt'
in_neighbors, out_neighbors = parse_graph(file_path)

# Display a portion of the data
print("In-Neighbors:", dict(list(in_neighbors.items())[:5]))
print("Out-Neighbors:", dict(list(out_neighbors.items())[:5]))


In-Neighbors: {6548: [1], 15409: [1], 57031: [6548], 13102: [15409], 17794: [2]}
Out-Neighbors: {1: [6548, 15409], 6548: [57031], 15409: [13102], 2: [17794, 25202, 53625, 54582, 64930, 73764, 84477, 98628, 100193, 102355, 105318, 105730, 115926, 140864, 163550, 164599, 175799, 178642, 181714, 190453, 204189, 204604, 210870, 213966, 225119, 241596, 243294, 246897, 251658, 252915, 280935], 252915: [2]}


In [6]:
from collections import deque

def find_dead_ends(file_path):
    """
    Find all dead-end pages in the graph and return their list in removal order.

    Parameters:
        file_path (str): Path to the dataset file.

    Returns:
        list: List of dead-end pages in their removal order.
    """
    # Parse the graph
    in_neighbors, out_neighbors = parse_graph(file_path)

    # Initialize out-degree array
    D = {node: len(out_neighbors.get(node, [])) for node in set(in_neighbors) | set(out_neighbors)}

    # Identify initial dead-ends
    q = deque([node for node, degree in D.items() if degree == 0])

    # Recursive elimination of dead-ends
    dead_ends = []
    visited = set()  # To ensure nodes are processed only once
    while q:
        node = q.popleft()
        if node not in visited:
            visited.add(node)
            dead_ends.append(node)
            for in_neighbor in in_neighbors.get(node, []):
                D[in_neighbor] -= 1
                if D[in_neighbor] == 0:
                    q.append(in_neighbor)

    return dead_ends

dead_ends = find_dead_ends(file_path)
print("Dead-End Pages (in Removal Order):", dead_ends)

Dead-End Pages (in Removal Order): [24584, 65551, 258066, 208925, 213021, 237599, 28703, 131107, 53286, 139311, 20530, 122938, 90175, 69696, 32833, 229441, 200770, 81987, 200772, 229449, 76, 114765, 262222, 204879, 118862, 80, 127058, 127062, 86103, 82011, 91, 213089, 163941, 151653, 16492, 108, 143470, 262255, 266351, 16494, 77938, 151673, 262269, 270464, 45184, 200832, 233602, 28809, 110731, 37003, 73872, 225425, 61586, 155801, 114844, 188574, 168094, 147616, 250016, 41122, 221346, 266402, 155811, 77992, 196782, 77999, 200890, 274619, 155849, 78030, 159951, 192719, 245969, 73939, 176342, 213209, 192738, 65766, 225511, 78056, 123114, 20715, 164079, 94452, 106740, 184565, 90359, 168189, 123134, 213249, 135428, 24838, 225552, 98577, 86291, 78103, 61721, 192798, 241952, 123171, 176422, 258348, 127278, 250159, 49456, 94512, 114994, 106804, 250164, 311, 160056, 57657, 213303, 147768, 86333, 49476, 217423, 242000, 49489, 106843, 164186, 78181, 135534, 24944, 94578, 156020, 115060, 12663, 13

In [8]:
from collections import defaultdict

def compute_pagerank(file_path, beta=0.8, epsilon=1e-4, max_iterations=5):
    """
    Computes the PageRank values using the Complete Algorithm.

    Parameters:
        file_path (str): Path to the dataset file.
        beta (float): Probability not to follow random teleport links.
        epsilon (float): Convergence tolerance level.
        max_iterations (int): Maximum number of iterations.
        
    Returns:
        dict: Final PageRank values for each node.
    """
    # Parse the graph
    in_neighbors, out_neighbors = parse_graph(file_path)
    
    # Get all nodes
    all_nodes = set(in_neighbors) | set(out_neighbors)
    N = len(all_nodes)
    
    # Initialize ranks
    pagerank = {node: 1 / N for node in all_nodes}
    
    for iteration in range(max_iterations):
        new_pagerank = defaultdict(float)
        leaked_rank = 0
        
        # Calculate contributions
        for node in all_nodes:
            if node in out_neighbors and len(out_neighbors[node]) > 0:
                contribution = beta * pagerank[node] / len(out_neighbors[node])
                for neighbor in out_neighbors[node]:
                    new_pagerank[neighbor] += contribution
            else:
                # Dead-end contribution (no out-links)
                leaked_rank += beta * pagerank[node]

        # Re-insert the leaked rank
        S = sum(new_pagerank.values())
        for node in all_nodes:
            new_pagerank[node] += (leaked_rank + (1 - beta)) / N
        
        # Check for convergence
        diff = sum(abs(new_pagerank[node] - pagerank[node]) for node in all_nodes)
        pagerank = new_pagerank
        print(f"Iteration {iteration + 1}, Total Difference: {diff}")
        if diff < epsilon:
            break
    
    return pagerank

# Example usage
file_path = 'web-Stanford_small.txt'
pagerank = compute_pagerank(file_path, beta=0.8, epsilon=1e-12, max_iterations=5)

# Output PageRank values
for node, rank in sorted(pagerank.items()):
    print(f"Node {node}: PageRank = {rank}")

Iteration 1, Total Difference: 0.13084044950872642
Iteration 2, Total Difference: 0.05623703106929043
Iteration 3, Total Difference: 0.025831795190388418
Iteration 4, Total Difference: 0.016579639711373072
Iteration 5, Total Difference: 0.012317777294686324
Node 1: PageRank = 0.0005245190000505829
Node 2: PageRank = 0.026543740706355904
Node 3: PageRank = 0.0005245190000505829
Node 4: PageRank = 0.0005245190000505829
Node 5: PageRank = 0.0007054036498556903
Node 6: PageRank = 0.0013650120757502143
Node 7: PageRank = 0.0010617182732854155
Node 8: PageRank = 0.0005245190000505829
Node 9: PageRank = 0.0005245190000505829
Node 10: PageRank = 0.0009408836700604841
Node 11: PageRank = 0.0006990626571881938
Node 12: PageRank = 0.0005245190000505829
Node 13: PageRank = 0.0020027918440114582
Node 14: PageRank = 0.0005245190000505829
Node 15: PageRank = 0.0005369527308013729
Node 16: PageRank = 0.0008471479857245436
Node 17: PageRank = 0.0005245190000505829
Node 18: PageRank = 0.0013650120757502

# Exercise 3 (in the PDF)

# Exercise 4 (in the PDF)

# Exercise 5 (in the PDF)

# Exercise 6 (in the PDF)

# Exercise 7

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, TimestampType, StringType

def load_spot_instance_prices(file_name):
    """
    Load a .gz file with spot instance prices into a Spark DataFrame.
    
    Parameters:
        file_name (str): Path to the compressed .gz file.
    
    Returns:
        pyspark.sql.DataFrame: A Spark DataFrame with the data.
    """
    # Initialize SparkSession
    spark = SparkSession.builder.appName("SpotInstancePricesLoader").getOrCreate()
    
    # Define schema with memory-efficient data types
    schema = StructType([
        StructField("Type", StringType(), True),  # Drop this column later
        StructField("Price", DoubleType(), True),
        StructField("Timestamp", TimestampType(), True),
        StructField("InstanceType", StringType(), True),
        StructField("ProductDescription", StringType(), True),
        StructField("AvailabilityZone", StringType(), True)
    ])
    
    # Read the .gz file into a DataFrame
    df = spark.read.csv(file_name, sep="\t", schema=schema, header=True)
    
    # Drop the "Type" column
    df = df.drop("Type")
    
    return df

# Example usage
# File path to test
file_name = "dataset-EC2-series/prices-ca-central-1-2019-05-24.txt.gz"

# Load the DataFrame
df = load_spot_instance_prices(file_name)

# Calculate the average price per combination of InstanceType and ProductDescription
avg_prices = (
    df.groupBy("InstanceType", "ProductDescription")
        .avg("Price")
        .withColumnRenamed("avg(Price)", "AveragePrice")
)

# Show the results
avg_prices.show()


+------------+------------------+--------------------+
|InstanceType|ProductDescription|        AveragePrice|
+------------+------------------+--------------------+
|   c4.xlarge|           Windows| 0.23910000000000034|
|    t3.micro|           Windows|0.012699999999999993|
|  t3.2xlarge|           Windows| 0.25780000000000025|
|  t2.2xlarge|           Windows|  0.1848999999999999|
|  m5.4xlarge|        Linux/UNIX| 0.25776746268656775|
|   t2.xlarge|        Linux/UNIX|  0.0614000000000001|
| m5.12xlarge|        Linux/UNIX|  1.2112850490196077|
|   i3.xlarge|           Windows| 0.28719999999999923|
| x1.16xlarge|           Windows|   5.144800000000003|
|   c5.xlarge|        Linux/UNIX| 0.05872099644128116|
|  c5.9xlarge|        Linux/UNIX|  0.6711771978021982|
|    t2.micro|           Windows|0.008399999999999982|
|m5d.12xlarge|           Windows|   2.936500000000008|
|    t2.small|        Linux/UNIX|0.007700000000000022|
|  c5.4xlarge|           Windows|  0.9672999999999973|
| c5.18xla