In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setAppName("Application name")
sc = SparkContext(conf=conf)

## Difference between map and flatmap
- **`map`**: Transforms each element of a collection into a new collection, preserving the structure.
- **`flatMap`**: Transforms each element into a collection and then flattens the result into a single collection.

In [3]:
names = [
    "Mohammad Al-Hennawi",
    "Rafat Hammad",
    "Cristiano ronaldo"
]
# convert list to rdd
rdd1 = sc.parallelize(names)
rdd2 = sc.parallelize(names)

In [4]:
map_list = rdd1.map(lambda x: x.split()).collect() # 2D array
flatmap_list = rdd2.flatMap(lambda x: x.split()).collect() # 1D array

In [5]:
print('map', map_list)
print('flatMap', flatmap_list)

map [['Mohammad', 'Al-Hennawi'], ['Rafat', 'Hamad'], ['Cristiano', 'ronaldo']]
flatMap ['Mohammad', 'Al-Hennawi', 'Rafat', 'Hamad', 'Cristiano', 'ronaldo']


In [7]:
rdd1.saveAsTextFile('output')

### Output Folder:
![Output Folder](images/ss1.png) 

The number of parts is 12 because my PC has 12 threads, so it could be different from PC to another

### Word Count example

In [8]:
file = sc.textFile("data/input_01.txt", 4) # set the number of partitions
print(file) # lazy reading of the text file

data/input_01.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0


In [9]:
lines = file.flatMap(lambda line: line.split())
print(lines) # lazy transformation

PythonRDD[12] at RDD at PythonRDD.scala:53


In [10]:
word = lines.map(lambda word: (word, 1))
print(word) # lazy transformation

PythonRDD[13] at RDD at PythonRDD.scala:53


In [11]:
counts = word.reduceByKey(lambda a, b: a + b)
print(counts) # lazy transformation

PythonRDD[18] at RDD at PythonRDD.scala:53


In [12]:
counts_list = counts.collect() # action
print(counts_list[:6])

# this cell takes most of the time because it starts the evaluation



In [13]:
total_count = counts.map(lambda x: x[1]).reduce(lambda a, b: a + b) # reduce is the trigger here
print(total_count)

906


In [14]:
counts.saveAsTextFile('counts')

## Counts Folder:
![Counts Folder](images/ss2.png) 

The number of parts is four because I have set minPartitions to four

## Common Triggers (Actions) in PySpark

In PySpark, actions are operations that trigger the actual execution of the computations on the dataset. Below are some commonly used actions:

- **`collect()`**: Returns all the elements of the dataset as a list to the driver.
- **`count()`**: Returns the number of elements in the dataset.
- **`first()`**: Returns the first element of the dataset.
- **`take(n)`**: Returns the first `n` elements of the dataset.
- **`takeOrdered(n, key=None)`**: Returns the first `n` elements ordered by a specified key.
- **`reduce()`**: Aggregates the elements of the dataset using an associative function.
- **`reduceByKey()`**: Aggregates values of the same key using an associative function.
- **`foreach()`**: Applies a function to each element in the dataset, typically for side effects.
- **`saveAsTextFile(path)`**: Writes the dataset to a text file at the specified path.
- **`saveAsParquetFile(path)`**: Saves the dataset in Parquet format.
- **`countByKey()`**: Returns the count of elements with each key in the dataset.
- **`aggregate()`**: Aggregates the elements of the dataset with a combination of a zero value and an aggregate function.
- **`isEmpty()`**: Returns `True` if the dataset is empty.
- **`max()`**: Returns the maximum value in the dataset.
- **`min()`**: Returns the minimum value in the dataset.
- **`sum()`**: Returns the sum of all elements in the dataset.
- **`mean()`**: Returns the mean (average) value of the dataset.
- **`stdev()`**: Returns the standard deviation of the dataset.

These actions trigger the execution of transformations and return results to the driver program.

In [15]:
print(counts.count()) # prints number of distinct words

470


In [16]:
print(counts.takeOrdered(5, key=lambda x: -x[1])) # tke the most 5 frequent words

[('the', 46), ('*', 30), ('to', 18), ('and', 18), ('in', 18)]


# Example of Lineage Graph: Count of Bad Lines in a Log File

A log file is a tracer of any action in a website or database. Fortunately, I have a website called **[PLogic](https://www.linkedin.com/posts/mohammad-al-hennawi-9856592b9_throughout-my-college-journey-in-data-activity-7253457852055379968-j0qz?utm_source=share&utm_medium=member_desktop)**, and it generates real log files. Below is a simple lineage graph:

.
<img src="images/ss3_1.png" alt="LOGS">


In [17]:
log_file = sc.textFile("data/PlogicLogs.txt")

In [18]:
log_lower = log_file.map(lambda line: line.lower()) # covert all letters to lower case

In [19]:
# find if 'error' in the line (bad line) else returns -1 (good line)
error_rdd = log_lower.filter(lambda line: line.find("error") != -1)
 # find if 'waring' in the line (bad line) else returns -1 (good line)
waring_rdd = log_lower.filter(lambda line: line.find("warning") != -1)
# combine two RDDs into one
bad_rdd = waring_rdd.union(error_rdd)

In [20]:
print(bad_rdd.count())

5


In [21]:
print(bad_rdd.take(3))

