In [7]:
from pyspark.sql import *
from pyspark.sql.types import *

In [9]:
rdd = sc.textFile('/sptest/cal_housing.data')

In [10]:
rdd.take(2)

[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000', u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [11]:
rdd2 = rdd.map(lambda line: line.split(","))
rdd2.take(2)

[[u'-122.230000', u'37.880000', u'41.000000', u'880.000000', u'129.000000', u'322.000000', u'126.000000', u'8.325200', u'452600.000000'], [u'-122.220000', u'37.860000', u'21.000000', u'7099.000000', u'1106.000000', u'2401.000000', u'1138.000000', u'8.301400', u'358500.000000']]

In [30]:
from pyspark.sql import Row
df = rdd2.map(lambda line: Row(longitude=line[0],
                               latitude=line[1], 
                               housingMedianAge=line[2], 
                               totalRooms=line[3], 
                               totalBedRooms=line[4], 
                               population=line[5], 
                               households=line[6], 
                               medianIncome=line[7], 
                               medianHouseValue=line[8])).toDF()

In [13]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [17]:
def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df

In [31]:
from pyspark.sql.functions import col
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']
df = convertColumn(df, columns, FloatType())
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households"))
df = df.withColumn("populationPerHousehold", col("population")/col("households"))
df = df.withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))


df = df.select("medianHouseValue",
             "totalBedRooms",
             "population",
             "households",
             "medianIncome",
             "roomsPerHousehold",
             "populationPerHousehold",
             "bedroomsPerRoom")
df.show(5)

# df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|medianHouseValue|totalBedRooms|population|households|medianIncome| roomsPerHousehold|populationPerHousehold|    bedroomsPerRoom|
+----------------+-------------+----------+----------+------------+------------------+----------------------+-------------------+
|           4.526|        129.0|     322.0|     126.0|      8.3252| 6.984126984126984|    2.5555555555555554|0.14659090909090908|
|           3.585|       1106.0|    2401.0|    1138.0|      8.3014| 6.238137082601054|     2.109841827768014|0.15579659106916466|
|           3.521|        190.0|     496.0|     177.0|      7.2574| 8.288135593220339|    2.8022598870056497|0.12951601908657123|
|           3.413|        235.0|     558.0|     219.0|      5.6431|5.8173515981735155|     2.547945205479452|0.18445839874411302|
|           3.422|        280.0|     565.0|     259.0|      3.8462| 6.281853281853282|    

In [32]:
df.take(2)

[Row(medianHouseValue=4.526, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908), Row(medianHouseValue=3.585, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, roomsPerHousehold=6.238137082601054, populationPerHousehold=2.109841827768014, bedroomsPerRoom=0.15579659106916466)]

In [34]:
# 把 DataFrame 转换成 RDD，然后用 map() 函数把每个对象分成两部分：房价和一个包含其余属性的列表，然后.在转换回 DataFrame。
from pyspark.ml.linalg import DenseVector

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df_ = spark.createDataFrame(input_data, ["label", "features"])

In [35]:
df_.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
+-----+--------------------+
only showing top 5 rows

In [36]:
df.take(2)

[Row(medianHouseValue=4.526, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908), Row(medianHouseValue=3.585, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, roomsPerHousehold=6.238137082601054, populationPerHousehold=2.109841827768014, bedroomsPerRoom=0.15579659106916466)]

In [37]:
from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df_)
scaled_df = scaler.transform(df_)

In [38]:
scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949917...|
+-----+--------------------+--------------------+
only showing top 2 rows