<a href="https://colab.research.google.com/github/suriarasai/BEAD2024/blob/main/colab/03_Apache_Spark_Core_RDD_Operations_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Apache PySpark
In this demo we will see how we can run PySpark in a Google
 Colaboratory notebook. We will also perform some basic data exploratory tasks common to data science problems.



## PySpark Install

The first step involves installing pyspark.  The next step is to install findspark library.

*Note: the --ignore-install flag is used to ignore previous installations and use the latest one built alongside the allocated cluster.*


In [None]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m19.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Spark Session

We import the basic object SparkSession from the Spark Framework. In PySpark, a Spark Session is a unified entry point for reading data, configuring the system, and managing various Spark services.

Here's a breakdown of what the Spark Session does:

1. Unified Entry Point: It's the central point to access all Spark  functionalities, making it simpler and more intuitive to use Spark for development.
2. Data Reading and Writing: We use the Spark Session to read data from various sources (like HDFS, S3, JDBC, Hive, etc.) and write data to various sinks.
3. Configuration Management: It allows us to configure various aspects of the Spark application, such as setting configuration parameters.
4. Creating DataFrames and Datasets: The Spark Session provides methods to create DataFrames and Datasets, which are the core data structures in Spark.
5. Execution of SQL Queries: We can run SQL queries by using the Spark Session, especially when dealing with structured data.
6. Managing Spark Services: It also helps in managing underlying Spark services like SparkContext, and it's the main point of interaction when dealing with structured data.

In PySpark, a Spark Session is created using the SparkSession.builder method. Here's an example:

In [None]:
from pyspark.sql import SparkSession
# import collections
spark = SparkSession.builder.master("local").appName("My App ").getOrCreate()

In [None]:
spark.sparkContext

## Line Count
To count the number of lines from a file.

In [None]:
from pyspark.sql import SparkSession
someFile = "somefile.txt"
# the above file is under your pythonProject folder
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
print(spark.read.text(someFile).count())


5


## Mounting Google Drive
Connect to Google Drive

In [None]:
# to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
from google.colab import drive
# to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True)
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
someFile = "/content/drive/MyDrive/customer.csv"
# the above file is under your pythonProject folder
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
print(spark.read.text(someFile).count())


## Data Creation

### Create a simple from spark RDD
In this examples below we want to see how to create a simple data structure using spark core commands

#### Example 1: From RDD
To create an RDD using a SparkSession in PySpark, you first need to initialize a SparkSession and then use it to create an RDD. Here's a simple example where we'll create an RDD from a tuple of numbers using a SparkSession

In [None]:
# Create an RDD from a list of numbers
numbers = (1, 2, 3, 4, 5)
numbers_rdd = spark.sparkContext.parallelize(numbers)
# Syntax print(spark.sparkContext.parallelize("(A B C)").collect())
print(f"using collect() function",numbers_rdd.collect())



using collect() function [1, 2, 3, 4, 5]


In [None]:
#Alternative is to convert to a new data structure that is print friendly
print(f"using pipeline of functions")
print(tuple(spark.sparkContext.parallelize(numbers).collect()))


using pipeline of functions
(1, 2, 3, 4, 5)


In [None]:
outfile = spark.read.text("out.txt")

Now let us count the lines in the out.txt

In [None]:
print(outfile.count())

28


#### Example 2: From Local File

To create an RDD in PySpark by reading data from a CSV file, such as "customers.csv", you'll use a SparkSession to read the CSV and then convert the DataFrame to an RDD. Here's a step-by-step example:

1. Initialize a SparkSession. (Done)
2. Read the "customers.csv" file into a DataFrame.
3. Convert the DataFrame to an RDD.
4. Perform a simple action on the RDD, like counting the number of records.

Here's the code snippet for this process:

In [None]:
customers = spark.read.csv("customers.csv", header=True, inferSchema=True).rdd

# Perform a simple action: count the number of records
record_count = customers.count()
print(f"Number of records: {record_count}")


Number of records: 100


## Actions and Transformation

In PySpark, operations on RDDs can be broadly classified into two categories: transformations and actions. Transformations create a new RDD from an existing one, while actions return a value after running a computation on the RDD. Below are simple examples demonstrating the use of transformations and actions.

###Transformations

####Map
Applies a function to each element and returns a new RDD.


In [None]:
rdd = spark.sparkContext.parallelize((1, 2, 3, 4, 5))
# Traditional Python map(function, collection) (few MBs - GB fails)
# Scalabale map (Peta - support)
# iterablecollection.map(function) -> Object
# collect() Object to collection
print(tuple(rdd.map(lambda x: x * x).collect()))

(1, 4, 9, 16, 25)


####Filter
Returns a new RDD containing only the elements that satisfy a condition.

In [None]:
print(rdd.filter(lambda x: x % 2 == 0).collect())  # Keeps even numbers


[2, 4]


####FlatMap
Similar to map, but each input item can be mapped to 0 or more output items.

In [None]:
words = spark.sparkContext.parallelize(["hello world", "hi", "hello mars", "hello jupiter", "hello saturn"])
print(words.flatMap(lambda x: x.split(" ")).collect())



