## 将HDFS里的房源字段数据映射成数字，封装成数据集

In [46]:
from pyspark.sql.types import *
import json
# 获得清洗数据集
def get_results(row):
    res = {
        'raw_key': row.raw_key,
        'price': int(float(row.price)),    # 单位元每月
        'pay_way': get_pay_way(row.pay_way), # 付款方式，如押一付三等
        'tag': json.dumps(get_tags(row.tag)),     # 房屋标签
        'rent_way': get_rent_way(row.rent_way),  # 出租方式
        'house_type': get_house_type(row.house_type), # 户型
        'size': get_size(row.size),   # 面积
        'orientation': get_orientation(row.orientation), # 朝向
        'floor': json.dumps(get_floor(row.floor)) , # 楼层
        'decorate_type': get_decorate_type(row.decorate_type), # 装修类型
        'facility': json.dumps(get_facility(row.facility)),# 设施
        'traffic': get_traffic(row.traffic),
        'address': get_address(row.address),
        'coordinate': json.dumps(get_coordinate(row.address))
    }
    return res

    
def get_pay_way(pay_way):
    pay_map = {
    '': 0,
    '面议': 1,
    '押一付一': 2,
    '押一付二': 3,
    '押一付三': 4,
    '押一付半年': 5,
    '押一付一年': 6,
    '押二付一': 7,
    '押二付二': 8,
    '押二付三': 9,
    '押三付一': 10,
    '押三付三': 11,
    '半年付': 12,
    '年付': 13,
    }
    return pay_map[pay_way]


def get_tags(tags):
    tags_map = {
    '是一家人' : 0,
    '不吸烟' : 1,
    '随时看房' : 2,
    '独卫' : 3,
    '繁华地段' : 4,
    '精装修' : 5,
    '已传房本' : 6,
    '紧邻地铁' : 7,
    '家电齐全' : 8,
    '电梯房' : 9,
    '押一付一' : 10,
    '南北通透' : 11,
    '朝南' : 12,
    '不养宠物' : 13,
    '免中介费' : 14,
    '女生合租' : 15,
    '是女生': 15,
    '全装全配' : 16,
    '邻地铁' : 17,
    '公区消毒'  : 18,
    '入口检疫'  : 19,
    '租户稳定' : 20,
    '首次出租' : 21,
    '独立阳台' : 22,
    '普通装修' : 23,
    '拎包入住' : 24,
    '半年起租' : 25,
    '采光好' : 26,
    '低价出租' : 27,
    '配套齐全' : 28,
    '作息正常' : 29,
    '男生合租' : 30,
    '一年起租' : 31
    }
    res = [0 for i in range(32)]
    for (k,v) in tags_map.items():
        for item in tags:
            if item.find(k) != -1:
                res[v] = 1
    return res


def get_rent_way(rent_way):
    '''
    '合租 - 主卧 - 限男生':
    '合租 - 次卧':
    '合租 - 其他':
    '合租 - 次卧 - 限女生':
    '床位(合租)':
    '合租 - 其他 - 男女不限':
    '合租 - 其他 - 限女生':
    '次卧(合租)':
    '整租':
    '隔断间(合租)':
    '合租 - 男女不限':
    '合租 - 主卧 - 限女生':
    '合租 - 主卧 - 男女不限':
    '主卧(合租)':
    '合租 - 次卧 - 限男生':
    '单间(合租)':
    '合租 - 主卧':
    '合租 - 次卧 - 男女不限':
    '''
    rent_map_list = [
                {
                    '未知': 0,
                    '合租': 1,
                    '整租':2,
                },
                {
                    '未知': 0,
                    '其他': 0,
                    '主卧': 1,
                    '次卧': 2,
                    '隔断间': 3,
                    '床位': 4,
                    '单间': 6,
                },
                {
                    '未知': 0,
                    '男': 1,
                    '女': 2,
                    '不限': 3,
                }
              ]
    res = [0 for i in range(3)]
    for (i,rent_map) in enumerate(rent_map_list):
        for (k,v) in rent_map.items():
            if rent_way.find(k) != -1:
                res[i] = v
    return res


def get_house_type(house_type):
    house_type = house_type.replace(' ','')
    res = [-1 for i in range(3)]
    tmp = house_type.split('室')
    if len(tmp) == 2:
        res[0] = int(tmp[0])
        tmp = tmp[1].split('厅')
        res[1] = int(tmp[0])
        tmp = tmp[1].split('卫')
        res[2] = int(tmp[0])
    return res    
  

def get_size(size):
    size = float(size.replace('平米',''))
    return size


def get_orientation(orientation):
    orientation_map = {
        '东' : 1,
        '南' : 2,
        '西' : 3,
        '北' : 4,
        '东南' : 5,
        '东北' : 6,
        '西南' : 7,
        '西北' : 8,
        '东西' : 9,
        '南北' : 10,
        '暂无信息' : 0,
        '暂无' : 0,
        '不限' : 0
    }
    return orientation_map[orientation]

def get_floor(floor):
#     floor = '共2层'
    floor_map = {
        '高层': 1,
        '中层': 2,
        '低层': 3,
        '地下': 4
    }
    res = [0, 0, 0]
    if not floor:
        return res
    elif floor[0] == '共':
        res[0] = (int)(floor.strip('共层'))
    else:
        res[0] = 1
        tmp = floor.split(' ')
        if len(tmp) >= 1:
            res[1] = floor_map[tmp[0]]
        if len(tmp) >= 2:
            res[2] = (int)(tmp[1].replace('层', ''))
    return res


