# Instalação do PySpark e da biblioteca necessária para conexão com a Google Cloud Platform:

In [None]:
pip install gcsfs

In [None]:
pip install pyspark

# Importação das bibliotecas e funções necessárias para análise:

*Obs.: Foi importada a biblioteca Pandas para fazer a atribuição posterior dos dados ao dataframe.*

In [122]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id
import os
from google.cloud import storage

# Inicia a conexão com a GCP:

In [None]:
serviceaccount = '/content/chave_acesso_edlaine'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = serviceaccount

# Iniciar sessão Spark:

In [None]:
spark = SparkSession.builder\
      .master('local')\
      .appName('ProjetoEngDaods')\
      .config('spark.ui.port','4050')\
      .getOrCreate()
spark

# Atribui os dados do dataset ao dataframe que será trabalhado:

In [72]:
dfspark = pd.read_csv('gs://projeto-final-grupo09/saida_dados/ProjetoFinalPandas02(csv)')
df = spark.createDataFrame(dfspark)

# Visualização do dataframe:

In [76]:
df.show(10)

+----------+--------------------+----------+----------------+-------------+------+---------+---------------------+------------------+-------+-----------------+
|Unnamed: 0|       identificacao| criado_em|tipo_propriedade|nome_do_local|estado|    preco|area_construcao_em_m2|      preco_por_m2|quartos|area_total_por_m2|
+----------+--------------------+----------+----------------+-------------+------+---------+---------------------+------------------+-------+-----------------+
|         0|47c47b29f4b5d901e...|2018-01-24|            casa|        Bahia| Bahia|577083.67|                135.0| 4296.296296296297|    2.0|              NaN|
|         1|0916e4dea826443b2...|2018-01-22|            casa|        Bahia| Bahia|248742.97|                114.0|2192.9824561403507|    2.0|              NaN|
|         2|e206166a672764f56...|2018-01-09|            loja|         Pará|  Pará|      NaN|                  NaN|               NaN|    NaN|              NaN|
|         3|6986e2ddcb8c24d9a...|2018-01

# Verifica se há diferença entre as colunas em questão e quantos são diferentes:

In [77]:
df.select('estado').filter(df.nome_do_local != df.estado).count()

413290

# Verifica os valores iguais entre as colunas em questão:

In [78]:
df.select('estado').filter(df.nome_do_local == df.estado).count()

4394

# Renomeia a coluna para melhor entendimento e dropa a "antiga" com visualização do resultado:

In [79]:
df = df.select('*',F.col('criado_em').alias('data_criacao')).drop('criado_em')
df = df.select('*',F.col('nome_do_local').alias('local_imovel')).drop('nome_do_local')
df = df.select('*',F.col('Unnamed: 0').alias('Index')).drop('Unnamed: 0')
df = df.select('*',F.col('identificacao').alias('ID')).drop('identificacao')
df = df.select('*',F.col('preco_por_m2').alias('preco_m2')).drop('preco_por_m2')
df.show(10)

+----------------+------+---------+---------------------+-------+-----------------+------------+------------+-----+--------------------+------------------+
|tipo_propriedade|estado|    preco|area_construcao_em_m2|quartos|area_total_por_m2|data_criacao|local_imovel|Index|                  ID|          preco_m2|
+----------------+------+---------+---------------------+-------+-----------------+------------+------------+-----+--------------------+------------------+
|            casa| Bahia|577083.67|                135.0|    2.0|              NaN|  2018-01-24|       Bahia|    0|47c47b29f4b5d901e...| 4296.296296296297|
|            casa| Bahia|248742.97|                114.0|    2.0|              NaN|  2018-01-22|       Bahia|    1|0916e4dea826443b2...|2192.9824561403507|
|            loja|  Pará|      NaN|                  NaN|    NaN|              NaN|  2018-01-09|        Pará|    2|e206166a672764f56...|               NaN|
|     apartamento|  Pará|650960.32|                  NaN|    1.0

# Visualização das ocorrências dos tipos de propriedades e seu número de ocorrências presentes no dataframe:



In [81]:
df.groupBy('tipo_propriedade').count().show(10)

