# Basic understanding of PySpark

### Installing the latest pyspark version

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=20606f993c0d343171a5d9cb90c1c2357cd51c17f2b05bbf7da933291b9f777d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## Importing the PySpark to work with the modules

- `SparkContext` is the _entry point_ to any <u>Spark functionality</u>. It represents the connection to a Spark cluster and can be used to create **RDDs** (Resilient Distributed Datasets) and broadcast variables on that cluster.

- `SparkSession` is the _entry point_ to use DataFrame and SQL functionality in Spark. It provides a way to interact with various data sources (like Parquet, Avro, JSON, etc.) in a tabular form

In [2]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    import math
except ImportError as e:
    print(e)
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

1. `SparkContext.getOrCreate()` tries to get an existing SparkContext or creates a new one if it doesn't exist. The SparkConf().setMaster("local[*]") part specifies that Spark should run in **local mode** using all <u>available CPU cores ([*])</u>. In local mode, Spark runs on a single machine, which is useful for development and testing.
2. `SparkSession.builder.getOrCreate()` follows a similar pattern as `SparkContext.getOrCreate()`. It tries to get an existing SparkSession or creates a new one if it doesn't exist. The `SparkSession` _**is the unified entry point for reading data, executing SQL queries, and performing other Spark operations using the DataFrame API.**_

In [3]:
# Create a SparkContext if it doesn't exist, using a local master URL
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# Create a SparkSession if it doesn't exist
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/28 12:11:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Creating an RDD (Resilient Distributed Dataset) using the parallelize method of the SparkContext (sc). Let me break down what this line does:

   - `range(100)`: This creates a Python list containing numbers from 0 to 99.

   - `sc.parallelize(range(100))`: This takes that Python list and distributes it across the Spark cluster, creating an RDD. The RDD is a fault-tolerant collection of elements that can be processed in parallel.

So, the rdd variable now represents a distributed collection of numbers from 0 to 99. Each element of the RDD is stored on a different partition of the cluster, and Spark can perform operations on these partitions in parallel.

In [4]:
rdd = sc.parallelize(range(100))

The `take` action in Spark is used to retrieve the **first N elements** from an RDD or DataFrame. 

In [5]:
rdd.take(10) #note the as an array the rdd contains number from 0 to (n-1)

                                                                                

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

## Functional Programming

A python function which decides whether a value is greater than 50 (**True**) or not (**False**).

In [6]:
def gt50(i):
    return i > 50

print(gt50(4))
print(gt50(51))

False
True


Writing the above function but by lambda function

This line of code defines a lambda function (lambda) named gt25. The lambda function takes a single argument `i` and returns a _Boolean value (True or False)_. In this case, the lambda function checks whether the input i is greater than 25.

So, you can interpret this as follows: **gt25 is a function that, when given a value i, returns True if i is greater than 25 and False otherwise.**

In [7]:
gt25 = lambda i: i > 25

print(gt25(4))
print(gt25(51))

False
True


### Shuffling the RDD

`l = list(range(100))`: Creates a list containing numbers from 0 to 99. This is a simple way to generate a list with sequential integers.

`shuffle(l)`: Uses the shuffle function from the random module to randomly shuffle the elements of the list l. This step is intended to randomize the order of the numbers in the list.

`sc.parallelize(l)`: Creates an RDD named rdd from the shuffled list l. The parallelize method of the SparkContext (sc) is used to distribute the elements of the list across the nodes of the Spark cluster. This RDD can then be used for parallel processing using Spark.

So, after running this code, the _rdd_ variable holds a distributed representation of the shuffled list, and the elements are distributed across the nodes of the Spark cluster. 

In [8]:
from random import shuffle

# Create a list containing numbers from 0 to 99
l = list(range(100))

# Shuffle the list randomly
shuffle(l)

# Create an RDD (Resilient Distributed Dataset) from the shuffled list
rdd = sc.parallelize(l)


Filtering values from our list which are equals or less than 50 by applying the “gt50” function to the list using the “filter” function. 
**Note that by calling the “collect” function, all elements are returned to the Apache Spark Driver. This is not a good idea for BigData, please use “.sample(10,0.1).collect()” or “take(n)” instead. beacuse collect can be resource-intensive, especially if the RDD is large, as it brings all the data back to the driver program.**

In [9]:
# rdd.filter(gt50).collect() #we can use collect but following the best practice
rdd.filter(gt50).take(10)

[89, 73, 69, 91, 81, 94, 79, 71, 85, 52]

sample(10, 0.1): This is another transformation, specifically the sample transformation. It is used to create a random sample of the elements in the RDD. The parameters are:

`10`: The number of elements to include in the sample.
`0.1`: The sampling fraction, representing the probability that each element is included in the sample. In this case, it's set to 0.1, meaning there's a 10% chance of including each element in the sample.

Note that the result of this operation doesn't directly give you a collection of elements; instead, it returns a new RDD that represents the sampled subset of the filtered data.

In [10]:
sample = rdd.filter(gt50).sample(10, 0.1).take(10)
sample

[53]

In [11]:
# trying with sample api
rdd.filter(gt50).sample(10,0.1)

PythonRDD[6] at RDD at PythonRDD.scala:53

In [12]:
rdd.filter(lambda i: i > 50).take(10) #running the same code but with a lambda function

[89, 73, 69, 91, 81, 94, 79, 71, 85, 52]

Let’s consider the same list of integers. Now we want to compute the sum for elements in that list which are greater than 50 but less than 75. Please implement the missing parts. 

`rdd.filter(lambda x: $$).filter(lambda x: $$).$$()`

