In [2]:
#instalando o pyspark, será necessário para enviar do pandas para o spark
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=eed39cc4a9430be6b93731e7c3383ba2e7ef5b985bfdd0a3b336bcc73a5ef4da
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [64]:
# para verificar podemos listar os arquivos na pasta usando a biblioteca os
import os
import pandas as pd
from pyspark.sql import SparkSession #cria uma seção/ambiente para utilizarmos spark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, TimestampType

#para salvar no google drive, deve-se importar a biblioteca
from google.colab import drive
import csv
import pyspark.sql.functions as F #para substituir valores e tipo de dados
import pyspark


In [65]:
# Sets a name for the application, which will be shown in the Spark web UI.
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.builder.appName.html
spark = SparkSession.builder.appName("Aula 1").getOrCreate()

In [66]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [67]:
id_arquivo = '2023_Passagem.csv'

In [68]:
#Variável de diretório
diret = f'/content/drive/MyDrive/Bootcamp_eng_dados/Mod1/Viagens/{id_arquivo}'

In [69]:
os.listdir("/content/drive/MyDrive/Bootcamp_eng_dados/Mod1/Viagens")

['2023_Viagem.csv',
 '2023_Passagem.csv',
 '2023_Trecho.csv',
 '2023_Pagamento.csv']

#Leitura do arquivo da pasta

In [70]:
#pre-definindo o schema usando StructType e StructField
gastos_schema = StructType(fields=[StructField("Identificador do processo de viagem", IntegerType()),
                                    StructField("Número da Proposta (PCDP)", StringType()),
                                    StructField("Meio de transporte", StringType()),
                                    StructField("País - Origem ida", StringType()),
                                    StructField("UF - Origem ida", StringType()),
                                    StructField("Cidade - Origem ida", StringType()),
                                    StructField("País - Destino ida", StringType()),
                                   StructField("UF - Destino ida", StringType()),
                                    StructField("Cidade - Destino ida", StringType()),
                                    StructField("País - Origem volta", StringType()),
                                   StructField("UF - Origem volta", StringType()),
                                    StructField("Cidade - Origem volta", StringType()),
                                    StructField("Pais - Destino volta", StringType()),
                                   StructField("UF - Destino volta", StringType()),
                                    StructField("Cidade - Destino volta", StringType()),
                                    StructField("Valor da passagem", StringType()),#DoubleType()),
                                    StructField("Taxa de serviço", StringType()),#DoubleType()),
                                    StructField("Data da emissão/compra",StringType()), #DateType()),#only accept format yyyy-MM-dd.
                                   StructField("Hora da emissão/compra", TimestampType())
                                    ]
                           )

In [73]:
# usando StructType
df2 = spark.read.option("header", "true")\
.schema(gastos_schema)\
.option("delimiter", ";")\
.option("encoding", "iso-8859-1")\
.csv(diret)

In [72]:
# OU usando inferSchema
df2 = spark.read.option("header", "true")\
.option('inferSchema', 'true')\
.option("delimiter", ";")\
.option("encoding", "iso-8859-1")\
.csv(diret)


In [74]:
#mostra as colunas
df2.show(5)

+-----------------------------------+-------------------------+------------------+-----------------+----------------+-------------------+--------------------+----------------+--------------------+--------------------+-----------------+---------------------+--------------------+------------------+----------------------+-----------------+---------------+----------------------+----------------------+
|Identificador do processo de viagem|Número da Proposta (PCDP)|Meio de transporte|País - Origem ida| UF - Origem ida|Cidade - Origem ida|  País - Destino ida|UF - Destino ida|Cidade - Destino ida| País - Origem volta|UF - Origem volta|Cidade - Origem volta|Pais - Destino volta|UF - Destino volta|Cidade - Destino volta|Valor da passagem|Taxa de serviço|Data da emissão/compra|Hora da emissão/compra|
+-----------------------------------+-------------------------+------------------+-----------------+----------------+-------------------+--------------------+----------------+--------------------+--

