In [None]:
!pip install findspark pyspark



In [None]:
import findspark
findspark.init()
findspark.find()
from pyspark import SparkConf, SparkContext
import re

conf = SparkConf().setAppName("TDE 3 - Spark Implementation").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)

rdd = sc.textFile("CarFuelandEmissions2000_2013.csv")

header = rdd.first()
rdd = rdd.filter(lambda x: x != header)

def split_line(line):
    return re.split(r",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", line)

In [None]:
# Emissão total de monoxido de carbono
emissao_total = rdd \
    .map(split_line) \
    .filter(lambda cols: len(cols) > 20 and cols[20] != '') \
    .map(lambda cols: float(cols[20])) \
    .sum()

print(f"Emissão total de CO2: {emissao_total}")

Emissão total de CO2: 16950839.601


In [None]:
# Média de barulho dos carros manuais
ruido_manual = rdd \
    .map(split_line) \
    .filter(lambda cols: len(cols) > 17 and cols[8] == "Manual" and cols[17] != '') \
    .map(lambda cols: float(cols[17]))

media_ruido_manual = ruido_manual.sum() / ruido_manual.count()
print(f"Média de ruído para carros manuais: {media_ruido_manual}")

Média de ruído para carros manuais: 72.42211577964541


In [None]:
# Contagem de modelos dos carros
contagem_modelo_carro = rdd \
    .map(split_line) \
    .map(lambda cols: (cols[3].replace("\"", ""), 1)) \
    .reduceByKey(lambda x, y: x + y)

print("Contagem de cada modelo de carro (limit 5):")
contagem_modelo_carro.take(5)

Contagem de cada modelo de carro (limit 5):


[('156 Range', 7),
 ('Spider/GTV', 12),
 ('Aston Martin', 15),
 ('Audi A6 (Saloon, Avant, SE & Quattro)', 15),
 ('Audi A8 (Sport, Quattro & Quattro Sport)', 4)]

In [None]:
# Media Km/L combinado (Cidade e Rodovia)
consumo_combustivel = rdd \
    .map(split_line) \
    .filter(lambda cols: len(cols) > 13 and cols[13] != '') \
    .map(lambda cols: float(cols[13]))

media_consumo = consumo_combustivel.sum() / consumo_combustivel.count()
print(f"Média de consumo de combustível (Combined Metric): {media_consumo}")

Média de consumo de combustível (Combined Metric): 7.72196290436003


In [None]:
# Carro mais econômico por tipo de combustível
maior_distancia = rdd.map(split_line) \
    .filter(lambda cols: len(cols) > 13 and cols[13] != "") \
    .map(lambda cols: (cols[10].replace("\"", ""), (cols[3].replace("\"", ""), float(cols[13] if cols[13] != "" else 0)))) \
    .reduceByKey(lambda x, y: x if x[1] > y[1] else y)

print("Maior distância percorrida por tipo de combustível (limit 5):")
maior_distancia.take(5)

Maior distância percorrida por tipo de combustível (limit 5):


[('Diesel', ('B-Class, Model Year 2013', 44.0)),
 ('LPG', ('Shogun Sport', 16.8)),
 ('Petrol Hybrid', ('LS MY2011', 9.3)),
 ('CNG', ('S80 Model Year 2002', 10.5)),
 ('LPG / Petrol', ('Zafira, MY2005', 11.4))]

In [None]:
# Média de emissão de CO2 por fabricante
avg_emissao_fabricante = rdd.map(split_line) \
    .filter(lambda cols: len(cols) > 13 and cols[2] != "") \
    .map(lambda cols: (cols[2].replace("\"", ""), (float(cols[18] if cols[18] != "" else 0), 1))) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .mapValues(lambda x: x[0] / x[1])

print("Média de emissão de CO2 por fabricante (limit 5):")
avg_emissao_fabricante.take(5)

Média de emissão de CO2 por fabricante (limit 5):


[('Alfa Romeo', 207.60645161290321),
 ('Aston Martin Lagonda', 371.6190476190476),
 ('Citroen', 160.11764705882354),
 ('Daewoo Cars', 216.93589743589743),
 ('Daihatsu', 158.36094674556213)]

