<a href="https://colab.research.google.com/github/EliRub1/Introduction-to-Cloud-Computing/blob/main/ItCC_Tut10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download the latest Apache Spark version
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz

# Install findspark to connect Python with Spark
!pip install -q findspark


In [None]:
# Import the os module to interact with the operating system
import os
# Import findspark to locate the Spark installation
import findspark

# Set the environment variable for Java home directory (required for Spark to run)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# Set the environment variable for Spark home directory to the downloaded Spark path
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# Initialize findspark to make pyspark importable within Python
findspark.init()


In [None]:
# Import SparkSession class from PySpark SQL module
from pyspark.sql import SparkSession

# Create a SparkSession object, which is the entry point to use Spark functionality
  # Set the name of the Spark application to be "Big Data Example"
  # Create a new SparkSession or return an existing one
spark = SparkSession.builder.appName("Big Data Example").getOrCreate()

In [None]:
# Define a list of tuples, each containing a name and a price
data = [("Tal", 120), ("Uri", 90), ("Dina", 150)]
# Define the column names for the DataFrame
columns = ["name", "price"]
# Create a DataFrame from the data and column names using the SparkSession
df = spark.createDataFrame(data, columns)
# Filter the DataFrame to include only rows where the price is greater than 100
df.filter(df["price"] > 100).show()


In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("datafiniti/consumer-reviews-of-amazon-products")

print("Path to dataset files:", path)

In [None]:
df = spark.read.csv("/kaggle/input/consumer-reviews-of-amazon-products", header=True, inferSchema=True)
df.printSchema()
df.show(50)

In [None]:
# Filter positive reviews
positive_reviews = df.filter(df["`reviews.rating`"] >= 4)

# Show examples of positive reviews
positive_reviews.select("`reviews.text`", "`reviews.rating`").show(50, truncate=False)


In [None]:
# Count how many reviews there are per numerical rating
df.filter(df["`reviews.rating`"] >= 0).groupBy("`reviews.rating`").count().orderBy("count", ascending=False).show()

Map Reducer

In [None]:
logs = [
    "192.168.1.10 - - [10/May/2025:13:00] GET /index.html",
    "172.16.0.5 - - [10/May/2025:13:01] GET /contact.html",
    "192.168.1.10 - - [10/May/2025:13:02] GET /products.html",
    "10.0.0.1 - - [10/May/2025:13:02] GET /index.html",
    "192.168.1.10 - - [10/May/2025:13:03] GET /about.html"
]

In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("IP Visit Count") \
    .getOrCreate()

# Create RDD (Resilient Distributed Dataset) from the list of logs
rdd = spark.sparkContext.parallelize(logs)

# Map step: extract the IP and map each one to (IP, 1)
ip_counts = rdd.map(lambda line: (line.split()[0], 1))

# Reduce step: sum all counts per IP
result = ip_counts.reduceByKey(lambda a, b: a + b)

# Collect the results to the driver and print
for ip, count in result.collect():
    print(f"{ip} visited {count} times")

# Stop the Spark session
spark.stop()


In [None]:
add = lambda x, y: x + y
print(add(2, 3))

In [None]:
from pyspark.sql import SparkSession

# Start SparkSession
spark = SparkSession.builder.appName("RDD Example").getOrCreate()

rdd = spark.sparkContext.parallelize([("apple", 1), ("banana", 1), ("apple", 1)])
result = rdd.reduceByKey(lambda a, b: a + b)
print(result.collect())


# *Word Count with MapReduce in PySpark:*

In [None]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .appName("Big Data Word Count with MapReduce") \
    .getOrCreate()

# Load text data (simulating a large dataset with parallelize)
text = [
    "Big data is transforming the world",
    "Apache Spark is fast and powerful",
    "Big data requires scalable tools",
    "Spark is designed for big data processing"
]

# Create RDD from text lines
rdd = spark.sparkContext.parallelize(text)

# MapReduce steps:
# Step 1: Split each line into words
words = rdd.flatMap(lambda line: line.lower().split())

# Step 2: Map each word to (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# Step 3: Reduce by key (sum counts for each word)
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

# Collect and display the results
for word, count in word_counts.collect():
    print(f"{word}: {count}")


# *Map Reduce analysis Big Data*

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("datafiniti/consumer-reviews-of-amazon-products")

print("Path to dataset files:", path)

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import isnull # Import the isnull function

df = spark.read.csv("/kaggle/input/consumer-reviews-of-amazon-products", header=True, inferSchema=True)
# Select the review column and filter out rows where 'reviews.text' is null
df_clean = df.select(col("`reviews.text`").alias("review")).filter(col("review").isNotNull())
# Create RDD from text lines
rdd = df_clean.select("review").rdd.map(lambda row: row["review"])
# MapReduce steps:
# Step 1: Split each line into words
words = rdd.flatMap(lambda line: line.lower().split())

# Step 2: Map each word to (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# Step 3: Reduce by key (sum counts for each word)
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

# Collect and display the results
for word, count in word_counts.collect():
    print(f"{word}: {count}")