In [76]:
# caso precise converter a data
df2.select(col("Data da emissão/compra"),
    to_date(col("Data da emissão/compra"), "dd/MM/yyyy").alias("to_date")).show()
    #date_format(col("Data da emissão/compra"), "dd/MM/yyyy").alias("to_date")).show()

+----------------------+----------+
|Data da emissão/compra|   to_date|
+----------------------+----------+
|            04/11/2022|2022-11-04|
|            15/09/2022|2022-09-15|
|            15/09/2022|2022-09-15|
|            15/09/2022|2022-09-15|
|            15/09/2022|2022-09-15|
|            06/12/2022|2022-12-06|
|            06/12/2022|2022-12-06|
|            06/12/2022|2022-12-06|
|            23/11/2022|2022-11-23|
|            23/11/2022|2022-11-23|
|            21/09/2022|2022-09-21|
|            21/09/2022|2022-09-21|
|            26/01/2023|2023-01-26|
|            26/01/2023|2023-01-26|
|            21/09/2022|2022-09-21|
|            21/10/2022|2022-10-21|
|            21/10/2022|2022-10-21|
|            28/02/2023|2023-02-28|
|            28/02/2023|2023-02-28|
|            26/09/2022|2022-09-26|
+----------------------+----------+
only showing top 20 rows



In [77]:
df2 = df2.withColumn("Data da emissão/compra", to_date("Data da emissão/compra", "dd/MM/yyyy"))

In [78]:
df2.printSchema()

root
 |-- Identificador do processo de viagem: integer (nullable = true)
 |-- Número da Proposta (PCDP): string (nullable = true)
 |-- Meio de transporte: string (nullable = true)
 |-- País - Origem ida: string (nullable = true)
 |-- UF - Origem ida: string (nullable = true)
 |-- Cidade - Origem ida: string (nullable = true)
 |-- País - Destino ida: string (nullable = true)
 |-- UF - Destino ida: string (nullable = true)
 |-- Cidade - Destino ida: string (nullable = true)
 |-- País - Origem volta: string (nullable = true)
 |-- UF - Origem volta: string (nullable = true)
 |-- Cidade - Origem volta: string (nullable = true)
 |-- Pais - Destino volta: string (nullable = true)
 |-- UF - Destino volta: string (nullable = true)
 |-- Cidade - Destino volta: string (nullable = true)
 |-- Valor da passagem: string (nullable = true)
 |-- Taxa de serviço: string (nullable = true)
 |-- Data da emissão/compra: date (nullable = true)
 |-- Hora da emissão/compra: timestamp (nullable = true)



In [79]:
df2 = df2.select([F.col(x).alias(x.replace(" - "," ").replace(" ","_").lower()) for x in df2.columns])

In [80]:
#mostra as colunas
df2.show(5)

+-----------------------------------+-------------------------+------------------+---------------+----------------+-----------------+--------------------+--------------+------------------+--------------------+---------------+-------------------+------------------+----------------+--------------------+-----------------+---------------+----------------------+----------------------+
|identificador_do_processo_de_viagem|número_da_proposta_(pcdp)|meio_de_transporte|país_origem_ida|   uf_origem_ida|cidade_origem_ida|    país_destino_ida|uf_destino_ida|cidade_destino_ida|   país_origem_volta|uf_origem_volta|cidade_origem_volta|pais_destino_volta|uf_destino_volta|cidade_destino_volta|valor_da_passagem|taxa_de_serviço|data_da_emissão/compra|hora_da_emissão/compra|
+-----------------------------------+-------------------------+------------------+---------------+----------------+-----------------+--------------------+--------------+------------------+--------------------+---------------+---------

In [81]:
df2.printSchema()

