# PySpark Coding Exercises
This notebook contains a series of challenging exercises on PySpark. Each exercise is designed to test your understanding of different PySpark concepts.

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=3cc50c37b4e30026a0dbb414172146d2614b7441d19e0171acbe2ca8c015226c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## Exercise 1: Word Count
Given a text file, count the frequency of each word in the file. Ignore case sensitivity and punctuations.

In [2]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Sample text data
sample_text = [
    'Hello world',
    'I am learning PySpark',
    'This is a sample',
    'PySpark is fun'
]

# Create an RDD from sample_text list
text_file = sc.parallelize(sample_text)

# Perform word count
word_counts = text_file.flatMap(lambda line: line.split(' ')) \
                     .map(lambda word: word.lower().strip('.,!?"')) \
                     .filter(lambda word: word != '') \
                     .map(lambda word: (word, 1)) \
                     .reduceByKey(lambda a, b: a + b)

# Show the word counts
result = word_counts.collect()
print(result)


[('world', 1), ('i', 1), ('am', 1), ('learning', 1), ('this', 1), ('is', 2), ('hello', 1), ('pyspark', 2), ('a', 1), ('sample', 1), ('fun', 1)]


## Exercise 2: Finding Prime Numbers
Given a list of numbers, find all the prime numbers in the list.

In [3]:
def is_prime(n):
    if n <= 1:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

# Sample list of numbers
numbers = sc.parallelize(range(1, 101))

# Find prime numbers
prime_numbers = numbers.filter(is_prime)

# Show the prime numbers
prime_numbers.collect()

[2,
 3,
 5,
 7,
 11,
 13,
 17,
 19,
 23,
 29,
 31,
 37,
 41,
 43,
 47,
 53,
 59,
 61,
 67,
 71,
 73,
 79,
 83,
 89,
 97]

## Exercise 3: Top N Frequent Words
Given a text file, find the top N most frequent words in the file. Ignore case sensitivity and punctuations.

In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Function to find top N frequent words
def top_n_frequent_words(text_rdd, N):
    word_counts = text_rdd.flatMap(lambda line: line.split(' ')) \
                          .map(lambda word: word.lower().strip('.,!?"')) \
                          .filter(lambda word: word != '') \
                          .map(lambda word: (word, 1)) \
                          .reduceByKey(lambda a, b: a + b)
    return word_counts.takeOrdered(N, key=lambda x: -x[1])

# Sample text data
sample_text = [
    'Hello world',
    'I am learning PySpark',
    'This is a sample',
    'PySpark is fun'
]

# Create an RDD from the sample_text list
text_rdd = sc.parallelize(sample_text)

# Find top 5 frequent words
top_5_words = top_n_frequent_words(text_rdd, 5)

# Show the top 5 frequent words
print(top_5_words)


[('is', 2), ('pyspark', 2), ('world', 1), ('i', 1), ('am', 1)]


## Exercise 4: Inverted Index
Given a set of text files, create an inverted index. An inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears.

In [5]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Sample text data for each file
sample_text1 = ['Hello world', 'I love PySpark']
sample_text2 = ['PySpark is great', 'Hello everyone']
sample_text3 = ['Great learning', 'Hello PySpark']

# Create RDDs from sample text data
text_file1 = sc.parallelize(sample_text1)
text_file2 = sc.parallelize(sample_text2)
text_file3 = sc.parallelize(sample_text3)

# Assign IDs to text files
text_file1 = text_file1.map(lambda line: ('file1', line))
text_file2 = text_file2.map(lambda line: ('file2', line))
text_file3 = text_file3.map(lambda line: ('file3', line))

# Combine all text files
all_text_files = text_file1.union(text_file2).union(text_file3)

# Create inverted index
inverted_index = all_text_files.flatMap(lambda x: [(word.lower().strip('.,!'), x[0]) for word in x[1].split()]) \
                                .groupByKey() \
                                .mapValues(list)

# Show the inverted index
result = inverted_index.collect()
print(result)


[('love', ['file1']), ('learning', ['file3']), ('hello', ['file1', 'file2', 'file3']), ('pyspark', ['file1', 'file2', 'file3']), ('everyone', ['file2']), ('world', ['file1']), ('i', ['file1']), ('is', ['file2']), ('great', ['file2', 'file3'])]


## Exercise 5: PageRank Algorithm
Implement the PageRank algorithm to rank a set of webpages based on their importance.

In [6]:
# Sample data representing the links between webpages
links = sc.parallelize([(1, [2, 3]), (2, [3]), (3, [1]), (4, [1, 2, 3])])
ranks = links.map(lambda x: (x[0], 1.0))

# Number of iterations
num_iterations = 10

# PageRank algorithm
for i in range(num_iterations):
    contributions = links.join(ranks).flatMap(lambda x: [(dest, x[1][1] / len(x[1][0])) for dest in x[1][0]])
    ranks = contributions.reduceByKey(lambda a, b: a + b).mapValues(lambda rank: 0.15 + 0.85 * rank)

# Show the PageRank of each webpage
ranks.collect()