NameError: name 'spark' is not defined

####Distinct
Returns a new RDD containing distinct elements from the original RDD.

In [None]:
print(words.flatMap(lambda x: x.split(" ")).distinct().collect())

In [None]:
tuples = spark.sparkContext.parallelize((1, 1, 2, 3, 3, 4))
print(tuples.distinct().collect())


[1, 2, 3, 4]


###Actions
####Collect
Returns all the elements of the RDD as an array to the driver program.

In [None]:
print(tuples.distinct().collect())

[1, 2, 3, 4]


####Count
Returns the number of elements in the RDD.

In [None]:
print(tuples.count())

6


####Take
Returns an array with the first n elements of the RDD.

In [None]:
first_three = tuples.take(3)
print(first_three)


[1, 1, 2]


In [None]:
customers.take(2)

[Row(Index=1, Customer Id='DD37Cf93aecA6Dc', First Name='Sheryl', Last Name='Baxter', Company='Rasmussen Group', City='East Leonard', Country='Chile', Phone 1='229.077.5154', Phone 2='397.884.0519x718', Email='zunigavanessa@smith.info', Subscription Date=datetime.date(2020, 8, 24), Website='http://www.stephenson.com/'),
 Row(Index=2, Customer Id='1Ef7b82A4CAAD10', First Name='Preston', Last Name='Lozano', Company='Vega-Gentry', City='East Jimmychester', Country='Djibouti', Phone 1='5153435776', Phone 2='686-620-1820x944', Email='vmata@colon.com', Subscription Date=datetime.date(2021, 4, 23), Website='http://www.hobbs.com/')]

####Reduce
Aggregates the elements of the RDD using a function.

In [None]:
sum = tuples.reduce(lambda a, b: a + b)
print(sum)


14


These examples illustrate basic operations in PySpark, allowing you to manipulate and analyze large datasets efficiently. To run these examples, ensure you have a SparkContext (sc) initialized in your PySpark environment.

### How to pretty print in PySaprk?

The take() function and iteration in PySpark will mimic the pretty print function, but use them wisely.

In [None]:
pprint()

Pretty printing has been turned OFF


In [None]:
print("First five records of customer data set", customers.take(5))
print("Not so pretty....")
print("Now let us pretty print:")
# To pretty print, you need to iterate
for element in customers.take(10):
    print(element)

