<a href="https://colab.research.google.com/github/FranziskoB/Challenge-Data-Science-Alura-2ed/blob/main/Semana_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Desafio Data Science Alura

A imobiliária InsightPlaces, situada na cidade do Rio de Janeiro, está enfrentando dificuldades para alugar e vender imóveis. Em uma pesquisa de como empresas semelhantes operam no mercado, a InsightPlaces percebeu que esse problema pode estar relacionado aos valores dos imóveis e às recomendações realizadas em seu site.

Dentro desse contexto, como podemos definir de forma eficiente os preços dos imóveis lidando com grandes volumes de dados? É importante recomendar imóveis utilizando outro critério? O que precisa ser feito?

Você faz parte do time de Ciência de Dados e Big Data da InsightPlaces e ficou responsável por auxiliar no processo de análise de dados dos imóveis, que estão localizados em alguns bairros da cidade do Rio de Janeiro.

Esse projeto tem algumas etapas como: ler e fazer o tratamento do histórico dos preços de imóveis no Rio de Janeiro, construir um modelo de regressão para precificar imóveis e, por último, criar um recomendador de imóveis. Para cada uma dessas etapas, vamos utilizar a ferramenta PySpark, que oferece uma melhor performance ao trabalharmos com grandes volumes de dados.

##Semana 1

Na semana 1 vamos fazer parte do trabalho de pessoas engenheiras de dados.

O time de engenharia de dados da InsightPlaces disponibilizou uma base de dados no estado bruto para trabalharmos. Durante a semana 1, ficamos responsáveis por fazer transformações mais iniciais nesses dados, de forma que eles fiquem mais estruturados. Utilizamos os recursos do PySpark para conseguirmos explorar essa base de dados, realizar transformações em algumas colunas e também aplicar filtros específicos para obtermos apenas as informações que interessam para nossa análise.

No final dessa semana, salvamos os dados transformados no formato parquet para podermos utilizá-los na semana 2.

### Configurando o Spark no Google Colab

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

### Carregando dos dados

#### Acessando o Spark UI (Google Colab)n

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [None]:
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


#### Carregamento de dados

Descrição de cada variável do dataset
*   id ->	Código de identificação do anúncio no sistema da InsightPlaces
*   tipo_unidade ->	Tipo de imóvel (apartamento, casa e outros)
*   tipo_uso ->	Tipo de uso do imóvel (residencial ou comercial)
*   area_total ->	Área total do imóvel (construção e terreno)
*   area_util ->	Área construída do imóvel
*   quartos ->	Quantidade de quartos do imóvel
*   suites ->	Quantidade de suítes do imóvel
*   banheiros ->	Quantidade de banheiros do imóvel
*   vaga ->	Quantidade de vagas de garagem do imóvel
*   caracteristicas ->	Listagem de características do imóvel
*   andar ->	Número do andar do imóvel
*   endereco ->	Informações sobre o endereço do imóvel
*   valores ->	Informações sobre valores de venda e locação dos imóveis




In [None]:
import zipfile
import requests
from io import BytesIO
import os

In [None]:
os.makedirs("./dados",exist_ok=True) # Criando uma pasta no google colab

In [None]:
path = 'https://github.com/FranziskoB/Challenge-Data-Science-Alura-2ed/raw/main/dados/semana-1.zip' # Local do arquivo zip
filebytes = BytesIO(
    requests.get(path).content
)
myzip = zipfile.ZipFile(filebytes)
myzip.extractall("./dados") #Salvando os dados na pasta criada

In [None]:
dados = spark.read.load("/content/dados/dataset_bruto.json", format="json") # Importando os dados

In [None]:
dados.count() # Número de registros carregados

89083

### Visualiando e tratando os dados

Os primeiro 5 valores


In [None]:
dados.limit(5).toPandas()

