In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=ee97b7fbf948447851adeded28e66cc0e4ee1178c4657c99d488e06dd87c2136
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark import SparkContext
import numpy as np

# Initialize SparkContext
sc = SparkContext("local", "RDD Tasks")

# Generating 100 random numbers in the range 0 to 10 using numpy randint function with the seed set to 10
np.random.seed(10)
random_numbers = np.random.randint(0, 11, 100)

# Create an RDD using the parallelize function with the data generated in the previous step
rdd = sc.parallelize(random_numbers)

# Calculate the frequency of each number (0 - 10) using the appropriate function of RDD
frequency_count = rdd.countByValue()
print(random_numbers,'\n')
# Print the frequency of each number
for number, count in sorted(frequency_count.items()):
    if count==1:
        print(f"{number} has {count} occurrences")
    else:
        print(f"{number} has {count} occurrences")
# Stop SparkContext
sc.stop()

[ 9  4  0  1  9  0  1 10  8  9  0 10  8  6  4  3  0  4  6  8 10  1  8  4
  1  3  6  5  3  9  6  9  1  9  4  2  6  7  8 10  8  9  2  0  6  7  8  1
  7  1  4 10  0  8  5  4  7  8  8  2  6  2  8  8  6  6  5 10  6  0  0  6
  9  1  8 10  9  1  2  8  9  9  5  0  2  7  3  0  4  2  0  3  3  1  2  5
  9  0 10  1] 

0 has 12 occurrences
1 has 11 occurrences
2 has 8 occurrences
3 has 6 occurrences
4 has 8 occurrences
5 has 5 occurrences
6 has 11 occurrences
7 has 5 occurrences
8 has 14 occurrences
9 has 12 occurrences
10 has 8 occurrences


In [13]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "RDD Tasks")

# Load the text8 dataset into an RDD
text8_rdd = sc.textFile("text8")

# Split each line into words
words = text8_rdd.flatMap(lambda line: line.split(" "))

# Map each word to a key-value pair with the word as the key and 1 as the value
word_counts = words.map(lambda word: (word, 1))

# ReduceByKey to calculate the frequency of each word
word_frequencies = word_counts.reduceByKey(lambda a, b: a + b)

# Filter the RDD to get frequencies of words containing the letter 'a'
words_with_a = word_frequencies.filter(lambda x: 'a' in x[0])

# Print the frequencies of words containing the letter 'a'
for word, frequency in words_with_a.collect():
    if frequency==1:
        print(f"{word} occurs {frequency} time")
    else:
        print(f"{word} occurs {frequency} times")
# Stop SparkContext
sc.stop()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
amulet occurs 1 time
aldeburgh occurs 1 time
naze occurs 1 time
urals occurs 1 time
olbia occurs 1 time
massilia occurs 3 times
hatria occurs 2 times
cathrine occurs 1 time
ambers occurs 2 times
hymenea occurs 1 time
gedanite occurs 1 time
gedanum occurs 1 time
stantienite occurs 1 time
gutta occurs 1 time
percha occurs 1 time
krantzite occurs 1 time
allingite occurs 1 time
roumanite occurs 1 time
giaretta occurs 1 time
catania occurs 1 time
cesena occurs 1 time
fiat occurs 1 time
hukawng occurs 1 time
nangotaimaw occurs 1 time
mandalay occurs 1 time
schraufite occurs 1 time
carpathian occurs 1 time
ambrite occurs 1 time
coals occurs 1 time
copaline occurs 1 time
highgate occurs 1 time
chemawinite occurs 1 time
cedarite occurs 1 time
ammolite occurs 1 time
labdatriene occurs 1 time
villanueva occurs 1 time
richa occurs 1 time
arkivoc occurs 1 time
emporia occurs 2 times
gdansk occurs 1 time
amalaric occurs 7 times
amalari

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrame Tasks").getOrCreate()

# Define the schema for the iris JSON data
iris_schema = StructType([
    StructField("sepalLength", FloatType(), True),
    StructField("sepalWidth", FloatType(), True),
    StructField("petalLength", FloatType(), True),
    StructField("petalWidth", FloatType(), True),
    StructField("species", StringType(), True),
])

# Load the iris JSON data into a DataFrame with the defined schema
iris_df = spark.read.json("iris.json", schema=iris_schema)

# We calculate Pearson Correlation between petalLength and petalWidth
correlation_result = iris_df.stat.corr("petalLength", "petalWidth")

print(f"Pearson Correlation between petalLength and petalWidth: {correlation_result}")

# We Show columns sepalLength, sepalWidth, and species for rows with petalLength >= 1.4
df = iris_df.filter(col("petalLength") >= 1.4).select("sepalLength", "sepalWidth", "species")

print("Rows with petalLength >= 1.4:")
df.show()

# Stop Spark session
spark.stop()


Pearson Correlation between petalLength and petalWidth: 0.9626417198678409
Rows with petalLength >= 1.4:
+-----------+----------+-------+
|sepalLength|sepalWidth|species|
+-----------+----------+-------+
|        4.6|       3.1| setosa|
|        5.4|       3.9| setosa|
|        5.0|       3.4| setosa|
|        4.9|       3.1| setosa|
|        5.4|       3.7| setosa|
|        4.8|       3.4| setosa|
|        5.7|       4.4| setosa|
|        5.7|       3.8| setosa|
|        5.1|       3.8| setosa|
|        5.4|       3.4| setosa|
|        5.1|       3.7| setosa|
|        5.1|       3.3| setosa|
|        4.8|       3.4| setosa|
|        5.0|       3.0| setosa|
|        5.0|       3.4| setosa|
|        5.2|       3.5| setosa|
|        4.7|       3.2| setosa|
|        4.8|       3.1| setosa|
|        5.4|       3.4| setosa|
|        5.2|       4.1| setosa|
+-----------+----------+-------+
only showing top 20 rows