[(1, 1.2446686281209396), (2, 0.6822141376476143), (3, 1.2699916385721675)]

## Exercise 6: K-means Clustering
Implement the K-means clustering algorithm to cluster a set of points into K clusters.

In [7]:
import random
from math import sqrt

# Function to calculate Euclidean distance
def euclidean_distance(x, y):
    return sqrt(sum((a - b) ** 2 for a, b in zip(x, y)))

# Function to assign points to the nearest centroid
def assign_to_centroid(point, centroids):
    distances = [euclidean_distance(point, centroid) for centroid in centroids]
    return distances.index(min(distances))

# Sample data points
points = sc.parallelize([(1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10)])

# Initialize centroids randomly
K = 3
centroids = points.takeSample(False, K)

# Number of iterations
num_iterations = 10

# K-means algorithm
for i in range(num_iterations):
    cluster_assignments = points.map(lambda point: (assign_to_centroid(point, centroids), point))
    new_centroids = cluster_assignments.groupByKey().mapValues(lambda points: tuple(sum(x) / len(points) for x in zip(*points)))
    centroids = new_centroids.collectAsMap().values()

# Show the final centroids
centroids

dict_values([(2.5, 2.5), (6.0, 6.0), (9.0, 9.0)])

## Exercise 7: Collaborative Filtering
Implement a simple collaborative filtering algorithm to make movie recommendations based on user ratings.

In [None]:
# Sample data representing the links between webpages
links = sc.parallelize([(1, [2, 3]), (2, [3]), (3, [1]), (4, [1, 2, 3])])
ranks = links.map(lambda x: (x[0], 1.0))

# Number of iterations
num_iterations = 10

# PageRank algorithm
for i in range(num_iterations):
    contributions = links.join(ranks).flatMap(lambda x: [(dest, x[1][1] / len(x[1][0])) for dest in x[1][0]])
    ranks = contributions.reduceByKey(lambda a, b: a + b).mapValues(lambda rank: 0.15 + 0.85 * rank)

# Show the PageRank of each webpage
ranks.collect()

## Exercise 6: K-means Clustering
Implement the K-means clustering algorithm to cluster a set of points into K clusters.

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Sample data representing user ratings (user_id, movie_id, rating)
ratings = sc.parallelize([(1, 101, 5), (1, 102, 4), (1, 103, 2),
                          (2, 101, 3), (2, 102, 4), (2, 104, 5),
                          (3, 101, 4), (3, 103, 3), (3, 104, 4), (3, 105, 5)])

# Calculate average rating for each movie
avg_ratings = ratings.map(lambda x: (x[1], (x[2], 1))) \
                     .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                     .mapValues(lambda x: x[0] / x[1])

# Function to make recommendations
def recommend_movies(user_id, num_recommendations):
    user_ratings_rdd = ratings.filter(lambda x: x[0] == user_id).map(lambda x: x[1])
    user_ratings_list = user_ratings_rdd.collect()
    potential_movies = avg_ratings.filter(lambda x: x[0] not in user_ratings_list)
    return potential_movies.takeOrdered(num_recommendations, key=lambda x: -x[1])

# Recommend 3 movies for user 1
recommendation_result = recommend_movies(1, 3)
print(recommendation_result)


In [None]:
import random
from math import sqrt

# Function to calculate the distance between two points
def distance(a, b):
    return sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)

# Function to find the closest centroid for a given point
def closest_centroid(point, centroids):
    return min(range(len(centroids)), key=lambda i: distance(point, centroids[i]))

# Sample data points
points = sc.parallelize([(random.uniform(0, 10), random.uniform(0, 10)) for _ in range(100)])

# Initial centroids
initial_centroids = points.takeSample(False, 3)
centroids = sc.broadcast(initial_centroids)

# Number of iterations
num_iterations = 10

# K-means algorithm
for i in range(num_iterations):
    closest = points.map(lambda point: (closest_centroid(point, centroids.value), (point, 1)))
    new_centroids = closest.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1]), a[1] + b[1]))
    new_centroids = new_centroids.mapValues(lambda x: (x[0][0] / x[1], x[0][1] / x[1])).collectAsMap()
    centroids = sc.broadcast(new_centroids)

# Show the final centroids
centroids.value

## Exercise 7: Collaborative Filtering
Implement a simple collaborative filtering algorithm to make movie recommendations.

## Exercise 8: Anomaly Detection
Implement an anomaly detection algorithm to identify outliers in a dataset.

In [None]:
# Sample data points
data_points = sc.parallelize([random.gauss(0, 1) for _ in range(100)] + [10, -10, 15, -15])

# Calculate mean and standard deviation
mean = data_points.mean()
stddev = data_points.stdev()

# Anomaly detection algorithm
threshold = 3 * stddev
anomalies = data_points.filter(lambda x: abs(x - mean) > threshold)

# Show anomalies
anomalies.collect()

## Exercise 9: Sentiment Analysis
Implement a simple sentiment analysis algorithm to classify the sentiment of sentences in a text file.

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Sample sentiment words
positive_words = sc.parallelize(['good', 'great', 'awesome', 'excellent', 'positive'])
negative_words = sc.parallelize(['bad', 'terrible', 'awful', 'poor', 'negative'])

