Configuração do Spark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
!tar xf spark-3.0.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

Introdução ao Spark

In [3]:
# Criando uma sessão no Spark
# Este comando cria uma nova sessão no spark.
spark = SparkSession.builder.getOrCreate()


# Criando um dataFrame no Spark
from datetime import datetime, date # Biblioteca para trabalhar com datas no python
import pandas as pd
from pyspark.sql import Row

df_rdd = spark.createDataFrame([
 Row(a=1, b=2, c='Ceará', d = date(2010,1,2), e = datetime(2018,1,1,12,0)),  # Cria uma linha com as colunas a,b,c,d,e no dataframe
 Row(a=1, b=4, c='Paraiba', d = date(2021,1,2), e = datetime(2013,1,1,12,0)),  # Cria uma linha com as colunas a,b,c,d,e no dataframe
 Row(a=2, b=4, c='Pernambuco', d = date(2013,1,2), e = datetime(2019,1,1,12,0)),  # Cria uma linha com as colunas a,b,c,d,e no dataframe
])
# O fato de você criar um dataframe dentro do spark significa dizer que ele já é um Rdd

In [None]:
df_rdd.show() # Exibe o dataframe

+---+---+----------+----------+-------------------+
|  a|  b|         c|         d|                  e|
+---+---+----------+----------+-------------------+
|  1|  2|     Ceará|2010-01-02|2018-01-01 12:00:00|
|  1|  4|   Paraiba|2021-01-02|2013-01-01 12:00:00|
|  2|  4|Pernambuco|2013-01-02|2019-01-01 12:00:00|
+---+---+----------+----------+-------------------+



Criando um spark dataframe a partir de um pandas dataframe

In [4]:
pandas_df = pd.DataFrame({'a':[1,2,3],
                          'b':[23,1,5],
                          'c':['string1', 'string2', 'string3'],
                          'd':[date(2013,1,2),date(2013,1,23),date(2012,3,2)],
                          'e':[datetime(2019,1,1,12,0),datetime(2019,1,1,12,0),datetime(2019,1,1,12,0)]})

pandas_df

Unnamed: 0,a,b,c,d,e
0,1,23,string1,2013-01-02,2019-01-01 12:00:00
1,2,1,string2,2013-01-23,2019-01-01 12:00:00
2,3,5,string3,2012-03-02,2019-01-01 12:00:00


In [5]:
# Convertendo o pandas df para um spark df
sp_df_convertido = spark.createDataFrame(pandas_df)

sp_df_convertido.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1| 23|string1|2013-01-02|2019-01-01 12:00:00|
|  2|  1|string2|2013-01-23|2019-01-01 12:00:00|
|  3|  5|string3|2012-03-02|2019-01-01 12:00:00|
+---+---+-------+----------+-------------------+



In [6]:
# Realizando a leitura do arquivo municipios_do_Brasil.csv
# A leitura é feita pelo pandas e depois é realizada a conversão para o spark

# Nos próximos exemplos faremos direto o parse

# municipios_df = pd.read_csv('municipios_do_Brasil.csv', sep=';')
# rdd_municipios = spark.createDataFrame(municipios_df)

rdd_municipios = spark.createDataFrame(pd.read_csv('municipios_do_Brasil.csv', error_bad_lines=False))

rdd_municipios.show()

b'Skipping line 4320: expected 5 fields, saw 7\nSkipping line 5105: expected 5 fields, saw 6\n'


+---+--------------------+-------------------+------------------+-----+
| uf|              cidade|                lat|               lng| cond|
+---+--------------------+-------------------+------------------+-----+
| AC|          Acrelândia|          -9.825808|        -66.897166|false|
| AC|        Assis Brasil|        -10.9409203|       -69.5672108|false|
| AC|           Brasiléia|-11.001276400000002|       -68.7487974|false|
| AC|              Bujari|         -9.8309656|       -67.9520886|false|
| AC|            Capixaba|-10.572968300000001|-67.67608940000001|false|
| AC|     Cruzeiro do Sul| -7.630795599999999|-72.67038690000001|false|
| AC|      Epitaciolândia|        -11.0289439|       -68.7411519|false|
| AC|               Feijó|         -8.1639128|       -70.3541781|false|
| AC|              Jordão|         -9.4338167|       -71.8843997|false|
| AC|         Mâncio Lima| -7.613777499999999|       -72.8964167|false|
| AC|       Manoel Urbano| -8.838942800000002|-69.26012920000001

