#### *********** Atenção: *********** 
Utilize Java JDK 11 e Apache Spark 2.4.7

*Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook*

Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs

# Transformações

In [1]:
# Criando uma lista em Python
lista1 = [124, 901, 652, 102, 397]

In [2]:
type(lista1)

list

In [3]:
# Carregando dados de uma coleção
lstRDD = sc.parallelize(lista1)

In [4]:
type(lstRDD)

pyspark.rdd.RDD

In [5]:
lstRDD.collect()

[124, 901, 652, 102, 397]

In [6]:
lstRDD.count()

5

In [7]:
# Carregando um arquivo e criando um RDD. 
autoDataRDD = sc.textFile("data/carros.csv")

In [8]:
type(autoDataRDD)

pyspark.rdd.RDD

In [9]:
# Operação de Ação. 
autoDataRDD.first()

'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

In [10]:
autoDataRDD.take(5)

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

In [11]:
# Cada ação gera um novo processo de computação dos dados. 
# Mas podemos persistir os dados em cache para que ele possa ser usado por outras ações, sem a necessidade 
# de nova computação.
autoDataRDD.cache()

data/carros.csv MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [12]:
for line in autoDataRDD.collect():
    print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,37,41,5389
honda,gas,std,two,hatchback,fwd,four,60,5500,38,42,5399
nissan,gas,std,two,sedan,fwd,four,69,5200,31,37,5499
dodge,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
plymouth,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
mazda,gas,std,two,hatchback,fwd,four,68,5000,31,38,6095
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,31,38,6189
dodge,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
plymouth,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
chevrolet,gas,std,two,hatchback,fwd,four,70,5400,38,43,6295
toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338
dodge,gas,std,two,hatchback,fwd,four,68,5500,31,38,6377

In [13]:
# Map() e criação de um novo RDD - Transformação - Lazy Evaluation
tsvData = autoDataRDD.map(lambda x : x.replace(",","\t"))
tsvData.take(5)

['MAKE\tFUELTYPE\tASPIRE\tDOORS\tBODY\tDRIVE\tCYLINDERS\tHP\tRPM\tMPG-CITY\tMPG-HWY\tPRICE',
 'subaru\tgas\tstd\ttwo\thatchback\tfwd\tfour\t69\t4900\t31\t36\t5118',
 'chevrolet\tgas\tstd\ttwo\thatchback\tfwd\tthree\t48\t5100\t47\t53\t5151',
 'mazda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5000\t30\t31\t5195',
 'toyota\tgas\tstd\ttwo\thatchback\tfwd\tfour\t62\t4800\t35\t39\t5348']

In [14]:
autoDataRDD.first()

'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

In [15]:
# Filter() e criação de um novo RDD - Transformação - Lazy Evaluation
toyotaData = autoDataRDD.filter(lambda x: "toyota" in x)

In [16]:
# Ação
toyotaData.count()

32

In [17]:
# Ação
toyotaData.take(20)

['toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338',
 'toyota,gas,std,four,hatchback,fwd,four,62,4800,31,38,6488',
 'toyota,gas,std,four,wagon,fwd,four,62,4800,31,37,6918',
 'toyota,gas,std,four,sedan,fwd,four,70,4800,30,37,6938',
 'toyota,gas,std,four,hatchback,fwd,four,70,4800,30,37,7198',
 'toyota,gas,std,four,sedan,fwd,four,70,4800,38,47,7738',
 'toyota,diesel,std,four,hatchback,fwd,four,56,4500,38,47,7788',
 'toyota,gas,std,four,wagon,4wd,four,62,4800,27,32,7898',
 'toyota,diesel,std,four,sedan,fwd,four,56,4500,34,36,7898',
 'toyota,gas,std,two,sedan,rwd,four,70,4800,29,34,8058',
 'toyota,gas,std,two,hatchback,rwd,four,70,4800,29,34,8238',
 'toyota,gas,std,four,hatchback,fwd,four,70,4800,28,34,8358',
 'toyota,gas,std,two,hardtop,rwd,four,116,4800,24,30,8449',
 'toyota,gas,std,four,wagon,4wd,four,62,4800,27,32,8778',
 'toyota,gas,std,four,sedan,fwd,four,92,4200,29,34,8948',
 'toyota,gas,std,four,sedan,fwd,four,70,

In [18]:
# Pode salvar o conjunto de dados, o RDD. 
# Nesse caso, o Spark solicita os dados ao processo Master e então gera um arquivo de saída.
savedRDD = open("data/carros_v2.csv","w")
savedRDD.write("\n".join(autoDataRDD.collect()))
savedRDD.close()

## Operações Set

In [19]:
# Set operations
palavras1 = sc.parallelize(["Big Data","Data Science","Analytics","Visualization"])
palavras2 = sc.parallelize(["Big Data","R","Python","Scala"])

In [20]:
# União
for unions in palavras1.union(palavras2).distinct().collect():
    print(unions)

Data Science
R
Analytics
Visualization
Scala
Big Data
Python


In [21]:
# Interseção
for intersects in palavras1.intersection(palavras2).collect():
    print(intersects)

Big Data


In [22]:
rdd01 = sc.parallelize(range(1,10))
rdd02 = sc.parallelize(range(10,21))
rdd01.union(rdd02).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [23]:
rdd01 = sc.parallelize(range(1,10))
rdd02 = sc.parallelize(range(5,15))
rdd01.intersection(rdd02).collect()

[5, 6, 7, 8, 9]

## Left/Right Outer Join

In [24]:
names1 = sc.parallelize(("banana", "uva", "laranja")).map(lambda a: (a, 1))
names2 = sc.parallelize(("laranja", "abacaxi", "manga")).map(lambda a: (a, 1))
names1.join(names2).collect()

[('laranja', (1, 1))]

In [25]:
names1.leftOuterJoin(names2).collect()

[('uva', (1, None)), ('banana', (1, None)), ('laranja', (1, 1))]

In [26]:
names1.rightOuterJoin(names2).collect()

[('manga', (None, 1)), ('laranja', (1, 1)), ('abacaxi', (None, 1))]

## Distinct

In [27]:
# Distinct
lista1 = [124, 901, 652, 102, 397, 124, 901, 652]
lstRDD = sc.parallelize(lista1)
for numbData in lstRDD.distinct().collect():
    print(numbData)

124
652
901
397
102


## Transformação e Limpeza

In [28]:
# RDD Original
autoDataRDD.collect()

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348',
 'mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,37,41,5389',
 'honda,gas,std,two,hatchback,fwd,four,60,5500,38,42,5399',
 'nissan,gas,std,two,sedan,fwd,four,69,5200,31,37,5499',
 'dodge,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572',
 'plymouth,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,31,38,6095',
 'mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,31,38,6189',
 'dodge,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229',
 'plymouth,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229',
 'chevrolet,gas,std,two,hatchback,fwd,four,70,5400,38,43,6295',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,31,3

In [29]:
# Transformação e Limpeza
def LimpaRDD(autoStr) :
    
    # Verifica a indexação 
    if isinstance(autoStr, int) :
        return autoStr
    
    # Separa cada índice pela vírgula (separador de colunas)
    attList = autoStr.split(",")
    
    # Converte o número de portas para um num
    if attList[3] == "two" :
        attList[3] = "2"
    elif attList[3] == "four":
        attList[3] = "4"
    
    # Convert o modelo do carro para uppercase
    attList[5] = attList[4].upper()
    return ",".join(attList)

In [30]:
# Transformação
RDD_limpo = autoDataRDD.map(LimpaRDD)

In [31]:
print(RDD_limpo)

PythonRDD[73] at RDD at PythonRDD.scala:53


In [32]:
# Ação
RDD_limpo.collect()

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,BODY,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,2,hatchback,HATCHBACK,four,69,4900,31,36,5118',
 'chevrolet,gas,std,2,hatchback,HATCHBACK,three,48,5100,47,53,5151',
 'mazda,gas,std,2,hatchback,HATCHBACK,four,68,5000,30,31,5195',
 'toyota,gas,std,2,hatchback,HATCHBACK,four,62,4800,35,39,5348',
 'mitsubishi,gas,std,2,hatchback,HATCHBACK,four,68,5500,37,41,5389',
 'honda,gas,std,2,hatchback,HATCHBACK,four,60,5500,38,42,5399',
 'nissan,gas,std,2,sedan,SEDAN,four,69,5200,31,37,5499',
 'dodge,gas,std,2,hatchback,HATCHBACK,four,68,5500,37,41,5572',
 'plymouth,gas,std,2,hatchback,HATCHBACK,four,68,5500,37,41,5572',
 'mazda,gas,std,2,hatchback,HATCHBACK,four,68,5000,31,38,6095',
 'mitsubishi,gas,std,2,hatchback,HATCHBACK,four,68,5500,31,38,6189',
 'dodge,gas,std,4,hatchback,HATCHBACK,four,68,5500,31,38,6229',
 'plymouth,gas,std,4,hatchback,HATCHBACK,four,68,5500,31,38,6229',
 'chevrolet,gas,std,2,hatchback,HATCHBACK,four,70,5400,38,43,6295',
 't

## Ações

In [33]:
# reduce() - soma de valores
lista2 = [124, 901, 652, 102, 397, 124, 901, 652]
lstRDD = sc.parallelize(lista2)
lstRDD.collect()
lstRDD.reduce(lambda x,y: x + y)

3853

In [34]:
# Encontrando a linha com menor número de caracteres
autoDataRDD.reduce(lambda x,y: x if len(x) < len(y) else y)

'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

In [35]:
# Criando uma função para redução
def getMPG(autoStr) :
    
    if isinstance(autoStr, int) :
        return autoStr
    
    attList = autoStr.split(",")
    
    if attList[9].isdigit() :
        return int(attList[9])
    else:
        return 0

In [36]:
# Encontrando a média de MPG para todos os carros
media_mpg = round(autoDataRDD.reduce(lambda x,y : getMPG(x) + getMPG(y)) / (autoDataRDD.count() -1), 2)
print(media_mpg)

25.15


In [38]:
times = sc.parallelize(("Flamengo", "Vasco", "Botafogo", "Fluminense", "Palmeiras", "Bahia"))
times.takeSample(True, 3)

['Fluminense', 'Bahia', 'Botafogo']

In [39]:
times = sc.parallelize(("Flamengo", "Vasco", "Botafogo", "Fluminense", "Palmeiras", "Bahia", "Bahia", "Flamengo"))
times.map(lambda k: (k,1)).countByKey().items()

dict_items([('Flamengo', 2), ('Vasco', 1), ('Botafogo', 1), ('Fluminense', 1), ('Palmeiras', 1), ('Bahia', 2)])

In [40]:
autoDataRDD.saveAsTextFile("data/autoDataRDD.txt")