# Sample text data
sample_sentences = ['This is a good day',
                    'I had a terrible experience',
                    'What an awesome event',
                    'This is neither good nor bad']

# Create an RDD from the sample_sentences list
text_file = sc.parallelize(sample_sentences)

# Collect positive and negative words to Python lists (you only need to do this once)
positive_list = positive_words.collect()
negative_list = negative_words.collect()

# Sentiment analysis algorithm
sentiments = text_file.map(lambda line: line.lower()) \
                      .map(lambda line: (line, 1 if any(word in line for word in positive_list) else -1 if any(word in line for word in negative_list) else 0))

# Show sentiments
result = sentiments.collect()
print(result)


## Exercise 10: Linear Regression
Implement a simple linear regression algorithm to model the relationship between two variables.

In [None]:
# Sample data points (x, y)
data_points = sc.parallelize([(1, 2), (2, 4), (3, 3), (4, 5), (5, 4)])

# Calculate the mean of x and y
mean_x = data_points.map(lambda x: x[0]).mean()
mean_y = data_points.map(lambda x: x[1]).mean()

# Calculate the slope (b1) and intercept (b0) of the line
b1 = data_points.map(lambda x: (x[0] - mean_x) * (x[1] - mean_y)).sum() / data_points.map(lambda x: (x[0] - mean_x) ** 2).sum()
b0 = mean_y - b1 * mean_x

# Linear regression algorithm
predictions = data_points.map(lambda x: (x[0], b0 + b1 * x[0]))

# Show predictions
predictions.collect()

## Exercise 11: Support Vector Classification (SVC)
Implement a simple Support Vector Classification algorithm to classify data points into two classes.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LinearSVC
from pyspark.ml.linalg import Vectors

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Python Spark LinearSVC example") \
    .config("spark.some.config.option", "config-value") \
    .getOrCreate()

# Sample data points
data = [(Vectors.dense([0.0, 1.0]), 1.0),
        (Vectors.dense([1.0, 0.0]), 0.0),
        (Vectors.dense([-1.0, -1.0]), 0.0),
        (Vectors.dense([1.0, 1.0]), 1.0)]

# Create a DataFrame
df = spark.createDataFrame(data, ["features", "label"])

# Initialize the LinearSVC model
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(df)

# Make predictions
predictions = lsvcModel.transform(df)

# Show predictions
predictions.show()


## Exercise 12: Support Vector Regression (SVR)
Implement a simple Support Vector Regression algorithm to model the relationship between two variables.

In [None]:
from pyspark.ml.regression import LinearRegression

# Sample data points
data = [(Vectors.dense([0.0]), 1.0),
        (Vectors.dense([1.0]), 2.0),
        (Vectors.dense([2.0]), 1.0),
        (Vectors.dense([3.0]), 3.0)]
df = spark.createDataFrame(data, ['features', 'label'])

# Initialize the LinearRegression model
lr = LinearRegression(maxIter=10, regParam=0.1)

# Fit the model
lrModel = lr.fit(df)

# Make predictions
predictions = lrModel.transform(df)

# Show predictions
predictions.show()

## Exercise 13: Decision Tree Classification
Implement a simple Decision Tree Classification algorithm to classify data points into multiple classes.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Sample data points
data = [(Vectors.dense([0.0, 1.0]), 1.0),
        (Vectors.dense([1.0, 0.0]), 0.0),
        (Vectors.dense([-1.0, -1.0]), 0.0),
        (Vectors.dense([1.0, 1.0]), 1.0)]
df = spark.createDataFrame(data, ['features', 'label'])

# Initialize the DecisionTreeClassifier model
dt = DecisionTreeClassifier()

# Fit the model
dtModel = dt.fit(df)

# Make predictions
predictions = dtModel.transform(df)

# Show predictions
predictions.show()

## Exercise 14: Naive Bayes Classification
Implement a simple Naive Bayes Classification algorithm to classify text documents.

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import HashingTF, Tokenizer

# Sample text data
data = [(0, 'good movie'),
        (0, 'excellent film'),
        (1, 'bad acting'),
        (1, 'terrible plot')]
df = spark.createDataFrame(data, ['label', 'text'])

# Tokenize the text
tokenizer = Tokenizer(inputCol='text', outputCol='words')
wordsData = tokenizer.transform(df)

# Hash the words
hashingTF = HashingTF(inputCol='words', outputCol='features')
featurizedData = hashingTF.transform(wordsData)

# Initialize the NaiveBayes model
nb = NaiveBayes()

# Fit the model
nbModel = nb.fit(featurizedData)

# Make predictions
predictions = nbModel.transform(featurizedData)

# Show predictions
predictions.show()

## Exercise 15: Gradient Boosting
Implement a simple Gradient Boosting algorithm to improve the performance of a weak learner.

In [None]:
from pyspark.ml.classification import GBTClassifier

