# Christos Stylidis
# Assignment 2: Spark - Graph Analysis

## Input file description

You are given the `p2p-Gnutella08.txt` input file. It is a Comma Separated Values (CSV) file that stores a snapshot of the Gnutella P2P network from August 8 2002. The snapshot is provided in the form of a directed graph, where each row represents a connection between two servers.

`ServerFrom_ID,ServerTo_ID`

The tuple contains two fields (columns):

* `ServerFrom_ID`: the ID of the connection's source server.
* `ServerTo_ID`: the ID of the connection's target server.


## Tasks

You will write 4 Spark jobs **here** by using PySpark:

* The first job will construct the graph's adjacency lists. That is, for each server ID, the task will build a list of all the incoming connections in the form `ServerID, [ServerFrom1_ID, ServerFrom2_ID, ...]`

* The second job will count the number of nodes and edges in the graph.

* The third job will count the number of nodes for each outdegree. That is, how many nodes have no outgoing edges, how many have one outgoing edge, how many have two outgoing edges, and so on?

* The fourth job will count the number of nodes for each indegree.

## Deliverables

**There will be a single deliverable, this notebook**. You will organize your answers according to the provided structure, which is identical to the example notebooks that were uploaded to the e-learning platform. **Please write your full name in both the notebook's filename and the notebook's title (first line of first cell)**.

Then, upload the file to the e-learning platform.


## Answer

## Job 1

In [9]:
!hadoop fs -put p2p-Gnutella08.txt /user/hadoop/p2p-Gnutella08.txt

In [20]:
from pyspark.sql import SparkSession

def initialize_spark(app_name):
    """Initialize and return a Spark session."""
    return SparkSession.builder.appName(app_name).getOrCreate()

def read_data(file_path, sc):
    """Read the input file and return RDD of lines."""
    return sc.textFile(file_path)

def parse_edge(line):
    """Parse an edge from a line, with validation."""
    tokens = line.split(",")
    if len(tokens) != 2 or not tokens[0].isdigit() or not tokens[1].isdigit():
        return None
    return tokens[1], tokens[0]

def create_adjacency_list(edges):
    """Transform edges into adjacency lists."""
    return edges.filter(lambda x: x is not None).groupByKey().mapValues(set)

def main():
    # Initialize Spark session
    spark = initialize_spark("GraphAnalysisOptimized")
    sc = spark.sparkContext

    # Read the data file
    file_path = "p2p-Gnutella08.txt"
    lines = read_data(file_path, sc)

    # Parse the data into edges
    edges = lines.map(parse_edge)

    # Transform into adjacency lists
    adjacency_lists = create_adjacency_list(edges)

    # Output the results
    results = adjacency_lists.take(10)
    for result in results:
        print(result)

if __name__ == "__main__":
    main()


('1', {'0'})
('2', {'0', '3192', '553', '3552', '2546', '4752'})
('3', {'0', '2032', '3820', '2796', '2901', '5267', '3459', '992', '180', '1487', '1784', '5654', '4160', '174', '3693', '3495', '5724', '1046', '5792', '2084', '3526', '2176', '3077', '3515', '3026', '2873', '3890', '3551', '2490', '1676', '2822', '3544', '1454', '4260', '2252', '762', '15', '2465', '1199', '4003', '2695', '820', '1082', '504', '6132', '3042', '409', '2699', '4553', '4002', '1477', '2241', '1564', '36', '2546', '3660', '120', '1059', '4405', '3505', '3577', '123', '1541', '4179', '2313', '3213', '366', '1161', '4482', '2733', '507', '2986', '30', '580', '1739', '1527', '3176'})
('4', {'339', '481', '6092', '0', '4211', '4275', '4306', '1934', '2146', '427', '1907', '5928', '195', '4382', '1458', '4353', '2881', '2066', '665', '5809', '3937', '1113', '6175', '720', '423', '830', '371', '4598', '3515', '1477', '434', '263', '4297', '4470', '1784', '4445', '2350', '3856', '553', '1586', '1866', '2621', '306

## Job 2

In [23]:
# Initialize Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("GraphAnalysis_Count").getOrCreate()
sc = spark.sparkContext

# Read the data file
file_path = "p2p-Gnutella08.txt"  # Update with the correct path
lines = sc.textFile(file_path)

# Parse the data
edges = lines.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0], tokens[1]))

# Counting nodes
nodes_from = edges.map(lambda x: x[0])
nodes_to = edges.map(lambda x: x[1])
unique_nodes = nodes_from.union(nodes_to).distinct().count()

# Counting edges
total_edges = edges.count()

print(f"Total number of nodes (servers): {unique_nodes}")
print(f"Total number of edges (connections): {total_edges}")


                                                                                

Total number of nodes (servers): 6301
Total number of edges (connections): 20777


                                                                                

## Job 3

In [25]:
from pyspark.sql import SparkSession

def initialize_spark(app_name):
    """Initialize and return a Spark session."""
    return SparkSession.builder.appName(app_name).getOrCreate()

def read_data(file_path, sc):
    """Read the input file and return RDD of lines."""
    return sc.textFile(file_path)

def parse_edge(line):
    """Parse an edge from a line, with validation."""
    tokens = line.split(",")
    if len(tokens) != 2 or not tokens[0].isdigit() or not tokens[1].isdigit():
        return None
    return tokens[0], tokens[1]