In [None]:
# Imposto médio padrão e no primeiro ano por veículo
imposto_medio = rdd.map(split_line) \
    .filter(lambda cols: len(cols) > 29 and cols[3] != "") \
    .map(lambda cols: (cols[3].replace("\"", ""), \
      (float(cols[26] if cols[26] != "" else 0), \
      float(cols[27] if cols[27] != "" else 0), \
      float(cols[28] if cols[28] != "" else 0), \
      float(cols[29] if cols[29] != "" else 0), 1, 1, 1, 1))) \
    .reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3], x[4] + y[4], x[5] + y[5], x[6] + y[6], x[7] + y[7])) \
    .mapValues(lambda x: (x[0] / x[4], x[1] / x[5], x[2] / x[6], x[3] / x[7])) \
    .sortBy(lambda x: x[1], ascending=False)

print("Custo médio de cada tipo de combustível (limit 5):")
imposto_medio.take(5)

Custo médio de cada tipo de combustível (limit 5):


[('V8 Vantage MY2011', (475.0, 261.25, 1030.0, 0.0)),
 ('Virage MY2012', (475.0, 261.25, 1030.0, 0.0)),
 ('Grand Sport Coupé Model Year 2012', (475.0, 261.25, 1030.0, 0.0)),
 ('Z06 Model Year 2012', (475.0, 261.25, 1030.0, 0.0)),
 ('FX Model Year 2009', (475.0, 261.25, 1030.0, 0.0))]

In [None]:
# Tipo de combustível e seu custo médio para 6000 e 12000 milhas
custo_combustivel = rdd.map(split_line) \
    .filter(lambda cols: len(cols) > 25 and cols[10] != "") \
    .map(lambda cols: (cols[10].replace("\"", ""),
     (float(cols[24] if cols[24] != "" else 0), float(cols[25] if cols[25] != "" else 0), 1, 1))) \
    .reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3])) \
    .mapValues(lambda x: (x[0] / x[2], x[1] / x[3]))

print("Custo médio de cada tipo de combustível (limit 5):")
custo_combustivel.take(5)

Custo médio de cada tipo de combustível (limit 5):


[('Diesel', (1118.1072178297404, 86.60656583018219)),
 ('LPG', (277.89115646258506, 238.69387755102042)),
 ('Petrol Hybrid', (1135.9236641221373, 2.7022900763358777)),
 ('CNG', (70.35483870967742, 0.0)),
 ('LPG / Petrol', (561.1538461538462, 0.0))]

In [None]:
# Maior quantidade de euro standard
maior_qtd_euro_standard = rdd.map(split_line) \
    .filter(lambda cols: len(cols) > 5 and cols[5] != "") \
    .map(lambda cols: (cols[5].replace("\"", ""), 1)) \
    .reduceByKey(lambda x,y: x + y) \
    .reduce(lambda x,y: x if x[1] > y[1] else y)

print("Maior quantidade de euro standard:")
maior_qtd_euro_standard

Maior quantidade de euro standard:


('4', 20269)

In [None]:
# Média de variação de emissão de CO2 por ano por marca
def map_data(line):
  colunas = split_line(line)
  year = int(colunas[1])
  manufacturer = colunas[2]
  co2 = float(colunas[18])
  return (manufacturer, year), co2

def calculate_avg_co2(x, y):
  count_x, sum_x = x
  count_y, sum_y = y
  return count_x + count_y, sum_x + sum_y

rdd_mapped = rdd.map(map_data).filter(lambda x: x is not None)
rdd_avg_co2 = rdd_mapped \
    .mapValues(lambda x: (1, x)) \
    .reduceByKey(calculate_avg_co2) \
    .mapValues(lambda x: x[1] / x[0])

rdd_manufacturer_year_co2 = rdd_avg_co2.map(lambda x: (x[0][0], (x[0][1], x[1])))

def calculate_co2_variation(values):
  values_sorted = sorted(values, key=lambda x: x[0])
  prev_co2 = 0
  variations = []
  for year, avg_co2 in values_sorted:
      if prev_co2 != 0:
          variations.append(prev_co2 - avg_co2)
      prev_co2 = avg_co2
  return variations

def calculate_avg_variation(variations):
  return sum(variations) / len(variations) if variations else 0

rdd_avg_variation = rdd_manufacturer_year_co2 \
    .groupByKey() \
    .mapValues(calculate_co2_variation) \
    .mapValues(calculate_avg_variation)

print("Média de variação de emissão de CO2 por ano por marca (limit 5):")
rdd_avg_variation.take(5)

Média de variação de emissão de CO2 por ano por marca (limit 5):


[('Alfa Romeo', 8.199095022624435),
 ('Aston Martin Lagonda', 12.258461538461535),
 ('Citroen', 5.543869437857443),
 ('Daewoo Cars', 9.793939393939393),
 ('Daihatsu', 1.574825174825176)]