# Sample data points
data = [(Vectors.dense([0.0, 1.0]), 1.0),
        (Vectors.dense([1.0, 0.0]), 0.0),
        (Vectors.dense([-1.0, -1.0]), 0.0),
        (Vectors.dense([1.0, 1.0]), 1.0)]
df = spark.createDataFrame(data, ['features', 'label'])

# Initialize the GBTClassifier model
gbt = GBTClassifier(maxIter=10)

# Fit the model
gbtModel = gbt.fit(df)

# Make predictions
predictions = gbtModel.transform(df)

# Show predictions
predictions.show()

## Exercise 16: Logistic Regression
Implement a simple Logistic Regression algorithm to classify data points into two classes.

In [None]:
from pyspark.ml.classification import LogisticRegression

# Sample data points
data = [(Vectors.dense([0.0, 1.0]), 1.0),
        (Vectors.dense([1.0, 0.0]), 0.0),
        (Vectors.dense([-1.0, -1.0]), 0.0),
        (Vectors.dense([1.0, 1.0]), 1.0)]
df = spark.createDataFrame(data, ['features', 'label'])

# Initialize the LogisticRegression model
lr = LogisticRegression(maxIter=10, regParam=0.1)

# Fit the model
lrModel = lr.fit(df)

# Make predictions
predictions = lrModel.transform(df)

# Show predictions
predictions.show()

In [None]:
# Note: PySpark doesn't have a built-in Hierarchical Clustering algorithm, but you can implement it using RDDs.
# Here's a simple example using scipy for the actual clustering part.
import numpy as np
from scipy.cluster.hierarchy import linkage, dendrogram
import matplotlib.pyplot as plt

# Sample data points
data = np.array([[1, 2], [2, 3], [3, 4], [4, 5], [11, 12], [12, 13], [13, 14], [14, 15]])

# Perform hierarchical clustering
Z = linkage(data, 'ward')

# Plot dendrogram
plt.figure(figsize=(10, 6))
dendrogram(Z)
plt.show()

## Exercise 18: K-Nearest Neighbors (KNN)
Implement a simple K-Nearest Neighbors algorithm to classify a new data point based on its nearest neighbors.

In [None]:
# Note: PySpark doesn't have a built-in K-Nearest Neighbors algorithm, but you can implement it using RDDs.
# Here's a simple example using scikit-learn for the actual KNN part.
from sklearn.neighbors import KNeighborsClassifier

# Sample data points and labels
X = [[0, 1], [1, 0], [-1, -1], [1, 1]]
y = [1, 0, 0, 1]

# Initialize the KNN classifier
knn = KNeighborsClassifier(n_neighbors=3)

# Fit the model
knn.fit(X, y)

# Make a prediction
prediction = knn.predict([[0.5, 0.5]])
print('Prediction:', prediction)

## Exercise 19: Random Forest
Implement a simple Random Forest algorithm to classify data points into multiple classes.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Sample data points
data = [(Vectors.dense([0.0, 1.0]), 1.0),
        (Vectors.dense([1.0, 0.0]), 0.0),
        (Vectors.dense([-1.0, -1.0]), 0.0),
        (Vectors.dense([1.0, 1.0]), 1.0)]
df = spark.createDataFrame(data, ['features', 'label'])

# Initialize the RandomForestClassifier model
rf = RandomForestClassifier(numTrees=10)

# Fit the model
rfModel = rf.fit(df)

# Make predictions
predictions = rfModel.transform(df)

# Show predictions
predictions.show()

## Exercise 20: AdaBoost
Implement a simple AdaBoost algorithm to improve the performance of a weak learner.

In [None]:
# Note: PySpark doesn't have a built-in AdaBoost algorithm, but you can implement it using RDDs.
# Here's a simple example using scikit-learn for the actual AdaBoost part.
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier

# Sample data points and labels
X = [[0, 1], [1, 0], [-1, -1], [1, 1]]
y = [1, 0, 0, 1]

# Initialize the weak learner
dt = DecisionTreeClassifier(max_depth=1)

# Initialize the AdaBoost classifier
ab = AdaBoostClassifier(base_estimator=dt, n_estimators=10)

# Fit the model
ab.fit(X, y)

# Make a prediction
prediction = ab.predict([[0.5, 0.5]])
print('Prediction:', prediction)

## Exercise 21: Principal Component Analysis (PCA)
Implement Principal Component Analysis to reduce the dimensionality of a dataset.

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

# Sample data points
data = [(Vectors.dense([1.0, 2.0, 3.0]),),
        (Vectors.dense([4.0, 5.0, 6.0]),),
        (Vectors.dense([7.0, 8.0, 9.0]),)]
df = spark.createDataFrame(data, ['features'])

# Initialize the PCA model
pca = PCA(k=2, inputCol='features', outputCol='pca_features')

# Fit the model
pca_model = pca.fit(df)

# Transform the data
result = pca_model.transform(df)

# Show the transformed data
result.show()

## Exercise 22: Latent Dirichlet Allocation (LDA)
Implement Latent Dirichlet Allocation to discover topics in a collection of text documents.

In [None]:
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import Vectors

