<a href="https://colab.research.google.com/github/Isiumlord/ProjetoFinal-AcidentesTerrestres/blob/main/DataSet-VolumeDeTrafego-PySpark-SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1 - INSTALANDO O PYSPARK

!pip install pyspark



In [None]:
# 2 - IMPORTANDO O PYSPARK E A BIBLIOTECA DE FUNÇÕES DO PYSPARK

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from pyspark.sql.window import Window

In [None]:
# 3 - AUTENTICAÇÃO DA GCP

from google.colab import auth
auth.authenticate_user()

In [None]:
# 3 - SELECIONANDO O PROJETO - IMPORTANDO DIRETO DA BUCKET

project_id = 'projeto-final-grupo03'
!gcloud config set project projeto-final-grupo03

Updated property [core/project].


In [None]:
# 4 - FAZENDO DOWLOAD DO DOCUMENTO QUE SERÁ UTILIZADO - !gsutil cp gs://NomeDaBucket/NomeDoDocumento /tmp/NomeDoDocumento.csv

!gsutil cp gs://notebooks_pandas_gp03/pedagio_tratado.csv /tmp/pedagio_tratado.csv

Copying gs://notebooks_pandas_gp03/pedagio_tratado.csv...
- [1 files][  4.0 MiB/  4.0 MiB]                                                
Operation completed over 1 objects/4.0 MiB.                                      


In [None]:
#CONSULTAR PARA VER SE A TRANSFERÊNCIA FUNCIONOU

#!cat /tmp/pedagio_tratado.csv

In [None]:
# 5 - CONFIGURANDO A SPARKSESSION

spark = (SparkSession.builder
        .master("local")                   
        .appName("aprendendo-dataframes") 
        .config("spark.ui.port", "4050")
        .getOrCreate()) 

In [None]:
# CONSULTAR SE O SPARK ESTÁ INSTALADO

spark

In [None]:
# 6 - ESTRUTURA DO DATAFRAME

df = (spark
       .read
       .format("csv")
       .option("header", "true")
       .option("inferschema", "true")
       .option("delimiter", ",")
       .load("/tmp/pedagio_tratado.csv")
)

