<a href="https://colab.research.google.com/github/Mandavidu/Mandavidu/blob/main/Cloud_Computing_HW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 0 B/110 kB 0%] [Connected to cloud.r-project.org                                                                                                     Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [810 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:7 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,082 kB]
Hit:8 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:9 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [1,695 kB]
Hit:10 https://ppa.laun

# **1. Word count**

In [2]:
import re
from collections import Counter

# Read data from n-grams.txt
with open("/content/sample_data/n-grams.txt", "r") as file:
    text = file.read()

# Convert text to lowercase of n-grams.txt
text_lower = text.lower()

# Remove punctuation marks of n-grams.txt
text_cleaned = re.sub(r'[^\w\s]', '', text_lower)

# Tokenize the text into words
words = text_cleaned.split()

# Generate n-grams for n = 1 to 5
ngrams = {}
for n in range(1, 6):
    ngrams[n] = [(words[i:i+n]) for i in range(len(words)-n+1)]

# Count occurrences of n-grams
ngram_counts = {}
for n, ngram_list in ngrams.items():
    ngram_counts[n] = Counter([tuple(ngram) for ngram in ngram_list])

# finaly print the 5 most frequent n-grams for each value of n
for n, counts in ngram_counts.items():
    print(f"======5 most frequent {n}-grams======")
    print("index count ngram")
    for i, (ngram, count) in enumerate(counts.most_common(5), start=1):
        ngram_str = " ".join(ngram)
        print(f"{i}. {count} '{ngram_str}'")



index count ngram
1. 5 'to'
2. 3 'care'
3. 2 'you'
4. 2 'needs'
5. 2 'we'
index count ngram
1. 1 'to know'
2. 1 'know you'
3. 1 'you is'
4. 1 'is to'
5. 1 'to love'
index count ngram
1. 1 'to know you'
2. 1 'know you is'
3. 1 'you is to'
4. 1 'is to love'
5. 1 'to love you'
index count ngram
1. 1 'to know you is'
2. 1 'know you is to'
3. 1 'you is to love'
4. 1 'is to love you'
5. 1 'to love you hello'
index count ngram
1. 1 'to know you is to'
2. 1 'know you is to love'
3. 1 'you is to love you'
4. 1 'is to love you hello'
5. 1 'to love you hello world'


# **2. Serializing problem**

Since UDFs in PySpark can be inefficient because they require serialization of data between JVM and Python.The apply method uses ("F.udf") to register the ("is_stopword") function as a UDF directly which is not allowed in PySpark. PySpark UDFs need to be serialized and sent to the worker nodes, so they cannot reference Python functions defined outside of the UDF.

In [3]:
from typing import List
from dataclasses import dataclass
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import types as T
from pyspark.sql import functions as F


@dataclass
class StopWordFilter:
    stop_words: List[str]

    def apply(self, df: DataFrame) -> DataFrame:
        stop_words = self.stop_words

        def is_stopword(word: str) -> bool:
            return word in stop_words

        df = df.withColumn(
            "is_stop_word",
            F.udf(is_stopword, returnType=T.BooleanType())("word"))
        return df


if __name__ == '__main__':
    spark = SparkSession.builder.appName("example").getOrCreate()

    # Create a DataFrame
    data = [("Alice",), ("Bob",), ("BadBoy",)]
    df = spark.createDataFrame(data, schema=['word'])

    # initialize the class
    stop_word_filter = StopWordFilter(["BadBoy"])

    # apply the filter
    df = stop_word_filter.apply(df)
    df.show()

+------+------------+
|  word|is_stop_word|
+------+------------+
| Alice|       false|
|   Bob|       false|
|BadBoy|        true|
+------+------------+



**Solution:**

To resolve the above problem, I made ("is_stopword") function compatible with PySpark UDFs. This can be done by using a lambda function or a regular python function defined inside the apply method. I use a lambda function inside the apply method and modified the code as follows:

In [4]:
from typing import List
from dataclasses import dataclass
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import types as T
from pyspark.sql import functions as F


@dataclass
class StopWordFilter:
    stop_words: List[str]

    def apply(self, df: DataFrame) -> DataFrame:
        stop_words = self.stop_words

        # Define a lambda function for stop word filtering
        is_stopword = F.udf(lambda word: word in stop_words, T.BooleanType())

        # Apply the lambda function to create a new column
        df = df.withColumn("is_stop_word", is_stopword("word"))
        return df


