<a href="https://colab.research.google.com/github/OseiasBeu/SparkFormation/blob/main/SparkFormation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Instalando o Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [4]:
# instalando a findspark
!pip install -q findspark

In [5]:
# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

In [6]:
# importando o pacote necessário para iniciar uma seção Spark
from pyspark.sql import SparkSession

# iniciando o spark context
sc = SparkSession.builder.master('local[*]').config("spark.executor.memory", "2g").config('spark.driver.memory', '1g').config('spark.driver.cores', '4').enableHiveSupport().getOrCreate()
# ss = SparkSession.builder.master('local').getOrCreate()

# Verificando se a sessão foi criada
sc

In [7]:
# importando os métodos com funções para transformações de variáveis
from pyspark.sql.functions import *
from pyspark.sql import *

SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)

<pyspark.sql.context.SQLContext at 0x7fc90573d610>

In [8]:
sc.sparkContext

# RDD's

In [9]:
numeros = sc.sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10])

In [10]:
numeros.take(5)

[1, 2, 3, 4, 5]

In [11]:
numeros.top(5)

[10, 9, 8, 7, 6]

In [12]:
numeros.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## Funções

In [13]:
numeros.count()

10

In [14]:
numeros.mean()

5.5

In [15]:
numeros.max()

10

In [16]:
numeros.min()

1

In [17]:
numeros.stdev()

2.8722813232690143

In [18]:
numeros.sum()

55

## Filtro

In [19]:
filtro = numeros.filter(lambda filtro: filtro > 2)

In [20]:
filtro.collect()

[3, 4, 5, 6, 7, 8, 9, 10]

In [21]:
amostra = numeros.sample(True, 0.5,1)

In [22]:
amostra.collect()

[2, 3, 4, 5, 8, 10, 10, 10, 10]

In [23]:
mapa = numeros.map(lambda mapa: mapa *2)

In [24]:
mapa.collect()

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

In [25]:
numeros.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## Join's

In [26]:
numeros2 = sc.sparkContext.parallelize([6,7,8,9,10])

In [27]:
uniao = numeros.union(numeros2)

In [28]:
uniao.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 6, 7, 8, 9, 10]

In [29]:
interseccao = numeros.intersection(numeros2)

In [30]:
interseccao.collect()

[8, 9, 6, 10, 7]

In [31]:
subtrai = numeros.subtract(numeros2)

In [32]:
subtrai.collect()

[4, 1, 5, 2, 3]

In [33]:
cartesiano = numeros.cartesian(numeros2)

In [34]:
cartesiano.collect()

[(1, 6),
 (1, 7),
 (2, 6),
 (2, 7),
 (3, 6),
 (3, 7),
 (4, 6),
 (4, 7),
 (5, 6),
 (5, 7),
 (1, 8),
 (1, 9),
 (2, 8),
 (2, 9),
 (3, 8),
 (3, 9),
 (4, 8),
 (4, 9),
 (5, 8),
 (5, 9),
 (1, 10),
 (2, 10),
 (3, 10),
 (4, 10),
 (5, 10),
 (6, 6),
 (6, 7),
 (7, 6),
 (7, 7),
 (8, 6),
 (8, 7),
 (9, 6),
 (9, 7),
 (10, 6),
 (10, 7),
 (6, 8),
 (6, 9),
 (7, 8),
 (7, 9),
 (8, 8),
 (8, 9),
 (9, 8),
 (9, 9),
 (10, 8),
 (10, 9),
 (6, 10),
 (7, 10),
 (8, 10),
 (9, 10),
 (10, 10)]

In [35]:
cartesiano.countByValue()

defaultdict(int,
            {(1, 6): 1,
             (1, 7): 1,
             (2, 6): 1,
             (2, 7): 1,
             (3, 6): 1,
             (3, 7): 1,
             (4, 6): 1,
             (4, 7): 1,
             (5, 6): 1,
             (5, 7): 1,
             (1, 8): 1,
             (1, 9): 1,
             (2, 8): 1,
             (2, 9): 1,
             (3, 8): 1,
             (3, 9): 1,
             (4, 8): 1,
             (4, 9): 1,
             (5, 8): 1,
             (5, 9): 1,
             (1, 10): 1,
             (2, 10): 1,
             (3, 10): 1,
             (4, 10): 1,
             (5, 10): 1,
             (6, 6): 1,
             (6, 7): 1,
             (7, 6): 1,
             (7, 7): 1,
             (8, 6): 1,
             (8, 7): 1,
             (9, 6): 1,
             (9, 7): 1,
             (10, 6): 1,
             (10, 7): 1,
             (6, 8): 1,
             (6, 9): 1,
             (7, 8): 1,
             (7, 9): 1,
             (8, 8): 1,
             (8,

## Compras

In [37]:
compras = sc.sparkContext.parallelize([(1,200),(2,300),(3,120),(4,250),(5,78)])

In [39]:
chaves = compras.keys()
chaves.collect()

[1, 2, 3, 4, 5]

In [40]:
valores = compras.values()
valores.collect()

[200, 300, 120, 250, 78]

In [41]:
compras.countByKey()

defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1})

In [42]:
soma = compras.mapValues(lambda soma: soma + 1)
soma.collect()

[(1, 201), (2, 301), (3, 121), (4, 251), (5, 79)]

In [43]:
compras.collect()

[(1, 200), (2, 300), (3, 120), (4, 250), (5, 78)]

In [44]:
debitos = sc.sparkContext.parallelize([(1,20),(2,300)])

In [45]:
resultado = compras.join(debitos)
resultado.collect()

[(1, (200, 20)), (2, (300, 300))]

In [46]:
semdebito = compras.subtractByKey(debitos)
semdebito.collect()

[(4, 250), (5, 78), (3, 120)]