# Sample data points
data = [(Vectors.dense([1.0, 2.0, 6.0, 0.0, 2.0]),),
        (Vectors.dense([1.0, 3.0, 0.0, 1.0, 3.0]),),
        (Vectors.dense([1.0, 4.0, 1.0, 0.0, 4.0]),),
        (Vectors.dense([1.0, 2.0, 0.0, 5.0, 5.0]),),
        (Vectors.dense([1.0, 1.0, 1.0, 4.0, 4.0]),)]
df = spark.createDataFrame(data, ['features'])

# Initialize the LDA model
lda = LDA(k=2, maxIter=10)

# Fit the model
lda_model = lda.fit(df)

# Describe topics
topics = lda_model.describeTopics(3)
topics.show()

## Exercise 23: K-Means Clustering
Implement K-Means Clustering to partition a dataset into clusters.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Sample data points
data = [(Vectors.dense([0.0, 0.0]),),
        (Vectors.dense([1.0, 1.0]),),
        (Vectors.dense([9.0, 8.0]),),
        (Vectors.dense([8.0, 9.0]),)]
df = spark.createDataFrame(data, ['features'])

# Initialize the KMeans model
kmeans = KMeans().setK(2).setSeed(1)

# Fit the model
kmeans_model = kmeans.fit(df)

# Make predictions
predictions = kmeans_model.transform(df)

# Show the cluster centers
centers = kmeans_model.clusterCenters()
print('Cluster Centers: ', centers)

## Exercise 24: Gaussian Mixture Model (GMM)
Implement Gaussian Mixture Model to fit a mixture of Gaussian distributions to a dataset.

In [None]:
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.linalg import Vectors

# Sample data points
data = [(Vectors.dense([0.0, 0.0]),),
        (Vectors.dense([1.0, 1.0]),),
        (Vectors.dense([9.0, 8.0]),),
        (Vectors.dense([8.0, 9.0]),)]
df = spark.createDataFrame(data, ['features'])

# Initialize the GaussianMixture model
gmm = GaussianMixture().setK(2).setSeed(1)

# Fit the model
gmm_model = gmm.fit(df)

# Make predictions
predictions = gmm_model.transform(df)

# Show the Gaussian distributions
gaussians = gmm_model.gaussiansDF
gaussians.show()

## Exercise 25: Isotonic Regression
Implement Isotonic Regression to fit a non-decreasing function to a dataset.

In [None]:
from pyspark.ml.regression import IsotonicRegression

# Sample data points
data = [(1.0, 1.0), (2.0, 2.0), (3.0, 3.0), (4.0, 4.0), (5.0, 5.0), (6.0, 6.0), (7.0, 7.0)]
df = spark.createDataFrame(data, ['feature', 'label'])

# Initialize the IsotonicRegression model
ir = IsotonicRegression(featuresCol='feature', labelCol='label')

# Fit the model
ir_model = ir.fit(df)

# Make predictions
predictions = ir_model.transform(df)

# Show the predictions
predictions.show()

# Deep Learning Exercises using PySpark

## Exercise 26: Feedforward Neural Network
Implement a simple feedforward neural network for classification.

In [None]:
# Note: PySpark doesn't have built-in support for deep learning, but you can use libraries like Elephas or TensorFlowOnSpark.
# Here's a simple example using Keras and Elephas.
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
from elephas.ml_model import ElephasEstimator

# Define a Keras model
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

# Initialize optimizer
sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='categorical_crossentropy', optimizer=sgd)

# Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd.get_config())
estimator.set_mode('synchronous')
estimator.set_loss('categorical_crossentropy')
estimator.set_metrics(['acc'])

# You can now use this estimator as a Spark ML Estimator

## Exercise 27: Convolutional Neural Network (CNN)
Implement a simple Convolutional Neural Network for image classification.

In [None]:
# Note: PySpark doesn't have built-in support for deep learning, but you can use libraries like Elephas or TensorFlowOnSpark.
# Here's a simple example using Keras and Elephas.
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from elephas.ml_model import ElephasEstimator

# Define a Keras model
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config('adam')
estimator.set_mode('synchronous')
estimator.set_loss('categorical_crossentropy')
estimator.set_metrics(['acc'])

# You can now use this estimator as a Spark ML Estimator

## Exercise 29: Long Short-Term Memory (LSTM)
Implement a simple Long Short-Term Memory network for sequence prediction.

In [None]:
# Note: PySpark doesn't have built-in support for deep learning, but you can use libraries like Elephas or TensorFlowOnSpark.
# Here's a simple example using Keras and Elephas.
from keras.models import Sequential
from keras.layers import LSTM, Dense
from elephas.ml_model import ElephasEstimator

# Define a Keras model
model = Sequential()
model.add(LSTM(32, input_shape=(10, 64)))
model.add(Dense(10, activation='softmax'))

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config('adam')
estimator.set_mode('synchronous')
estimator.set_loss('categorical_crossentropy')
estimator.set_metrics(['acc'])

# You can now use this estimator as a Spark ML Estimator

## Exercise 30: TensorFlowOnSpark: Basic Classification
Implement a basic classification model using TensorFlowOnSpark.