In [7]:
type(rdd_municipios)

pyspark.sql.dataframe.DataFrame

In [None]:
rdd_municipios.show(1) # exibe a quantidade de linhas que você informar

+---+----------+---------+----------+-----+
| uf|    cidade|      lat|       lng| cond|
+---+----------+---------+----------+-----+
| AC|Acrelândia|-9.825808|-66.897166|false|
+---+----------+---------+----------+-----+
only showing top 1 row



In [None]:
rdd_municipios.show(2, vertical=True) # Muda a formatação da exibição da apresentação

-RECORD 0--------------
 uf     | AC           
 cidade | Acrelândia   
 lat    | -9.825808    
 lng    | -66.897166   
 cond   | false        
-RECORD 1--------------
 uf     | AC           
 cidade | Assis Brasil 
 lat    | -10.9409203  
 lng    | -69.5672108  
 cond   | false        
only showing top 2 rows



In [None]:
rdd_municipios.columns # Exibe apenas as colunas do df

['uf', 'cidade', 'lat', 'lng', 'cond']

In [None]:
rdd_municipios.select('lat', 'lng').describe().show() # describe das colunas, pega as estatísticas das colunas

+-------+-------------------+------------------+
|summary|                lat|               lng|
+-------+-------------------+------------------+
|  count|               5561|              5561|
|   mean|-127.00429727887501|-291.8346017817909|
| stddev| 1550.2988359047768|3380.0524683381946|
|    min|           -29345.0|          -51835.0|
|    max|             3843.0|            54.107|
+-------+-------------------+------------------+



Convertendo um df do spark para um df do pandas

In [None]:
df_pandas_municipio = rdd_municipios.toPandas()
df_pandas_municipio

Unnamed: 0,uf,cidade,lat,lng,cond
0,AC,Acrelândia,-9.825808,-66.897166,False
1,AC,Assis Brasil,-10.940920,-69.567211,False
2,AC,Brasiléia,-11.001276,-68.748797,False
3,AC,Bujari,-9.830966,-67.952089,False
4,AC,Capixaba,-10.572968,-67.676089,False
...,...,...,...,...,...
5556,TO,Talismã,-12.794879,-49.089582,False
5557,TO,Tocantínia,-9.563198,-48.374104,False
5558,TO,Tocantinópolis,-6.324474,-47.422400,False
5559,TO,Tupirama,-8.971676,-48.188324,False


Trabalhando com colunas no spark

In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper


# Criando uma nova coluna no rdd do spark
# Ele irá pegar a coluna cidade e tranformar todos os registros dele em uppercase criando assim uma nova coluna
rdd_municipios.withColumn('cidade_maiuscula', upper(rdd_municipios.cidade)).show()

# Lembre-se o rdd é imutável, assim nesse caso esse método estará criando um novo rdd e não incluindo no que já existe

+---+--------------------+-------------------+------------------+-----+--------------------+
| uf|              cidade|                lat|               lng| cond|    cidade_maiuscula|
+---+--------------------+-------------------+------------------+-----+--------------------+
| AC|          Acrelândia|          -9.825808|        -66.897166|false|          ACRELÂNDIA|
| AC|        Assis Brasil|        -10.9409203|       -69.5672108|false|        ASSIS BRASIL|
| AC|           Brasiléia|-11.001276400000002|       -68.7487974|false|           BRASILÉIA|
| AC|              Bujari|         -9.8309656|       -67.9520886|false|              BUJARI|
| AC|            Capixaba|-10.572968300000001|-67.67608940000001|false|            CAPIXABA|
| AC|     Cruzeiro do Sul| -7.630795599999999|-72.67038690000001|false|     CRUZEIRO DO SUL|
| AC|      Epitaciolândia|        -11.0289439|       -68.7411519|false|      EPITACIOLÂNDIA|
| AC|               Feijó|         -8.1639128|       -70.3541781|false

In [None]:
# Realizando filtros no DF
rdd_municipios.filter(rdd_municipios.uf == 'CE').show()

