In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

def concatenation(csv,city):
    spark = SparkSession.builder.getOrCreate()

    dataframe = spark.read\
        .option('header',True)\
        .option('escape','"')\
        .option('inferSchema', True)\
        .csv("./data/{}".format(csv))
    
    dataframe = dataframe.filter(dataframe.備註 != 'the note').withColumn('city',lit(city))
    
    return dataframe
    #dataframe.printSchema()

In [2]:
Taipei_df = concatenation('A_lvr_land_A.csv','台北市')
Taichung_df = concatenation('B_lvr_land_A.csv','台中市')
Kaohsiung_df = concatenation('E_lvr_land_A.csv','高雄市')
NewTaipei_df = concatenation('F_lvr_land_A.csv','新北市')
Taoyuan_df = concatenation('H_lvr_land_A.csv','桃園市')

In [3]:
Taipei_df.printSchema()

root
 |-- 鄉鎮市區: string (nullable = true)
 |-- 交易標的: string (nullable = true)
 |-- 土地位置建物門牌: string (nullable = true)
 |-- 土地移轉總面積平方公尺: string (nullable = true)
 |-- 都市土地使用分區: string (nullable = true)
 |-- 非都市土地使用分區: string (nullable = true)
 |-- 非都市土地使用編定: string (nullable = true)
 |-- 交易年月日: string (nullable = true)
 |-- 交易筆棟數: string (nullable = true)
 |-- 移轉層次: string (nullable = true)
 |-- 總樓層數: string (nullable = true)
 |-- 建物型態: string (nullable = true)
 |-- 主要用途: string (nullable = true)
 |-- 主要建材: string (nullable = true)
 |-- 建築完成年月: string (nullable = true)
 |-- 建物移轉總面積平方公尺: string (nullable = true)
 |-- 建物現況格局-房: string (nullable = true)
 |-- 建物現況格局-廳: string (nullable = true)
 |-- 建物現況格局-衛: string (nullable = true)
 |-- 建物現況格局-隔間: string (nullable = true)
 |-- 有無管理組織: string (nullable = true)
 |-- 總價元: string (nullable = true)
 |-- 單價元平方公尺: string (nullable = true)
 |-- 車位類別: string (nullable = true)
 |-- 車位移轉總面積(平方公尺): string (nullable = true)
 |-- 車位總價元: string (nullable 

In [4]:
Taipei_df.select(['鄉鎮市區','交易標的','city']).limit(10).show()

+--------+--------------------+------+
|鄉鎮市區|            交易標的|  city|
+--------+--------------------+------+
|  文山區|房地(土地+建物)+車位|台北市|
|  文山區|     房地(土地+建物)|台北市|
|  文山區|     房地(土地+建物)|台北市|
|  中正區|     房地(土地+建物)|台北市|
|  文山區|                車位|台北市|
|  文山區|     房地(土地+建物)|台北市|
|  萬華區|     房地(土地+建物)|台北市|
|  萬華區|房地(土地+建物)+車位|台北市|
|  內湖區|房地(土地+建物)+車位|台北市|
|  內湖區|     房地(土地+建物)|台北市|
+--------+--------------------+------+



In [5]:
from functools import reduce
from pyspark.sql import DataFrame

dfs = [Taipei_df,Taichung_df,Kaohsiung_df,NewTaipei_df,Taoyuan_df]
combine_df = reduce(DataFrame.unionAll, dfs)

In [6]:
combine_df.select(['鄉鎮市區', '總價元','city']).where(combine_df.city=='新北市').limit(10).collect()

[Row(鄉鎮市區='板橋區', 總價元='19000000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='14250000', city='新北市'),
 Row(鄉鎮市區='土城區', 總價元='82100000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='3505800', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='14900000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='8050000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='11530000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='7450000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='15990000', city='新北市'),
 Row(鄉鎮市區='板橋區', 總價元='10190000', city='新北市')]

## 合併數據並篩選

In [12]:
import pyspark.sql.functions as f

selection = combine_df.where(combine_df.主要用途=='住家用').filter(combine_df.建物型態.contains('住宅大樓')).filter((f.length(combine_df.總樓層數)>=3)&(combine_df.總樓層數!="十一層")&(combine_df.總樓層數!="十二層"))
selection.show()

+--------+--------------------+----------------------------------------+----------------------+----------------+------------------+------------------+----------+---------------+----------+--------+--------------------------+--------+----------------+------------+----------------------+---------------+---------------+---------------+-----------------+------------+---------+--------------+--------+------------------------+----------+-------------------------------------+-------------------+----------+------------+--------+----+--------+------+
|鄉鎮市區|            交易標的|                        土地位置建物門牌|土地移轉總面積平方公尺|都市土地使用分區|非都市土地使用分區|非都市土地使用編定|交易年月日|     交易筆棟數|  移轉層次|總樓層數|                  建物型態|主要用途|        主要建材|建築完成年月|建物移轉總面積平方公尺|建物現況格局-房|建物現況格局-廳|建物現況格局-衛|建物現況格局-隔間|有無管理組織|   總價元|單價元平方公尺|車位類別|車位移轉總面積(平方公尺)|車位總價元|                                 備註|               編號|主建物面積|附屬建物面積|陽台面積|電梯|移轉編號|  city|
+--------+--------------------+----------------------------------------+----------------------

In [13]:
from pyspark.sql.functions import udf
def date_text(date):
    str_date = str(date)
    trans = "{year}-{month}-{day}".format(year=int(str_date[:3])+1911,month=str_date[3:5],day=str_date[5:7])
    return trans
udf_func = udf(date_text) 

In [14]:
select_city = selection.withColumn("time_slots",
                                   f.struct(*[udf_func(selection.交易年月日).alias('date'),
                                              f.struct(*[f.lit(selection.鄉鎮市區).alias('district'), f.lit(selection.建物型態).alias('building_state')])
                                              .alias('events')])).select(["city","time_slots"]).distinct()
#select_city.printSchema()
select_city.limit(10).collect()

[Row(city='台北市', time_slots=Row(date='2019-01-04', events=Row(district='大安區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2018-12-22', events=Row(district='松山區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2018-12-24', events=Row(district='中山區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2019-01-27', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2019-02-17', events=Row(district='文山區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2019-01-18', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2019-03-21', events=Row(district='中正區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2018-06-22', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)'))),
 Row(city='台北市', time_slots=Row(date='2019-02-11', events=Row(district='信義區', bu

In [23]:
select_Taipei = selection\
    .withColumn('date',udf_func(selection.交易年月日))\
    .withColumn('events',f.struct(*[f.lit(selection.鄉鎮市區).alias('district'), f.lit(selection.建物型態).alias('building_state')]))\
    .select(['date','events'])\
    .sort(f.col('date').desc())\
    .where(selection.city=="台北市")
select_Taichung = selection\
    .withColumn('date',udf_func(selection.交易年月日))\
    .withColumn('events',f.struct(*[f.lit(selection.鄉鎮市區).alias('district'), f.lit(selection.建物型態).alias('building_state')]))\
    .select(['date','events'])\
    .sort(f.col('date').desc())\
    .where(selection.city=="台中市")
select_Kaohsiung = selection\
    .withColumn('date',udf_func(selection.交易年月日))\
    .withColumn('events',f.struct(*[f.lit(selection.鄉鎮市區).alias('district'), f.lit(selection.建物型態).alias('building_state')]))\
    .select(['date','events'])\
    .sort(f.col('date').desc())\
    .where(selection.city=="高雄市")
select_NewTaipei = selection\
    .withColumn('date',udf_func(selection.交易年月日))\
    .withColumn('events',f.struct(*[f.lit(selection.鄉鎮市區).alias('district'), f.lit(selection.建物型態).alias('building_state')]))\
    .select(['date','events'])\
    .sort(f.col('date').desc())\
    .where(selection.city=="新北市")
select_Taoyuan = selection\
    .withColumn('date',udf_func(selection.交易年月日))\
    .withColumn('events',f.struct(*[f.lit(selection.鄉鎮市區).alias('district'), f.lit(selection.建物型態).alias('building_state')]))\
    .select(['date','events'])\
    .sort(f.col('date').desc())\
    .where(selection.city=="桃園市")

select_Taipei.limit(10).collect()

[Row(date='2019-04-22', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-22', events=Row(district='文山區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-22', events=Row(district='文山區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-22', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-22', events=Row(district='文山區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-22', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-19', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-18', events=Row(district='南港區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-18', events=Row(district='萬華區', building_state='住宅大樓(11層含以上有電梯)')),
 Row(date='2019-04-16', events=Row(district='內湖區', building_state='住宅大樓(11層含以上有電梯)'))]

In [54]:
df_json = select_Taipei.toJSON()

with open('./test.json','w') as j:
    rows = ''
    for row in df_json.collect():
        rows += row + ', '
    rows = rows[:-2]
    j.write('{"city": "台北市", "time_slots": ['+rows+']}')

In [55]:
select_Taipei_split = select_Taipei.randomSplit([0.5,0.5],seed=1)
select_Taoyuan_split = select_Taoyuan.randomSplit([0.5,0.5],seed=1)
select_Taichung_split = select_Taichung.randomSplit([0.5,0.5],seed=1)
select_NewTaipei_split = select_NewTaipei.randomSplit([0.5,0.5],seed=1)
select_Kaohsiung_split = select_Kaohsiung.randomSplit([0.5,0.5],seed=1)

In [59]:
def write_json(version):
    taipei = select_Taipei_split[version].toJSON()
    taoyuan = select_Taoyuan_split[version].toJSON()
    newtaipei = select_NewTaipei_split[version].toJSON()
    taichung = select_Taichung_split[version].toJSON()
    kaohsiung = select_Kaohsiung_split[version].toJSON()
    
    with open("./result-part{}.json".format(version+1),'w') as j:
        rows=''
        for row in taipei.collect():
            rows += row + ', '
        rows = rows[:-2]
        j.write('{"city": "台北市", "time_slots": ['+rows+']}\n')
        
        rows=''
        for row in taoyuan.collect():
            rows += row + ', '
        rows = rows[:-2]
        j.write('{"city": "桃園市", "time_slots": ['+rows+']}\n')
        
        rows=''
        for row in newtaipei.collect():
            rows += row + ', '
        rows = rows[:-2]
        j.write('{"city": "新北市", "time_slots": ['+rows+']}\n')
        
        rows=''
        for row in taichung.collect():
            rows += row + ', '
        rows = rows[:-2]
        j.write('{"city": "台中市", "time_slots": ['+rows+']}\n')
        rows=''
        for row in kaohsiung.collect():
            rows += row + ', '
        rows = rows[:-2]
        j.write('{"city": "高雄市", "time_slots": ['+rows+']}')

In [60]:
write_json(0)
write_json(1)