In [None]:
# Note: TensorFlowOnSpark (TFoS) allows distributed TensorFlow execution on Spark clusters.
# Here's a simple example of a basic classification model using TensorFlowOnSpark.
from tensorflow import keras
from tensorflow.keras import layers
from tensorflowonspark import TFCluster
import tensorflow as tf
import argparse

# Define TensorFlow model
def model_fn(features, labels, mode):
    input_layer = layers.Input(shape=(784,))
    hidden_layer = layers.Dense(128, activation='relu')(input_layer)
    output_layer = layers.Dense(10, activation='softmax')(hidden_layer)
    model = keras.Model(inputs=input_layer, outputs=output_layer)

    loss = tf.losses.sparse_categorical_crossentropy(labels, output_layer)
    train_op = tf.train.AdamOptimizer().minimize(loss)

    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--batch_size', type=int, default=100)
parser.add_argument('--epochs', type=int, default=1)
args = parser.parse_args()

# Initialize TensorFlowOnSpark
cluster = TFCluster.run(sc, model_fn, args, num_executors=4, num_ps=1, tensorboard=True, input_mode=TFCluster.InputMode.SPARK)

# Train the model
cluster.train(train_data, args.epochs)

# Shutdown the cluster
cluster.shutdown()

## Exercise 31: TensorFlowOnSpark: Image Classification
Implement an image classification model using TensorFlowOnSpark.

In [None]:
# Note: TensorFlowOnSpark (TFoS) allows distributed TensorFlow execution on Spark clusters.
# Here's a simple example of a text classification model using TensorFlowOnSpark.
from tensorflow import keras
from tensorflow.keras import layers
from tensorflowonspark import TFCluster
import tensorflow as tf
import argparse

# Define TensorFlow model
def model_fn(features, labels, mode):
    input_layer = layers.Input(shape=(100,))
    embedding = layers.Embedding(input_dim=5000, output_dim=128)(input_layer)
    lstm = layers.LSTM(128)(embedding)
    output_layer = layers.Dense(1, activation='sigmoid')(lstm)
    model = keras.Model(inputs=input_layer, outputs=output_layer)

    loss = tf.losses.binary_crossentropy(labels, output_layer)
    train_op = tf.train.AdamOptimizer().minimize(loss)

    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--batch_size', type=int, default=100)
parser.add_argument('--epochs', type=int, default=1)
args = parser.parse_args()

# Initialize TensorFlowOnSpark
cluster = TFCluster.run(sc, model_fn, args, num_executors=4, num_ps=1, tensorboard=True, input_mode=TFCluster.InputMode.SPARK)

# Train the model
cluster.train(train_data, args.epochs)

# Shutdown the cluster
cluster.shutdown()

## Exercise 33: TensorFlowOnSpark: Autoencoder
Implement an autoencoder model using TensorFlowOnSpark.

In [None]:
# Note: TensorFlowOnSpark (TFoS) allows distributed TensorFlow execution on Spark clusters.
# Here's a simple example of an autoencoder model using TensorFlowOnSpark.
from tensorflow import keras
from tensorflow.keras import layers
from tensorflowonspark import TFCluster
import tensorflow as tf
import argparse

# Define TensorFlow model
def model_fn(features, labels, mode):
    input_layer = layers.Input(shape=(784,))
    encoded = layers.Dense(128, activation='relu')(input_layer)
    encoded = layers.Dense(64, activation='relu')(encoded)
    decoded = layers.Dense(128, activation='relu')(encoded)
    output_layer = layers.Dense(784, activation='sigmoid')(decoded)
    model = keras.Model(inputs=input_layer, outputs=output_layer)

    loss = tf.losses.mean_squared_error(labels, output_layer)
    train_op = tf.train.AdamOptimizer().minimize(loss)

    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--batch_size', type=int, default=100)
parser.add_argument('--epochs', type=int, default=1)
args = parser.parse_args()

# Initialize TensorFlowOnSpark
cluster = TFCluster.run(sc, model_fn, args, num_executors=4, num_ps=1, tensorboard=True, input_mode=TFCluster.InputMode.SPARK)

# Train the model
cluster.train(train_data, args.epochs)

# Shutdown the cluster
cluster.shutdown()

## Exercise 34: TensorFlowOnSpark: Generative Adversarial Network (GAN)
Implement a Generative Adversarial Network using TensorFlowOnSpark.

In [None]:
# Note: TensorFlowOnSpark (TFoS) allows distributed TensorFlow execution on Spark clusters.
# Here's a simple example of a Generative Adversarial Network (GAN) using TensorFlowOnSpark.
from tensorflow import keras
from tensorflow.keras import layers
from tensorflowonspark import TFCluster
import tensorflow as tf
import argparse