+---+-----------------+-------------------+-------------------+-----+
| uf|           cidade|                lat|                lng| cond|
+---+-----------------+-------------------+-------------------+-----+
| CE|          Abaiara| -7.345878999999999|          -39.04159|false|
| CE|          Acarape|           -4.22083|         -38.705532|false|
| CE|           Acaraú|          -2.887689|         -40.118306|false|
| CE|         Acopiara|          -6.089114|         -39.448042|false|
| CE|           Aiuaba|         -6.5737801|        -40.1269838|false|
| CE|       Alcântaras|          -3.585369|         -40.547867|false|
| CE|        Altaneira|          -6.988047|         -39.748055|false|
| CE|       Alto Santo|          -5.508941|         -38.274318|false|
| CE|         Amontada|-3.3601660000000004|-39.828815999999996|false|
| CE|Antonina do Norte|          -6.769193|          -39.98697|false|
| CE|         Apuiarés|          -3.945057|          -39.43593|false|
| CE|          Aquir

In [None]:
# Realizando agrupamento de Uf e calculando a média de longitude e latitude.
rdd_municipios.groupBy('uf').avg().show()

+---+-------------------+-------------------+
| uf|           avg(lat)|           avg(lng)|
+---+-------------------+-------------------+
| SC| -27.14853438501707|  -50.6527320686007|
| RO| -6.103667727835822| -44.46219914432836|
| PI| -5.906995162424108| -37.30347918883929|
| AM| -2.905133511032257| -48.44973783870968|
| GO| -90.78883155813004| -48.87683853373987|
| TO| -8.100870098102193| -40.28681757985401|
| MT|-10.531815782978729| -42.78672166879435|
| SP| -201.1889953021582|-478.05384353708087|
| ES| -19.89423848717949| -40.82512100000001|
| PB| -5.221764408071748| -186.6731234367713|
| RS| -88.14581654117175|-52.439766316606054|
| MS|-17.520789366666666|-45.248810126410255|
| AL|  -7.73761957647059|-29.657702503921566|
| MG| -470.7169663895661|   -964.60788139742|
| PA|-2.4628557321678324|-383.67296369020977|
| BA|-10.668882180239805|  -33.3799907658753|
| SE|-10.619186786799999| -37.27485500573334|
| PE|  -49.0512529936378| -27.54312291329731|
| CE|-3.9544738636956502| -245.232

Salvando dados com o Spark

In [None]:
# Trabalhando com SQL

#Converte o rdd numa tabela
rdd_municipios.createOrReplaceTempView('tableA')

# Executa uma instrução sql na tabela que você criou
print(spark.sql('select count(*) from tableA').show())

print(spark.sql('select * from tableA').show())

print(spark.sql('select distinct(uf) from tableA').show())

+--------+
|count(1)|
+--------+
|    5561|
+--------+

None
+---+--------------------+-------------------+------------------+-----+
| uf|              cidade|                lat|               lng| cond|
+---+--------------------+-------------------+------------------+-----+
| AC|          Acrelândia|          -9.825808|        -66.897166|false|
| AC|        Assis Brasil|        -10.9409203|       -69.5672108|false|
| AC|           Brasiléia|-11.001276400000002|       -68.7487974|false|
| AC|              Bujari|         -9.8309656|       -67.9520886|false|
| AC|            Capixaba|-10.572968300000001|-67.67608940000001|false|
| AC|     Cruzeiro do Sul| -7.630795599999999|-72.67038690000001|false|
| AC|      Epitaciolândia|        -11.0289439|       -68.7411519|false|
| AC|               Feijó|         -8.1639128|       -70.3541781|false|
| AC|              Jordão|         -9.4338167|       -71.8843997|false|
| AC|         Mâncio Lima| -7.613777499999999|       -72.8964167|false|
| A

Exercício da aula
1 - Montar uma lista dos municípios por estado
2 - Fazer uma contagem de municípios por estado

In [None]:
rdd_municipios.select(['uf', 'cidade']).orderBy('uf').show()

+---+--------------------+
| uf|              cidade|
+---+--------------------+
| AC|    Senador Guiomard|
| AC|Marechal Thaumaturgo|
| AC|      Sena Madureira|
| AC|      Epitaciolândia|
| AC|       Manoel Urbano|
| AC|        Porto Walter|
| AC| Santa Rosa do Purus|
| AC|           Brasiléia|
| AC|     Cruzeiro do Sul|
| AC|              Jordão|
| AC|         Mâncio Lima|
| AC|   Plácido de Castro|
| AC|          Porto Acre|
| AC|          Rio Branco|
| AC|     Rodrigues Alves|
| AC|          Acrelândia|
| AC|        Assis Brasil|
| AC|              Bujari|
| AC|            Capixaba|
| AC|               Feijó|
+---+--------------------+
only showing top 20 rows