+----------------+------+
|tipo_propriedade| count|
+----------------+------+
|     apartamento|273577|
|            casa|135213|
|            loja|  8894|
+----------------+------+



# Número de ocorrências por cada estado em ordem decrescente:

In [82]:
df.groupBy('estado').count().sort('count', ascending=False).show(25)

+-------------------+------+
|             estado| count|
+-------------------+------+
|          São Paulo|277460|
|  Rio Grande do Sul| 52112|
|     Rio de Janeiro| 37227|
|     Santa Catarina| 10954|
|       Minas Gerais|  5962|
|     Espírito Santo|  5768|
|             Paraná|  5609|
|            Paraíba|  4916|
|              Ceará|  4724|
|              Bahia|  3041|
|Rio Grande do Norte|  2206|
|         Pernambuco|  2127|
|              Goiás|  1731|
|          Tocantins|  1273|
|   Distrito Federal|   730|
|        Mato Grosso|   609|
|            Alagoas|   418|
|               Pará|   300|
|           Amazonas|   197|
|           Maranhão|   153|
| Mato Grosso do Sul|    83|
|           Rondônia|    44|
|              Amapá|    22|
|               Acre|    12|
|              Piauí|     6|
+-------------------+------+



# Visualização dos valores mínimos por estado:

In [88]:
df.groupBy('estado').min('preco').sort('estado').show(25)

+-------------------+----------+
|             estado|min(preco)|
+-------------------+----------+
|               Acre|  95140.34|
|            Alagoas|  23225.24|
|              Amapá|  58099.76|
|           Amazonas|  60088.62|
|              Bahia|  37311.44|
|              Ceará|  38372.16|
|   Distrito Federal|  30044.31|
|     Espírito Santo|   50489.7|
|              Goiás|  16212.63|
|           Maranhão|  37054.66|
|        Mato Grosso|   30293.8|
| Mato Grosso do Sul|  58568.02|
|       Minas Gerais|  16023.61|
|             Paraná|  29944.15|
|            Paraíba|  19158.25|
|               Pará|  35051.69|
|         Pernambuco|  35342.78|
|              Piauí| 120177.28|
|Rio Grande do Norte|  20029.53|
|  Rio Grande do Sul|  18026.59|
|     Rio de Janeiro|  23033.96|
|           Rondônia|  96832.91|
|     Santa Catarina|  60587.63|
|          São Paulo|       0.0|
|          Tocantins| 110152.49|
+-------------------+----------+



# Visualização do esquema do dataframe:

In [83]:
df.printSchema()

root
 |-- tipo_propriedade: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- preco: double (nullable = true)
 |-- area_construcao_em_m2: double (nullable = true)
 |-- quartos: double (nullable = true)
 |-- area_total_por_m2: double (nullable = true)
 |-- data_criacao: string (nullable = true)
 |-- local_imovel: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- ID: string (nullable = true)
 |-- preco_m2: double (nullable = true)



In [125]:
df.show()

+----------------+----------+-------------+---------------------+-------+-----------------+------------+------------+-----+--------------------+------------------+
|tipo_propriedade|    estado|        preco|area_construcao_em_m2|quartos|area_total_por_m2|data_criacao|local_imovel|Index|                  ID|          preco_m2|
+----------------+----------+-------------+---------------------+-------+-----------------+------------+------------+-----+--------------------+------------------+
|            casa|     Bahia|    577083.67|                135.0|    2.0|              NaN|  2018-01-24|       Bahia|    0|47c47b29f4b5d901e...| 4296.296296296297|
|            casa|     Bahia|    248742.97|                114.0|    2.0|              NaN|  2018-01-22|       Bahia|    1|0916e4dea826443b2...|2192.9824561403507|
|            loja|      Pará|          NaN|                  NaN|    NaN|              NaN|  2018-01-09|        Pará|    2|e206166a672764f56...|               NaN|
|     apartament

# Salva o dataframe tratado em formato CSV direto no Data Lake:

In [126]:
client = storage.Client()
bucket = client.get_bucket('projeto-final-grupo09')
     
bucket.blob('saida_dados/ProjetoFinalPySpark02.csv').upload_from_string(df.toPandas().to_csv(index=False), 'text/csv')