# Define TensorFlow model
def model_fn(features, labels, mode):
    # Generator
    generator_input = layers.Input(shape=(100,))
    x = layers.Dense(128, activation='relu')(generator_input)
    x = layers.Dense(784, activation='sigmoid')(x)
    generator = keras.Model(generator_input, x)

    # Discriminator
    discriminator_input = layers.Input(shape=(784,))
    x = layers.Dense(128, activation='relu')(discriminator_input)
    x = layers.Dense(1, activation='sigmoid')(x)
    discriminator = keras.Model(discriminator_input, x)

    # GAN
    gan_input = layers.Input(shape=(100,))
    gan_output = discriminator(generator(gan_input))
    gan = keras.Model(gan_input, gan_output)

    # Compile models
    discriminator.compile(optimizer='adam', loss='binary_crossentropy')
    discriminator.trainable = False
    gan.compile(optimizer='adam', loss='binary_crossentropy')

    # Training logic here...

    return None  # This is a placeholder. Actual implementation will involve custom training loops.

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--batch_size', type=int, default=100)
parser.add_argument('--epochs', type=int, default=1)
args = parser.parse_args()

# Initialize TensorFlowOnSpark
cluster = TFCluster.run(sc, model_fn, args, num_executors=4, num_ps=1, tensorboard=True, input_mode=TFCluster.InputMode.SPARK)

# Train the model
cluster.train(train_data, args.epochs)

# Shutdown the cluster
cluster.shutdown()

## Simple Coding Exercises using PySpark
These exercises will cover basic programming concepts like loops, if-else statements, functions, and object-oriented programming (OOP) in PySpark.

### Exercise 35: Using Loops in PySpark
Write a PySpark code snippet that uses a loop to filter out even numbers from an RDD.

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Use a loop to filter out even numbers
for i in range(2, 21, 2):
    rdd = rdd.filter(lambda x: x != i)

# Collect and print the RDD
print(rdd.collect())

### Exercise 36: Using If-Else Statements in PySpark
Write a PySpark code snippet that uses an if-else statement to filter numbers greater than 10 from an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Use an if-else statement to filter numbers greater than 10
filtered_rdd = rdd.filter(lambda x: x > 10 if x else None)

# Collect and print the RDD
print(filtered_rdd.collect())

### Exercise 37: Using Functions in PySpark
Write a PySpark code snippet that uses a function to square the numbers in an RDD.

In [None]:
# Define a function to square a number
def square(x):
    return x * x

# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Use the function to square the numbers in the RDD
squared_rdd = rdd.map(square)

# Collect and print the RDD
print(squared_rdd.collect())

### Exercise 38: Using Object-Oriented Programming (OOP) in PySpark
Write a PySpark code snippet that uses a class to encapsulate the logic for filtering even numbers from an RDD.

In [None]:
# Define a class to encapsulate the logic for filtering even numbers
class EvenNumberFilter:
    def __init__(self, rdd):
        self.rdd = rdd

    def filter_even_numbers(self):
        return self.rdd.filter(lambda x: x % 2 == 0)

# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Create an instance of the class
even_filter = EvenNumberFilter(rdd)

# Use the class method to filter even numbers
filtered_rdd = even_filter.filter_even_numbers()

# Collect and print the RDD
print(filtered_rdd.collect())

### Exercise 39: Using Nested Loops in PySpark
Write a PySpark code snippet that uses nested loops to create an RDD of tuples containing all pairs of numbers from two different RDDs.

In [None]:
# Create two RDDs
rdd1 = sc.parallelize(range(1, 4))
rdd2 = sc.parallelize(range(4, 7))

# Use nested loops to create an RDD of tuples containing all pairs of numbers from the two RDDs
pairs = []
for num1 in rdd1.collect():
    for num2 in rdd2.collect():
        pairs.append((num1, num2))

# Create an RDD from the list of pairs
pair_rdd = sc.parallelize(pairs)

# Collect and print the RDD
print(pair_rdd.collect())

### Exercise 40: Using If-Elif-Else Statements in PySpark
Write a PySpark code snippet that uses an if-elif-else statement to categorize numbers in an RDD into 'small', 'medium', or 'large'.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Use an if-elif-else statement to categorize numbers
categorized_rdd = rdd.map(lambda x: 'small' if x <= 7 else ('medium' if x <= 14 else 'large'))

# Collect and print the RDD
print(categorized_rdd.collect())

In [None]:
# Define a recursive function to calculate factorial
def factorial(n):
    return 1 if n == 0 else n * factorial(n-1)

# Create an RDD
rdd = sc.parallelize(range(1, 6))

# Use the recursive function to calculate the factorial of numbers in the RDD
factorial_rdd = rdd.map(factorial)

# Collect and print the RDD
print(factorial_rdd.collect())

### Exercise 42: Using Class Inheritance in PySpark
Write a PySpark code snippet that uses class inheritance to create a specialized filter class that filters out numbers less than 5 from an RDD.

In [None]:
# Define a base class for filtering numbers
class NumberFilter:
    def __init__(self, rdd):
        self.rdd = rdd

    def filter_numbers(self):
        return self.rdd

# Define a specialized filter class that inherits from the base class
class SpecializedFilter(NumberFilter):
    def filter_numbers(self):
        return self.rdd.filter(lambda x: x >= 5)

# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Create an instance of the specialized filter class
special_filter = SpecializedFilter(rdd)