if __name__ == '__main__':
    spark = SparkSession.builder.appName("example").getOrCreate()

    # Create a DataFrame
    data = [("Alice",), ("Bob",), ("BadBoy",)]
    df = spark.createDataFrame(data, schema=['word'])

    # initialize the class
    stop_word_filter = StopWordFilter(["BadBoy"])

    # apply the filter
    df = stop_word_filter.apply(df)
    df.show()


+------+------------+
|  word|is_stop_word|
+------+------------+
| Alice|       false|
|   Bob|       false|
|BadBoy|        true|
+------+------------+



# **3. Page Rank**
PageRank is an algorithm used by Google to rank web pages in its search engine results.

In [43]:
import networkx as nx

def page_rank(input_file, num_iterations, damping_factor):
    # Read the input file and extract necessary information
    with open(input_file, 'r') as file:
        lines = file.readlines()

    num_pages = len(lines) - 2

    # Create a directed graph using NetworkX.
    G = nx.DiGraph()

    # Add nodes to the graph
    for i in range(1, num_pages + 1):
        G.add_node(str(i))

    # Add edges to the graph
    for line in lines[2:]:
        src, *destinations = map(str.strip, line.split())
        for dest in destinations:
            G.add_edge(dest, src)  # Reverse the direction to match the example

    # Calculate PageRank using the given formula by professor
    pr = {}
    for page in G.nodes:
        pr[page] = (1 - damping_factor)  # Initialize with (1 - d)

    for _ in range(num_iterations):
        new_pr = {}
        for page in G.nodes:
            new_pr[page] = (1 - damping_factor)  # Initialize with (1 - d)
            for predecessor in G.predecessors(page):
                new_pr[page] += pr[predecessor] / len(list(G.successors(predecessor))) * damping_factor
                 # PR(p) = (1 - d) + d * (PR(p1)/L(p1) + PR(p2)/L(p2) + ... + PR(pk)/L(pk))

        pr = new_pr

    return pr

def print_page_rank(pr):
    # Find the page with the highest PageRank value
    highest_page = max(pr, key=pr.get)
    highest_pr_value = pr[highest_page]
    # Sort the pages by PageRank value
    sorted_pages = sorted(pr.items(), key=lambda x: x[1], reverse=True)

    # Print the page with the highest PageRank value first
    print(f"Page {highest_page}: has higher rank = {highest_pr_value}")

    # Print the 5 pages with the highest PageRank values
    for i, (page, pr_value) in enumerate(sorted_pages[:5], 1):
        print(f"Page {page}: has rank: = {pr_value}")

input_file = "/content/sample_data/input_1.txt"
num_iterations = 100  # Increased iterations
damping_factor = 0.85  # Damping factor as given in ppt
result = page_rank(input_file, num_iterations, damping_factor)
print_page_rank(result)

Page 1: has higher rank = 1.918918764179665
Page 1: has rank: = 1.918918764179665
Page 2: has rank: = 0.6936936461331438
Page 3: has rank: = 0.6936936461331438
Page 4: has rank: = 0.6936936461331438
Page 5: has rank: = 0.15000000000000002


# **4. Linear Regression Model**

In [51]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Convert all the features (without medv) into a single column
#         Call this new vector column as 'Attributes' in the outputCol
spark = SparkSession.builder \
    .appName("LinearRegression") \
    .getOrCreate()

# Load the dataset
data = spark.read.csv("/content/sample_data/BostonHousing.csv", header=True, inferSchema=True)

# Target column
target_column = 'medv'

# Get the list of feature columns (all columns except the target column)
feature_columns = [col for col in data.columns if col != target_column]

# Create a vector assembler
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="Attributes")

# Transform the data to incorporate the assembled features
data = assembler.transform(data)

# Step 2: Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

# Step 3: Create a Linear Regression model
lr = LinearRegression(featuresCol="Attributes", labelCol=target_column)

# Fit the model to the training data
lr_model = lr.fit(train_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Show predictions
#predictions.select("medv", target_column, *feature_columns).show()

# Step 4: Analyze the model statistically
# Evaluate the model by importing RegressionEvaluator module
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mse")
evaluation_summary = lr_model.evaluate(test_data)
print("Root Mean Squared Error (RMSE):", evaluation_summary.rootMeanSquaredError)
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE):", mse)

evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print("Mean Absolute Error (MAE):", mae)


print("R-squared:", evaluation_summary.r2)
spark.stop()


Root Mean Squared Error (RMSE): 3.952468980908394
Mean Squared Error (MSE): 15.622011045043038
Mean Absolute Error (MAE): 2.9764433568995194
R-squared: 0.8024391787661578
