In [None]:
from utils import init_spark, median_age_class, north_south_latitude, west_east_longitude
from pyspark.sql.functions import col, stddev, min as spark_min, max as spark_max
from pyspark.sql.types import FloatType

### Diretórios

In [None]:
# Diretório dos dados de entrada, não inclusos no git.
relative_ca_h_t_dir = '../raw_data/california_housing_train.csv'

# Diretório para salvar dados processados, não inclusos no git.
target_processed_data_dir = '../processed_data'
processed_ca_housing_dir = f'{target_processed_data_dir}/ca_housing/ca_housing.parquet'

views_dir = '../views'
ca_housing_query_dir = f'{views_dir}/ca_analysis.sql'

### Inicialização do PySpark

In [None]:
spark = init_spark()

In [None]:
processed_ca_housing = raw_ca_housing = spark.read.options(header = True, delimiter=',').csv(relative_ca_h_t_dir)

In [None]:
raw_ca_housing.show(5)

### 1 - Exploração

##### 1.1 - Coluna com maior desvio padrão

In [None]:
std_devs = raw_ca_housing.select([stddev(column) for column in raw_ca_housing.columns]).collect()
std_devs = [column for column in std_devs[0]]

greatest_std_dev_value = max(std_devs)
greatest_std_dev_column = raw_ca_housing.columns[std_devs.index(greatest_std_dev_value)]

print(f'Greatest standard deviation column: {greatest_std_dev_column}: {greatest_std_dev_value}')

##### 1.2 Valor mínimo e máximo

In [None]:
column_agg = raw_ca_housing.select(spark_min(greatest_std_dev_column), spark_max(greatest_std_dev_column)).collect()
[greatest_std_dev_max, greatest_std_dev_min] = [i for i in column_agg[0]]

print(f'{greatest_std_dev_column}\nMin value: {greatest_std_dev_min}\nMax value: {greatest_std_dev_max}')

### 2 - Trabalhando com colunas

#### 2.1 -  Criar coluna hma_cat, baseada na coluna housing_median_age

In [None]:
processed_ca_housing = processed_ca_housing.withColumn('hma_cat', median_age_class(col('housing_median_age').cast(FloatType())))
processed_ca_housing.select('housing_median_age', 'hma_cat').show(5)

#### 2.2 - Criar coluna c_ns (e c_ol)
PS: A regra para longitude foi alterada para retornar `oeste` ou `leste`, uma vez que é latitude que determina a região relativa de norte e sul.

In [None]:
processed_ca_housing = processed_ca_housing.withColumn('c_ns', north_south_latitude(col('latitude').cast(FloatType())))

processed_ca_housing = processed_ca_housing.withColumn('c_ol', west_east_longitude(col('longitude').cast(FloatType())))
processed_ca_housing.select('longitude', 'c_ol', 'latitude', 'c_ns').show(5)

#### 2.3 - Renomar as colunas

In [None]:
processed_ca_housing = processed_ca_housing\
    .withColumnRenamed('hma_cat', 'age')\
    .withColumnRenamed('c_ns', 'california_ns_region')\
    .withColumnRenamed('c_ol', 'california_ol_region')

#### Gravando o resultado em parquet

In [None]:
processed_ca_housing.select('age',
    'california_ns_region',
    'california_ol_region',
    'total_rooms',
    'total_bedrooms',
    'population',
    'households',
    'median_house_value')\
    .write.format('parquet').mode('overwrite').save(processed_ca_housing_dir)

### 3 - Agregações

In [None]:
ca_housing = spark.read.parquet(processed_ca_housing_dir)

ca_housing.createOrReplaceTempView('ca_housing')

#### 3.1 - Criação da análise: Soma de população e média dos valores médios das casas agrupados por idade e região.
PS: O desafio pede ordenação decrescente por `median_house_value`, porém, por se tratar de uma query com agregações, não há como incluir colunas que não servem de agrupamento ou agregação. Nesse caso, foi interpretado que a ordenação descrescente é por `m_median_house_value`.

In [None]:
ca_housing_query = open(ca_housing_query_dir).read()

try:
    ca_housing_view = spark.sql(ca_housing_query)
    ca_housing_view.show(10, truncate = False)
except Exception as e:
    print(e)