In [1]:
spark

In [79]:
import pandas as pd
import numpy as np
from numpy import random
import pyspark.pandas as ps
import os
import json
import itertools
from operator import itemgetter
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn

In [74]:
# 設定路徑，一次讀取5個CSV檔案
path = ["/home/hadoop/work_1/data/a_lvr_land_a.csv",
        "/home/hadoop/work_1/data/b_lvr_land_a.csv",
        "/home/hadoop/work_1/data/e_lvr_land_a.csv",
        "/home/hadoop/work_1/data/f_lvr_land_a.csv",
        "/home/hadoop/work_1/data/h_lvr_land_a.csv"]

# 讀取檔案，印出Schema
df = spark.read.options(header=True).csv(path)
df.printSchema()

root
 |-- district: string (nullable = true)
 |-- date: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- style: string (nullable = true)
 |-- use: string (nullable = true)
 |-- city: string (nullable = true)



In [75]:
# 確認檔案的row & column
print((df.count(), len(df.columns)))

(41021, 6)


In [76]:
# 確認前10比檔案
df.show(10)

+--------+----------+-----+--------------------------+--------------+------+
|district|      date|floor|                     style|           use|  city|
+--------+----------+-----+--------------------------+--------------+------+
|  板橋區|2018-11-17|    9|    華廈(10層含以下有電梯)|        住家用|新北市|
|  板橋區|2018-12-14|   26|住宅大樓(11層含以上有電梯)|        住家用|新北市|
|  土城區|2018-12-24|   18|           套房(1房1廳1衛)|        住家用|新北市|
|  土城區|2019-01-11|   12|住宅大樓(11層含以上有電梯)|        住家用|新北市|
|  板橋區|2019-01-11|    4|     公寓(5樓含以下無電梯)|        住家用|新北市|
|  板橋區|2018-06-15|    4|     公寓(5樓含以下無電梯)|        住家用|新北市|
|  板橋區|2018-12-15|   40|住宅大樓(11層含以上有電梯)|        住家用|新北市|
|  板橋區|2016-08-13|   19|住宅大樓(11層含以上有電梯)|見其他登記事項|新北市|
|  板橋區|2016-07-28|   19|住宅大樓(11層含以上有電梯)|見其他登記事項|新北市|
|  板橋區|2016-07-14|   19|住宅大樓(11層含以上有電梯)|見其他登記事項|新北市|
+--------+----------+-----+--------------------------+--------------+------+
only showing top 10 rows



In [77]:
# 選取條件並轉回Pandas DataFrame
df_pandas = df.filter((df['use'] == '住家用') & (df['style'].like('%住宅大樓%')) & (df['floor'] > 13)).orderBy('date', ascending=False).toPandas()

In [78]:
# DataFrame to JSON
df_json = df_pandas.to_json(orient="split")
parsed = json.loads(df_json)

In [20]:
# 修改JSON的格式
result = []
for i in parsed['data']:
    result.append({"city":i[5],"time_slots":[{"date":i[1],"events":[{"district":i[0],"style":i[3]}]}]})

print(result)

[{'city': '新北市', 'time_slots': [{'date': '2019-05-01', 'events': [{'district': '淡水區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '高雄市', 'time_slots': [{'date': '2019-04-30', 'events': [{'district': '鼓山區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '高雄市', 'time_slots': [{'date': '2019-04-30', 'events': [{'district': '左營區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '高雄市', 'time_slots': [{'date': '2019-04-30', 'events': [{'district': '三民區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '新北市', 'time_slots': [{'date': '2019-04-29', 'events': [{'district': '汐止區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '高雄市', 'time_slots': [{'date': '2019-04-29', 'events': [{'district': '鼓山區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '高雄市', 'time_slots': [{'date': '2019-04-29', 'events': [{'district': '岡山區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '桃園市', 'time_slots': [{'date': '2019-04-29', 'events': [{'district': '中壢區', 'style': '住宅大樓(11層含以上有電梯)'}]}]}, {'city': '新北市', 'time_slots': [{'date': '2019-04-26', '

In [None]:
# random 兩組數字
def subset(alist, idxs):
    sub_list = []
    for idx in idxs:
        sub_list.append(alist[idx])
    return sub_list

def split_list(alist, group_num=2, shuffle=True, retain_left=False):
    index = list(range(len(alist)))
    
    # 打亂列表
    if shuffle:
        random.shuffle(index)
    
    # 每個子列表含有的數量
    elem_num = len(alist) // group_num
    sub_lists = {}    
    
    # 取出每個子列表，存入{}
    for idx in range(group_num):
        start, end = idx*elem_num, (idx+1)*elem_num
        sub_lists['set'+str(idx)] = subset(alist, index[start:end])
    
    # 將列表中剩餘的分為一組
    if retain_left and group_num * elem_num != len(index):
        sub_lists['set'+str(idx+1)] = subset(alist, index[end:])
        
    return sub_lists

In [52]:
# 將結果分為兩組
random_1 = split_list(range(6484), group_num=2, retain_left=False)['set0']
random_2 = split_list(range(6484), group_num=2, retain_left=False)['set1']

In [61]:
# 將random結果排序
random_1 = sorted(random_1)
random_2 = sorted(random_2)

In [65]:
result_1 = []
result_2 = []
for i in random_1:
    result_1.append(result[i])
    
for j in random_2:
    result_2.append(result[j])

In [66]:
json_part1 = json.dumps(result_1, ensure_ascii=False)
json_part2 = json.dumps(result_2, ensure_ascii=False)

In [48]:
# 輸出json檔案
with open("result-part1.json", "w") as f:
    json.dump(json_part1, f, ensure_ascii=False)
with open("result-part2.json", "w") as f:
    json.dump(json_part2, f, ensure_ascii=False)

---