<a href="https://colab.research.google.com/github/PabloPoti/RepositorioEdem20-21/blob/main/00_Introduction_to_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prerrequisites

Installing Spark

---



In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip -q install findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

Starting Spark Session and print the version


---


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.2.0'

Creating tunnel</br>
**To Check the Spark UI, open the URL printed by running the above command : https://######/jobs/, /SQL/**


In [5]:
 from google.colab.output import eval_js
 print(eval_js("google.colab.kernel.proxyPort(4040)") + "jobs/")

https://du5d0dgk6an-496ff2e9c6d22116-4040-colab.googleusercontent.com/jobs/


# Descargar Datasets

In [6]:
!mkdir -p /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/frankenstein.txt -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/el_quijote.txt -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/characters.csv -P /dataset
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/data/planets.csv -P /dataset
!ls /dataset

characters.csv	el_quijote.txt	frankenstein.txt  planets.csv


# RDD

---



## Example 1

In [7]:
textFile = spark.sparkContext.textFile("/dataset/frankenstein.txt")
textFile.first()

'FRANKENSTEIN'

In [10]:
textFile

/dataset/frankenstein.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


Creation of paralelized collection de colecciones paralelizadas
This is a fast way to create a RDD:

## Example 2

In [9]:
distData = spark.sparkContext.parallelize([25, 20, 15, 10, 5])
distData.reduce(lambda x ,y: x + y)

75

## Exercise 1
Count the number of lines for `el_quijote.txt` file

---



In [13]:
textFile = spark.sparkContext.textFile("/dataset/el_quijote.txt")
textFile.count()

2186

## Exercise 2
Print the first line of the file `el_quijote.txt`

---



In [15]:
textFile.first()

'DON QUIJOTE DE LA MANCHA'

## Transformations and Actions in RDDs 

### Actions

### Example 3

In [None]:
print(textFile.count()) # Number of elements in RDD
print(textFile.first()) # First element in RDD

### Transformaciones

### Example 4

In [None]:
# ReduceByKey
lines = spark.sparkContext.textFile("/dataset/frankenstein.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b).cache()
counts.count()
counts.collect()

In [None]:
# SortByKey
sorted = counts.sortByKey()
sorted.collect()

### Example 5

In [None]:
# Filter

linesWithSpark = textFile.filter(lambda line: "the" in line)
linesWithSpark.count()

### Exercise 3
Get the word count for the file `frankenstein.txt`

---

In [17]:
lines = spark.sparkContext.textFile("/dataset/frankenstein.txt")
pairs = lines.flatMap(lambda a: a.split(" ")).map(lambda a: (a, 1))
counts = pairs.reduceByKey(lambda a, b: a + b).cache()
counts.collect()

[('FRANKENSTEIN', 1),
 ('', 3488),
 ('The', 256),
 ('Mary', 1),
 ('Wollstonecraft', 1),
 ('(Godwin)', 1),
 ('Shelley', 1),
 ('Letter', 4),
 ('1', 2),
 ('St.', 5),
 ('17—', 7),
 ('TO', 2),
 ('hear', 15),
 ('no', 146),
 ('disaster', 2),
 ('of', 2634),
 ('an', 208),
 ('enterprise', 1),
 ('have', 356),
 ('regarded', 6),
 ('evil', 16),
 ('forebodings.', 1),
 ('yesterday,', 1),
 ('task', 12),
 ('is', 296),
 ('assure', 4),
 ('sister', 7),
 ('in', 1072),
 ('success', 6),
 ('am', 114),
 ('already', 32),
 ('far', 38),
 ('as', 477),
 ('walk', 4),
 ('streets', 4),
 ('feel', 40),
 ('cold', 21),
 ('upon', 123),
 ('braces', 1),
 ('fills', 4),
 ('Do', 12),
 ('understand', 13),
 ('this', 298),
 ('feeling?', 1),
 ('travelled', 5),
 ('towards', 93),
 ('foretaste', 1),
 ('icy', 3),
 ('promise,', 2),
 ('daydreams', 2),
 ('become', 32),
 ('more', 149),
 ('vivid.', 1),
 ('try', 4),
 ('pole', 2),
 ('frost', 4),
 ('desolation;', 3),
 ('ever', 40),
 ('presents', 3),
 ('imagination', 14),
 ('region', 4),
 ('beau

### Exercise 4
Get TOP 10 of the words with more than 4 characters

---



In [19]:
lines = spark.sparkContext.textFile("/dataset/frankenstein.txt")

lines.flatMap(lambda line: line.split(" ")) \
    .filter (lambda word: len(word)>4)\
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda word: (word[1], word[0]))\
    .sortByKey(False)\
    .take(10)

[(540, 'which'),
 (187, 'could'),
 (177, 'would'),
 (174, 'their'),
 (152, 'should'),
 (130, 'these'),
 (122, 'before'),
 (107, 'might'),
 (105, 'myself'),
 (103, 'every')]

## Key/Value Pair RDD

---



### Example 6


---



In [None]:
charac_sw = spark.sparkContext.textFile("/dataset/characters.csv")
planets_sw = spark.sparkContext.textFile("/dataset/planets.csv")
charac_sw.take(10)

In [None]:
planets_sw.take(10)

In [None]:
from itertools import islice

charac_sw_noheader = charac_sw.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it)

planets_sw_noheader = planets_sw.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it)

### Exercise 5
Get a list of the population of the planet each Star Wars character belongs to

---
