<a href="https://colab.research.google.com/github/squadOito/soulcodead2/blob/joseaureliok%2Fnotebook/notebooks/notebook_iea_ponto_recarga_pyspark_bruto.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Projeto Final**
Escola: SoulCode Academy

Curso: Bootcamp Analista de Dados - Martech - AD2

**Equipe 08**

**Alunos: Adriano Kim, José Aurelio, Marcos Paulo, Paulo Vitorino, Renato e Wesley**

Professores: Douglas Ribeiro, Franciane Rodrigues e Jonathas Carneiro

## Preparação de Ambiente
Instalações e importações das bibliotecas necessárias para o processo de ETL.

In [None]:
# Instalaçao Bibliotecas
!pip install gcsfs -q
!pip install pandera -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/201.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━[0m [32m143.4/201.6 kB[0m [31m4.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m201.6/201.6 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# Importando Bibliotecas
import os
import pandas as pd
import numpy as np
import pandera as pa
import datetime as dt
from google.cloud import storage
from google.colab import drive
from google.colab import auth
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from oauth2client.client import GoogleCredentials

In [None]:
# Ignorando alguns alertas desnecessários
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Configuração da quantidade de colunas para aparecer em um DataFrame
pd.set_option('display.max_columns',100)

### Configuração PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -N -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark.sql.functions import regexp_replace
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Para deixar a visualição das tabelas mais amigável

import pyspark.sql.functions as F
from pyspark.sql.types import *

spark

### Compartilhamento Chave GDrive

In [None]:
# Cria compartilhamento com Google Drive
drive.mount('/content/drive', force_remount=True)

# Arquivo a ser acessado na pasta compartilhada

target = 'projeto-final-ad2-e8-ae566c3a2c2b.json'

# Caminho completo da pasta compartilhada
folder = '/content/drive/MyDrive/Classroom/AD2 - Analista de Dados/ProjetoFinal'

# Acesso ao arquivo no colab
serviceAccount = os.path.join(folder, target)

Mounted at /content/drive


## Extração
A primeira etapa da ETL é a extração dos dados de sua fonte original. Dependendo do tipo de dados e da fonte, você pode precisar de diferentes ferramentas e técnicas para extrair os dados.

In [None]:
# Conexão com a conta do Google Cloud
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

In [None]:
# Conexão com a bucket do Google Cloud
client = storage.Client()
bucket = client.get_bucket('projeto-final-ad2-e8') # nome do bucket

In [None]:
# Caminho do arquivo
path = 'gs://projeto-final-ad2-e8/dados/brutos/excel/iea_ponto_recarga_bruto.xlsx'

Read Pyspark

In [None]:
# função de leitura no Spark
def spark_read(path):
  try:

    try:
      # leitura csv com pandas
      read = pd.read_excel(path)
      # conversão em df pandas em df pyspark
      df = spark.createDataFrame(read)
      # êxito da função retornando o dataframe
      return df

    except:
      # leitura csv com pandas
      read = pd.read_excel(path)

      # tratamento de tipo de dado do df pandas para conversão em df pyspark
      for n in read.columns:
        if read[f'{n}'].dtype == object:
          read[f'{n}'] = read[f'{n}'].astype(str)
        elif read[f'{n}'].dtype == bool:
          read[f'{n}'] = read[f'{n}'].astype(bool)
        else:
          read[f'{n}'] = read[f'{n}'].astype(np.double)

      # conversão df pandas em df pyspark
      df = spark.createDataFrame(read)
      # êxito da função retornando o dataframe
      return df

  except:
    # indicação de erro na leitura do dataframe
    print(f'Falha na leitura do DataFrame: {path}!')

In [None]:
df = spark_read(path)

### Pré Análise

In [None]:
# Visualização geral
df.show(100)

+---------+----------+------------------+----+--------------------+----+---------------+-----------------+
|   region|  category|         parameter|mode|          powertrain|year|           unit|            value|
+---------+----------+------------------+----+--------------------+----+---------------+-----------------+
|Australia|Historical|EV charging points|  EV|Publicly availabl...|2017|charging points|               40|
|Australia|Historical|EV charging points|  EV|Publicly availabl...|2017|charging points|              440|
|Australia|Historical|EV charging points|  EV|Publicly availabl...|2018|charging points|               61|
|Australia|Historical|EV charging points|  EV|Publicly availabl...|2018|charging points|              670|
|Australia|Historical|EV charging points|  EV|Publicly availabl...|2019|charging points|              250|
|Australia|Historical|EV charging points|  EV|Publicly availabl...|2019|charging points|             1700|
|Australia|Historical|EV charging poi

In [None]:
# Tamanho total de (linhas , colunas)
describe = df.describe()
describe

summary,region,category,parameter,mode,powertrain,year,unit,value
count,604,604,604,604,604,604.0,604,604.0
mean,,,,,,2017.546357615894,,570364247108334.5
stddev,,,,,,3.126864813215356,,2275968481157552.0
min,Australia,Historical,EV charging points,EV,Publicly availabl...,2010.0,charging points,1.0
max,United Kingdom,Historical,EV charging points,EV,Publicly availabl...,2022.0,charging points,1.00000001490116e+16


In [None]:
# Todos os tipos de dados presentes
tiposDados = df.dtypes
tiposDados

[('region', 'string'),
 ('category', 'string'),
 ('parameter', 'string'),
 ('mode', 'string'),
 ('powertrain', 'string'),
 ('year', 'bigint'),
 ('unit', 'string'),
 ('value', 'bigint')]

### Limpeza de Dados
Remover dados duplicados, corrigir erros de digitação, tratar dados inconsistentes etc.

Valores distintos

In [None]:
# Verificar valores distintos por colunas
lista_distintos = [F.approx_count_distinct(col).alias(col) for col in df.columns]
df_distintos = df.agg(*lista_distintos)
display(df_distintos)

region,category,parameter,mode,powertrain,year,unit,value
33,1,1,1,2,13,1,231


In [None]:
df_contagem_ptrain = df.groupby('powertrain').count().orderBy(F.col('count').desc())
df_contagem_ptrain.show(truncate=False)

+-----------------------+-----+
|powertrain             |count|
+-----------------------+-----+
|Publicly available slow|304  |
|Publicly available fast|300  |
+-----------------------+-----+



In [None]:
df_contagem_region = df.groupby('region').count().orderBy(F.col('count').desc())
df_contagem_region.show(100)

+--------------+-----+
|        region|count|
+--------------+-----+
|        Norway|   25|
|         Chile|   24|
|      Portugal|   24|
|       Austria|   24|
|   Netherlands|   24|
|United Kingdom|   23|
|       Germany|   22|
|         Italy|   22|
|   Switzerland|   22|
|        Canada|   22|
|         Japan|   22|
|        France|   22|
|       Denmark|   22|
|         Spain|   21|
|           USA|   21|
|       Belgium|   20|
|       Finland|   20|
|        Greece|   19|
|         China|   18|
|        Poland|   18|
|       Turkiye|   17|
|       Iceland|   17|
|        Sweden|   16|
|        Mexico|   15|
|        Israel|   14|
|         Korea|   14|
|         India|   13|
|      Thailand|   12|
|     Australia|   12|
|  South Africa|   12|
|   New Zealand|   11|
|        Brazil|   10|
|     Indonesia|    6|
+--------------+-----+



In [None]:
df_contagem_year = df.groupby('year').count().orderBy(F.col('count').desc())
df_contagem_year

year,count
2022,66
2021,65
2020,65
2018,64
2019,64
2017,61
2016,52
2015,47
2014,42
2013,35


In [None]:
df_contagem_value = df.groupby('value').count().where((F.col('value')> 1000000))
df_contagem_value

value,count
10000000149011600,32
4900000095367430,5


Valores Nulos

In [None]:
# Verificar se há valores nulos/ausentes
lista_nulos = [F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]
df_nulos = df.agg(*lista_nulos)
display(df_nulos)

region,category,parameter,mode,powertrain,year,unit,value
0,0,0,0,0,0,0,0


### Normalização de Dados
Colocar os dados em um formato padronizado para facilitar a análise.

Renomeando

In [None]:
df_br = df

In [None]:
# Renomeando registros do dataframe

traducao = {
    'Publicly available slow': 'LENTO',
    'Publicly available fast': 'RAPIDO',
    "Sweden": "SUECIA",
    "Germany": "ALEMANHA",
    "France": "FRANCA",
    "Greece": "GRECIA",
    "Belgium": "BELGICA",
    "Finland": "FINLANDIA",
    "China": "CHINA",
    "India": "INDIA",
    "Chile": "CHILE",
    "Italy": "ITALIA",
    "Norway": "NORUEGA",
    "Spain": "ESPANHA",
    "Denmark": "DINAMARCA",
    "Thailand": "TAILANDIA",
    "Iceland": "ISLANDIA",
    "Israel": "ISRAEL",
    "USA": "ESTADOS UNIDOS",
    "Mexico": "MEXICO",
    "Indonesia": "INDONESIA",
    "Switzerland": "SUIÇA",
    "Turkiye": "TURQUIA",
    "Canada": "CANADA",
    "Brazil": "BRASIL",
    "Japan": "JAPAO",
    "New Zealand": "NOVA ZELANDIA",
    "Poland": "POLONIA",
    "Portugal": "PORTUGAL",
    "Australia": "AUSTRALIA",
    "Austria": "AUSTRIA",
    "Korea": "COREIA",
    "South Africa": "AFRICA DO SUL",
    "United Kingdom": "REINO UNIDO",
    "Netherlands": "PAISES BAIXOS",
    "Historical": "HISTORICO",
    "charging points": "PONTOS DE CARREGAMENTO",
    "EV charging points": "PONTOS DE CARREGAMENTO VEICULAR",
    "EV": "VEICULOS ELETRICOS"
}

for i in df_br.columns:
    df_br = df_br.withColumn(i, F.coalesce(*(F.when(F.col(i) == value, F.lit(traducao[value])) for value in traducao.keys()), F.col(i)))

In [None]:
df_br.show(100, truncate = False)

+---------+---------+-------------------------------+------------------+----------+----+----------------------+-----------------+
|region   |category |parameter                      |mode              |powertrain|year|unit                  |value            |
+---------+---------+-------------------------------+------------------+----------+----+----------------------+-----------------+
|AUSTRALIA|HISTORICO|PONTOS DE CARREGAMENTO VEICULAR|VEICULOS ELETRICOS|RAPIDO    |2017|PONTOS DE CARREGAMENTO|40               |
|AUSTRALIA|HISTORICO|PONTOS DE CARREGAMENTO VEICULAR|VEICULOS ELETRICOS|LENTO     |2017|PONTOS DE CARREGAMENTO|440              |
|AUSTRALIA|HISTORICO|PONTOS DE CARREGAMENTO VEICULAR|VEICULOS ELETRICOS|RAPIDO    |2018|PONTOS DE CARREGAMENTO|61               |
|AUSTRALIA|HISTORICO|PONTOS DE CARREGAMENTO VEICULAR|VEICULOS ELETRICOS|LENTO     |2018|PONTOS DE CARREGAMENTO|670              |
|AUSTRALIA|HISTORICO|PONTOS DE CARREGAMENTO VEICULAR|VEICULOS ELETRICOS|RAPIDO    |2019|PO

In [None]:
# tradução das colunas do dataframe
df_br = df_br.withColumnRenamed("region", "pais")\
             .withColumnRenamed("category", "categoria")\
             .withColumnRenamed("parameter", "parametro")\
             .withColumnRenamed("mode", "veiculo")\
             .withColumnRenamed("powertrain", "carregamento")\
             .withColumnRenamed("unit", "unidade")\
             .withColumn("valor", F.col("value").cast("long"))\
             .withColumn("ano", F.col("year").cast("int"))

In [None]:
df_br.dtypes

[('pais', 'string'),
 ('categoria', 'string'),
 ('parametro', 'string'),
 ('veiculo', 'string'),
 ('carregamento', 'string'),
 ('year', 'string'),
 ('unidade', 'string'),
 ('value', 'string'),
 ('valor', 'bigint'),
 ('ano', 'int')]

In [None]:
df_br = df_br.drop('year', 'value')

In [None]:
df_br.show(100)

+---------+---------+--------------------+------------------+------------+--------------------+-----------------+----+
|     pais|categoria|           parametro|           veiculo|carregamento|             unidade|            valor| ano|
+---------+---------+--------------------+------------------+------------+--------------------+-----------------+----+
|AUSTRALIA|HISTORICO|PONTOS DE CARREGA...|VEICULOS ELETRICOS|      RAPIDO|PONTOS DE CARREGA...|               40|2017|
|AUSTRALIA|HISTORICO|PONTOS DE CARREGA...|VEICULOS ELETRICOS|       LENTO|PONTOS DE CARREGA...|              440|2017|
|AUSTRALIA|HISTORICO|PONTOS DE CARREGA...|VEICULOS ELETRICOS|      RAPIDO|PONTOS DE CARREGA...|               61|2018|
|AUSTRALIA|HISTORICO|PONTOS DE CARREGA...|VEICULOS ELETRICOS|       LENTO|PONTOS DE CARREGA...|              670|2018|
|AUSTRALIA|HISTORICO|PONTOS DE CARREGA...|VEICULOS ELETRICOS|      RAPIDO|PONTOS DE CARREGA...|              250|2019|
|AUSTRALIA|HISTORICO|PONTOS DE CARREGA...|VEICUL

# **Qualidade e integridade dos dados**

### **Validação da estrutura dos dados (schema)**

In [None]:
# Definindo o esquema de validação e o examinando
schemaBR = StructType([ StructField("pais", StringType()),
                      StructField("categoria", StringType()),
                      StructField("parametro", StringType()),
                      StructField("veiculo", StringType()),
                      StructField("carregamento", StringType()),
                      StructField("unidade", StringType()),
                      StructField("valor", LongType()),
                      StructField("ano", IntegerType())
                      ])


In [None]:
# estabelecendo laço for para quantificar valores nulos de cada coluna
nl = [F.sum(
            F.when(
                    F.col(c).\
                    isNull(), 1)\
            .otherwise(0))\
            .alias(c)\
      for c in df_br.columns]
# somando valores nulos encontrados para cada coluna, obtidos através de dicionários e adicionados em listas
nulos = sum(
            list(
                 (
                    (df_br.agg(*nl)
                    .collect()[0])
                    .asDict())
                    .values())
                    )

In [None]:
# validando Schema e valores nulos do dataframe tratado
if df_br.schema == schemaBR and nulos == 0:
  print( f'VALIDADEO!')
else:
  print( f'Não validado!')

VALIDADO!


## **Colunas Persistidas**

================================================================================
String
================================================================================
* pais
* categoria
* parametro
* veiculo
* carregamento
* unidade

================================================================================
BigInt
================================================================================
* valor

================================================================================
Int
================================================================================
* ano


## Copia de segurança do tratamento - Backup

In [None]:
# Backup para analise
df_tratado = df_br

## Load
Depois que os dados são extraídos e transformados adequadamente, eles estarão prontos para as análise, mas antes disso eles precisam ser carregados em um local de armazenamento adequado. Podendo ser um banco de dados SQL ou NoSQL, um sistema de armazenamento em nuvem, e para o caso de disponibilizar o projeto publicamente é ideal que ele seja colocar em uma pasta de datasets, diferenciando o arquivo bruto e o tratado.

In [None]:
# convertendo em um dataframe Pandas
gcs = df_br.toPandas()

# carregamento no GCS
gcs.to_csv('gs://projeto-final-ad2-e8/dados/analise/Analise_dataset_iea_ponto_recarga_pyspark_bruto.csv', index=False)

# encerrando o SparkSession
spark.stop()