# Getting Started

In [None]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/usr/local/spark-3.5.0-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

## Interacting with the System 

SparkContext was the preferred entry point for interacting with the system.It allows connecting to the cluster and enables a user to create RDDs, accumulators, and broadcast variables, as well as access system services. SparkContext also enables access to SQLContext and HiveContext, which provide additional functionality for working with structured and semi-structured data.

SparkSession was introduced in Spark 2.0 and quickly became the preferred entry point for programming with DataFrames and Datasets, which are higher-level abstractions than RDDs. SparkSession internally creates a SparkContext object, which can be accessed through the sparkContext attribute. Therefore, you can still use SparkContext methods and features through SparkSession. SparkSession also provides a unified interface to access various data sources and formats, such as Parquet, ORC, JSON, CSV, JDBC, and Hive. SparkSession also integrates with popular Spark libraries, such as Spark Streaming or MLlib. In the following we mainly use the SparkSession interface.

In [None]:
# Create a SparkSession
from pyspark.sql import SparkSession
ss = SparkSession.builder \
    .appName("bchwtz-bdda-session") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

In [None]:
# Explicitly access the SparkContext
sc = ss.sparkContext

In [None]:
ss

In [None]:
sc

# Resilient Distributed Datasets ([Doku](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html))

Apache Spark has its architectural foundation in the resilient distributed dataset (RDD), a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way. The Dataframe API was released as an abstraction on top of the RDD, followed by the Dataset API. In Spark 1.x, the RDD was the primary application programming interface (API), but as of Spark 2.x use of the Dataset API is encouraged even though the RDD API is not deprecated. The RDD technology still underlies the Dataset API.

Spark and its RDDs were developed in 2012 in response to limitations in the MapReduce cluster computing paradigm, which forces a particular linear dataflow structure on distributed programs: MapReduce programs read input data from disk, map a function across the data, reduce the results of the map, and store reduction results on disk. Spark's RDDs function as a working set for distributed programs that offers a (deliberately) restricted form of distributed shared memory.

In [None]:
# Create a first RDD
numbers = [1, 2, 3, 4, 5]
numbers_rdd = ss.sparkContext.parallelize(numbers)

In [None]:
# Collect action: Retrieve all elements of the RDD
numbers_rdd.collect()

In [None]:
# Create an RDD from a list of tuples
data = [("Homer", 38), ("Marge", 32), ("Bart", 12)]
data_rdd = ss.sparkContext.parallelize(data)

In [None]:
# Collect action: Retrieve all elements of the RDD
data_rdd.collect()

In [None]:
data_rdd.getNumPartitions()

## Actions (RDDs)

In [None]:
# Count action: Count the number of elements in the RDD
data_rdd.count()

In [None]:
# First action: Retrieve the first element of the RDD
data_rdd.first()

In [None]:
# Take action: Retrieve the n elements of the RDD
data_rdd.take(2)

In [None]:
# Foreach action: Print each element of the RDD
def f(x): print(x)
data_rdd.foreach(f) # Q: Where is the output? A:In StdOut of the worker not in the driver.

In [None]:
# Print each element of the RDD in the driver
for w in data_rdd.toLocalIterator():
    print(w)

## Transformations (RDDs)

In [None]:
# Map transformation: Convert name to uppercase
mapped_rdd = data_rdd.map(lambda x: (x[0].upper(), x[1]))
mapped_rdd.collect()

In [None]:
# SortBy transformation: Sort the RDD by age in ascending order
sorted_rdd = data_rdd.sortBy(lambda x: x[1], ascending=True)
sorted_rdd.collect()

## I/O (RDDs)

In [None]:
# Save action: Save the RDD to a text file
data_rdd.saveAsTextFile("simpsons")

In [None]:
# Read action: Create RDD from text file
imported_rdd = ss.sparkContext.textFile("simpsons")
imported_rdd.collect()

## Obligatory Word Count (RDDs)

In [None]:
# Create a RDD
rdd = ss.sparkContext.textFile("./data/data.txt")
result_rdd = rdd.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)
result_rdd.take(10)

# DataFrames (DFs)

In [None]:
%%bash 
head -10 ./data/products.csv

In [None]:
# Read CSV file into DataFrame
df = ss.read.csv("./data/products.csv", header=True)

In [None]:
# Display schema of DataFrame
df.printSchema()

In [None]:
# Display content of DataFrame
df.show(5)

## CSV / Read Data: Define Schema (DFs)

