In [11]:
# Imports and Spark Configuration (Python/PySpark)
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix

# Initialize Spark Session (required for DataFrame operations)
spark = SparkSession.builder \
    .appName("RDD MatrixMultiply") \
    .master("local") \
    .getOrCreate()

# Get SparkContext from SparkSession
sc = spark.sparkContext
print("Spark initialized successfully!")

Spark initialized successfully!


In [12]:
# Define Matrix A as RDD (Python)
# Matrix A (2x3): 
# [1, 0, 2]
# [0, 3, -1]
entries_a = sc.parallelize([
    MatrixEntry(0, 0, 1.0), MatrixEntry(0, 2, 2.0),
    MatrixEntry(1, 1, 3.0), MatrixEntry(1, 2, -1.0)
])

print("Matrix A entries:")
for entry in entries_a.collect():
    print(f"({entry.i},{entry.j}) = {entry.value}")

Matrix A entries:
(0,0) = 1.0
(0,2) = 2.0
(1,1) = 3.0
(1,2) = -1.0


In [13]:
# Define Matrix B as RDD (Python)
# Matrix B (3x2):
# [1, 2]
# [3, 4]  
# [5, 6]
entries_b = sc.parallelize([
    MatrixEntry(0, 0, 1.0), MatrixEntry(1, 0, 3.0), MatrixEntry(2, 0, 5.0),
    MatrixEntry(0, 1, 2.0), MatrixEntry(1, 1, 4.0), MatrixEntry(2, 1, 6.0)
])

print("Matrix B entries:")
for entry in entries_b.collect():
    print(f"({entry.i},{entry.j}) = {entry.value}")

Matrix B entries:
(0,0) = 1.0
(1,0) = 3.0
(2,0) = 5.0
(0,1) = 2.0
(1,1) = 4.0
(2,1) = 6.0


In [14]:
# Matrix Multiplication Logic (Python)
# Step 1: Key matrices for join operation
a_keyed = entries_a.map(lambda e: (e.j, (e.i, e.value)))
b_keyed = entries_b.map(lambda e: (e.i, (e.j, e.value)))

print("Matrix A keyed by column:")
for item in a_keyed.collect():
    print(item)

print("\nMatrix B keyed by row:")
for item in b_keyed.collect():
    print(item)

Matrix A keyed by column:
(0, (0, 1.0))
(2, (0, 2.0))
(1, (1, 3.0))
(2, (1, -1.0))

Matrix B keyed by row:
(0, (0, 1.0))
(1, (0, 3.0))
(2, (0, 5.0))
(0, (1, 2.0))
(1, (1, 4.0))
(2, (1, 6.0))
(0, (0, 1.0))
(2, (0, 2.0))
(1, (1, 3.0))
(2, (1, -1.0))

Matrix B keyed by row:
(0, (0, 1.0))
(1, (0, 3.0))
(2, (0, 5.0))
(0, (1, 2.0))
(1, (1, 4.0))
(2, (1, 6.0))


In [15]:
# Step 2: Perform join and multiplication (Python)
product = (a_keyed.join(b_keyed)
          .map(lambda x: ((x[1][0][0], x[1][1][0]), x[1][0][1] * x[1][1][1]))
          .reduceByKey(lambda a, b: a + b)
          .map(lambda x: MatrixEntry(x[0][0], x[0][1], x[1])))

print("Intermediate products:")
for item in a_keyed.join(b_keyed).collect():
    print(item)

print("\nAfter multiplication and grouping:")
for entry in product.collect():
    print(f"({entry.i},{entry.j}) = {entry.value}")

Intermediate products:
(0, ((0, 1.0), (0, 1.0)))
(0, ((0, 1.0), (1, 2.0)))
(2, ((0, 2.0), (0, 5.0)))
(2, ((0, 2.0), (1, 6.0)))
(2, ((1, -1.0), (0, 5.0)))
(2, ((1, -1.0), (1, 6.0)))
(1, ((1, 3.0), (0, 3.0)))
(1, ((1, 3.0), (1, 4.0)))

After multiplication and grouping:
(0, ((0, 1.0), (0, 1.0)))
(0, ((0, 1.0), (1, 2.0)))
(2, ((0, 2.0), (0, 5.0)))
(2, ((0, 2.0), (1, 6.0)))
(2, ((1, -1.0), (0, 5.0)))
(2, ((1, -1.0), (1, 6.0)))
(1, ((1, 3.0), (0, 3.0)))
(1, ((1, 3.0), (1, 4.0)))

After multiplication and grouping:
(0,0) = 11.0
(1,1) = 6.0
(0,1) = 14.0
(1,0) = 4.0
(0,0) = 11.0
(1,1) = 6.0
(0,1) = 14.0
(1,0) = 4.0


In [16]:
# Final Result (Python)
result = CoordinateMatrix(product)

print("Final Matrix Multiplication Result (A × B):")
print("=" * 50)
sorted_entries = sorted(result.entries.collect(), key=lambda e: (e.i, e.j))
for entry in sorted_entries:
    print(f"({entry.i},{entry.j}) = {entry.value}")

# Stop Spark Session (this also stops SparkContext)
spark.stop()
print("\nSpark session stopped successfully!")

Final Matrix Multiplication Result (A × B):
(0,0) = 11.0
(0,1) = 14.0
(1,0) = 4.0
(1,1) = 6.0
(0,0) = 11.0
(0,1) = 14.0
(1,0) = 4.0
(1,1) = 6.0

Spark session stopped successfully!

Spark session stopped successfully!


In [17]:
# Alternative: NumPy Matrix Multiplication for Verification
import numpy as np

# Define matrices as NumPy arrays
matrix_a = np.array([
    [1.0, 0.0, 2.0],
    [0.0, 3.0, -1.0]
])

matrix_b = np.array([
    [1.0, 2.0],
    [3.0, 4.0],
    [5.0, 6.0]
])

# Perform matrix multiplication
numpy_result = np.dot(matrix_a, matrix_b)

print("NumPy Matrix Multiplication Result (for verification):")
print("Matrix A (2x3):")
print(matrix_a)
print("\nMatrix B (3x2):")
print(matrix_b)
print("\nResult A × B (2x2):")
print(numpy_result)

# Convert to same format as Spark result for comparison
print("\nNumPy result in coordinate format:")
for i in range(numpy_result.shape[0]):
    for j in range(numpy_result.shape[1]):
        if numpy_result[i,j] != 0:  # Only show non-zero entries
            print(f"({i},{j}) = {numpy_result[i,j]}")

NumPy Matrix Multiplication Result (for verification):
Matrix A (2x3):
[[ 1.  0.  2.]
 [ 0.  3. -1.]]

Matrix B (3x2):
[[1. 2.]
 [3. 4.]
 [5. 6.]]

Result A × B (2x2):
[[11. 14.]
 [ 4.  6.]]

NumPy result in coordinate format:
(0,0) = 11.0
(0,1) = 14.0
(1,0) = 4.0
(1,1) = 6.0
