In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u382-ga-1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 16 not upgraded.


In [2]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf

In [3]:
conf = SparkConf().set("spark.ui.port", "4050")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [4]:
#Connection is tested sucsessfully

# Feature engineering

In [5]:
from __future__ import print_function

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
import sys

In [8]:
import warnings
warnings.filterwarnings('ignore')

## Выбор релевантных признаков

In [9]:
#Загрузка датасета
from sqlalchemy.sql.expression import true
input_path = '/content/drive/MyDrive/Spark_training/california_housing_train.csv'
df =spark.read.csv(input_path, inferSchema=True, header=True)

In [10]:
# Размер и схема датасета
print(f'rows: {df.count()}, columns: {len(df.columns)}')

rows: 17000, columns: 9


In [11]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [12]:
from pandas._libs.hashtable import value_count
def value_counts(df, col_name):
    value_counts = df.GroupBy(col_name).count().orderBy(col('count').desc())
    return value_counts.show()

In [13]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [14]:
important_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'population', 'households']

In [15]:
#формируем датасет, используя только нужные признаки
df_1 = df.select(important_cols)

In [16]:
df_1.show()

+---------+--------+------------------+-----------+----------+----------+
|longitude|latitude|housing_median_age|total_rooms|population|households|
+---------+--------+------------------+-----------+----------+----------+
|  -114.31|   34.19|              15.0|     5612.0|    1015.0|     472.0|
|  -114.47|    34.4|              19.0|     7650.0|    1129.0|     463.0|
|  -114.56|   33.69|              17.0|      720.0|     333.0|     117.0|
|  -114.57|   33.64|              14.0|     1501.0|     515.0|     226.0|
|  -114.57|   33.57|              20.0|     1454.0|     624.0|     262.0|
|  -114.58|   33.63|              29.0|     1387.0|     671.0|     239.0|
|  -114.58|   33.61|              25.0|     2907.0|    1841.0|     633.0|
|  -114.59|   34.83|              41.0|      812.0|     375.0|     158.0|
|  -114.59|   33.61|              34.0|     4789.0|    3134.0|    1056.0|
|   -114.6|   34.83|              46.0|     1497.0|     787.0|     271.0|
|   -114.6|   33.62|              16.0

In [17]:
#группируем по housing_median_age и сортируем
df_1.createOrReplaceTempView('df_1')

In [18]:
df_grouped_by_median_age = spark.sql("""
select longitude,latitude, housing_median_age, total_rooms, population, households,
count(*) OVER(partition by housing_median_age order by housing_median_age desc) as cnt_houses

 from df_1
 """)

In [19]:
df_grouped_by_median_age.show()

+---------+--------+------------------+-----------+----------+----------+----------+
|longitude|latitude|housing_median_age|total_rooms|population|households|cnt_houses|
+---------+--------+------------------+-----------+----------+----------+----------+
|  -120.93|   37.65|               1.0|     2254.0|     402.0|     112.0|         2|
|   -122.0|   38.23|               1.0|     2062.0|     872.0|     268.0|         2|
|   -115.8|   33.26|               2.0|       96.0|      30.0|      16.0|        49|
|  -116.89|   33.86|               2.0|     6900.0|    1950.0|     980.0|        49|
|  -117.15|    33.7|               2.0|     6305.0|    2489.0|    1152.0|        49|
|  -117.16|   33.57|               2.0|    20391.0|    7132.0|    2716.0|        49|
|  -117.17|   34.12|               2.0|     3867.0|    1275.0|     433.0|        49|
|  -117.17|   33.66|               2.0|     7401.0|    2826.0|     839.0|        49|
|  -117.21|   33.82|               2.0|     4198.0|    1943.0|   

In [22]:
df_grouped_by_median_age.createOrReplaceTempView('df_grouped_by_median_age_view')

In [27]:
df_3 = spark.sql("""
SELECT *, round(avg(total_rooms) OVER(partition by housing_median_age), 2) as avg_total_rooms
FROM df_grouped_by_median_age_view
""") #добавлен признак avg_total_rooms

In [29]:
df_3.show()

+---------+--------+------------------+-----------+----------+----------+----------+---------------+
|longitude|latitude|housing_median_age|total_rooms|population|households|cnt_houses|avg_total_rooms|
+---------+--------+------------------+-----------+----------+----------+----------+---------------+
|  -120.93|   37.65|               1.0|     2254.0|     402.0|     112.0|         2|         2158.0|
|   -122.0|   38.23|               1.0|     2062.0|     872.0|     268.0|         2|         2158.0|
|   -115.8|   33.26|               2.0|       96.0|      30.0|      16.0|        49|         5237.1|
|  -116.89|   33.86|               2.0|     6900.0|    1950.0|     980.0|        49|         5237.1|
|  -117.15|    33.7|               2.0|     6305.0|    2489.0|    1152.0|        49|         5237.1|
|  -117.16|   33.57|               2.0|    20391.0|    7132.0|    2716.0|        49|         5237.1|
|  -117.17|   34.12|               2.0|     3867.0|    1275.0|     433.0|        49|       