In [151]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

In [152]:
# Estabilish connection and load data into memory

spark = (SparkSession.builder
        .appName("Map")
        .getOrCreate())

# Get data
data = spark.sparkContext.textFile("/opt/homebrew/Cellar/apache-spark/3.3.1/README.md")

## Transformation

### map()

In [153]:
# Use map()
#For every line in the doc, create a tuple containing its content and lenght (content, lenght)
mapFile = data.map(lambda line : (line, len(line)))

# Print tuples
mapFile.foreach(print)

('# Apache Spark', 14)
('', 0)
('Spark is a unified analytics engine for large-scale data processing. It provides', 80)
('high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 75)
('supports general computation graphs for data analysis. It also supports a', 73)
('rich set of higher-level tools including Spark SQL for SQL and DataFrames,', 74)
('pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing,', 98)
('and Structured Streaming for stream processing.', 47)
('', 0)
('<https://spark.apache.org/>', 27)
('', 0)
('[![GitHub Action Build](https://github.com/apache/spark/actions/workflows/build_and_test.yml/badge.svg?branch=master&event=push)](https://github.com/apache/spark/actions/workflows/build_and_test.yml?query=branch%3Amaster+event%3Apush)', 234)
('[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic&logo=appveyor)](https://ci.appveyor.com/project/ApacheSoftwa

### flatMap()

In [154]:
# Use flatMap()
# Split line into words
flatFile = data.flatMap(lambda line : line.split())

# Print words
flatFile.foreach(print)

#```python

>>>
spark.range(1000
Apache*
1000
*
1000).count()
```
##

Example
Programs
SparkSpark

also
comes
with
several
sample
programs
in
the
`examples`
Spark
is
a
unified
analytics
engine
for
large-scale
directory.data
processing.
It

provides
high-level
APIs
in
Scala,
ToJava,

runPython,

oneand

R,of
and

an
optimizedthem,
engine
that

supportsuse
general
computation
graphs
for
data

analysis.
It
also
supports`./bin/run-example
<class>
[params]`.
For
example:

```bash
./bin/run-examplea
rich
set
of
higher-level
tools
including
Spark
SQL
for
SQL
and
DataFrames,
pandas
API
on
Spark
for
pandas
workloads,
MLlib
for
SparkPi
```
will
run
the
Pi
example
locally.
You
can
set

themachine

learning,
GraphX
for
graph
processing,
and
Structured
Streaming
for
stream
processing.
<https://spark.apache.org/>
[![GitHub
Action
Build](https://github.com/apache/spark/actions/workflows/build_and_test.yml/badge.svg?branch=master&event=push)](https://github.com/apache/spark/actions/workflows/build_and

### filter(func)

In [155]:
# Use filter()
# Only words starts with "a"
filterFile = flatFile.filter(lambda word : word.startswith("a"))

# Print words
# foreach() is a action that starts the execution
filterFile.foreach(print)

a
analytics
and
and
an
analysis.
also
a
and
and
a
and
a
available
at
an
also
also
a
a
and
also
an
abbreviated
are
a
also
a
and
against
at
and
a
and
an


### reduceByKey(func)

In [156]:
list = ["um", "um", "dois", "dois", "três"]

rdd = spark.sparkContext.parallelize(list) #transform list in RDD

# map list into tuple and use reduceByKey to count frequency of every word
rdd2 = rdd.map(lambda x: (x, 1)).reduceByKey(lambda a,b : a+b)

rdd2.foreach(print)

('um', 2)
('três', 1)
('dois', 2)


### sortByKey(func)

In [157]:
# map list into tuple, use reduceByKey to count frequency of every word and sort by key
rdd2 = rdd.map(lambda x: (x, 1)).reduceByKey(lambda a,b : a+b).sortByKey("asc")

rdd2.foreach(print)

('um', 2)
('dois', 2)
('três', 1)


### union(rdd)

In [158]:
list2 = ["um", "quatro", "cinco"]

rdd2 = spark.sparkContext.parallelize(list2)

rddUnion = rdd.union(rdd2)

rddUnion.foreach(print)

dois
um
um
três
dois
um
cinco
quatro


### intersection(rdd)

In [159]:
rddIntersection = rdd.intersection(rdd2)
rddIntersection.foreach(print)

um


### distinct(rdd)

In [160]:
rddDistinct = rdd.distinct()
rddDistinct.foreach(print)

três
um
dois


### join(rdd)

In [161]:
list = [("Pedro", 38), ("Maria", 42), ("João", 12)]
list2 = [("Pedro", "BH"), ("Maria", "DF")]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)

rddJoin = rdd.join(rdd2)

rddJoin.foreach(print)

('Pedro', (38, 'BH'))
('Maria', (42, 'DF'))


## Action

### foreach(func)

In [162]:
list = [("Pedro", 38), ("Maria", 42), ("João", 12)]
list2 = [("Pedro", "BH"), ("Maria", "DF")]

rdd = spark.sparkContext.parallelize(list)
rdd2 = spark.sparkContext.parallelize(list2)

rddJoin = rdd.join(rdd2)
#rddJoin.foreach(print)

### collect()

In [163]:
print(rddJoin.collect())

[('Maria', (42, 'DF')), ('Pedro', (38, 'BH'))]


### count()

In [164]:
print(rddJoin.count())

2


### take(n)

In [165]:
# rddUnion.foreach(print)
rddUnion.take(4)

['um', 'um', 'dois', 'dois']

### top(k)

In [166]:
# rddUnion.foreach(print)
rddUnion.top(4)

['um', 'um', 'um', 'três']

### countByValue()

In [167]:
rddUnion.countByValue()

defaultdict(int, {'um': 3, 'dois': 2, 'três': 1, 'quatro': 1, 'cinco': 1})

### reduce(func)

In [168]:
rddUnion.reduce(lambda a,b : a + ' ' + b)

'um um dois dois três um quatro cinco'

### saveAsTextFile(path)

In [172]:
rddUnion.saveAsTextFile('/Users/deborabastos/Documents/5.Bootcamp DS/Módulo2/bootcamp_code/')

Py4JJavaError: An error occurred while calling o3054.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/deborabastos/Documents/5.Bootcamp DS/Módulo2/bootcamp_code already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1599)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1599)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1585)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1585)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