In [None]:
rdd_municipios.groupBy('uf').count().show()

+---+-----+
| uf|count|
+---+-----+
| SC|  293|
| RO|   67|
| PI|  224|
| AM|   62|
| GO|  246|
| TO|  137|
| MT|  141|
| SP|  644|
| ES|   78|
| PB|  223|
| RS|  495|
| MS|   78|
| AL|  102|
| MG|  853|
| PA|  143|
| BA|  417|
| SE|   75|
| PE|  185|
| CE|  184|
| RN|  167|
+---+-----+
only showing top 20 rows



Resolução do Professor

In [None]:
# Cria um novo contexto do spark e faz a leitura do arquivo pelo próprio spark
lines = spark.sparkContext.textFile('municipios_do_Brasil.csv')
lines

municipios_do_Brasil.csv MapPartitionsRDD[91] at textFile at NativeMethodAccessorImpl.java:0

In [None]:
def mapfun(x):
  return [x[0], x[1]]

# Aplica um mapeamento com lambda
# Ou seja, este método nos retorna um map
data = lines.map(lambda line:line.split(',')).map(mapfun)
data.sortByKey()
data.take(20)



[['uf', 'cidade'],
 ['AC', 'Acrelândia'],
 ['AC', 'Assis Brasil'],
 ['AC', 'Brasiléia'],
 ['AC', 'Bujari'],
 ['AC', 'Capixaba'],
 ['AC', 'Cruzeiro do Sul'],
 ['AC', 'Epitaciolândia'],
 ['AC', 'Feijó'],
 ['AC', 'Jordão'],
 ['AC', 'Mâncio Lima'],
 ['AC', 'Manoel Urbano'],
 ['AC', 'Marechal Thaumaturgo'],
 ['AC', 'Plácido de Castro'],
 ['AC', 'Porto Acre'],
 ['AC', 'Porto Walter'],
 ['AC', 'Rio Branco'],
 ['AC', 'Rodrigues Alves'],
 ['AC', 'Santa Rosa do Purus'],
 ['AC', 'Sena Madureira']]

In [None]:
# Mesma ideia do mapeiaObjetos
counts = data.groupByKey().map(lambda x:(x[0], list(x[1])))
print(counts.take(20))

count_municipio = counts.map(lambda tup:(tup[0], len(tup[1])))
count_municipio.take(20)

[('AC', ['Acrelândia', 'Assis Brasil', 'Brasiléia', 'Bujari', 'Capixaba', 'Cruzeiro do Sul', 'Epitaciolândia', 'Feijó', 'Jordão', 'Mâncio Lima', 'Manoel Urbano', 'Marechal Thaumaturgo', 'Plácido de Castro', 'Porto Acre', 'Porto Walter', 'Rio Branco', 'Rodrigues Alves', 'Santa Rosa do Purus', 'Sena Madureira', 'Senador Guiomard', 'Tarauacá', 'Xapuri']), ('AP', ['Amapá', 'Calçoene', 'Cutias', 'Ferreira Gomes', 'Itaubal', 'Laranjal do Jari', 'Macapá', 'Mazagão', 'Oiapoque', 'Pedra Branca do Amapari', 'Porto Grande', 'Pracuúba', 'Santana', 'Serra do Navio', 'Tartarugalzinho', 'Vitória do Jari']), ('BA', ['Abaíra', 'Abaré', 'Acajutiba', 'Adustina', 'Água Fria', 'Aiquara', 'Alagoinhas', 'Alcobaça', 'Almadina', 'Amargosa', 'Amélia Rodrigues', 'América Dourada', 'Anagé', 'Andaraí', 'Andorinha', 'Angical', 'Anguera', 'Antas', 'Antônio Cardoso', 'Antônio Gonçalves', 'Aporá', 'Apuarema', 'Araças', 'Aracatu', 'Araci', 'Aramari', 'Arataca', 'Aratuípe', 'Aurelino Leal', 'Baianópolis', 'Baixa Grande'

[('AC', 22),
 ('AP', 16),
 ('BA', 417),
 ('CE', 184),
 ('DF', 1),
 ('ES', 78),
 ('MG', 853),
 ('PE', 185),
 ('PR', 399),
 ('RJ', 92),
 ('RN', 167),
 ('SC', 293),
 ('SP', 645),
 ('TO', 137),
 ('uf', 1),
 ('AL', 102),
 ('AM', 62),
 ('GO', 246),
 ('MA', 217),
 ('MS', 78)]