def count_outdegree(nodes_to):
    """Count outdegree for each node."""
    return nodes_to.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

def count_nodes_for_outdegree(outdegree_counts):
    """Count the number of nodes for each outdegree."""
    return outdegree_counts.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

def main():
    # Initialize Spark session
    spark = initialize_spark("GraphAnalysisOutdegree")
    sc = spark.sparkContext

    # Read the data file
    file_path = "p2p-Gnutella08.txt"
    lines = read_data(file_path, sc)

    # Parse the data into edges
    edges = lines.map(parse_edge)

    # Count outdegree for each node
    nodes_to = edges.map(lambda x: (x[1], None))  # Only consider destination nodes
    outdegree_counts = count_outdegree(nodes_to)

    # Count the number of nodes for each outdegree
    nodes_for_outdegree = count_nodes_for_outdegree(outdegree_counts)

    # Output the results
    results = nodes_for_outdegree.collect()
    for result in results:
        print(f"Nodes with outdegree {result[0]}: {result[1]}")

if __name__ == "__main__":
    main()


Nodes with outdegree 1: 2452
Nodes with outdegree 6: 227
Nodes with outdegree 77: 2
Nodes with outdegree 59: 1
Nodes with outdegree 73: 2
Nodes with outdegree 51: 1
Nodes with outdegree 74: 1
Nodes with outdegree 49: 1
Nodes with outdegree 7: 144
Nodes with outdegree 5: 333
Nodes with outdegree 4: 559
Nodes with outdegree 12: 23
Nodes with outdegree 3: 868
Nodes with outdegree 2: 1287
Nodes with outdegree 70: 3
Nodes with outdegree 11: 29
Nodes with outdegree 54: 1
Nodes with outdegree 85: 1
Nodes with outdegree 50: 1
Nodes with outdegree 71: 3
Nodes with outdegree 60: 3
Nodes with outdegree 82: 1
Nodes with outdegree 81: 4
Nodes with outdegree 83: 1
Nodes with outdegree 19: 2
Nodes with outdegree 66: 2
Nodes with outdegree 14: 13
Nodes with outdegree 52: 1
Nodes with outdegree 20: 4
Nodes with outdegree 13: 19
Nodes with outdegree 67: 3
Nodes with outdegree 21: 2
Nodes with outdegree 57: 1
Nodes with outdegree 38: 1
Nodes with outdegree 47: 2
Nodes with outdegree 30: 1
Nodes with outd

## Job 4

In [None]:
from pyspark.sql import SparkSession

def initialize_spark(app_name):
    """Initialize and return a Spark session."""
    return SparkSession.builder.appName(app_name).getOrCreate()

def read_data(file_path, sc):
    """Read the input file and return RDD of lines."""
    return sc.textFile(file_path)

def parse_edge(line):
    """Parse an edge from a line, with validation."""
    tokens = line.split(",")
    if len(tokens) != 2 or not tokens[0].isdigit() or not tokens[1].isdigit():
        return None
    return tokens[1], tokens[0]

def count_indegree(nodes_from):
    """Count indegree for each node."""
    return nodes_from.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

def count_nodes_for_indegree(indegree_counts):
    """Count the number of nodes for each indegree."""
    return indegree_counts.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

def main():
    # Initialize Spark session
    spark = initialize_spark("GraphAnalysisIndegree")
    sc = spark.sparkContext

    # Read the data file
    file_path = "p2p-Gnutella08.txt"
    lines = read_data(file_path, sc)

    # Parse the data into edges
    edges = lines.map(parse_edge)

    # Count indegree for each node
    nodes_from = edges.map(lambda x: (x[1], None))  # Only consider source nodes
    indegree_counts = count_indegree(nodes_from)

    # Count the number of nodes for each indegree
    nodes_for_indegree = count_nodes_for_indegree(indegree_counts)

    # Output the results
    results = nodes_for_indegree.collect()
    for result in results:
        print(f"Nodes with indegree {result[0]}: {result[1]}")

if __name__ == "__main__":
    main()


Nodes with indegree 10: 1531
Nodes with indegree 9: 372
Nodes with indegree 1: 294
Nodes with indegree 3: 16
Nodes with indegree 7: 9
Nodes with indegree 2: 28
Nodes with indegree 5: 107
Nodes with indegree 8: 44
Nodes with indegree 6: 10
Nodes with indegree 4: 28
Nodes with indegree 13: 2
Nodes with indegree 14: 2
Nodes with indegree 18: 2
Nodes with indegree 25: 1
Nodes with indegree 12: 3
Nodes with indegree 29: 1
Nodes with indegree 19: 1
Nodes with indegree 47: 1
Nodes with indegree 34: 1
Nodes with indegree 24: 1
Nodes with indegree 22: 1
Nodes with indegree 28: 1
Nodes with indegree 11: 1
Nodes with indegree 17: 4
Nodes with indegree 46: 1
Nodes with indegree 48: 1
Nodes with indegree 31: 1
Nodes with indegree 41: 1


In [None]:
## Resources
"https://spark.apache.org/docs/latest/graphx-programming-guide.html""
"https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations"
"https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html"