# Primeiros Passos com Spark e Python (PySpark) - Parte 01

O [Apache Spark](https://spark.apache.org/) é uma ferramenta Big Data que tem o objetivo de processar grandes conjuntos de dados de forma paralela e distribuída. Ela estende o modelo de programação MapReduce popularizado pelo Apache Hadoop, facilitando bastante o desenvolvimento de aplicações de processamento de grandes volumes de dados. Além do modelo de programação estendido, o Spark também apresenta uma performance muito superior ao Hadoop, chegando em alguns casos a apresentar uma performance quase 100x maior. Existem API's para as principais linguagem disponíveis no mercado, com destaque para PYTHON, JAVA, R, JULIA, SCALA.

Outra grande vantagem do Spark, é que todos os componentes funcionam integrados na própria ferramenta, como o Spark Streaming, o Spark SQL e o GraphX, diferentemente do Hadoop, onde é necessário utilizar ferramentas que se integram a ele, mas que são distribuídas separadamente, como o Apache Hive.

O Spark tem diversos componentes para diferentes tipos de processamentos, todos construídos sobre o Spark Core, que é o componente que disponibiliza as funções básicas para o processamento como as funções map, reduce, filter e collect:
  
- O Spark Streaming, que possibilita o processamento de fluxos em tempo real;
- O GraphX, que realiza o processamento sobre grafos;
- O SparkSQL para a utilização de SQL na realização de consultas e processamento sobre os dados no Spark;
- A MLlib, que é a biblioteca de aprendizado de máquina, com deferentes algoritmos para as mais diversas atividades, como clustering.

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import functions as sql_f

In [0]:
# Criando o objeto SparkSession.
spark = SparkSession.builder.appName('PySpark_For_Beginners').getOrCreate()

In [0]:
spark

In [0]:
# Link para acesso ao dataset.
DATASET_URL = 'https://raw.githubusercontent.com/alexcamargos/DataScienceBootcampIGTI/main/Modulo_2/total_frota_2022.csv'

In [0]:
# Informando ao Spark onde esta o arquivo para download.
spark.sparkContext.addFile(DATASET_URL)

# Carregando o dataset. Nosso arquivo CSV tem cabeçalho definido e usa encoding Windows-1252.
# encoding=Windows-1252 - Define o encoding dos dados.
# inferSchema=True - Tenta determinar o schema a partir dos dados.
df = spark.read.option('header', True) \
               .option('encoding', 'Windows-1252') \
               .option('inferSchema', 'True') \
               .csv(f"file://{SparkFiles.get('total_frota_2022.csv')}")

In [0]:
df.show()

+---+-------+--------------------+-------------+--------+-----------+-----------------+-------------------+----------+--------+--------------------+--------------+--------+-----------+---------+--------------------+-----+-----+-----+--------------+--------+-----------+----------+--------------+---------------+-------------+--------------+----------+
|_c0|  Placa|              Modelo|        Marca|AnoFabr.|Combustível|           Chassi|                Cor|  Potência|Cilindro|           Aquisição|   Conservação|Situação|Combustivel|  Renavam|                Tipo|Motor|Caixa|Local|Loc.Manutenção|Km Atual|Data Compra|Data Venda|Nota de compra|Valor de compra|Nota de venda|Valor de venda|Nº Apólice|
+---+-------+--------------------+-------------+--------+-----------+-----------------+-------------------+----------+--------+--------------------+--------------+--------+-----------+---------+--------------------+-----+-----+-----+--------------+--------+-----------+----------+--------------+-

In [0]:
# Visualizando as colunas do dataframe.
df.columns

Out[7]: ['_c0',
 'Placa',
 'Modelo',
 'Marca',
 'AnoFabr.',
 'Combustível',
 'Chassi',
 'Cor',
 'Potência',
 'Cilindro',
 'Aquisição',
 'Conservação',
 'Situação',
 'Combustivel',
 'Renavam',
 'Tipo',
 'Motor',
 'Caixa',
 'Local',
 'Loc.Manutenção',
 'Km Atual',
 'Data Compra',
 'Data Venda',
 'Nota de compra',
 'Valor de compra',
 'Nota de venda',
 'Valor de venda',
 'Nº Apólice']

In [0]:
# Visualizando o schema dos dados em formato árvore.
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Placa: string (nullable = true)
 |-- Modelo: string (nullable = true)
 |-- Marca: string (nullable = true)
 |-- AnoFabr.: integer (nullable = true)
 |-- Combustível: string (nullable = true)
 |-- Chassi: string (nullable = true)
 |-- Cor: string (nullable = true)
 |-- Potência: string (nullable = true)
 |-- Cilindro: integer (nullable = true)
 |-- Aquisição: string (nullable = true)
 |-- Conservação: string (nullable = true)
 |-- Situação: string (nullable = true)
 |-- Combustivel: string (nullable = true)
 |-- Renavam: integer (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- Motor: string (nullable = true)
 |-- Caixa: string (nullable = true)
 |-- Local: string (nullable = true)
 |-- Loc.Manutenção: string (nullable = true)
 |-- Km Atual: integer (nullable = true)
 |-- Data Compra: string (nullable = true)
 |-- Data Venda: string (nullable = true)
 |-- Nota de compra: integer (nullable = true)
 |-- Valor de compra: string (nullab

In [0]:
# Visualizando o tamanho do datafame.
print(f'Nosso dataframe tem {df.count()} linhas e {len(df.columns)} colunas.')

Nosso dataframe tem 273 linhas e 28 colunas.


In [0]:
# Visualizando os cinco primeiros items da dataframe.
df.head(3)

Out[10]: [Row(_c0=1, Placa='GMF1396', Modelo='CAMINHAO TANQUE', Marca='VOLKSWAGEN', AnoFabr.=1995, Combustível='Diesel', Chassi='9BWXTACM3SDB90133', Cor='BRANCA', Potência='134', Cilindro=6, Aquisição='COMPRA', Conservação='Regular', Situação='Ativa', Combustivel='Diesel', Renavam=650997018, Tipo='CAR/CAMINHAO TANQUE', Motor=None, Caixa=None, Local=None, Loc.Manutenção=None, Km Atual=35329, Data Compra='12/11/1995', Data Venda=None, Nota de compra=0, Valor de compra='79785,2', Nota de venda=0, Valor de venda=0, Nº Apólice=None),
 Row(_c0=2, Placa='GMF1564', Modelo='PARATI CLI', Marca='VOLKSWAGEN', AnoFabr.=1996, Combustível='Gasolina', Chassi='9BWZZZ379TT163437', Cor='BRANCA', Potência='081 CV-1.6', Cilindro=4, Aquisição='DOACAO TERMO 02/2012', Conservação='Anti-econômico', Situação='Ativa', Combustivel='Gasolina', Renavam=660911655, Tipo='PAS/AUTOMOVEL', Motor=None, Caixa=None, Local=None, Loc.Manutenção=None, Km Atual=233615, Data Compra='11/09/2012', Data Venda=None, Nota de compra=

In [0]:
# Visualizando os cinco últimos itens do dataframe.
df.tail(3)

Out[11]: [Row(_c0=None, Placa='TOTAL 267', Modelo=None, Marca=None, AnoFabr.=None, Combustível=None, Chassi=None, Cor=None, Potência=None, Cilindro=None, Aquisição=None, Conservação=None, Situação=None, Combustivel=None, Renavam=None, Tipo=None, Motor=None, Caixa=None, Local=None, Loc.Manutenção=None, Km Atual=None, Data Compra=None, Data Venda=None, Nota de compra=None, Valor de compra=None, Nota de venda=None, Valor de venda=None, Nº Apólice=None),
 Row(_c0=None, Placa='RELATÓRIO DE VEÍCULOS DA FROTA', Modelo=None, Marca=None, AnoFabr.=None, Combustível=None, Chassi=None, Cor=None, Potência=None, Cilindro=None, Aquisição=None, Conservação=None, Situação=None, Combustivel=None, Renavam=None, Tipo=None, Motor=None, Caixa=None, Local=None, Loc.Manutenção=None, Km Atual=None, Data Compra=None, Data Venda=None, Nota de compra=None, Valor de compra=None, Nota de venda=None, Valor de venda=None, Nº Apólice=None),
 Row(_c0=None, Placa='Da depart. :   até :', Modelo=None, Marca=None, AnoFabr.

In [0]:
# Eliminando colunas do dataframe.
df = df.drop('_c0', 'Nota de venda', 'Valor de venda', 'Loc.Manutenção')
df.columns

Out[12]: ['Placa',
 'Modelo',
 'Marca',
 'AnoFabr.',
 'Combustível',
 'Chassi',
 'Cor',
 'Potência',
 'Cilindro',
 'Aquisição',
 'Conservação',
 'Situação',
 'Combustivel',
 'Renavam',
 'Tipo',
 'Motor',
 'Caixa',
 'Local',
 'Km Atual',
 'Data Compra',
 'Data Venda',
 'Nota de compra',
 'Valor de compra',
 'Nº Apólice']

In [0]:
sql_f.col('Data Compra')

Out[13]: Column<'Data Compra'>

In [0]:
# Renomeando a coluna AnoFabr. para AnoFabricao.
df = df.withColumnRenamed('AnoFabr.', 'AnoFabricao') \
       .withColumnRenamed('Nº Apólice', 'NumApolice') \
       .withColumnRenamed('Km Atual', 'KmAtual')

In [0]:
# Renomeando colunas em lote.
columns_names = ['Data Compra','Data Venda','Nota de compra', 'Valor de compra']
new_columns_names = [column.title().replace(' ', '').replace('De', '') for column in columns_names]
columns_names_mapping = dict(zip(columns_names, new_columns_names))

df = df.select([sql_f.col(column).alias(columns_names_mapping.get(column, column)) for column in df.columns])

In [0]:
sql_f.to_date(sql_f.col('DataCompra'), 'dd-mm-yyyy').alias('Data')

Out[16]: Column<'to_date(DataCompra, dd-mm-yyyy) AS Data'>

In [0]:
# Transformando as colunas Data Compra e Data Venda em objetos do tipo data.
df = df.withColumn('DataCompra', sql_f.to_date(sql_f.col('DataCompra'), 'dd/mm/yyyy').alias('DataCompra'))
df = df.withColumn('DataVenda', sql_f.to_date(sql_f.col('DataVenda'), 'dd/mm/yyyy').alias('DataVenda'))

In [0]:
# Transformando a coluna Valor de compra em float.
df = df.withColumn('ValorCompra', df['ValorCompra'].cast('double'))

In [0]:
df.printSchema()

root
 |-- Placa: string (nullable = true)
 |-- Modelo: string (nullable = true)
 |-- Marca: string (nullable = true)
 |-- AnoFabricao: integer (nullable = true)
 |-- Combustível: string (nullable = true)
 |-- Chassi: string (nullable = true)
 |-- Cor: string (nullable = true)
 |-- Potência: string (nullable = true)
 |-- Cilindro: integer (nullable = true)
 |-- Aquisição: string (nullable = true)
 |-- Conservação: string (nullable = true)
 |-- Situação: string (nullable = true)
 |-- Combustivel: string (nullable = true)
 |-- Renavam: integer (nullable = true)
 |-- Tipo: string (nullable = true)
 |-- Motor: string (nullable = true)
 |-- Caixa: string (nullable = true)
 |-- Local: string (nullable = true)
 |-- KmAtual: integer (nullable = true)
 |-- DataCompra: date (nullable = true)
 |-- DataVenda: date (nullable = true)
 |-- NotaCompra: integer (nullable = true)
 |-- ValorCompra: double (nullable = true)
 |-- NumApolice: string (nullable = true)



## Operações de Seleção e Filtragem

In [0]:
# Selecionando apenas uma coluna do dataframe.
df.select('Placa').show()

+-------+
|  Placa|
+-------+
|GMF1396|
|GMF1564|
|GMF1641|
|GMF1750|
|GMF1756|
|GMF2408|
|GMF2414|
|GMF2445|
|GMF2581|
|GMF2673|
|GMF2794|
|GMF2862|
|GMF3059|
|GMF3118|
|GMF3242|
|GMF3245|
|GMF3249|
|GMF3252|
|GMF3253|
|GMF3254|
+-------+
only showing top 20 rows



In [0]:
# Selecionando várias colunas ao mesmo tempo.
df.select(['Placa', 'Modelo', 'Marca', 'ValorCompra']).show()

+-------+--------------------+-------------+------------+
|  Placa|              Modelo|        Marca| ValorCompra|
+-------+--------------------+-------------+------------+
|GMF1396|     CAMINHAO TANQUE|   VOLKSWAGEN|        null|
|GMF1564|          PARATI CLI|   VOLKSWAGEN|     18290.0|
|GMF1641|          GOL 1.6 MI|   VOLKSWAGEN|        null|
|GMF1750|        VAN SPRINTER|MERCEDES BENZ|        null|
|GMF1756|        VAN SPRINTER|MERCEDES BENZ|        null|
|GMF2408|      CAMINHAO 608 D|MERCEDES BENZ|        null|
|GMF2414|     CAMINHAO  L1314|MERCEDES BENZ|        null|
|GMF2445|         ONIBUS 1314|MERCEDES BENZ|        null|
|GMF2581|       KOMBI PICK-UP|   VOLKSWAGEN|        null|
|GMF2673|    MICRO-ONIBUS 708|MERCEDES BENZ|        null|
|GMF2794|        CAMINHAO 608|MERCEDES BENZ| 3.7018287E7|
|GMF2862|     CAMINHAO  L1113|MERCEDES BENZ|1.19980655E8|
|GMF3059|      STRADA WORKING|         FIAT|        null|
|GMF3118|M. POLO VIAGGIO G...|MERCEDES BENZ|     98850.0|
|GMF3242|PICK-

In [0]:
# Filtrando apenas os veículos da marca VOLKSWAGEN.
df.filter(df['Marca'] == 'FORD').select(['Placa',
                                         'Modelo',
                                         'Marca',
                                         'DataCompra',
                                         'ValorCompra']).show()

+-------+--------------------+-----+----------+-----------+
|  Placa|              Modelo|Marca|DataCompra|ValorCompra|
+-------+--------------------+-----+----------+-----------+
|GMF3242|PICK-UP COURIER 1...| FORD|2000-01-12|       null|
|GMF5754|          FIESTA 1.6| FORD|2009-01-02|       null|
|GMF5755|          FIESTA 1.6| FORD|2009-01-02|       null|
|GMF5756|          FIESTA 1.6| FORD|2009-01-02|       null|
|GMF5757|          FIESTA 1.6| FORD|2009-01-03|       null|
|GMF5758|          FIESTA 1.6| FORD|2009-01-02|       null|
|GMF5759|          FIESTA 1.6| FORD|2009-01-02|       null|
|GMF6494|           RANGER XL| FORD|2010-01-12|    84900.0|
|GMF6583|          FIESTA 1.6| FORD|2011-01-16|    35900.0|
|GMF6584|          FIESTA 1.6| FORD|2011-01-16|    35900.0|
|GXP5136|         PICK-UP F75| FORD|1983-01-29|  2298960.0|
|HCG1E96|PICK-UP COURIER 1...| FORD|2012-01-15|        0.0|
|HNX0284|  CAMINHAO BAU 815 N| FORD|2012-01-15|   145500.0|
|NXY0160|VAN TRANSIT 350 L...| FORD|2017

In [0]:
# Filtrando os veículos da marca VOLKSWAGEN ou CHEVROLET.
df.filter((df['Marca'] == 'FORD') | 
          (df['Marca'] == 'CHEVROLET')).select(['Placa',
                                                'Modelo',
                                                'Marca',
                                                'DataCompra',
                                                'ValorCompra']).show()

+-------+--------------------+---------+----------+-----------+
|  Placa|              Modelo|    Marca|DataCompra|ValorCompra|
+-------+--------------------+---------+----------+-----------+
|GMF3242|PICK-UP COURIER 1...|     FORD|2000-01-12|       null|
|GMF3245|   PICK-UP S10 2.2 S|CHEVROLET|2000-01-15|    24699.0|
|GMF3254|   PICK-UP S10 2.2 S|CHEVROLET|2000-01-15|    24699.0|
|GMF3257|   PICK-UP S10 2.2 S|CHEVROLET|2000-01-15|    24699.0|
|GMF3259|PICK-UP GM/S10 2.8 S|CHEVROLET|2000-01-15|    35795.0|
|GMF3260|       GM/CORSA WIND|CHEVROLET|2000-01-15|    19563.0|
|GMF3262|       GM/CORSA WIND|CHEVROLET|2000-01-15|    19563.0|
|GMF3273|   PICK-UP S10 2.2 S|CHEVROLET|2000-01-15|    24699.0|
|GMF3276|       GM/CORSA WIND|CHEVROLET|2000-01-15|    19563.0|
|GMF3554|GM/S10 2.8 D CABI...|CHEVROLET|2012-01-24|    62063.0|
|GMF4073|               ASTRA|CHEVROLET|2016-01-05|       null|
|GMF4884|ASTRA SEDAN ELEGANCE|CHEVROLET|2006-01-28|    55500.0|
|GMF4885|ASTRA SEDAN ELEGANCE|CHEVROLET|

In [0]:
# Filtrando os veículos com mais de 20 anos de fabricação.
df.filter(f'AnoFabricao < {2022 - 20}').select(['Placa',
                                                'Marca',
                                                'Modelo',
                                                'Conservação',
                                                'AnoFabricao',
                                                'DataCompra',
                                                'KmAtual',
                                                'ValorCompra']).show()

+-------+-------------+--------------------+--------------+-----------+----------+-------+------------+
|  Placa|        Marca|              Modelo|   Conservação|AnoFabricao|DataCompra|KmAtual| ValorCompra|
+-------+-------------+--------------------+--------------+-----------+----------+-------+------------+
|GMF1396|   VOLKSWAGEN|     CAMINHAO TANQUE|       Regular|       1995|1995-01-12|  35329|        null|
|GMF1564|   VOLKSWAGEN|          PARATI CLI|Anti-econômico|       1996|2012-01-11| 233615|     18290.0|
|GMF1641|   VOLKSWAGEN|          GOL 1.6 MI|Anti-econômico|       1997|2013-01-17|  57315|        null|
|GMF1750|MERCEDES BENZ|        VAN SPRINTER|Anti-econômico|       1997|1997-01-04| 359394|        null|
|GMF1756|MERCEDES BENZ|        VAN SPRINTER|       Regular|       1997|1997-01-30| 365290|        null|
|GMF2408|MERCEDES BENZ|      CAMINHAO 608 D|       Regular|       1986|1986-01-29| 183347|        null|
|GMF2414|MERCEDES BENZ|     CAMINHAO  L1314|       Regular|     

In [0]:
# Filtrando os veículos com mais de 20 anos de fabricação e estado de conservação Anti-econômico.
df.filter((df['AnoFabricao'] <= 2000) &
          (df['Conservação'] == 'Anti-econômico')).select(['Placa',
                                                           'Marca',
                                                           'Modelo',
                                                           'Conservação',
                                                           'AnoFabricao',
                                                           'DataCompra',
                                                           'KmAtual',
                                                           'ValorCompra']).show()

+-------+-------------+------------------+--------------+-----------+----------+-------+-----------+
|  Placa|        Marca|            Modelo|   Conservação|AnoFabricao|DataCompra|KmAtual|ValorCompra|
+-------+-------------+------------------+--------------+-----------+----------+-------+-----------+
|GMF1564|   VOLKSWAGEN|        PARATI CLI|Anti-econômico|       1996|2012-01-11| 233615|    18290.0|
|GMF1641|   VOLKSWAGEN|        GOL 1.6 MI|Anti-econômico|       1997|2013-01-17|  57315|       null|
|GMF1750|MERCEDES BENZ|      VAN SPRINTER|Anti-econômico|       1997|1997-01-04| 359394|       null|
|GMF2445|MERCEDES BENZ|       ONIBUS 1314|Anti-econômico|       1987|1987-01-20|  32885|       null|
|GMF2581|   VOLKSWAGEN|     KOMBI PICK-UP|Anti-econômico|       1986|1986-01-29|  79715|       null|
|GMF2673|MERCEDES BENZ|  MICRO-ONIBUS 708|Anti-econômico|       1987|1987-01-06|  50841|       null|
|GMF3257|    CHEVROLET| PICK-UP S10 2.2 S|Anti-econômico|       2000|2000-01-15| 118168|   

In [0]:
# Visualizando todos os veículos que não são movidos a Diesel.
df.filter(~(df['Combustível'] == 'Diesel')).select(['Placa',
                                                    'Marca',
                                                    'Modelo',
                                                    'ValorCompra']).show()

+-------+----------+--------------------+-----------+
|  Placa|     Marca|              Modelo|ValorCompra|
+-------+----------+--------------------+-----------+
|GMF1564|VOLKSWAGEN|          PARATI CLI|    18290.0|
|GMF1641|VOLKSWAGEN|          GOL 1.6 MI|       null|
|GMF2581|VOLKSWAGEN|       KOMBI PICK-UP|       null|
|GMF3059|      FIAT|      STRADA WORKING|       null|
|GMF3242|      FORD|PICK-UP COURIER 1...|       null|
|GMF3245| CHEVROLET|   PICK-UP S10 2.2 S|    24699.0|
|GMF3249|VOLKSWAGEN|    KOMBI - STANDARD|       null|
|GMF3252|VOLKSWAGEN|    KOMBI - STANDARD|       null|
|GMF3254| CHEVROLET|   PICK-UP S10 2.2 S|    24699.0|
|GMF3257| CHEVROLET|   PICK-UP S10 2.2 S|    24699.0|
|GMF3260| CHEVROLET|       GM/CORSA WIND|    19563.0|
|GMF3262| CHEVROLET|       GM/CORSA WIND|    19563.0|
|GMF3273| CHEVROLET|   PICK-UP S10 2.2 S|    24699.0|
|GMF3276| CHEVROLET|       GM/CORSA WIND|    19563.0|
|GMF3314|VOLKSWAGEN|    KOMBI - STANDARD|       null|
|GMF3523|   REBOQUE|        