Unnamed: 0,anuncio,imagens,usuario
0,"(0, [], [16], [0], [], (Centro, 20061003, Rio ...","[(39d6282a-71f3-47bc-94aa-909351ecd881, https:...","(9d44563d-3405-4e84-9381-35b7cf40a9a4, Frank)"
1,"(0, [], [14], [0], [], (Centro, 20051040, Rio ...","[(23d2b3ab-45b0-47f2-a27f-ffbe066c2c38, https:...","(36245be7-70fe-40cd-84d4-70a57dc8f976, Caroline)"
2,"(0, [1026], [1026], [0], [], (Maria da Graça, ...","[(1da65baa-368b-490f-bef7-1834b9a428c1, https:...","(9dc415d8-1397-4d8d-8425-b8310fad309d, Oliver)"
3,"(0, [120], [120], [0], [Portão eletrônico, Con...","[(79b542c6-49b4-4f06-80b5-ef4fcdf98f76, https:...","(9911a2df-f299-4a73-a384-281417abaf69, Matthew)"
4,"(0, [3], [3], [0], [], (São Cristóvão, 2093167...","[(e2bc497b-6510-4731-8942-97b3d3cdab5e, https:...","(240a7aab-12e5-4069-9a2c-9dca4c4f9c68, Jude)"


Os dados cheragam muito desorganizados 

In [None]:
anuncio=dados.select("anuncio") # selecionando apenas a coluna anuncio

Selecionado os dados estão dentro da coluna anuncio

In [None]:
anuncio.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

Passando os dados para colunas

In [None]:
df= anuncio.select('*',"anuncio.id" ,"anuncio.tipo_anuncio","anuncio.tipo_unidade" ,"anuncio.tipo_uso" ,"anuncio.area_total" ,
                   "anuncio.area_util" ,"anuncio.quartos", "anuncio.suites", "anuncio.banheiros", "anuncio.vaga",
                   "anuncio.caracteristicas", "anuncio.andar", "anuncio.endereco", "anuncio.valores")
df=df.drop("anuncio")
df.show()

