In [None]:
# Upload .tar file for Mahout and Spark on Google Drive
# Mount same Drive account to Google Collab
from google.colab import drive
drive.mount('/content/drive')


: 

In [None]:
!apt-get update -qq
!apt-get install -y -qq openjdk-8-jdk-headless wget tar
!apt install -y pigz > /dev/null

In [None]:
import os
import subprocess
from google.colab import drive

# drive_tar here is path for spark tar file in Google drive (adjust accordingly)
drive_tar = '/content/drive/MyDrive/spark-2.2.0-bin-hadoop2.7.tgz'  # change this
local_tar = '/content/spark-2.2.0-bin-hadoop2.7.tgz'
extract_dir = '/content/spark'

# --- Copy from Drive  ---
os.makedirs(extract_dir, exist_ok=True)
print("Copying from Drive to Colab...")
subprocess.run(["cp", drive_tar, local_tar])

# --- Step 1: Multi-threaded decompression ---
print("Decompressing with 2 threads using pigz...")
subprocess.run(["pigz", "-d", "-p", "2", local_tar])

# --- Step 2: Extract .tar ---
tar_file = local_tar.replace(".gz", "")
print(f"Extracting {tar_file} to {extract_dir}...")
subprocess.run(["tar", "-xf", tar_file, "-C", extract_dir])

print(f"\nâœ… Extraction complete! Files are in: {extract_dir}")


In [None]:
# Lab Example
# Basic Imports
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

# Create 10 vertices (id, name)
val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Charlie"),
  (4L, "David"),
  (5L, "Eve"),
  (6L, "Frank"),
  (7L, "Grace"),
  (8L, "Hannah"),
  (9L, "Ivy"),
  (10L, "Jack")
))

# Create some edges (srcId, dstId, relationship)
val edges: RDD[Edge[String]] = sc.parallelize(Seq(
  Edge(1L, 2L, "friend"),
  Edge(2L, 3L, "friend"),
  Edge(3L, 4L, "follow"),
  Edge(4L, 5L, "friend"),
  Edge(5L, 6L, "follow"),
  Edge(6L, 7L, "friend"),
  Edge(7L, 8L, "follow"),
  Edge(8L, 9L, "friend"),
  Edge(9L, 10L, "follow"),
  Edge(10L, 1L, "friend"),
  Edge(1L, 5L, "friend"),
  Edge(2L, 6L, "follow")
))

# Build the Graph
val graph = Graph(vertices, edges)

# Compute the degree of each vertex (in + out)
val degrees: VertexRDD[Int] = graph.degrees

val highDegreeVertices = degrees.filter { case (id, deg) => deg >= 3 }

val highDegreeIDs = highDegreeVertices.map(_._1).collect()

val highDegreeIDs_b = sc.broadcast(highDegreeIDs)

val subgraph = graph.subgraph(
  vpred = (id, attr) => highDegreeIDs_b.value.contains(id)
)

val inSub = subgraph.inDegrees

val outSub = subgraph.outDegrees


# Save vertices and edges as CSV for Python / NetworkX
# Coalesce to 1 partition so we get a single CSV file
graph.vertices
  .map { case (id, attr) => s"$id,$attr" }
  .coalesce(1)
  .saveAsTextFile("/content/vertices_csv")

graph.edges
  .map(e => s"${e.srcId},${e.dstId},${e.attr}")
  .coalesce(1)
  .saveAsTextFile("/content/edges_csv")



In [None]:
import os
#Run command to open scala shell
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark/spark-2.2.0-bin-hadoop2.7'
os.environ['PATH'] += f":{os.environ['SPARK_HOME']}/bin"

!spark-shell --master local[*] --driver-memory 2g


#If it does not work run last cell  again
# Run previous cell as practice example

In [14]:
#Delete folder /content/vertices_csv and all its contents
!rm -rf /content/vertices.csv


In [None]:
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt

# Load CSV files
vertices = pd.read_csv("/content/vertices_csv/part-00000", names=["id", "name"])
edges = pd.read_csv("/content/edges_csv/part-00000", names=["src", "dst", "relation"])

# Create a directed graph (or use nx.Graph() for undirected)
G = nx.DiGraph()

# Add nodes with attributes
for _, row in vertices.iterrows():
    G.add_node(row['id'], name=row['name'])

# Add edges with attributes
for _, row in edges.iterrows():
    G.add_edge(row['src'], row['dst'], relation=row['relation'])

# Draw the graph
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, labels=nx.get_node_attributes(G, 'name'))
edge_labels = nx.get_edge_attributes(G, 'relation')
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
plt.show()
