In [None]:
from functools import reduce

class Person:
  # Construtor da Classe
  def __init__(self, name, age):
    self.name = name
    self.age = age

  # Função para formatação
  def __str__(self):
    return f'Name: {self.name} Age: {self.age}' 


# Criando uma lista de Pessoas
pessoas = [Person("Gustavo Guimarães", 21), Person("Scarlos", 19), Person("Pedro Pobre", 19)]

for p in pessoas:
  print(p)


# Somando a idade de todas as pessoas
MapPerson = list(map(lambda x: x.age,pessoas ))
SumAge = reduce(lambda x,y: x + y, MapPerson)
print(f"Soma da idade de todas as pessoas: {SumAge}")

# Incrementar 1 a idade de todas as pessoas
pessoas = list(map(lambda p: Person(p.name, p.age + 1), pessoas))
print()
print("Somando 1 em todas as idades de todos as Pessoas")
for p in pessoas:
  print(p)

# Retorna somente a pessoa com idade maior que 20 anos
maiores = list(filter(lambda p: p.age > 20, pessoas))
print()
print("Retornando somente a pessoa com idade maior que 20 anos")
for p in maiores:
  print(p)


**COMEÇANDO A UTILIZAR O SPARK**

In [None]:
!pip install pyspark

In [None]:
# importar bibliotecas
import pyspark
from pyspark.sql import SparkSession

In [None]:
#Criação de uma sessão Spark(caso ela não exista) ou recuperação de uma 
#sessão existente com o mesmo nome
spark = SparkSession.builder.appName("Word Count").master('local[*]').getOrCreate()

In [None]:
# criação do contexto

sc = spark.sparkContext

In [None]:
# Criação de um RDD que carrega o arquivo SeinField

rdd = sc.textFile("Seinfeld.txt")

In [None]:
# Vizualizar um rdd

rdd.collect()

In [None]:
lista = rdd.collect()

In [None]:
lista[30]

'GEORGE: (smiling) So, you know, what, what happened?'

In [None]:
# Transformar cada linha (i.e., cada item da lista) em um conjunto de palavras
# A partir do slipt do texto em espaço

palavras = rdd.flatMap(lambda x: x.split(" "))

# Mostrar os primeiros N itens
palavras.take(30)

['INT.',
 'COMEDY',
 'CLUB',
 '–',
 'NIGHT',
 '(Jerry',
 'is',
 'on',
 'stage,',
 'performing.)',
 'JERRY:',
 'Do',
 'you',
 'know',
 'what',
 'this',
 'is',
 'all',
 'about?',
 'Do',
 'you',
 'know,',
 'why',
 "we're",
 'here?',
 'To',
 'be',
 'out,',
 'this',
 'is']

In [None]:
palavras = rdd.flatMap(lambda x:x.lower().replace(".", "").replace("(", "").replace(")", "").split(" "))
palavras.take(30)

['int',
 'comedy',
 'club',
 '–',
 'night',
 'jerry',
 'is',
 'on',
 'stage,',
 'performing',
 'jerry:',
 'do',
 'you',
 'know',
 'what',
 'this',
 'is',
 'all',
 'about?',
 'do',
 'you',
 'know,',
 'why',
 "we're",
 'here?',
 'to',
 'be',
 'out,',
 'this',
 'is']

In [None]:
# Consultar a quantidade de elementos do RDD

palavras.count()

723895

In [None]:
# Remover itens vazios

palavras = palavras.filter(lambda x:x != "")
palavras.count()

722738

In [None]:
# Contar o número de ocorrencias de cada palavra

contagem = palavras.countByValue()

In [None]:
# Imprimir as palavras e contagem de maneira ordenada

