<a href="https://colab.research.google.com/github/FelipPalermo/DataEng_SC23/blob/main/Preco_Consumidor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# *Instalação de bibliotecas necessárias*

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 52 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 78.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=4c7a55a5395a63b66b3479f10ad6867bf4bea5d1e1d7d63ee6cb8017bfbae8d9
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
!pip install gcsfs

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting gcsfs
  Downloading gcsfs-2022.8.2-py2.py3-none-any.whl (25 kB)
Collecting fsspec==2022.8.2
  Downloading fsspec-2022.8.2-py3-none-any.whl (140 kB)
[K     |████████████████████████████████| 140 kB 14.9 MB/s 
Installing collected packages: fsspec, gcsfs
  Attempting uninstall: fsspec
    Found existing installation: fsspec 2022.8.1
    Uninstalling fsspec-2022.8.1:
      Successfully uninstalled fsspec-2022.8.1
Successfully installed fsspec-2022.8.2 gcsfs-2022.8.2


# *Importação das Bibliotecas*



In [None]:
# Bibliotecas para o pandas 
import pandas as pd 

# Bibliotecas para acesso ao bucket do GCP
from google.cloud import storage
import os

In [None]:
# Bibliotecas para configuração do PySpark
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

# *Iniciando a SparkSession*

In [None]:
spark = (SparkSession.builder
          .master('local')
          .appName('E8-Precos_DoConsumidor.json')
          .config('spark.ui.port', '4050')
          .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
          .getOrCreate()
         )

# *Extração dos dados via Bucket do GCP*

In [None]:
# Configuração da chave de segurança
serviceAccount = '/content/drive/MyDrive/DATASET/lunar-reef-355417-ecfe16be5ae7.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

In [None]:
# Códigos de acesso ao bucket para criar um DataFrame a partir do dataset
client = storage.Client()

# Criando uma variável para receber o nome da Bucket
bucket = client.get_bucket('bucket_e8_pf')

# Escolhendo o arquivo dentro da Bucket
bucket.blob('Precos_DoConsumidor.json')

# Criando uma variável para receber o caminho do arquivo
path = ('gs://bucket_e8_pf/Precos_DoConsumidor.json')

In [None]:
df_pandas = pd.read_json(path)

# Pandas

In [None]:
df_pandas

Unnamed: 0,ano,mes,indice,variacao_mensal,variacao_trimestral,variacao_semestral,variacao_anual,variacao_doze_meses
0,1979,3,5.576500e-09,,,,,
1,1979,4,5.768900e-09,3.45,,,,
2,1979,5,5.870400e-09,1.76,,,,
3,1979,6,6.046500e-09,3.00,8.43,,,
4,1979,7,6.370500e-09,5.36,10.43,,,
...,...,...,...,...,...,...,...,...
516,2022,3,6.546800e+03,1.71,3.42,6.26,3.42,11.73
517,2022,4,6.614890e+03,1.04,3.80,6.14,4.49,12.47
518,2022,5,6.644660e+03,0.45,3.23,5.73,4.96,11.90
519,2022,6,6.685860e+03,0.62,2.12,5.61,5.61,11.92


# *Pré-Análise*

In [None]:
df_pandas.dtypes

ano                      int64
mes                      int64
indice                 float64
variacao_mensal        float64
variacao_trimestral    float64
variacao_semestral     float64
variacao_anual         float64
variacao_doze_meses    float64
dtype: object

In [None]:
df_pandas

Unnamed: 0,ano,mes,indice,variacao_mensal,variacao_trimestral,variacao_semestral,variacao_anual,variacao_doze_meses
0,1979,3,5.576500e-09,,,,,
1,1979,4,5.768900e-09,3.45,,,,
2,1979,5,5.870400e-09,1.76,,,,
3,1979,6,6.046500e-09,3.00,8.43,,,
4,1979,7,6.370500e-09,5.36,10.43,,,
...,...,...,...,...,...,...,...,...
516,2022,3,6.546800e+03,1.71,3.42,6.26,3.42,11.73
517,2022,4,6.614890e+03,1.04,3.80,6.14,4.49,12.47
518,2022,5,6.644660e+03,0.45,3.23,5.73,4.96,11.90
519,2022,6,6.685860e+03,0.62,2.12,5.61,5.61,11.92


Colunas que possuem números nulos - NaN (Colunas: variacao_mensal,variacao_trimestral,	variacao_semestral,	variacao_anual e variacao_doze_meses)

In [None]:
# Drop de colunas com números nulos e sem utilização para as analises realizadas
df_pandas.drop(['variacao_mensal','variacao_trimestral','variacao_semestral','variacao_doze_meses'],axis=1 ,inplace=True)

In [None]:
# Backup 
df1 = df_pandas.copy()

In [None]:
df_pandas.isna().sum()

ano                0
mes                0
indice             0
variacao_anual    10
dtype: int64

In [None]:
df_pandas.head(12)

Unnamed: 0,ano,mes,indice,variacao_anual
0,1979,3,5.5765e-09,
1,1979,4,5.7689e-09,
2,1979,5,5.8704e-09,
3,1979,6,6.0465e-09,
4,1979,7,6.3705e-09,
5,1979,8,6.739e-09,
6,1979,9,7.1843e-09,
7,1979,10,7.5481e-09,
8,1979,11,8.0089e-09,
9,1979,12,8.3698e-09,


In [None]:
df_pandas.tail(10)

Unnamed: 0,ano,mes,indice,variacao_anual
511,2021,10,6232.36,8.45
512,2021,11,6284.71,9.36
513,2021,12,6330.59,10.16
514,2022,1,6373.0,0.67
515,2022,2,6436.73,1.68
516,2022,3,6546.8,3.42
517,2022,4,6614.89,4.49
518,2022,5,6644.66,4.96
519,2022,6,6685.86,5.61
520,2022,7,6645.74,4.98


# *Tratamento dos dados com Pandas*

In [None]:
# Criando uma cópia do DataFrame original para trabalhar com Pandas
df1 = df_pandas.copy()

In [None]:
# Verificar e insert de dados 
df_pandas.loc[10]

ano               1.980000e+03
mes               1.000000e+00
indice            8.919000e-09
variacao_anual    6.560000e+00
Name: 10, dtype: float64

In [None]:
pd.unique(df_pandas['ano'])

array([1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989,
       1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000,
       2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011,
       2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022])

Drop de linhas que interferem na análise realiza.




In [None]:
df_pandas.drop([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129], axis=0, inplace=True)

In [None]:
df_pandas

Unnamed: 0,ano,mes,indice,variacao_anual
130,1990,1,0.005366,68.19
131,1990,2,0.009337,192.63
132,1990,3,0.017010,433.12
133,1990,4,0.019505,511.33
134,1990,5,0.020931,556.02
...,...,...,...,...
516,2022,3,6546.800000,3.42
517,2022,4,6614.890000,4.49
518,2022,5,6644.660000,4.96
519,2022,6,6685.860000,5.61


# *Plotagem*

# *Tratamento dos dados com PySpark*

In [None]:
def schemaPySpark(df):
  # Definindo o schema
  mySchema = StructType([StructField("ano", IntegerType(), True),
        StructField("mes", IntegerType(), True),
        StructField("indice", StringType(), True),
        StructField("variacao_mensal ", StringType(), True),
        StructField("variacao_trimestral ", StringType(), True),
        StructField("variacao_anual", StringType(), True),
        StructField("variacao_doze_meses", StringType(), True)])
   
  # Convertendo para PySpark
  df_pyspark = spark.createDataFrame(df, schema = mySchema)

In [None]:
spark

In [None]:
  df_pyspark = spark.createDataFrame(df_pandas)

In [None]:
df_pyspark.show(500)

+----+---+----------------+--------------+
| ano|mes|          indice|variacao_anual|
+----+---+----------------+--------------+
|1990|  1| 0.0053663052799|         68.19|
|1990|  2| 0.0093368346371|        192.63|
|1990|  3| 0.0170098454104|        433.12|
|1990|  4| 0.0195051897442|        511.33|
|1990|  5| 0.0209310190666|        556.02|
|1990|  6| 0.0233673896742|        632.38|
|1990|  7| 0.0263163543373|         724.8|
|1990|  8| 0.0295216863099|        825.26|
|1990|  9| 0.0337314788076|        957.21|
|1990| 10| 0.0385989312553|       1109.76|
|1990| 11| 0.0451298705166|       1314.45|
|1990| 12| 0.0537677278248|       1585.18|
|1991|  1|  0.065032066804|         20.95|
|1991|  2| 0.0781675227116|         45.38|
|1991|  3| 0.0873833112608|         62.52|
|1991|  4| 0.0917600043057|         70.66|
|1991|  5| 0.0978895252777|         82.06|
|1991|  6| 0.1084925212048|        101.78|
|1991|  7| 0.1216656145218|        126.28|
|1991|  8| 0.1406671295351|        161.62|
|1991|  9| 

# *Filtros - Insights relevantes para o contexto*

In [None]:
# Indice de variação nas últimas 3 decadas 
n1990 = df_pyspark.filter(df_pyspark.ano == '1990').count()
n2000 = df_pyspark.filter(df_pyspark.ano == '2000').count()
n2010 = df_pyspark.filter(df_pyspark.ano == '2010').count()
n2020 = df_pyspark.filter(df_pyspark.ano == '2020').count()

df_decadas = pd.DataFrame({'ANO': ['1990', '2000', '2010', '2020'],
                                  'Ocorrencias': [n1990, n2000, n2010, n2020]})

In [None]:
df_pyspark.where(F.col('ANO') == 2012).show()

+----+---+-------+--------------+
| ano|mes| indice|variacao_anual|
+----+---+-------+--------------+
|2012|  1|3516.11|          0.51|
|2012|  2|3529.82|           0.9|
|2012|  3|3536.17|          1.08|
|2012|  4|3558.81|          1.73|
|2012|  5|3578.38|          2.29|
|2012|  6|3587.67|          2.56|
|2012|  7| 3603.1|           3.0|
|2012|  8|3619.31|          3.46|
|2012|  9|3642.12|          4.11|
|2012| 10|3667.97|          4.85|
|2012| 11|3687.78|          5.42|
|2012| 12|3715.07|           6.2|
+----+---+-------+--------------+



In [None]:
# Indice de variação nos periodos festivos de final de ano 
n11 = df_pyspark.filter(df_pyspark.mes == '11').count()
n12 = df_pyspark.filter(df_pyspark.mes == '12').count()
n01 = df_pyspark.filter(df_pyspark.mes == '01').count()

df_trimestral = pd.DataFrame({'mes': ['11', '12', '01'],
                                  'Ocorrencias': [n11, n12, n01]})

In [None]:
df_pyspark.where(F.col('mes') == 11).show()

+----+---+----------------+--------------+
| ano|mes|          indice|variacao_anual|
+----+---+----------------+--------------+
|1990| 11| 0.0451298705166|       1314.45|
|1991| 11| 0.2490682456026|        363.23|
|1992| 11| 3.0755946831675|        894.64|
|1993| 11|72.6058212060671|       1779.85|
|1994| 11|         1012.11|        912.11|
|1995| 11|         1235.21|          20.0|
|1996| 11|         1365.54|          8.76|
|1997| 11|         1421.41|          3.75|
|1998| 11|         1458.93|          2.06|
|1999| 11|         1576.88|          7.63|
|2000| 11|         1663.16|           4.7|
|2001| 11|         1816.76|          8.64|
|2002| 11|         2044.76|         11.72|
|2003| 11|         2305.58|          9.79|
|2004| 11|         2439.21|          5.23|
|2005| 11|         2574.05|          4.63|
|2006| 11|         2640.68|          2.18|
|2007| 11|         2767.19|          4.15|
|2008| 11|         2966.51|          6.17|
|2009| 11|         3090.08|          3.86|
+----+---+-

In [None]:
df_pandas.to_json('Preco_consumidor_tratado.json')

In [None]:
df_pandas.to_csv('PrecoConsumidorTratado.csv')