## 了解数据集

In [146]:
from pyspark.sql import SparkSession

In [147]:
# 初始化 SparkSession 和 SparkContext
spark = SparkSession.builder.master('local').appName(
    'California Housing').config('spark.executor.memory', '1gb').getOrCreate()
sc = spark.sparkContext

In [148]:
# 读取数据并创建 RDD
sk = spark.read.csv('./housing/housing.csv', header=True)
lines = sc.textFile('./housing/housing.csv')

In [149]:
lines.take(2)

['longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity',
 '-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY']

In [150]:
header = lines.first()
rdd = lines.filter(lambda row: row != header)

In [151]:
rdd.take(2)

['-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY',
 '-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY']

In [152]:
sk.take(2)

[Row(longitude='-122.23', latitude='37.88', housing_median_age='41.0', total_rooms='880.0', total_bedrooms='129.0', population='322.0', households='126.0', median_income='8.3252', median_house_value='452600.0', ocean_proximity='NEAR BAY'),
 Row(longitude='-122.22', latitude='37.86', housing_median_age='21.0', total_rooms='7099.0', total_bedrooms='1106.0', population='2401.0', households='1138.0', median_income='8.3014', median_house_value='358500.0', ocean_proximity='NEAR BAY')]

In [153]:
rdd = rdd.map(lambda line: line.split(','))

In [154]:
rdd.take(2)

[['-122.23',
  '37.88',
  '41.0',
  '880.0',
  '129.0',
  '322.0',
  '126.0',
  '8.3252',
  '452600.0',
  'NEAR BAY'],
 ['-122.22',
  '37.86',
  '21.0',
  '7099.0',
  '1106.0',
  '2401.0',
  '1138.0',
  '8.3014',
  '358500.0',
  'NEAR BAY']]

In [188]:
sk.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [187]:
# 读取数据并创建 RDD
rdd = sc.textFile('./CaliforniaHousing/cal_housing.data')

# 读取数据每个属性的定义并创建 RDD
header = sc.textFile('./CaliforniaHousing/cal_housing.domain')

In [189]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

In [190]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [193]:
rdd = rdd.map(lambda line: line.split(','))

In [156]:
from pyspark.sql import Row

In [194]:
# 遇到numpy warning 更行numpy就行， pip install --upgrade numpy
df = rdd.map(lambda line: Row(longitude=line[0],
                             latitude=line[1],
                             housingMedianAge=line[2],
                             totalRooms=line[3],
                             totalBedRooms=line[4],
                             population=line[5],
                             households=line[6],
                             medianIncome=line[7],
                             medianHouseValue=line[8])).toDF()

In [195]:
df.show(5)

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
+-----------+----------------+--

In [196]:
df.describe

<bound method DataFrame.describe of DataFrame[households: string, housingMedianAge: string, latitude: string, longitude: string, medianHouseValue: string, medianIncome: string, population: string, totalBedRooms: string, totalRooms: string]>

In [197]:
# 通过 cast() 函数把每一列的类型转换成 float。
def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df    

In [198]:
from pyspark.sql.types import FloatType
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

df = convertColumn(df, columns, FloatType())

In [199]:
df.describe

<bound method DataFrame.describe of DataFrame[households: float, housingMedianAge: float, latitude: float, longitude: float, medianHouseValue: float, medianIncome: float, population: float, totalBedRooms: float, totalRooms: float]>

In [200]:
df.show(5)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
+----------+----------------+--------+---------+----------------+------------+--

In [201]:
# 统计出所有建造年限各有多少个房子
df.groupBy('housingMedianAge').count().sort('housingMedianAge', ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



## 预处理

In [202]:
# 用 withColumn() 函数把所有房价都除以 100000
from pyspark.sql.functions import col
df = df.withColumn('medianHouseValue', col('medianHouseValue')/100000)

In [203]:
df.show(5)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|           3.413|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|           3.422|      3.8462|     565.0|        280.0|    1627.0|
+----------+----------------+--------+---------+----------------+------------+--

添加三个新列：
- 每个家庭的平均房间数：roomsPerHousehold  
- 每个家庭的平均人数：populationPerHouseho  
- 卧室在总房间的占比：bedroomsPerRoom

In [204]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")).\
    withColumn("populationPerHousehold", col("population")/col("households")).\
    withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [205]:
df.show(3)

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+-----------------+----------------------+-------------------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|6.984126984126984|    2.5555555555555554|0.14659090909090908|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|6.238137082601054|     2.109841827768014|0.15579659106916466|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0|8.2881

In [206]:
# 只留下重要的信息列。
df = df.select("medianHouseValue",
               "totalBedRooms",
               "population",
               "households",
               "medianIncome",
               "roomsPerHousehold",
               "populationPerHousehold",
               "bedroomsPerRoom")

In [207]:
df.show(3)

+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome|roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252|6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014|6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|        190.0|     496.0|     177.0|      7.2574|8.288135593220339|    2.8022598870056497|0.12951601908657123|
+----------------+-------------+----------+----------+------------+-----------------+----------------------+-------------------+
only showing top 3 rows



In [208]:
# 把 DataFrame 转换成 RDD，然后用 map() 函数把每个对象分成两部分：房价和一个包含其余属性的列表，然后.在转换回 DataFrame。
from pyspark.ml.linalg import DenseVector

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df_ = spark.createDataFrame(input_data, ["label", "features"])

In [209]:
df_.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
+-----+--------------------+
only showing top 5 rows



In [210]:
df.take(2)

[Row(medianHouseValue=4.526, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908),
 Row(medianHouseValue=3.585, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, roomsPerHousehold=6.238137082601054, populationPerHousehold=2.109841827768014, bedroomsPerRoom=0.15579659106916466)]

In [211]:
from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df_)
scaled_df = scaler.transform(df_)

In [212]:
# 这个有NaN 搞不明白
scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949916...|
+-----+--------------------+--------------------+
only showing top 2 rows



In [213]:
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

## 创建模型

In [214]:
train_data, test_data = scaled_df.randomSplit([.8, .2], seed=123)

In [215]:
train_data.show(2)

+-------+--------------------+--------------------+
|  label|            features|     features_scaled|
+-------+--------------------+--------------------+
|0.14999|[28.0,18.0,8.0,0....|[0.06646917315187...|
|  0.175|[168.0,259.0,138....|[0.39881503891126...|
+-------+--------------------+--------------------+
only showing top 2 rows



In [216]:
df.count(), train_data.count(), test_data.count()

(20640, 16462, 4178)

In [233]:
# 构建一个线性回归模型
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features_scaled', labelCol="label",
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data)

## 模型评估

In [234]:
predicted = linearModel.transform(test_data)
predictions = predicted.select('prediction').rdd.map(lambda x: x[0])
labels = predicted.select('label').rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect()

In [235]:
predicted.show(5)

+-------+--------------------+--------------------+------------------+
|  label|            features|     features_scaled|        prediction|
+-------+--------------------+--------------------+------------------+
|0.14999|[73.0,85.0,38.0,1...|[0.17329463000310...|1.4548841678815545|
|0.14999|[239.0,490.0,164....|[0.56736187083209...|1.5771240310074937|
|0.14999|[267.0,628.0,225....|[0.63383104398397...| 2.159579166671764|
|  0.225|[1743.0,6835.0,14...|[4.13770602870438...| 1.747920398004267|
|  0.266|[309.0,808.0,294....|[0.73353480371179...|1.6331379289701429|
+-------+--------------------+--------------------+------------------+
only showing top 5 rows



In [238]:
predictionAndLabel[900: 902]

[(1.777861230736911, 1.114), (1.8660698813830552, 1.115)]