In [None]:
# import necessary types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [None]:
# Define the schema
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField(name="name", dataType=StringType(), nullable=True),
    StructField(name="category", dataType=StringType(), nullable=True),
    StructField(name="quantity", dataType=IntegerType(), nullable=True),
    StructField(name="price", dataType=DoubleType(), nullable=True)
])

In [None]:
# Read CSV file into DataFrame with schema definition
df = ss.read.csv("./data/products.csv", header=True, schema=schema)

In [None]:
# Display schema of DataFrame
df.printSchema()

In [None]:
# Display content of DataFrame
df.show(5)

## CSV / Read Data: Infer a schema (DFs)

In [None]:
# Read CSV file into DataFrame with inferSchema
df = ss.read.csv("./data/products.csv", header=True, inferSchema=True)

In [None]:
# Display schema of DataFrame
df.printSchema()

In [None]:
# Display content of DataFrame
df.show(5)

## JSON / Read Data

In [None]:
%%bash
head -10 data/products_singleline.json

In [None]:
# Read JSON (Single Line)
df = ss.read.json("./data/products_singleline.json")

In [None]:
%%bash
head -10 data/products_multiline.json

In [None]:
# Read JSON (Multi Line)
df = ss.read.json("./data/products_multiline.json", multiLine=True)

In [None]:
# Display schema of DataFrame
df.printSchema()

In [None]:
# Display content of DataFrame
df.show(5)

## Parquet / Read Data

In [None]:
# write dataframe into parquet file
df.write.parquet("./data/products.parquet")

In [None]:
# Display schema of DataFrame
df.printSchema()

In [None]:
# Display content of DataFrame
df.show(5)

## Working with DFs

In [None]:
%%bash
head -10 data/stocks.txt

In [None]:
# Load the synthetic data into a DataFrame
df = ss.read.csv("./data/stocks.txt", header=True, inferSchema=True)

In [None]:
# Display schema of DataFrame
df.printSchema()

In [None]:
# Show the initial DataFrame
df.show(10)

## Selecting Columns

In [None]:
# Select specific columns
selected_columns = df.select("id", "name", "price")
selected_columns.show(10)

## Filtering Rows

In [None]:
# Filter rows based on a condition
filtered_data = df.filter(df.quantity > 20)
filtered_data.count()

In [None]:
filtered_data.show()

## Joining

In [None]:
# Join with another DataFrame
df2 = df.select("id", "category").limit(10)
joined_data = df.join(df2, "id", "inner")
joined_data.show()

## Sorting

In [None]:
# Sort by a column
sorted_data = df.orderBy("price")
sorted_data.show(10)

In [None]:
# Sort by a column desc
from pyspark.sql.functions import col, desc
sorted_data = df.orderBy(col("price").desc(), col("id").desc())
sorted_data.show(10)

## Unique Rows

In [None]:
# Get distinct product category
distinct_rows = df.select("category").distinct()
print("Distinct Product Categories:")
distinct_rows.show()

## Remove Columns

In [None]:
# Drop columns
dropped_columns = df.drop("quantity", "category")
dropped_columns.show(10)

## Add Calculated Columns

In [None]:
# Add a new calculated column
df_with_new_column = df.withColumn("revenue", df.quantity * df.price)
df_with_new_column.show(10)

## Rename Columns

In [None]:
# Rename columns using alias
df_with_alias = df.withColumnRenamed("price", "product_price")
df_with_alias.show(10)

# Regression

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder \
    .appName("Linear Regression with PySpark MLlib") \
    .getOrCreate()

In [None]:
url = "https://raw.githubusercontent.com/selva86/datasets/master/BostonHousing.csv"
spark.sparkContext.addFile(url)

data = spark.read.csv(SparkFiles.get("BostonHousing.csv"), header=True, inferSchema=True)
data.show(5)

In [None]:
assembler = VectorAssembler(
    inputCols=["crim", "zn", "indus", "chas", "nox", "rm", "age", "dis", "rad", "tax", "ptratio", "b", "lstat"],
    outputCol="features")

data = assembler.transform(data)
final_data = data.select("features", "medv")

train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="medv", predictionCol="predicted_medv")
lr_model = lr.fit(train_data)

In [None]:
predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="medv", predictionCol="predicted_medv", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="medv", predictionCol="predicted_medv", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

In [None]:
coefficients = lr_model.coefficients
intercept = lr_model.intercept

print("Coefficients: ", coefficients)
print("Intercept: {:.3f}".format(intercept))

In [None]:
# Save the model
lr_model.save("lr_model")

# Load the model
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load("lr_model")

# Closing the Session

In [None]:
ss.stop()