First five records of customer data set [Row(Index=1, Customer Id='DD37Cf93aecA6Dc', First Name='Sheryl', Last Name='Baxter', Company='Rasmussen Group', City='East Leonard', Country='Chile', Phone 1='229.077.5154', Phone 2='397.884.0519x718', Email='zunigavanessa@smith.info', Subscription Date=datetime.date(2020, 8, 24), Website='http://www.stephenson.com/'), Row(Index=2, Customer Id='1Ef7b82A4CAAD10', First Name='Preston', Last Name='Lozano', Company='Vega-Gentry', City='East Jimmychester', Country='Djibouti', Phone 1='5153435776', Phone 2='686-620-1820x944', Email='vmata@colon.com', Subscription Date=datetime.date(2021, 4, 23), Website='http://www.hobbs.com/'), Row(Index=3, Customer Id='6F94879bDAfE5a6', First Name='Roy', Last Name='Berry', Company='Murillo-Perry', City='Isabelborough', Country='Antigua and Barbuda', Phone 1='+1-539-402-0259', Phone 2='(496)978-3969x58947', Email='beckycarr@hogan.com', Subscription Date=datetime.date(2020, 3, 25), Website='http://www.lawrence.com/'),

###Key Operations

PySpark examples for key based functions are groupByKey, reduceByKey, and sortByKey operations. Let us look at how they work.

####groupByKey
This operation groups the values for each key in the RDD into a single sequence.
####reduceByKey
This operation merges the values for each key using an associative reduce function.
####sortByKey
This operation sorts the dataset by keys.

Let us put together an example to compare and contrast


In [None]:
rdd = spark.sparkContext.parallelize([(3, 6),(1, 2),(3, 4)])
grouped = rdd.groupByKey()
for key, values in grouped.collect():
    print(f"{key}: {tuple(values)}")
reduced = rdd.reduceByKey(lambda a, b: a + b)
print(reduced.collect())
sorted_rdd = rdd.sortByKey()
print(sorted_rdd.collect())

3: (6, 4)
1: (2,)
[(3, 10), (1, 2)]
[(1, 2), (3, 6), (3, 4)]


Please note that these operations are transformations and require an action like collect to retrieve the data. Also, keep in mind that groupByKey can cause a lot of data shuffling over the network, and it's generally more efficient to use reduceByKey where possible because it combines output values locally before sending data over the network.

### Sampling

 The sample() transformation is used to sample a fraction of the data from an RDD. You can sample with or without replacement. Here's how you can use it:

####Sampling without replacement


In [None]:
# Now, use the sample function to take a random sample of about 10% of the customers without replacement
sampled_customers_rdd = customers.sample(False, 0.1)

# Collect the results
sampled_customers = sampled_customers_rdd.collect()

# Print the sampled list of customers
for customer in sampled_customers:
    print(customer)

Row(Index=1, Customer Id='DD37Cf93aecA6Dc', First Name='Sheryl', Last Name='Baxter', Company='Rasmussen Group', City='East Leonard', Country='Chile', Phone 1='229.077.5154', Phone 2='397.884.0519x718', Email='zunigavanessa@smith.info', Subscription Date=datetime.date(2020, 8, 24), Website='http://www.stephenson.com/')
Row(Index=2, Customer Id='1Ef7b82A4CAAD10', First Name='Preston', Last Name='Lozano', Company='Vega-Gentry', City='East Jimmychester', Country='Djibouti', Phone 1='5153435776', Phone 2='686-620-1820x944', Email='vmata@colon.com', Subscription Date=datetime.date(2021, 4, 23), Website='http://www.hobbs.com/')
Row(Index=14, Customer Id='A08A8aF8BE9FaD4', First Name='Kristine', Last Name='Cox', Company='Carpenter-Cook', City='Jodyberg', Country='Sri Lanka', Phone 1='786-284-3358x62152', Phone 2='+1-315-627-1796x8074', Email='holdenmiranda@clarke.com', Subscription Date=datetime.date(2021, 2, 8), Website='https://www.brandt.com/')
Row(Index=15, Customer Id='6fEaA1b7cab7B6C', F

In the sample method:

1. The first argument is withReplacement. Set it to False for sampling without replacement, meaning a particular customer can be chosen only once.
2. The second argument is the fraction of the data to sample, which is 0.1 in this case, meaning approximately 10% of the data.

This will output a random sample of the customers from your customers_rdd. The collect() action is used here for demonstration purposes, and it should be used with caution if the dataset is large, as it will gather all the sampled data to the driver node.

####Sampling with replacement

The following example shows how to use sample() with replacement. This means an element can be included in the sample multiple times.

In [None]:
# Now, use the sample function to take a random sample of about 10% of the customers with replacement
sampled_customers_rdd = customers.sample(True, 0.01)

# Collect the results
sampled_customers = sampled_customers_rdd.collect()

# Print the sampled list of customers
for customer in sampled_customers:
    print(customer)

Row(Index=20, Customer Id='0F60FF3DdCd7aB0', First Name='Joanna', Last Name='Kirk', Company='Mays-Mccormick', City='Jamesshire', Country='French Polynesia', Phone 1='(266)131-7001x711', Phone 2='(283)312-5579x11543', Email='tuckerangie@salazar.net', Subscription Date=datetime.date(2021, 9, 24), Website='https://www.camacho.net/')
Row(Index=70, Customer Id='CC68FD1D3Bbbf22', First Name='Riley', Last Name='Good', Company='Wade PLC', City='Erikaville', Country='Canada', Phone 1='6977745822', Phone 2='855-436-7641', Email='alex06@galloway.com', Subscription Date=datetime.date(2020, 2, 3), Website='http://conway.org/')


These examples will give you an array of customers sampled from the original RDD. The actual elements in the sample will vary each time you run the code due to the randomness of the sampling process.

### More Transformations
In PySpark, you can perform various RDD operations such as union, join, and cartesian (cross) to combine data in different ways. Here are simple examples for each:
#### Union
The union operation combines two RDDs to form a new RDD that contains elements from both RDDs.

In [None]:
# Create two RDDs
rdd1 = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2)])
rdd2 = spark.sparkContext.parallelize([("Charlie", 3), ("David", 4)])

# Perform the union operation
union_rdd = rdd1.union(rdd2)

# Collect and print the results
print(union_rdd.collect())



[('Alice', 1), ('Bob', 2), ('Charlie', 3), ('David', 4)]


####Join
The join operation combines two RDDs based on their key.

In [None]:

# Create two RDDs with common keys
rdd3 = spark.sparkContext.parallelize([("Alice", "Apple"), ("Bob", "Banana")])
rdd4 = spark.sparkContext.parallelize([("Alice", 1), ("Bob", 2)])

# Perform the join operation
join_rdd = rdd3.join(rdd4)

# Collect and print the results
print(join_rdd.collect())


[('Alice', ('Apple', 1)), ('Bob', ('Banana', 2))]


####Cross or Catesian
The cartesian operation returns all possible pairs of (a, b) where a is in the first RDD and b is in the second RDD.

In [None]:
# Create two RDDs
rdd5 = spark.sparkContext.parallelize([1, 2])
rdd6 = spark.sparkContext.parallelize(["a", "b"])

# Perform the cartesian operation
cross_rdd = rdd5.cartesian(rdd6)

# Collect and print the results
print(cross_rdd.collect())

[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


Please note that the cartesian operation can be very expensive in terms of computation and memory usage, especially with large datasets, because it forms all possible combinations of elements between the two RDDs.

### More Actions

####save
Saving an RDD in PySpark can be done in a variety of formats. Common formats include saving as text files, sequence files, or other file-based data sources. Below are examples of how to save an RDD that contains customer data as a text file.

But we will see about this action after the NoSQL lecture.

End of Demo

Thank you for the patient listening. 🙏🌞