<a href="https://colab.research.google.com/github/trefftzc/cis677/blob/main/Introduction_to_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# An introduction to PySpark

Taken from https://sparkbyexamples.com/pyspark-tutorial/

Installing:

In [11]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark

import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[2]")

sc = SparkContext.getOrCreate(spark_conf)

# RDD Resilient Distributed Dataset

PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure that is fault-tolerant, immutable, and distributed collections of objects. RDDs are immutable, meaning they cannot be changed once created. Any transformation on an RDD results in a new RDD. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

RDD Creation
In order to create an RDD, first, you need to create a SparkSession which is an entry point to the PySpark application. SparkSession can be created using a builder() or newSession() methods of the SparkSession.

Spark session internally creates a sparkContext variable of SparkContext. You can create multiple SparkSession objects but only one SparkContext per JVM. In case you want to create another new SparkContext, you should stop the existing Sparkcontext (using stop()) before creating a new one.

Let's create an RDD out of a list with 1024 entries

In [12]:
nums = list(range(1, 1024))
len(nums)

nums_rdd = sc.parallelize(nums)

Let's look at the first 5 entries:

In [13]:
nums_rdd.take(5)

[1, 2, 3, 4, 5]

Let's solve the partition problem.
Let's start by defining a function that will evaluate a possible partition:
The array for the instance of the problem is hardcoded here.

In [14]:
def evaluatePartition(  value ) :
   array = [1,1,1,1,1,1,1,1,1,9]
   n = 10
   sum0s = 0
   sum1s = 0
   mask = 1
   for i in range(0,n):
     if ((mask & value) != 0):
       sum1s = sum1s + array[i]
     else:
       sum0s = sum0s + array[i]
     mask = mask * 2

   if (sum0s == sum1s):
     return value
   else:
     return 0

In [15]:
print(evaluatePartition(1))

0


Now apply the function evaluatePartition to the nums_rdd and store the results in a different rdd:

In [16]:
result_rdd = nums_rdd.map(evaluatePartition)

Finally, find if any entry turned to be different from 0, a solution

In [17]:
from operator import add
sol = result_rdd.reduce(max)
if sol == 0:
  print("This instance of the partition problem does not have a solution")
else:
  print("This instance of the partition problem does have a solution.")
print(sol)

This instance of the partition problem does have a solution.
512


# Creating a Data Frame

In [5]:
# Create DataFrame
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

Since DataFrame is a tabular format that has names and data types in columns, use df.printSchema() to get the schema of the DataFrame.

To display the DataFrame use df.show() which shows the 20 rows by default.

In [6]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



# PySpark SQL

PySpark SQL is a module in Spark that provides a higher-level abstraction for working with structured data and can be used SQL queries.

SQL enables you to write SQL queries against structured data, leveraging standard SQL syntax and semantics. This familiarity with SQL allows users with SQL proficiency to transition to Spark for data processing tasks easily.

First, you should create a temporary table or view on DataFrame to use SQL queries. Once the table is created, you can be accessed throughout the SparkSession using sql().

These tables and views are scoped to the SparkSession that created them. Once the SparkSession is terminated, either by closing the Spark application or ending the Spark session explicitly, the temporary views are removed from memory.

To run SQL queries, use sql() method of the SparkSession object. Note that this function returns a DataFrame.



In [7]:
# Create temporary table
df.createOrReplaceTempView("PERSON_DATA")

# Run SQL query
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



Use group by clause to run aggregate queries.

In [8]:
# Using groupby
groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()

+------+--------+
|gender|count(1)|
+------+--------+
|     M|       3|
|     F|       2|
+------+--------+



# Machine Learning MLlib
PySpark MLlib is Apache Spark’s scalable machine learning library, offering a suite of algorithms and tools for building, training, and deploying machine learning models. It provides implementations of popular algorithms for classification, regression, clustering, collaborative filtering, and more.

MLlib is designed for distributed computing, allowing it to handle large-scale datasets across clusters efficiently. It offers both high-level APIs for ease of use and low-level APIs for fine-grained control over model training and evaluation.

MLlib seamlessly integrates with Spark’s ecosystem, enabling end-to-end machine learning workflows, including data preprocessing, feature engineering, model training, and deployment in production environments.

Here’s a simple example using Spark MLlib in Python to train a linear regression model:


In [9]:
# Import
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample training data
data = [(1.0, 2.0), (2.0, 3.0), (3.0, 4.0), (4.0, 5.0), (5.0, 6.0)]
df = spark.createDataFrame(data, ["features", "label"])

# Define a feature vector assembler
assembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")

# Transform the DataFrame with the feature vector assembler
df = assembler.transform(df)

# Create a LinearRegression model
lr = LinearRegression(featuresCol="features_vec", labelCol="label")

# Fit the model to the training data
model = lr.fit(df)

# Print the coefficients and intercept of the model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Stop the SparkSession
spark.stop()

Coefficients: [1.0000000000000004]
Intercept: 0.9999999999999986
