# Housing Market

### Introduction:

This time we will create our own dataset with fictional numbers to describe a house market. As we are going to create random data don't try to reason of the numbers.

### Step 1. Import the necessary libraries

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T


spark = SparkSession.builder\
                    .appName('housingmarket')\
                    .getOrCreate()


25/06/17 13:22:17 WARN Utils: Your hostname, kevin-llanos-Type1ProductConfigId resolves to a loopback address: 127.0.1.1; using 192.168.1.92 instead (on interface wlo1)
25/06/17 13:22:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/17 13:22:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Step 2. Create 3 differents Series, each of length 100, as follows: 
1. The first a random number from 1 to 4 
2. The second a random number from 1 to 3
3. The third a random number from 10,000 to 30,000

## 🐍 **Solución Pyspark** 

In [2]:
import random
import numpy as np


numbers1 = np.random.randint(1, 4, 100)

numbers2 = np.random.randint(1, 3, 100)

numbers3 = np.random.randint(10000, 30000, 100)

In [3]:
numbers_1 = [random.randint(1, 4) for i in range(1, 101)]

numbers_2 = [random.randint(1, 3) for i in range(1, 101)]

numbers_3 = [random.randint(1, 10000) for i in range(1, 101)]

In [4]:
s1 = pd.Series(numbers1)

s2 = pd.Series(numbers2)

s3 = pd.Series(numbers3)

### Step 3. Let's create a DataFrame by joinning the Series by column

## 🐍 **Solución Pyspark** 

In [5]:
data = list(zip(numbers_1, numbers_2, numbers_3))


schema = T.StructType([
    T.StructField('col1', T.IntegerType()),
    T.StructField('col2', T.IntegerType()),
    T.StructField('col3', T.IntegerType()),

])
market = spark.createDataFrame(data, schema=schema)
market.show()

                                                                                

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   3|   1|6886|
|   2|   2|3464|
|   2|   1|6892|
|   1|   1|8116|
|   3|   2|9020|
|   3|   1|8603|
|   2|   1|4728|
|   3|   3|3983|
|   1|   1|9692|
|   2|   3|2456|
|   4|   1|4567|
|   3|   1|6709|
|   3|   1| 819|
|   2|   3|3763|
|   2|   2|4144|
|   2|   3|6247|
|   2|   1|  92|
|   2|   2|2617|
|   3|   3|9737|
|   1|   3|3511|
+----+----+----+
only showing top 20 rows



In [6]:
df = pd.concat([s1, s2, s3], axis=1)

df.columns= ['col1', 'col2', 'col3']

df

Unnamed: 0,col1,col2,col3
0,2,2,27766
1,1,1,15025
2,1,1,20619
3,2,2,26153
4,3,2,19868
...,...,...,...
95,1,2,18657
96,1,1,28471
97,3,2,14000
98,2,2,13928


### Step 4. Change the name of the columns to bedrs, bathrs, price_sqr_meter

## 🐍 **Solución Pyspark** 

In [7]:

columns = market.columns

new_cols = ['beds', 'baths', 'price_sqr_meter']

for i, col in enumerate(columns):
    
    print(f'Renombrando {col} por  {new_cols[i]}')
    
    market = market.withColumnRenamed(col,  new_cols[i])
    

Renombrando col1 por  beds
Renombrando col2 por  baths
Renombrando col3 por  price_sqr_meter


In [8]:
market.show()

+----+-----+---------------+
|beds|baths|price_sqr_meter|
+----+-----+---------------+
|   3|    1|           6886|
|   2|    2|           3464|
|   2|    1|           6892|
|   1|    1|           8116|
|   3|    2|           9020|
|   3|    1|           8603|
|   2|    1|           4728|
|   3|    3|           3983|
|   1|    1|           9692|
|   2|    3|           2456|
|   4|    1|           4567|
|   3|    1|           6709|
|   3|    1|            819|
|   2|    3|           3763|
|   2|    2|           4144|
|   2|    3|           6247|
|   2|    1|             92|
|   2|    2|           2617|
|   3|    3|           9737|
|   1|    3|           3511|
+----+-----+---------------+
only showing top 20 rows



In [9]:
df.columns  = ['beds', 'baths', 'price_sqr_meter']

df

Unnamed: 0,beds,baths,price_sqr_meter
0,2,2,27766
1,1,1,15025
2,1,1,20619
3,2,2,26153
4,3,2,19868
...,...,...,...
95,1,2,18657
96,1,1,28471
97,3,2,14000
98,2,2,13928


25/06/17 13:22:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Step 5. Create a one column DataFrame with the values of the 3 Series and assign it to 'bigcolumn'

## 🐍 **Solución Pyspark** 

In [32]:
data = numbers_1 + numbers_2 +numbers_3

data = [tuple([i]) for i in data]

In [34]:

schema = T.StructType([
    T.StructField('bigcolumn', T.IntegerType())
])

big_data = spark.createDataFrame(data, schema=schema)

big_data.show()

+---------+
|bigcolumn|
+---------+
|        3|
|        2|
|        2|
|        1|
|        3|
|        3|
|        2|
|        3|
|        1|
|        2|
|        4|
|        3|
|        3|
|        2|
|        2|
|        2|
|        2|
|        2|
|        3|
|        1|
+---------+
only showing top 20 rows



In [45]:
df_ = pd.DataFrame(pd.concat([s1, s2, s3], axis=0), columns=['bigcolumn'])
df_

Unnamed: 0,bigcolumn
0,2
1,1
2,1
3,2
4,3
...,...
95,18657
96,28471
97,14000
98,13928


### Step 6. Oops, it seems it is going only until index 99. Is it true?

## 🐍 **Solución Pyspark** 

Como pysprk no utiliza indices no hay problema en cabmbio en pandas si

### Step 7. Reindex the DataFrame so it goes from 0 to 299

## 🐍 **Solución Pyspark** 

In [46]:
df_.reset_index(drop=True)

Unnamed: 0,bigcolumn
0,2
1,1
2,1
3,2
4,3
...,...
295,18657
296,28471
297,14000
298,13928
