In [1]:
#instalação do PySpark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=f83d19981244f6476251f25990d2e917a420e3201b097fd5089a402eff5c85ea
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
#importação das bibliotecas
import pyspark #biblioteca principal
from pyspark.sql import SparkSession # biblioteca para a criação de uma sessão Spark

In [6]:
# criação de uma sessão spark
spark = SparkSession.builder.appName("Word Count").master('local[*]').getOrCreate()

# criação do SparkContext (conexão entre o Driver e Workers)
sc = spark.sparkContext

In [7]:
rdd = sc.textFile("domcasmurro.txt")

In [10]:
rdd.take(10)

['DOM CASMURRO',
 '',
 'POR',
 '',
 'MACHADO DE ASSIS',
 '',
 'I',
 '',
 'Do titulo.',
 '']

In [13]:
# transformar cada linha (ou seja, cada item da "lista"), em um conjunto de palavras
palavras = rdd.flatMap(lambda x: x.lower().replace(".","").replace(",","").split(" "))
palavras.take(20)

['dom',
 'casmurro',
 '',
 'por',
 '',
 'machado',
 'de',
 'assis',
 '',
 'i',
 '',
 'do',
 'titulo',
 '',
 'uma',
 'noite',
 'destas',
 'vindo',
 'da',
 'cidade']

In [14]:
# remoção de vazios
palavras = palavras.filter(lambda x: x!="")
palavras.take(10)

['dom',
 'casmurro',
 'por',
 'machado',
 'de',
 'assis',
 'i',
 'do',
 'titulo',
 'uma']

In [19]:
# contagem da qtde de ocorrencias utilizando o countByValue
contagem = palavras.countByValue()
sorted(contagem.items())

[('(1851)', 1),
 ('(2+2=4)', 1),
 ('(ao', 1),
 ('(assim', 1),
 ('(bastou-me', 1),
 ('(cap', 1),
 ('(continuava', 1),
 ('(disse-me', 1),
 ('(dizia', 1),
 ('(donde', 1),
 ('(e', 4),
 ('(era', 1),
 ('(estivera', 1),
 ('(ingenua', 1),
 ('(já', 1),
 ('(não', 1),
 ('(perdoai', 1),
 ('(salvo', 1),
 ('(sem', 1),
 ('(tinha', 1),
 ('(tudo', 1),
 ('(velha', 1),
 ('(é', 2),
 ('**--mas', 1),
 ('**--se', 1),
 ('**--uma', 1),
 ('**enfim', 1),
 ('--', 2),
 ('--_in', 1),
 ('--a', 15),
 ('--abaixo', 2),
 ('--acabemos', 1),
 ('--acho', 3),
 ('--adeus', 1),
 ('--agora', 5),
 ('--aguente', 1),
 ('--ah!', 3),
 ('--ainda', 3),
 ('--alembra', 1),
 ('--amanhã', 2),
 ('--anda', 3),
 ('--apanhar', 1),
 ('--aqui', 1),
 ('--as', 1),
 ('--até', 2),
 ('--beata!', 1),
 ('--bem', 4),
 ('--bem;', 1),
 ('--bom', 2),
 ('--candelaria', 1),
 ('--capitú', 2),
 ('--capitú!', 3),
 ('--certíssima!', 1),
 ('--cesar!', 1),
 ('--chamo-me', 1),
 ('--chorou', 1),
 ('--cocadinha', 1),
 ('--coitado', 2),
 ('--com', 1),
 ('--como', 5)

In [20]:
# contagem de ocorrencias utilizando chave e valor
palavras_mapeadas = palavras.map(lambda word: (word, 1))
palavras_mapeadas.take(5)

[('dom', 1), ('casmurro', 1), ('por', 1), ('machado', 1), ('de', 1)]

In [22]:
contagem = palavras_mapeadas.reduceByKey(lambda x,y: x+y)
contagem.take(5)

[('machado', 1), ('assis', 1), ('i', 1), ('do', 617), ('uma', 430)]

In [None]:
contagem_sorted = contagem.sortByKey()
contagem_sorted.take(10)

In [28]:
# salvando em arquivo
contagem_sorted.coalesce(1).saveAsTextFile("wordcount.txt")

##AIRPORTS

In [29]:
# carregando o arquivo Airports.csv
rdd_airports = sc.textFile("Airports.csv")
rdd_airports.take(5)

['icao_code,iata_code,name,city,country,lat_deg,lat_min,lat_sec,lat_dir,lon_deg,lon_min,lon_sec,lon_dir,altitude,lat_decimal,lon_decimal,id',
 'AYGA,GKA,GOROKA,GOROKA,PAPUA NEW GUINEA,6,4,54,S,145,23,30,E,1610,-6.082,145.392,1',
 'AYLA,LAE,N/A,LAE,PAPUA NEW GUINEA,0,0,0,U,0,0,0,U,0,0,0,2',
 'AYMD,MAG,MADANG,MADANG,PAPUA NEW GUINEA,5,12,25,S,145,47,19,E,7,-5.207,145.789,3',
 'AYMH,HGU,MOUNT HAGEN,MOUNT HAGEN,PAPUA NEW GUINEA,5,49,34,S,144,17,46,E,1643,-5.826,144.296,4']

In [30]:
# selecionar as linhas que possuem USA na coluna country
rdd_usa = rdd_airports.filter(lambda x: x.split(",")[4] == "USA")
rdd_usa.take(10)

['KABI,ABI,ABILENE RGNL,ABILENE,USA,32,24,40,N,99,40,54,W,546,32.411,-99.682,3381',
 'KABQ,ABQ,N/A,ALBUQUERQUE,USA,0,0,0,U,0,0,0,U,0,0,0,3382',
 'KACK,ACK,NANTUCKET MEM,NANTUCKET,USA,41,15,10,N,70,3,36,W,15,41.253,-70.06,3383',
 'KACT,ACT,WACO RGNL,WACO,USA,31,36,40,N,97,13,49,W,158,31.611,-97.23,3384',
 'KACY,ACY,ATLANTIC CITY INTERNATIONAL,ATLANTIC CITY,USA,39,27,27,N,74,34,37,W,23,39.458,-74.577,3385',
 'KADM,ADM,ARDMORE MUNI,ARDMORE,USA,34,18,11,N,97,1,10,W,233,34.303,-97.019,3386',
 'KADW,ADW,ANDREWS AFB,CAMP SPRINGS,USA,38,48,38,N,76,52,1,W,86,38.811,-76.867,3387',
 'KAEX,AEX,ALEXANDRIA INTERNATIONAL,ALEXANDRIA,USA,31,19,38,N,92,32,54,W,28,31.327,-92.548,3388',
 'KAFB,N/A,N/A,SHEPPARD,USA,0,0,0,U,0,0,0,U,0,0,0,3389',
 'KAGS,AGS,AUGUSTA RGNL AT BUSH FLD,BUSH FIELD,USA,33,22,11,N,81,57,52,W,44,33.37,-81.964,3390']

In [31]:
# qtde de aeroportos que estão nos USA
rdd_usa.count()

552

In [32]:
# salvar a resposta em um único arquivo
rdd_usa.coalesce(1).saveAsTextFile("airport_usa.txt")