def get_decorate_type(decorate_type):
    res = 0
    decorate_type_map = {
    '未知': 0,
    '毛坯': 1,
    '简单装修': 2,
    '中等装修': 3,
    '精装修': 4,
    '豪华装修': 5,
    '中装修': 3,
    '暂无资料': 0,
    '不限':0,
    '简装修':2,
     None: 0
    }
    return decorate_type_map[decorate_type]

def get_facility(facility):
    facility_map = {
    '卫生间' : 0,
    '独立卫生间' : 1,
    '冰箱' : 2,
    '宽带' : 3,
    '可做饭' : 4,
    '衣柜' : 5,
    '露台' : 6,
    '阁楼' : 7,
    '暖气' : 8,
    '洗衣机' : 9,
    '游泳池' : 10,
    '车位' : 11,
    '微波炉' : 12,
    '沙发' : 13,
    '床' : 14,
    '阳光房' : 15,
    '空调' : 16,
    '热水器' : 17,
    '电视' : 18,
    '阳台' : 19,
    '电梯' : 20
    }
#     facility = ['独立卫生间', '冰箱', '宽带', '可做饭', '衣柜', '露台', '阁楼', '暖气', '洗衣机', '游泳池', '车位', '微波炉', '卫生间', '沙发', '床', '阳光房', '空调', '热水器', '电视', '阳台', '电梯']

    res = [0 for i in range(21)]
    if not facility:
        return res
    for (k,v) in facility_map.items():
        if k in facility:
            res[v] = 1
    return res


def get_traffic(traffic):
#     traffic = '距离地铁5号线南延伸段金海湖站站880米'
#     无邻近地铁则设置为3000米，因为大于2000米非地铁房
    if not traffic:
        return 3000
    traffic = traffic.strip('米')
    res = ''
    for c in traffic[::-1]:
        if c >= '0' and c <= '9':
            res += c
        else:
            break
    if res == '':
        res = 3000
    else:
        res = int(res[::-1])
    return res

def get_address(address):
    district = address['district']
    address_map = {
        '黄浦':1,
        '嘉定':2,
        '普陀':3,
        '上海周边':4,
        '浦东':5,
        '徐汇':6,
        '静安':7,
        '松江':8,
        '青浦':9,
        '杨浦':10,
        '虹口':11,
        '崇明':12,
        '奉贤':13,
        '长宁':14,
        '金山':15,
        '宝山':16,
        '闵行':17
    }
    res = address_map[district]
    return res
def get_coordinate(address):
    if address['coordinate']:
        res = [float(item) for item in address['coordinate'].split(',')]
    else:
        res = None
    return res

In [8]:
from pyspark.sql.types import *
input_path = "/user/tawn/shanghai.house.parquet"
df = spark.read.format("parquet").load(input_path)
schema = StructType([
StructField('raw_key',StringType(),True),
StructField('price',LongType(),True),
StructField('pay_way',LongType(),True),
StructField('tag',StringType(),True),
StructField('rent_way',StringType(),True),
StructField('house_type',StringType(),True),
StructField('size',DoubleType(),True),
StructField('orientation',LongType(),True),
StructField('floor',StringType(),True),
StructField('decorate_type',LongType(),True),
StructField('facility',StringType(),True),
StructField('traffic',LongType(),True),
StructField('address',LongType(),True),
StructField('coordinate',StringType(),True)
])

In [48]:
# t = df.take(100)
# df_t = spark.createDataFrame(t)
rdd = df.rdd.map(lambda x: Row(**get_results(x)))
df_out = spark.createDataFrame(rdd,schema)

In [31]:
df.count()

73356

## 写入HDFS

In [20]:
df_out.write.csv('shanghai.house.csv', mode='overwrite')

## 写入本地

In [None]:
df_out.repartition(1).write.format("csv").mode("overwrite").option("header", True).save('file:///Users/tawn/Tawn/house_clean/shanghai.house.csv')

In [None]:
# df_out.write.csv('file:///Users/tawn/Tawn/house_clean/shanghai.house.csv', mode='overwrite')

In [131]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- community: string (nullable = true)
 |    |-- detail: string (nullable = true)
 |    |-- coordinate: string (nullable = true)
 |-- decorate_type: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- facility: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- floor: string (nullable = true)
 |-- house_type: string (nullable = true)
 |-- intro: string (nullable = true)
 |-- orientation: string (nullable = true)
 |-- pay_way: string (nullable = true)
 |-- pic: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- price: string (nullable = true)
 |-- raw_key: string (nullable = true)
 |-- rent_way: string (nullable = true)
 |-- size: string (nullable = true)
 |-- tag: array (nullable = true)
 |    |-- element: stri

In [27]:
df_out.printSchema()

root
 |-- raw_key: string (nullable = true)
 |-- price: long (nullable = true)
 |-- pay_way: long (nullable = true)
 |-- tag: string (nullable = true)
 |-- rent_way: string (nullable = true)
 |-- house_type: string (nullable = true)
 |-- size: double (nullable = true)
 |-- orientation: long (nullable = true)
 |-- floor: string (nullable = true)
 |-- decorate_type: long (nullable = true)
 |-- facility: string (nullable = true)
 |-- traffic: long (nullable = true)
 |-- address: long (nullable = true)
 |-- coordinate: string (nullable = true)



In [9]:
df_out.take(1)

[Row(raw_key='ff3b7365d59ce7c4cf6aec97f3130224', price=4500, pay_way=2, tag='[null, null, null, null, 1, null, null, 1, null, null, null, null, null, null, null, null, 1, 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null]', rent_way='[2, null, null]', house_type='[2, 1, 1]', size=79.0, orientation=2, floor='[2, null, null]', decorate_type=4, facility='[1, null, 1, 1, 1, 1, null, null, 1, 1, null, null, 1, 1, 1, null, 1, 1, 1, 1, 1]', traffic=366, address=5, coordinate='[121.524348, 31.150716]')]