In [1]:
import os
import requests
from pyspark.sql import SparkSession, functions as F

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.parquet('../../数据/ticket_cleaned.parquet')
df.show()
df.count()

+--------+-------------------+----------------+----------+--------+
|班次代码|           发车时间|      乘车站名称|到达站名称|座位类型|
+--------+-------------------+----------------+----------+--------+
|  KS1057|2020-05-07 09:00:00|苏州北广场汽车站|  常熟南站|       1|
|  PT2056|2020-05-07 09:15:00|          浏河站|美兰湖刷卡|       1|
|  KS3197|2020-05-07 10:50:00|        苏州南站|  常熟南站|       1|
|  GT1001|2020-05-07 17:40:00|          沙溪站|    太仓站|       1|
|  GT1001|2020-05-07 17:40:00|          沙溪站|    太仓站|       1|
|  KT3117|2020-05-07 09:30:00|          朝阳站|  嘉定刷卡|       1|
|  KS3414|2020-05-07 09:30:00|        苏州南站|    连云港|       1|
|  KC1241|2020-05-07 09:30:00|        常熟南站|苏州火车站|       1|
|  KC1241|2020-05-07 09:30:00|        常熟南站|苏州火车站|       1|
|  KC1241|2020-05-07 09:30:00|        常熟南站|苏州火车站|       1|
|  KC1717|2020-05-07 09:45:00|        常熟南站|  上海总站|       1|
|  KK3070|2020-05-07 13:50:00|        昆山南站|      扬州|       1|
|  ZS7530|2020-05-07 10:55:00|苏州北广场汽车站|  昆山周庄|       1|
|  PK0165|2020-05-07 11:01:00|        昆山南

67531286

In [4]:
stations = df.select('乘车站名称').union(df.select('到达站名称')).withColumnRenamed('乘车站名称', '车站名称')
stations = stations.distinct()
stations.show()
stations.count()

+------------------+
|          车站名称|
+------------------+
|    仪陇马鞍汽车站|
|        剑阁(下寺)|
|      太原汽车总站|
|象州汽车客运服务站|
|    松原公路客运站|
|              黄石|
|        芷江汽车站|
|        澄城汽车站|
|        泰宁汽车站|
|            锦州站|
|    和田东郊客运站|
|          万达广场|
|  建州汽车城公交站|
|白城市中心客运总站|
|              阜阳|
|              官桥|
|              铜陵|
|          常州南站|
|            岳阳村|
|            十里牌|
+------------------+
only showing top 20 rows



51729

In [5]:
def get_province(address):
    # 提取省份
    parameters = {'address': address, 'key': '7a73a533b706fbe049d6314987d4ecb9'}
    base = 'http://restapi.amap.com/v3/geocode/geo'
    response = requests.get(base, parameters)
    answer = response.json()
    try:
        if not answer['geocodes'] or not answer['geocodes'][0]['province']:
            province = 'N'
        else:
            province = answer['geocodes'][0]['province']
    except KeyError:
        province = 'N'
    return province

get_province = F.udf(get_province, 'string')

In [6]:
sections = {
    '华北': ['北京市', '天津市', '河北省', '山西省', '内蒙古自治区'],
    '东北': ['黑龙江省', '吉林省', '辽宁省'],
    '华东': ['上海市', '江苏省', '浙江省', '安徽省', '江西省', '山东省', '福建省', '台湾省'],
    '华中': ['河南省', '湖北省', '湖南省'],
    '华南': ['广东省', '广西壮族自治区', '海南省', '香港特别行政区', '澳门特别行政区'],
    '西南': ['重庆市', '四川省', '贵州省', '云南省', '西藏自治区'],
    '西北': ['陕西省', '甘肃省', '青海省', '宁夏回族自治区', '新疆维吾尔自治区']
}

inv_sections = {province: section for section, provinces in sections.items() for province in provinces}

def get_section(province):
    # 提取行政地理分区
    try:
        return inv_sections[province]
    except KeyError:
        return 'N'

get_section = F.udf(get_section, 'string')

In [7]:
def get_location(address):
    # 提取经纬度
    parameters = {'address': address, 'key': '7a73a533b706fbe049d6314987d4ecb9'}
    base = 'http://restapi.amap.com/v3/geocode/geo'
    response = requests.get(base, parameters)
    answer = response.json()
    try:
        if not answer['geocodes'] or not answer['geocodes'][0]['location']:
            location = 'N'
        else:
            location = answer['geocodes'][0]['location']
    except KeyError:
        location = 'N'
    return location

get_location = F.udf(get_location, 'string')

In [8]:
def get_longitude(location):
    # 提取经度
    if location == 'N':
        return -1.0
    longitude = list(map(float, location.split(',')))[0]
    return longitude

def get_latitude(location):
    # 提取纬度
    if location == 'N':
        return -1.0
    latitude = list(map(float, location.split(',')))[1]
    return latitude

get_longitude = F.udf(get_longitude, 'float')
get_latitude = F.udf(get_latitude, 'float')

In [9]:
stations = stations.withColumn('车站省份', get_province('车站名称'))

In [10]:
stations = stations.withColumn('车站行政地理分区', get_section('车站省份'))

In [11]:
stations = stations.withColumn('车站经度', get_longitude(get_location('车站名称')))
stations = stations.withColumn('车站纬度', get_latitude(get_location('车站名称')))

In [12]:
stations.show(1000)

+--------------------+----------------+----------------+----------+---------+
|            车站名称|        车站省份|车站行政地理分区|  车站经度| 车站纬度|
+--------------------+----------------+----------------+----------+---------+
|      仪陇马鞍汽车站|          四川省|            西南|106.623764|31.466978|
|          剑阁(下寺)|          四川省|            西南|105.490776| 32.28276|
|        太原汽车总站|          山西省|            华北| 112.57815|37.859325|
|  象州汽车客运服务站|  广西壮族自治区|            华南|109.689545|23.961946|
|      松原公路客运站|          吉林省|            东北| 124.83036|45.132942|
|                黄石|          湖北省|            华中| 115.03852|30.199652|
|          芷江汽车站|          湖南省|            华中|109.691216|27.432411|
|          澄城汽车站|          陕西省|            西北| 109.93181|35.181355|
|          泰宁汽车站|          福建省|            华东| 117.18047| 26.89569|
|              锦州站|          辽宁省|            东北|  121.1409|41.124466|
|      和田东郊客运站|新疆维吾尔自治区|            西北|  79.94694| 37.10699|
|            万达广场|          福建省|            华东|118.55810

In [13]:
stations.write.parquet('../../数据/stations.parquet')

In [14]:
spark.stop()