+--------------------+------------+------------+-----------+----------+---------+-------+------+---------+----+--------------------+-----+--------------------+--------------------+
|                  id|tipo_anuncio|tipo_unidade|   tipo_uso|area_total|area_util|quartos|suites|banheiros|vaga|     caracteristicas|andar|            endereco|             valores|
+--------------------+------------+------------+-----------+----------+---------+-------+------+---------+----+--------------------+-----+--------------------+--------------------+
|47d553e0-79f2-4a4...|       Usado|      Outros|  Comercial|        []|     [16]|    [0]|   [0]|      [0]| [1]|                  []|    0|{Centro, 20061003...|[{260, 107, Venda...|
|b6ffbae1-17f6-487...|       Usado|      Outros|  Comercial|        []|     [14]|    [0]|    []|      [0]| [0]|                  []|    0|{Centro, 20051040...|[{260, 107, Venda...|
|1fb030a5-9e3e-4a1...|       Usado|      Outros|  Comercial|    [1026]|   [1026]|    [0]|    []

Convetendo colunas númericas para inteiro 

In [None]:
from pyspark.sql.types import IntegerType
df=df.withColumn('area_total', df['area_total'][0].cast(IntegerType()))
df=df.withColumn('area_util', df['area_util'][0].cast(IntegerType()))
df=df.withColumn('quartos', df['quartos'][0].cast(IntegerType()))
df=df.withColumn('suites', df['suites'][0].cast(IntegerType()))
df=df.withColumn('banheiros', df['banheiros'][0].cast(IntegerType()))
df=df.withColumn('vaga', df['vaga'][0].cast(IntegerType()))
df=df.withColumn('andar', df['andar'].cast(IntegerType()))
df.show()
df.printSchema()

+--------------------+------------+------------+-----------+----------+---------+-------+------+---------+----+--------------------+-----+--------------------+--------------------+
|                  id|tipo_anuncio|tipo_unidade|   tipo_uso|area_total|area_util|quartos|suites|banheiros|vaga|     caracteristicas|andar|            endereco|             valores|
+--------------------+------------+------------+-----------+----------+---------+-------+------+---------+----+--------------------+-----+--------------------+--------------------+
|47d553e0-79f2-4a4...|       Usado|      Outros|  Comercial|      null|       16|      0|     0|        0|   1|                  []|    0|{Centro, 20061003...|[{260, 107, Venda...|
|b6ffbae1-17f6-487...|       Usado|      Outros|  Comercial|      null|       14|      0|  null|        0|   0|                  []|    0|{Centro, 20051040...|[{260, 107, Venda...|
|1fb030a5-9e3e-4a1...|       Usado|      Outros|  Comercial|      1026|     1026|      0|  null

### Filtrando as linhas de interesse

Filtrando os anúncios com as seguintes restrições
*   tipo_uso: Residencial;
*   tipo_unidade: Apartamento;
*   tipo_anuncio: Usado



Criando tabelas de frequências para analisar o impactos desses filtros aplicados

In [None]:
from pyspark.sql.functions import count
df.select('tipo_uso', 'id').groupBy('tipo_uso').agg(count("id").alias("frequencia")).orderBy('frequencia', ascending=True).show() 
df.select('tipo_unidade', 'id').groupBy('tipo_unidade').agg(count("id").alias("frequencia")).orderBy('frequencia', ascending=True).show() 
df.select('tipo_anuncio', 'id').groupBy('tipo_anuncio').agg(count("id").alias("frequencia")).orderBy('frequencia', ascending=True).show() 


+-----------+----------+
|   tipo_uso|frequencia|
+-----------+----------+
|  Comercial|      4542|
|Residencial|     84541|
+-----------+----------+

+------------+----------+
|tipo_unidade|frequencia|
+------------+----------+
|        Casa|     10319|
|      Outros|     11963|
| Apartamento|     66801|
+------------+----------+

+------------+----------+
|tipo_anuncio|frequencia|
+------------+----------+
|  Lançamento|       256|
|       Usado|     88827|
+------------+----------+



In [None]:
df_filted=df.where("tipo_unidade=='Apartamento' and tipo_uso=='Residencial' and tipo_anuncio=='Usado' ")

In [None]:
print('Foram filtrados',round(df_filted.count()/df.count()*100),'% dos dados') 

Foram filtrados 75 % dos dados


### Realizando mais alguns tratamentos de dados

A coluna valores contém variáveis tipo array com informações sobre valores de condominio, iptu, tipo e valor. Vamos separar em colunas: 

In [None]:
df_filted_2=df_filted.withColumn('valores', df['valores'][0])#Passando para um struct
df_filted_2= df_filted_2.select('*',"valores.tipo" ,"valores.iptu","valores.condominio" ,"valores.valor" )#Criando as colunas
df_filted_2=df_filted_2.withColumn('valor', df_filted_2['valor'].cast(IntegerType()))
df_filted_2=df_filted_2.withColumn('condominio', df_filted_2['condominio'].cast(IntegerType()))
df_filted_2=df_filted_2.withColumn('iptu', df_filted_2['iptu'].cast(IntegerType()))
df_filted_2=df_filted_2.drop("valores")
df_filted_2.printSchema()

root
 |-- id: string (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- andar: integer (nullable = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- tipo: string (nullable = true)


### Salvando os dados em formato parquet

In [None]:
df_filted_2.write.parquet(
    path='/content/dados/parquet',
    mode='overwrite'
)

In [None]:
dados_parquet = spark.read.parquet(
    '/content/dados/parquet'
)

In [None]:
dados_parquet.printSchema()

root
 |-- id: string (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- andar: integer (nullable = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- tipo: string (nullable = true)


In [None]:
df_filted_2.write.parquet(
    path='/content/drive/MyDrive/MF App/parquet',
    mode='overwrite'
)