from google.colab import drive
drive.mount('/content/drive')

# **Prática 1 - Configuração do ambiente**

In [13]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [14]:
!wget -q https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

In [15]:
!tar xf spark-3.4.1-bin-hadoop3.tgz

In [17]:
!pip install -q findspark

In [16]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [18]:
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# **Prática 2 - Começando a trabalhar com o Spark**

Testando se o pacote "findspark" foi instalado corretamente

importação de dados

In [34]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

In [35]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [36]:
df = spark.sql("select 'spark' as hello")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [37]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

In [38]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [39]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

In [40]:
spark.stop()

# **Prática 2.1 - Convertendo Pandas DataFrame para Spark DataFrame**

In [41]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

In [42]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [43]:
import pandas as pd

In [44]:
import numpy as np

In [45]:
media = 0
desvio_padrao=0.1
pd_temporario = pd.DataFrame(np.random.normal(media,desvio_padrao,100))
spark_temporario = spark.createDataFrame(pd_temporario)

In [46]:
print(spark.catalog.listTables())

[]


In [47]:
spark_temporario.createOrReplaceTempView("nova_tabela_temporaria")

In [48]:
print(spark.catalog.listTables())

[Table(name='nova_tabela_temporaria', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [49]:
spark.stop()

# Práticas com MapReduce

Exemplo 1

In [19]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

In [20]:
import numpy as np

In [21]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [22]:
vetor = np.array([10, 20, 30, 40, 50])

In [23]:
paralelo = spark_contexto.parallelize(vetor)

In [24]:
mapa = paralelo.map(lambda x : x**2+x)

In [25]:
mapa.collect()

[110, 420, 930, 1640, 2550]

Exemplo 2

In [29]:
paralelo = spark_contexto.parallelize(["distribuída", "distribuída", "spark", "RDD", "spark", "spark"])

In [30]:
funcao_lambda = lambda x : (x,1)

In [31]:
from operator import add
mapa = paralelo.map(funcao_lambda).reduceByKey(add).collect()

In [32]:
for (w, c) in mapa:
  print("{}: {}".format(w, c))

distribuída: 2
spark: 3
RDD: 1


In [33]:
spark_contexto.stop()

Atividade

In [50]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

In [51]:
import numpy as np

In [52]:
from pyspark import SparkContext
sc = SparkContext()

In [65]:
vetor = np.array([10, 20, 30, 40, 50])

In [66]:
paralelo = sc.parallelize(vetor)

In [67]:
mapa = paralelo.map(lambda x : x**4-10*x**2+3)

In [68]:
mapa.collect()

[9003, 156003, 801003, 2544003, 6225003]

In [69]:
sc.stop()

# Prática 2 - Práticas com MapReduce

In [71]:
from pyspark import SparkContext
sc = SparkContext()

In [72]:
lista = [1, 2, 3, 4, 5, 3]

In [73]:
lista_rdd = sc.parallelize(lista)

In [74]:
lista_rdd.count()

6

In [75]:
par_ordenado = lambda numero: (numero, numero*10)

In [76]:
lista_rdd.flatMap(par_ordenado).collect()

[1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 3, 30]

In [77]:
lista_rdd.map(par_ordenado).collect()

[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (3, 30)]

In [78]:
sc.stop()