root
 |-- identificador_do_processo_de_viagem: integer (nullable = true)
 |-- número_da_proposta_(pcdp): string (nullable = true)
 |-- meio_de_transporte: string (nullable = true)
 |-- país_origem_ida: string (nullable = true)
 |-- uf_origem_ida: string (nullable = true)
 |-- cidade_origem_ida: string (nullable = true)
 |-- país_destino_ida: string (nullable = true)
 |-- uf_destino_ida: string (nullable = true)
 |-- cidade_destino_ida: string (nullable = true)
 |-- país_origem_volta: string (nullable = true)
 |-- uf_origem_volta: string (nullable = true)
 |-- cidade_origem_volta: string (nullable = true)
 |-- pais_destino_volta: string (nullable = true)
 |-- uf_destino_volta: string (nullable = true)
 |-- cidade_destino_volta: string (nullable = true)
 |-- valor_da_passagem: string (nullable = true)
 |-- taxa_de_serviço: string (nullable = true)
 |-- data_da_emissão/compra: date (nullable = true)
 |-- hora_da_emissão/compra: timestamp (nullable = true)



In [82]:
df2 = df2.withColumn("valor_da_passagem", F.expr("cast(replace(valor_da_passagem, ',','.') as double)"))\
.withColumn("taxa_de_serviço", F.expr("cast(replace(`taxa_de_serviço`, ',','.') as double)"))



df2.printSchema()

root
 |-- identificador_do_processo_de_viagem: integer (nullable = true)
 |-- número_da_proposta_(pcdp): string (nullable = true)
 |-- meio_de_transporte: string (nullable = true)
 |-- país_origem_ida: string (nullable = true)
 |-- uf_origem_ida: string (nullable = true)
 |-- cidade_origem_ida: string (nullable = true)
 |-- país_destino_ida: string (nullable = true)
 |-- uf_destino_ida: string (nullable = true)
 |-- cidade_destino_ida: string (nullable = true)
 |-- país_origem_volta: string (nullable = true)
 |-- uf_origem_volta: string (nullable = true)
 |-- cidade_origem_volta: string (nullable = true)
 |-- pais_destino_volta: string (nullable = true)
 |-- uf_destino_volta: string (nullable = true)
 |-- cidade_destino_volta: string (nullable = true)
 |-- valor_da_passagem: double (nullable = true)
 |-- taxa_de_serviço: double (nullable = true)
 |-- data_da_emissão/compra: date (nullable = true)
 |-- hora_da_emissão/compra: timestamp (nullable = true)



In [83]:
df2.describe().show()


+-------+-----------------------------------+-------------------------+------------------+---------------+-------------+-----------------+----------------+--------------+------------------+-----------------+---------------+-------------------+------------------+----------------+--------------------+------------------+------------------+
|summary|identificador_do_processo_de_viagem|número_da_proposta_(pcdp)|meio_de_transporte|país_origem_ida|uf_origem_ida|cidade_origem_ida|país_destino_ida|uf_destino_ida|cidade_destino_ida|país_origem_volta|uf_origem_volta|cidade_origem_volta|pais_destino_volta|uf_destino_volta|cidade_destino_volta| valor_da_passagem|   taxa_de_serviço|
+-------+-----------------------------------+-------------------------+------------------+---------------+-------------+-----------------+----------------+--------------+------------------+-----------------+---------------+-------------------+------------------+----------------+--------------------+------------------+---

In [84]:
#somente as colunas quantitativas
df2[['valor_da_passagem','taxa_de_serviço']].describe().show()

+-------+------------------+------------------+
|summary| valor_da_passagem|   taxa_de_serviço|
+-------+------------------+------------------+
|  count|            196332|            196332|
|   mean|1832.7642894180713|10.517705111749974|
| stddev| 2400.985037158453| 298.5524850835766|
|    min|               0.0|               0.0|
|    max|          79234.41|          76896.94|
+-------+------------------+------------------+



In [85]:
type(df2)

pyspark.sql.dataframe.DataFrame