<a href="https://colab.research.google.com/github/alexbdla/spark/blob/main/spark_basic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1 - Configurações:


Essa parte de configurações deverá ser carregada sempre antes de executarmos os demais códigos.

In [1]:
# Instalando o Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Instalando Apache Spark com Hadoop:
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

Descompactando o arquivo baixado na célula anterior:

In [3]:
!tar xf spark-3.2.4-bin-hadoop3.2.tgz

### Instalando FindSpark e PySpark:

- __PySpark__ -> interface do Python para o Apache Spark (API para trabalhar com Python)
- __FindSpark__ -> adiciona o PySpark ao sys.path (caminho do sistema) em tempo de execução

In [4]:
!pip install -q findspark

In [5]:
!pip install -q pyspark


### Configurando as variáveis de ambiente:

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

### FindSpark
FindSpark é uma biblioteca Python que permite aos usuários conectar seu programa Python ao Apache Spark. Ele é projetado para ajudar a configurar as variáveis de ambiente necessárias e inicializar o SparkContext. O FindSpark pode ser instalado usando o pip, um gerenciador de pacotes Python. Ele é particularmente útil quando se trabalha com o Spark em um ambiente de desenvolvimento local ou em um notebook Jupyter. Com o FindSpark, os usuários podem escrever aplicativos Spark em Python, incluindo PySpark e SparkSQL. Além disso, o FindSpark também ajuda a configurar a integração com outras bibliotecas populares de ciência de dados em Python, como Pandas, Numpy e Matplotlib.

In [7]:
import findspark

# Iniciando o pacote:
findspark.init();

# 2 - Começando a trabalhar com o Spark

## Importação de dados

In [8]:
from pyspark import SparkContext
spark_contexto = SparkContext() 
print(spark_contexto)           
print(spark_contexto.version)

<SparkContext master=local[*] appName=pyspark-shell>
3.2.4


In [9]:
from pyspark.sql import SparkSession

# Criar/obter uma seção do Spark na máquina local, também cria o contexto
# spark = SparkSession.builder.master("local[*]").getOrCreate(); 

spark = SparkSession.builder.getOrCreate();
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fcc00ffac10>


In [10]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv', inferSchema=True, header=True);

In [11]:
dataset.printSchema();

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [12]:
# Primeira linha desse conjunto de dados:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

In [13]:
# Quantidade de linhas:
dataset.count()

3000

In [14]:
# Criando uma tabela temporária com os dados do Dataset:
dataset.createOrReplaceTempView("tabela_temporaria")

In [15]:
# Imprimindo as tabelas no catálogo:
print(spark.catalog.listTables())

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [16]:
# Consulta SQL (3 registros com os dados referentes às colunas de latitude e longitude):
query = "FROM tabela_temporaria SELECT longitude, latitude LIMIT 3"
saida = spark.sql(query)
saida.show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



## Convertendo Spark SQL para Pandas:

In [17]:
# Faz uma consulta SQL na tabela temporária para encontrar o valor máximo da coluna "total_rooms" na "tabela_temporaria" e renomeia esse valor máximo para "maximo_quartos":
query1 = "SELECT MAX(total_rooms) as maximo_quartos FROM tabela_temporaria"

In [18]:
# Executa a consulta SQL no Spark, obtendo um DataFrame do Spark:
q_maximo_quartos = spark.sql(query1)
# Converte o resultado para um DataFrame do Pandas:
pd_maximo_quartos = q_maximo_quartos.toPandas()
# Converte o valor do DataFrame para um inteiro:
qtd_maximo_quartos = int(pd_maximo_quartos.loc[0, 'maximo_quartos'])
# Imprime o resultado da consulta:
print('A quantidade máxima de quartos é: {}'.format(qtd_maximo_quartos))

A quantidade máxima de quartos é: 30450


In [19]:
# Localizando o bloco residencial com o maior nº de quartos:
query2 = "SELECT longitude, latitude FROM tabela_temporaria WHERE total_rooms = " + str(qtd_maximo_quartos)
localizacao_maximo_quartos = spark.sql(query2)
pd_localizacao_maximo_quartos = localizacao_maximo_quartos.toPandas()
print(pd_localizacao_maximo_quartos.head())

   longitude  latitude
0     -117.2     33.58


## Processo de conversão:

In [20]:
import numpy as np
import pandas as pd
media = 0 
desvio_padrao = 0.1
# Cria um DataFrame do Pandas com 100 valores, no formato de uma distribuição normal com média 0 e desvio padrão 0.1:
pd_temporario = pd.DataFrame(np.random.normal(media, desvio_padrao, 100))
# Transforma o DataFrame do Pandas em um DataFrame do Spark:
spark_temporario = spark.createDataFrame(pd_temporario)
print(spark.catalog.listTables())

  for column, series in pdf.iteritems():


[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [21]:
spark_temporario.createOrReplaceTempView("nova_tabela_temporaria")
print(spark.catalog.listTables())

[Table(name='nova_tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [22]:
# Fechando a seção do Spark: 
spark.stop()

# 3 - MapReduce:

MapReduce é um modelo de programação usado para processamento distribuído de grandes conjuntos de dados em um cluster de computadores. Ele foi criado pelo Google e é amplamente usado no processamento de grandes conjuntos de dados em vários ambientes, incluindo Hadoop, Spark e outros sistemas de processamento de Big Data.


---



O MapReduce funciona em duas fases principais: a fase de mapeamento (map) e a fase de redução (reduce). Na fase de mapeamento, os dados são divididos em partes menores e independentes, que são processadas em paralelo por diferentes nós do cluster. Em seguida, os resultados são combinados e ordenados por chave. Na fase de redução, os dados são processados novamente e os resultados são consolidados.

O MapReduce é eficiente para lidar com grandes conjuntos de dados, pois o processamento pode ser distribuído em vários nós do cluster, o que permite que os dados sejam processados ​​em paralelo. Ele é usado em várias aplicações de Big Data, incluindo análise de dados, mineração de dados, aprendizado de máquina e processamento de linguagem natural, entre outras.

### Exemplo 1:

In [23]:
import numpy as np

In [24]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [25]:
vetor = np.array([10, 20, 30, 40, 50])

In [26]:
paralelo = spark_contexto.parallelize(vetor)

In [27]:
print(paralelo)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


In [28]:
mapa = paralelo.map(lambda x : x ** 2 + x)

In [29]:
mapa.collect()

[110, 420, 930, 1640, 2550]

### Exemplo 2:

In [30]:
paralelo = spark_contexto.parallelize(["distribuida", "distribuida", "spark", "rdd", "spark", "spark"])

In [31]:
funcao_lambda = lambda x : (x, 1)

In [32]:
from operator import add
mapa = paralelo.map(funcao_lambda).reduceByKey(add).collect()

In [33]:
for(w, c) in mapa:
  print("{}: {}".format(w, c))

distribuida: 2
spark: 3
rdd: 1


In [34]:
spark_contexto.stop()

### Exemplo 3:

In [35]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [36]:
vetor = np.array([10, 20, 30, 40, 50])

In [37]:
paralelo = spark_contexto.parallelize(vetor)

In [38]:
from operator import add

In [39]:
mapa = paralelo.map(lambda x : x ** 4 - 10 * x ** 2 + 3).reduce(lambda a, b: a + b)
mapa

9735015

In [40]:
spark_contexto.stop()

### Exemplo 4:

In [41]:
from pyspark import SparkContext

sc = SparkContext('local', 'MapReduce')

In [42]:
lista = [1, 2, 3, 4, 5, 3]
lista_rdd = sc.parallelize(lista)

In [43]:
lista_rdd.count()

6

In [44]:
par_ordenado = lambda numero : (numero, numero * 10)

`map` -> função que permite aplicar uma transformação em cada elemento do array resultando em um novo array.

In [45]:
lista_rdd.map(par_ordenado).collect()

[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (3, 30)]

`flatMap` ->  função flatMap realiza um map de uma função sobre uma coleção de dados, porém achatando o resultado final em um nível, isto é, retornando um array de uma dimensão apenas.

In [46]:
lista_rdd.flatMap(par_ordenado).collect()

[1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 3, 30]

In [47]:
sc.stop()

### Exemplo 5:

In [48]:
from pyspark import SparkContext

sc = SparkContext('local', 'MapReduce')

In [49]:
lista = [1, 2, 3, 4, 5, 3]
rdd = sc.parallelize(lista)

In [50]:
rdd.map(lambda x : (x, x ** 2)).collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (3, 9)]

In [51]:
sc.stop()