In [None]:
# 7 - VISUALIZANDO SCHEMA DO DATAFRAME E O TYPES DOS DADOS (STRING, INT, FLOAT)

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: string (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



In [None]:
#VISUALIZANDO O DATAFRAME

df.show()

+---+--------------+----------+-------+---------------+------------+------+---+
|_c0|Concessionaria|      Data|Sentido|Tipo_de_veiculo|Volume_total|    BR| UF|
+---+--------------+----------+-------+---------------+------------+------+---+
|  0|     NOVADUTRA|2019-01-01|    Sul|        Passeio|    640429.0|BR-116| SP|
|  1|     NOVADUTRA|2019-01-01|  Norte|        Passeio|    787419.0|BR-116| SP|
|  2|     NOVADUTRA|2019-02-01|    Sul|        Passeio|    511718.0|BR-116| SP|
|  3|     NOVADUTRA|2019-02-01|  Norte|        Passeio|    638796.0|BR-116| SP|
|  4|     NOVADUTRA|2019-03-01|    Sul|        Passeio|    579422.0|BR-116| SP|
|  5|     NOVADUTRA|2019-03-01|  Norte|        Passeio|    719487.0|BR-116| SP|
|  6|     NOVADUTRA|2019-04-01|    Sul|        Passeio|    574518.0|BR-116| SP|
|  7|     NOVADUTRA|2019-04-01|  Norte|        Passeio|    710563.0|BR-116| SP|
|  8|     NOVADUTRA|2019-05-01|    Sul|        Passeio|    574429.0|BR-116| SP|
|  9|     NOVADUTRA|2019-05-01|  Norte| 

In [None]:
# 8 - MUDAR O TIPO DE DATA PARA DATE COM  CAST

df = df.withColumn("Data", df.Data.cast("date"))

In [None]:
# 9 - VISUALIZANDO SCHEMA DO DATAFRAME PARA VER SE A MUDANÇA DA COLUNA "Data" DEU CERTO

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



In [None]:
df.show()

+---+--------------+----------+-------+---------------+------------+------+---+
|_c0|Concessionaria|      Data|Sentido|Tipo_de_veiculo|Volume_total|    BR| UF|
+---+--------------+----------+-------+---------------+------------+------+---+
|  0|     NOVADUTRA|2019-01-01|    Sul|        Passeio|    640429.0|BR-116| SP|
|  1|     NOVADUTRA|2019-01-01|  Norte|        Passeio|    787419.0|BR-116| SP|
|  2|     NOVADUTRA|2019-02-01|    Sul|        Passeio|    511718.0|BR-116| SP|
|  3|     NOVADUTRA|2019-02-01|  Norte|        Passeio|    638796.0|BR-116| SP|
|  4|     NOVADUTRA|2019-03-01|    Sul|        Passeio|    579422.0|BR-116| SP|
|  5|     NOVADUTRA|2019-03-01|  Norte|        Passeio|    719487.0|BR-116| SP|
|  6|     NOVADUTRA|2019-04-01|    Sul|        Passeio|    574518.0|BR-116| SP|
|  7|     NOVADUTRA|2019-04-01|  Norte|        Passeio|    710563.0|BR-116| SP|
|  8|     NOVADUTRA|2019-05-01|    Sul|        Passeio|    574429.0|BR-116| SP|
|  9|     NOVADUTRA|2019-05-01|  Norte| 

In [None]:
# 10 - RENOMEAR COLUNA COMO ID

df = df.withColumnRenamed('_c0', 'ID')

In [None]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



In [None]:
# 11 - FILTRO PARA IDENTIFICAR COLUNAS COM DADOS NULOS RELEVANTES OU NÃO

#TEM DADOS NULOS

df.filter(F.isnull("BR")).show()
df.filter(F.isnull("Volume_Total")).show()
df.filter(F.isnull("UF")).show()

#NÃO TEM DADOS NULOS

df.filter(F.isnull("ID")).show()
df.filter(F.isnull("Concessionaria")).show()
df.filter(F.isnull("Data")).show()
df.filter(F.isnull("Tipo_de_Veiculo")).show()

+-----+--------------+----------+-------+---------------+------------+----+----+
|   ID|Concessionaria|      Data|Sentido|Tipo_de_veiculo|Volume_total|  BR|  UF|
+-----+--------------+----------+-------+---------------+------------+----+----+
|11004|RODOVIA DO AÇO|2019-01-01|    Sul|        Passeio|     45447.0|null|null|
|11005|RODOVIA DO AÇO|2019-01-01|  Norte|        Passeio|     37274.0|null|null|
|11006|RODOVIA DO AÇO|2019-02-01|    Sul|        Passeio|     27600.0|null|null|
|11007|RODOVIA DO AÇO|2019-02-01|  Norte|        Passeio|     27451.0|null|null|
|11008|RODOVIA DO AÇO|2019-03-01|    Sul|        Passeio|     33070.0|null|null|
|11009|RODOVIA DO AÇO|2019-03-01|  Norte|        Passeio|     32882.0|null|null|
|11010|RODOVIA DO AÇO|2019-04-01|    Sul|        Passeio|     32347.0|null|null|
|11011|RODOVIA DO AÇO|2019-04-01|  Norte|        Passeio|     32413.0|null|null|
|11012|RODOVIA DO AÇO|2019-05-01|    Sul|        Passeio|     29652.0|null|null|
|11013|RODOVIA DO AÇO|2019-0

In [None]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



In [None]:
# 12 - VISUALIZANDO O QUE TEM NA COLUNA "Volume_Total"

df.select("UF").distinct().collect()

[Row(UF='SC'),
 Row(UF='GO'),
 Row(UF=None),
 Row(UF='MT'),
 Row(UF='SP'),
 Row(UF='ES'),
 Row(UF='RS'),
 Row(UF='MS'),
 Row(UF='MG'),
 Row(UF='BA'),
 Row(UF='RJ'),
 Row(UF='PR')]

FILTROS E CONSULTAS

In [None]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



In [None]:
# 13 - FILTRO PARA IDENTIFICAR A QUANTIDADE ESPECIFICA DE PRAÇAS POR UF

df.groupBy(F.col("UF")).count().show()

+----+-----+
|  UF|count|
+----+-----+
|  SC| 3360|
|  GO| 4704|
|  MT| 6048|
|  SP| 8153|
|  ES| 3360|
|  RS| 2400|
|  MS| 6048|
|  MG|14112|
|  BA| 4032|
|  RJ| 7442|
|  PR| 1920|
|null| 7600|
+----+-----+



In [None]:
# 14 - FILTRO PARA IDENTIFICAR A QUANTIDADE ESPECIFICA DE PRAÇAS POR "Concessionaria"

df.groupBy(F.col("Concessionaria")).count().show()

+--------------------+-----+
|      Concessionaria|count|
+--------------------+-----+
|      RODOVIA DO AÇO| 1440|
|             VIA SUL| 3024|
|AUTOPISTA FLUMINENSE| 2210|
|              CONCER| 1440|
|AUTOPISTA LITORAL...| 2400|
|              ECOSUL| 2400|
|AUTOPISTA FERNÃO ...| 3840|
|AUTOPISTA REGIS B...| 2880|
|                 CRT| 1728|
|  ECOVIAS DO CERRADO|  112|
|AUTOPISTA PLANALT...| 2400|
|             VIA 040| 7392|
|     TRANSBRASILIANA| 1920|
|           NOVADUTRA| 4745|
|           VIA BAHIA| 4032|
|            CONCEBRA| 7392|
|                 CRO| 6048|
|               MSVIA| 6048|
|            ECOPONTE|  336|
|ECO101 CONCESSION...| 3360|
+--------------------+-----+
only showing top 20 rows



In [None]:
# 15 - FILTRO PARA IDENTIFICAR A QUANTIDADE ESPECIFICA DE PRAÇAS POR "Tipo_de_Veiculo"

df.groupBy(F.col("Tipo_de_Veiculo")).count().show()

+---------------+-----+
|Tipo_de_Veiculo|count|
+---------------+-----+
|      Comercial|46259|
|        Passeio|17190|
|           Moto| 5730|
+---------------+-----+



In [None]:
# 16 - MÉDIA DE "Volume_Total" DE CARROS TRAFEGANDO EM TODAS AS VIAS

df.select(F.round(F.avg("Volume_Total"),2).alias('Média_de_Trafego')).show()


+----------------+
|Média_de_Trafego|
+----------------+
|        18195.97|
+----------------+



In [None]:
# MÉDIA DE CARROS POR ESTADO - grpdf = joined_df.groupBy(temp1.datestamp).max('diff').select(func.col("max(diff)").alias("maxDiff"))

df.groupBy(F.col("UF")).avg("Volume_Total").select(F.col("UF"), F.round(F.col("avg(Volume_Total)"), 2).alias('Média_de_Trafego_UF')).show()

+----+-------------------+
|  UF|Média_de_Trafego_UF|
+----+-------------------+
|  SC|           35944.48|
|  GO|           13257.98|
|  MT|            8621.33|
|  SP|           38173.38|
|  ES|           14133.68|
|  RS|            9006.12|
|  MS|            5480.09|
|  MG|           12702.54|
|  BA|           19021.42|
|  RJ|           24902.26|
|  PR|           29663.79|
|null|           14077.46|
+----+-------------------+



In [None]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



UTILIZANDO FUNÇÃO WINDOW

In [None]:
# 18 - UTILIZANDO FUNÇÃO PARA OTIMIZAR A PESQUISA DE QUAL O GRUPO DE CARROS MAIS PASSA PELAS RODOVIAS EM DETERMINADO ESTADO , ORGANIZANDO POR "UF" >> "Volume_Total" >> "BR"

w0 = Window.partitionBy(F.col("UF")).orderBy("Volume_Total", "UF", "BR")
df.withColumn("row_number", F.row_number().over(w0)).show()

+-----+--------------+----------+-------+---------------+------------+----+----+----------+
|   ID|Concessionaria|      Data|Sentido|Tipo_de_veiculo|Volume_total|  BR|  UF|row_number|
+-----+--------------+----------+-------+---------------+------------+----+----+----------+
|27180|       VIA SUL|2019-01-01|  Norte|        Passeio|        null|null|null|         1|
|27181|       VIA SUL|2019-01-01|    Sul|        Passeio|        null|null|null|         2|
|27201|       VIA SUL|2019-11-01|    Sul|        Passeio|        null|null|null|         3|
|27203|       VIA SUL|2019-12-01|    Sul|        Passeio|        null|null|null|         4|
|27204|       VIA SUL|2019-01-01|  Norte|      Comercial|        null|null|null|         5|
|27205|       VIA SUL|2019-01-01|    Sul|      Comercial|        null|null|null|         6|
|27225|       VIA SUL|2019-11-01|    Sul|      Comercial|        null|null|null|         7|
|27227|       VIA SUL|2019-12-01|    Sul|      Comercial|        null|null|null|

In [None]:
# 19 - UTILIZANDO FUNÇÃO AGG PARA REALIZAR DOIS CALCULOS SOBRE O VOLUME DE CARROS PERCORRENDO AS RODOVIAS 

'''
A MÉDIA DAS RENDAS DOS CLIENTES
A SOMA TOTAL DAS RENDAS DOS CLIENTES
'''

(df.withColumn("row", F.row_number().over(w0))
 .withColumn("avg", F.avg(F.col("Volume_Total")).over(w0))
 .withColumn("sum", F.sum(F.col("Volume_Total")).over(w0))
  .where(F.col("row") == 1).select("row","UF", "BR", "avg", "sum").show()
)

+---+----+------+----+----+
|row|  UF|    BR| avg| sum|
+---+----+------+----+----+
|  1|null|  null|null|null|
|  1|  BA|BR-116|54.0|54.0|
|  1|  ES|BR-101|50.0|50.0|
|  1|  GO|BR-040|null|null|
|  1|  MG|BR-040|null|null|
|  1|  MS|BR-163|null|null|
|  1|  MT|BR-163|null|null|
|  1|  PR|BR-116| 0.0| 0.0|
|  1|  RJ|BR-101|null|null|
|  1|  RS|BR-116|null|null|
|  1|  SC|BR-116| 0.0| 0.0|
|  1|  SP|BR-116|null|null|
+---+----+------+----+----+



In [None]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Concessionaria: string (nullable = true)
 |-- Data: date (nullable = true)
 |-- Sentido: string (nullable = true)
 |-- Tipo_de_veiculo: string (nullable = true)
 |-- Volume_total: double (nullable = true)
 |-- BR: string (nullable = true)
 |-- UF: string (nullable = true)



In [None]:
#20 - SALVANDO O ARQUIVO NORMALIZADO NO BUCKET

(df.write.format("csv").option("header", "true")
                        .option("inferschema", "true")
                        .option("delimiter", ",")
                        .save("pedagio_pyspark"))
!gsutil cp -r pedagio_pyspark gs://notebooks_pyspark_sql_gp03

Copying file://pedagio_pyspark/._SUCCESS.crc [Content-Type=application/octet-stream]...
Copying file://pedagio_pyspark/.part-00000-732acd99-8e9c-476c-8f83-64ec9ad8fc34-c000.csv.crc [Content-Type=application/octet-stream]...
Copying file://pedagio_pyspark/part-00000-732acd99-8e9c-476c-8f83-64ec9ad8fc34-c000.csv [Content-Type=text/csv]...
Copying file://pedagio_pyspark/_SUCCESS [Content-Type=application/octet-stream]...
\ [4 files][  4.0 MiB/  4.0 MiB]                                                
Operation completed over 4 objects/4.0 MiB.                                      