# Use the class method to filter numbers less than 5
filtered_rdd = special_filter.filter_numbers()

# Collect and print the RDD
print(filtered_rdd.collect())

### Exercise 43: Using Nested If-Else Statements in PySpark
Write a PySpark code snippet that uses nested if-else statements to categorize numbers in an RDD into 'small', 'medium', 'large', or 'extra large'.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Use nested if-else statements to categorize numbers
categorized_rdd = rdd.map(lambda x: 'small' if x <= 5 else ('medium' if x <= 10 else ('large' if x <= 15 else 'extra large')))

# Collect and print the RDD
print(categorized_rdd.collect())

### Exercise 44: Using For-Else Loop in PySpark
Write a PySpark code snippet that uses a for-else loop to find the first number that is divisible by 7 in an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Use a for-else loop to find the first number that is divisible by 7
for num in rdd.collect():
    if num % 7 == 0:
        print(f'The first number divisible by 7 is {num}')
        break
else:
    print('No number is divisible by 7')

### Exercise 45: Using Function Overloading in PySpark
Write a PySpark code snippet that uses function overloading to create two versions of a function: one that squares a number and another that cubes a number.

In [None]:
# Define a function to square or cube a number
def power(x, n=2):
    return x ** n

# Create an RDD
rdd = sc.parallelize(range(1, 6))

# Use the function to square the numbers in the RDD
squared_rdd = rdd.map(lambda x: power(x))

# Use the function to cube the numbers in the RDD
cubed_rdd = rdd.map(lambda x: power(x, 3))

# Collect and print the RDDs
print('Squared RDD:', squared_rdd.collect())
print('Cubed RDD:', cubed_rdd.collect())

### Exercise 46: Using While Loop in PySpark
Write a PySpark code snippet that uses a while loop to find the sum of numbers in an RDD until the sum becomes greater than 50.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 21))

# Initialize sum and counter
sum_numbers = 0
counter = 0

# Use a while loop to find the sum of numbers until the sum becomes greater than 50
while sum_numbers <= 50:
    sum_numbers += rdd.collect()[counter]
    counter += 1

# Print the sum and the number of elements summed
print(f'Sum of first {counter} numbers is {sum_numbers}')

### Exercise 47: Using Function Composition in PySpark
Write a PySpark code snippet that uses function composition to apply multiple transformations to an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Define functions for transformations
def square(x):
    return x ** 2

def add_one(x):
    return x + 1

# Use function composition to apply multiple transformations
transformed_rdd = rdd.map(square).map(add_one)

# Collect and print the RDD
print(transformed_rdd.collect())

### Exercise 48: Using Function as a Parameter in PySpark
Write a PySpark code snippet that uses a function as a parameter to another function to apply transformations to an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Define a function that takes another function as a parameter
def apply_transformation(rdd, func):
    return rdd.map(func)

# Define a function to square a number
def square(x):
    return x ** 2

# Use the function that takes another function as a parameter
transformed_rdd = apply_transformation(rdd, square)

# Collect and print the RDD
print(transformed_rdd.collect())

### Exercise 49: Using Nested Functions in PySpark
Write a PySpark code snippet that uses nested functions to apply a sequence of transformations to an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Define a function with nested functions
def apply_nested_transformations(rdd):
    def square(x):
        return x ** 2
    def add_one(x):
        return x + 1
    return rdd.map(square).map(add_one)

# Use the function with nested functions to apply transformations
transformed_rdd = apply_nested_transformations(rdd)

# Collect and print the RDD
print(transformed_rdd.collect())

### Exercise 50: Using Anonymous Functions in PySpark
Write a PySpark code snippet that uses anonymous (lambda) functions to apply transformations to an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Use anonymous functions to apply transformations
transformed_rdd = rdd.map(lambda x: x ** 2).map(lambda x: x + 1)

# Collect and print the RDD
print(transformed_rdd.collect())

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Define a function that takes another function as a parameter
def apply_transformation(rdd, func):
    return rdd.map(func)

# Define a function for transformation
def square(x):
    return x ** 2

# Use the function as a parameter to apply transformation
transformed_rdd = apply_transformation(rdd, square)

# Collect and print the RDD
print(transformed_rdd.collect())

### Exercise 49: Using Function Chaining in PySpark
Write a PySpark code snippet that uses function chaining to apply multiple transformations to an RDD in a single line.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Define functions for transformations
def square(x):
    return x ** 2

def add_one(x):
    return x + 1

# Use function chaining to apply multiple transformations in a single line
transformed_rdd = rdd.map(square).map(add_one)

# Collect and print the RDD
print(transformed_rdd.collect())

### Exercise 50: Using Anonymous Functions in PySpark
Write a PySpark code snippet that uses an anonymous function (lambda function) to apply a transformation to an RDD.

In [None]:
# Create an RDD
rdd = sc.parallelize(range(1, 11))

# Use an anonymous function to square the numbers in the RDD
squared_rdd = rdd.map(lambda x: x ** 2)

# Collect and print the RDD
print(squared_rdd.collect())