In [13]:
rdd.filter(lambda x: x > 50 and x < 75).take(10) # shortened the code but doing that in one filter function

[73, 69, 71, 52, 70, 63, 60, 53, 58, 62]

## Working with the statistical parameters

### 1. First statistical moment ------> Mean and Median

In [14]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print(e)
    
# Create a SparkContext if it doesn't exist, using a local master URL
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# Create a SparkSession if it doesn't exist
spark = SparkSession.builder.getOrCreate()


rdd = sc.parallelize(range(100))

### Mean is calculated by using this formula
$$
\bar{x} = \frac{1}{n} \sum_{i=1}^{n} x_i
$$


In [15]:
sum = rdd.sum()  # does the summation of all the elements in the rdd
n = rdd.count() #counts the number of elements in the RDD
mean = sum/float(n) # calculating the x(bar)
mean

49.5

### Median can be calculated in the following manner
For odd \( n \):
$$
\tilde{x} = x_{\frac{n+1}{2}}
$$

For even \( n \):
$$
\tilde{x} = \frac{x_{\frac{n}{2}} + x_{\frac{n}{2}+1}}{2}
$$

In [16]:
sortedRDD = rdd.sortBy(lambda x : x).zipWithIndex().map(lambda item: (item[1], item[0]))
n = sortedRDD.count()
if (n%2 == 1):
    index = (n-1)/2
    print(sortedRDD.lookup(index))
else:
    index1 = n/2
    index2 = (n/2)-1
    value1 = sortedRDD.lookup(index1)[0]
    value2 = sortedRDD.lookup(index2)[0]
    print((value1+value2)/2.0)

                                                                                

49.5


## Now lets add outliers to the data and see the difference

In [17]:
rdd = sc.parallelize([101,102,10000,1023] + list(range(100)))

In [18]:
sortedRDD = rdd.sortBy(lambda x : x).zipWithIndex().map(lambda item: (item[1], item[0]))
n = sortedRDD.count()
if (n%2 == 1):
    index = (n-1)/2
    print(sortedRDD.lookup(index))
else:
    index1 = n/2
    index2 = (n/2)-1
    value1 = sortedRDD.lookup(index1)[0]
    value2 = sortedRDD.lookup(index2)[0]
    print((value1+value2)/2.0)
    
    

51.5


In [19]:
sum = rdd.sum()  # does the summation of all the elements in the rdd
n = rdd.count() #counts the number of elements in the RDD
mean = sum/float(n) # calculating the x(bar)
mean

155.53846153846155

## NOTE:
We can see that there are differences between the mean and the median when there are outliers

**Thats why we prefer the median method which is resilient to the outliers**

### Standard Deviation

The formula for the standard deviation (σσ) of a dataset is given by:
$$
\sigma = \sqrt{\frac{\sum_{i=1}^{n}(x_i - \bar{x})^2}{n}}
$$
Here:

- σσ represents the standard deviation.
- nn is the number of observations in the dataset.
- xixi​ represents each individual observation in the dataset.
- xˉxˉ is the mean of the dataset.

In [20]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print(e)
    
# Create a SparkContext if it doesn't exist, using a local master URL
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# Create a SparkSession if it doesn't exist
spark = SparkSession.builder.getOrCreate()


rdd = sc.parallelize(range(100))

If we want to apply a function to the all the entries in the python then we should use `map`

In [21]:
mean = rdd.mean()
n = rdd.count()
standard_deviation = math.sqrt(rdd.map(lambda x: pow((x - mean), 2)).sum()/n)
standard_deviation

28.86607004772212

## Skewness

The formula for skewness (γ1γ1​) in LaTeX is typically expressed as follows:

latex
$$
\gamma_1 = \frac{\frac{1}{n} \sum_{i=1}^{n}(x_i - \bar{x})^3}{\left(\frac{1}{n} \sum_{i=1}^{n}(x_i - \bar{x})^2\right)^{3/2}}
$$
Here:

- γ1​ represents skewness.
- n is the number of observations.
- xi​ represents each individual observation.
- xˉ is the mean of the dataset.



In [22]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    print(e)
    
# Create a SparkContext if it doesn't exist, using a local master URL
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# Create a SparkSession if it doesn't exist
spark = SparkSession.builder.getOrCreate()

In [23]:
rdd = sc.parallelize(range(100))
mean = rdd.mean()
n = float(rdd.count())
standard_deviation = math.sqrt(rdd.map(lambda x: pow((x - mean), 2)).sum()/n)

skewness = (rdd.map(lambda x: pow((x-mean),3)/pow(standard_deviation,3)).sum())/n
skewness

1.4210854715202004e-16

In [24]:
## trying to right skew the data
rdd = sc.parallelize([101,102,10000,1023,121728] + list(range(100)))
mean = rdd.mean()
n = float(rdd.count())
standard_deviation = math.sqrt(rdd.map(lambda x: pow((x - mean), 2)).sum()/n)

skewness = (rdd.map(lambda x: pow((x-mean),3)/pow(standard_deviation,3)).sum())/n
skewness

10.00200794849941

In [25]:
## trying to left skew the data
rdd = sc.parallelize([-101,-102,-10000,-1023,-121728] + list(range(100)))
mean = rdd.mean()
n = float(rdd.count())
standard_deviation = math.sqrt(rdd.map(lambda x: pow((x - mean), 2)).sum()/n)

skewness = (rdd.map(lambda x: pow((x-mean),3)/pow(standard_deviation,3)).sum())/n
skewness

-10.00010218410623