<a href="https://colab.research.google.com/github/Leandro-Braga/Estudo_de_Caso/blob/main/estudo_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
!pip install -q pyspark

[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[K     |████████████████████████████████| 198 kB 38.1 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [6]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.1.2-bin-hadoop3.2'

In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

Lendo um dataset já exitente no spark

In [9]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

Exibir o dataset no spark de exemplo

In [10]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



Finalizar a seção do spark

In [11]:
spark.stop()

cluster Spark do PySpark e instância da classe SparkContext

In [12]:
from pyspark import SparkContext
spark_contexto = SparkContext() 
print(spark_contexto)           
print(spark_contexto.version)

<SparkContext master=local[*] appName=pyspark-shell>
3.1.2


criar uma seção no Spark (SparkSession)

In [13]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate() # Create my_spark
print(spark) # Print my_spark

<pyspark.sql.session.SparkSession object at 0x7fb4bddfb890>


In [14]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

In [15]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

In [16]:
dataset.count()

3000

In [17]:
dataset.createOrReplaceTempView('tabela_temporaria')
print(spark.catalog.listTables())

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [18]:
query = 'FROM tabela_temporaria SELECT longitude, latitude LIMIT 3'  
saida = spark.sql(query)  
saida.show() 

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



Converter o DataFrame do Spark para o DataFrame do Pandas.

In [19]:
query1 = 'SELECT MAX(total_rooms) as maximo_quartos FROM tabela_temporaria'
q_maximo_quartos = spark.sql(query1)
pd_maximo_quartos = q_maximo_quartos.toPandas()
print('A quantidade máxima de quartos é: {}'.format(pd_maximo_quartos['maximo_quartos']))
qtd_maximo_quartos = int(pd_maximo_quartos.loc[0,'maximo_quartos'])

A quantidade máxima de quartos é: 0    30450.0
Name: maximo_quartos, dtype: float64


Executar o SQL no Spark e obter o resultado no DataFrame do Spark.

In [25]:
query2 = f'SELECT longitude, latitude FROM tabela_temporaria WHERE total_rooms = {str(qtd_maximo_quartos)}'
localizacao_maximo_quartos = spark.sql(query2)
pd_localizacao_maximo_quartos = localizacao_maximo_quartos.toPandas()
print(pd_localizacao_maximo_quartos.head())

   longitude  latitude
0     -117.2     33.58


exemplo que converte um DataFrame do Pandas para um DataFrame no Spark

In [26]:
import pandas as pd
import numpy as np
media = 0
desvio_padrao=0.1 
pd_temporario = pd.DataFrame(np.random.normal(media,desvio_padrao,100))
spark_temporario = spark.createDataFrame(pd_temporario)
print(spark.catalog.listTables())
spark_temporario.createOrReplaceTempView('nova_tabela_temporaria')
print(spark.catalog.listTables())

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='nova_tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


finalizar o spark

In [28]:
spark.stop()

MapReduce

In [29]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [30]:
vetor = np.array([10, 20, 30, 40, 50])

Agora vamos criar um RDD por meio de um SparkContext com o seguinte código

In [31]:
paralelo = spark_contexto.parallelize(vetor)

In [32]:
print(paralelo)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


Essa saída indica que criamos o RDD com sucesso. Logo, o vetor já está no Spark e podemos aplicar o Mapeamento.

In [33]:
mapa = paralelo.map(lambda x : x**2+x)

lambda x : x**2+x é uma função lambda que corresponde à função matemática que queremos aplicar aos elementos da entrada de dados. O código paralelo.map faz o mapeamento da função para cada elemento da entrada.

In [34]:
mapa.collect()

[110, 420, 930, 1640, 2550]

Nesse exemplo, o objetivo é contar a frequência das palavras de uma lista. Então, o primeiro passo é entrar com uma lista de palavras, conforme o código abaixo:

In [35]:
paralelo = spark_contexto.parallelize(['distribuida', 'distribuida', 'spark', 'rdd', 'spark','spark'])

Com isso, a variável “paralelo” faz referência ao RDD. O próximo passo é implementar uma função lambda que simplesmente associa o número “1” a uma palavra.

In [36]:
funcao_lambda = lambda x:(x,1)

# a função recebe uma variável “x” e vai produzir o par (x, 1).

In [37]:
#  vamos aplicar o MapReduce
from operator import add
mapa = paralelo.map(funcao_lambda).reduceByKey(add).collect()

# Na primeira linha, importamos o operador “add” que será usado para somar as ocorrências das palavras mapeadas.
# Já na segunda linha, muitas coisas estão ocorrendo:
""" 1 Fazemos o mapeamento dos dados da variável “paralelo” para a função lambda, ou seja, para cada palavra criamos um par (palavra, 1).
    2 Em seguida, aplicamos a redução com a função “reduceKey”, que soma as ocorrências e as agrupa pela chave, no caso, pelas palavras.
    3 Na etapa final, dada pela função “collect”, fazemos a coleta dos dados em uma lista que chamamos de “mapa”."""

' 1 Fazemos o mapeamento dos dados da variável “paralelo” para a função lambda, ou seja, para cada palavra criamos um par (palavra, 1).\n    2 Em seguida, aplicamos a redução com a função “reduceKey”, que soma as ocorrências e as agrupa pela chave, no caso, pelas palavras.\n    3 Na etapa final, dada pela função “collect”, fazemos a coleta dos dados em uma lista que chamamos de “mapa”.'

Esse código percorre a lista “mapa” e imprime cada par formado pela palavra e sua respectiva ocorrência.

In [38]:
for (w, c) in mapa:
    print('{}: {}'.format(w, c))

distribuida: 2
spark: 3
rdd: 1


In [39]:
spark_contexto.stop()

desenvolver um exemplo que envolve as operações de transformações e de ações.

In [40]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [42]:
lista = [1, 2, 3, 4, 5, 3]
lista_rdd = spark_contexto.parallelize(lista)

In [43]:
#  contar os elementos do RDD.
lista_rdd.count()

6

vamos criar uma função lambda que recebe um número como parâmetro e retorna um par formador pelo número do parâmetro e pelo mesmo número multiplicado por 10

In [44]:
par_ordenado = lambda numero: (numero, numero*10)

Nesse próximo passo, vamos aplicar a transformação “flatMap” com a ação “collect” da função lambda para a “lista_rdd”:

In [45]:
lista_rdd.flatMap(par_ordenado).collect()

[1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 3, 30]

vamos aplicar a transformação “map” com a ação “collect” da função lambda para a “lista_rdd”:

In [46]:
lista_rdd.map(par_ordenado).collect()

[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (3, 30)]

In [47]:
spark_contexto.stop()