for p, c in sorted(contagem.items()):
  print(f"{p}: {c}")

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
terrycloth: 1
terse: 1
tes,: 1
tesh?: 1
test: 63
test!: 1
test,: 5
test-drive: 1
test-taste: 1
test;: 2
test?: 11
test?!: 1
test]: 2
tested: 8
testicles: 1
testify: 2
testify!: 1
testikov: 14
testikov's: 3
testikov,: 5
testikov:: 24
testimonial: 1
testimony: 6
testimony?: 1
testing: 4
tests: 6
tests,: 1
testy: 3
testy,: 1
tetanus: 1
texan: 1
texas: 3
texas,: 1
texas?: 1
textbook: 3
texts: 1
texture: 2
texture,: 1
texturesand: 1
tgif-type: 1
th: 3
th'aid: 1
th-ri-rigatoni: 1
th-right?: 1
th-th-th-that's: 1
th-th-that--: 1
th-th-there's: 1
th-that's: 2
th-there's: 1
tha: 1
tha's: 1
tha'ts: 1
thai: 3
thai,: 1
thailand: 1
than: 301
than,: 1
thane: 1
thank: 347
thank-you: 1
thanked: 5
thankful : 1
thankgiving: 1
thanking: 4
thanks: 265
thanks!: 5
thanks,: 31
thanksgiving: 8
thanksgivings,: 1
thanksoh!: 1
that: 5398
that!: 77
that!": 3
that!': 1
that": 4
that"?: 1
that': 2
that'd: 10
that'll: 25
that'll,: 1
that's: 2167

In [None]:
# Contagem do número de Aeroportos dos EUA

rdd = sc.textFile("Airports.csv")
rdd.take(5)

['icao_code,iata_code,name,city,country,lat_deg,lat_min,lat_sec,lat_dir,lon_deg,lon_min,lon_sec,lon_dir,altitude,lat_decimal,lon_decimal,id',
 'AYGA,GKA,GOROKA,GOROKA,PAPUA NEW GUINEA,6,4,54,S,145,23,30,E,1610,-6.082,145.392,1',
 'AYLA,LAE,N/A,LAE,PAPUA NEW GUINEA,0,0,0,U,0,0,0,U,0,0,0,2',
 'AYMD,MAG,MADANG,MADANG,PAPUA NEW GUINEA,5,12,25,S,145,47,19,E,7,-5.207,145.789,3',
 'AYMH,HGU,MOUNT HAGEN,MOUNT HAGEN,PAPUA NEW GUINEA,5,49,34,S,144,17,46,E,1643,-5.826,144.296,4']

In [None]:
# Ignorar o Cabeçalho

rdd = rdd.filter(lambda x:x.split(",")[0]!="icao_code")
rdd.take(5)

['AYGA,GKA,GOROKA,GOROKA,PAPUA NEW GUINEA,6,4,54,S,145,23,30,E,1610,-6.082,145.392,1',
 'AYLA,LAE,N/A,LAE,PAPUA NEW GUINEA,0,0,0,U,0,0,0,U,0,0,0,2',
 'AYMD,MAG,MADANG,MADANG,PAPUA NEW GUINEA,5,12,25,S,145,47,19,E,7,-5.207,145.789,3',
 'AYMH,HGU,MOUNT HAGEN,MOUNT HAGEN,PAPUA NEW GUINEA,5,49,34,S,144,17,46,E,1643,-5.826,144.296,4',
 'AYNZ,LAE,NADZAB,NADZAB,PAPUA NEW GUINEA,6,34,11,S,146,43,34,E,73,-6.57,146.726,5']

In [None]:
# Selecionar as linhas que possuem USA nas colunas

rdd_usa = rdd.filter(lambda x:x.split(",")[4] == "USA")
rdd_usa.take(5)

['KABI,ABI,ABILENE RGNL,ABILENE,USA,32,24,40,N,99,40,54,W,546,32.411,-99.682,3381',
 'KABQ,ABQ,N/A,ALBUQUERQUE,USA,0,0,0,U,0,0,0,U,0,0,0,3382',
 'KACK,ACK,NANTUCKET MEM,NANTUCKET,USA,41,15,10,N,70,3,36,W,15,41.253,-70.06,3383',
 'KACT,ACT,WACO RGNL,WACO,USA,31,36,40,N,97,13,49,W,158,31.611,-97.23,3384',
 'KACY,ACY,ATLANTIC CITY INTERNATIONAL,ATLANTIC CITY,USA,39,27,27,N,74,34,37,W,23,39.458,-74.577,3385']

In [None]:
# Contagem da quantidade de aeroporto dos USA

rdd_usa.count()

552

In [None]:
# Salvar o novo RDD em um arquivo

rdd_usa.saveAsTextFile("RDD_USA.txt")

In [None]:
# Salvar a saída em um arquivo único

rdd_usa.coalesce(1).saveAsTextFile('RDD_USA_unique.txt')