In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode
from pyspark.sql import SQLContext
import pyspark.sql.functions as sqlf
from pyspark.sql.functions import col
from pyspark.sql.functions import collect_list, struct, sort_array, broadcast
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import *
import pyspark.pandas as pd
import geopandas as gp
import numpy as np
from pyspark.sql import functions as F


In [4]:
# prevent string show as binary byte
#!spark-shell --conf spark.sql.parquet.binaryAsString=true

spark.conf.set("spark.sql.parquet.binaryAsString","true")

In [5]:
%%time
# load the node parquet file into dataframe
parquet_path = ['/greater-london-latest.osm.pbf.node.parquet', '/centro-latest.osm.pbf.node.parquet', \
                '/ile-de-france-latest.osm.pbf.node.parquet', '/ontario-latest.osm.pbf.node.parquet']
london_node_df = sqlContext.read.parquet(parquet_path[0])
rome_node_df = sqlContext.read.parquet(parquet_path[1])
paris_node_df = sqlContext.read.parquet(parquet_path[2])
toronto_node_df = sqlContext.read.parquet(parquet_path[3])

                                                                                

CPU times: user 8.61 ms, sys: 0 ns, total: 8.61 ms
Wall time: 4.81 s


<h3>Part.1 Test How Partition Impact Large Scale DF Performance<h3>

In [6]:
# test how partition could impact performance in large scale data
london_node_df_test = london_node_df.withColumn("partition_id", lit(1))
rome_node_df_test = rome_node_df.withColumn("partition_id", lit(2))
paris_node_df_test = paris_node_df.withColumn("partition_id", lit(3))
toronto_node_df_test = toronto_node_df.withColumn("partition_id", lit(4))

In [34]:
# as file size goes up, the partitions goes up
print("london: ", london_node_df.rdd.getNumPartitions()) 
print("rome: ", rome_node_df.rdd.getNumPartitions()) 
print("paris: ", paris_node_df.rdd.getNumPartitions()) 
print("toronto: ", toronto_node_df.rdd.getNumPartitions()) 
london_node_df.groupBy(F.spark_partition_id()).count().show()
rome_node_df.groupBy(F.spark_partition_id()).count().show()
paris_node_df.groupBy(F.spark_partition_id()).count().show()
toronto_node_df.groupBy(F.spark_partition_id()).count().show()

london:  2
rome:  7
paris:  5
toronto:  17
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0|5740100|
|                   1|3426086|
+--------------------+-------+



                                                                                

+--------------------+--------+
|SPARK_PARTITION_ID()|   count|
+--------------------+--------+
|                   0| 5950100|
|                   1| 5200100|
|                   2| 6130100|
|                   3| 6420100|
|                   4|11374504|
|                   5| 6130319|
+--------------------+--------+



                                                                                

+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0|6000100|
|                   1|6140100|
|                   2|5720100|
|                   3|5980100|
|                   4|4887681|
+--------------------+-------+





+--------------------+--------+
|SPARK_PARTITION_ID()|   count|
+--------------------+--------+
|                   0| 6010100|
|                   1| 4380100|
|                   2| 6000100|
|                   3|11240200|
|                   4| 6550100|
|                   5| 6670100|
|                   6| 6540100|
|                   7| 6670100|
|                   8| 6680100|
|                   9| 6530100|
|                  10| 6540100|
|                  11| 4058022|
|                  12| 6270100|
|                  13|13190200|
|                  14| 6520100|
|                  15| 5760100|
|                  16| 2550609|
+--------------------+--------+



                                                                                

In [35]:
# combine the original partition by city and df
combined_node_df = london_node_df.union(rome_node_df).union(paris_node_df).union(toronto_node_df)
combined_node_df.show(5)
print("combined partitions: ", combined_node_df.rdd.getNumPartitions()) 
combined_node_df.groupBy(F.spark_partition_id()).count().show()

+-----+-------+-------------+---------+---+--------+--------------------+------------------+--------------------+
|   id|version|    timestamp|changeset|uid|user_sid|                tags|          latitude|           longitude|
+-----+-------+-------------+---------+---+--------+--------------------+------------------+--------------------+
|78112|      3|1539614984000|        0|  0|        |                  []|51.526976000000005|-0.14579240000000002|
|99878|      9|1607700514000|        0|  0|        |[{access, permiss...| 51.52435800000001|          -0.1529847|
|99879|      5|1596536339000|        0|  0|        |                  []|        51.5248246|          -0.1532934|
|99880|      6|1635619078000|        0|  0|        |[{crossing, unmar...|        51.5250847|          -0.1535802|
|99882|      1|1117060261000|        0|  0|        |                  []|51.525650000000006|          -0.1541197|
+-----+-------+-------------+---------+---+--------+--------------------+---------------



+--------------------+--------+
|SPARK_PARTITION_ID()|   count|
+--------------------+--------+
|                   0| 5740100|
|                   1| 3426086|
|                   2| 5950100|
|                   3| 5200100|
|                   4| 6130100|
|                   5| 6420100|
|                   6|11374504|
|                   7| 6130319|
|                   9| 6000100|
|                  10| 6140100|
|                  11| 5720100|
|                  12| 5980100|
|                  13| 4887681|
|                  14| 6010100|
|                  15| 4380100|
|                  16| 6000100|
|                  17|11240200|
|                  18| 6550100|
|                  19| 6670100|
|                  20| 6540100|
+--------------------+--------+
only showing top 20 rows



                                                                                

In [37]:
spark.conf.set("spark.sql.adaptive.enabled",False)

In [38]:
# Test repartition evenly, because repartition store in memory, need to reload from HDFS
combined_node_df_test = london_node_df_test.union(rome_node_df_test).union(paris_node_df_test).union(toronto_node_df_test)
combined_node_df_test = combined_node_df_test.repartition(31)
combined_node_df_test.write.mode("overwrite").parquet("/combined_filtered_parquet_test_large")
combined_node_df_test = sqlContext.read.parquet("/combined_filtered_parquet_test_large")
print("combined repartitioned: ", combined_node_df_test.rdd.getNumPartitions()) 
combined_node_df_test.groupBy(F.spark_partition_id()).count().show()

                                                                                

combined repartitioned:  47


                                                                                

+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                  31|3239144|
|                  34|3239142|
|                  28|4550100|
|                  26|4550100|
|                  27|4550100|
|                  44|3239146|
|                  12|4550100|
|                  22|4550100|
|                   1|4550100|
|                  13|4550100|
|                   6|4550100|
|                  16|4550100|
|                   3|4550100|
|                  20|4550100|
|                  40|3239143|
|                   5|4550100|
|                  19|4550100|
|                  41|3239144|
|                  15|4550100|
|                  43|3239141|
+--------------------+-------+
only showing top 20 rows



In [39]:
# to limit the city range by coordinate pairs from  {cityname: [lon, lat, lon, lat]} which is pair of left bottom and right top boundaries
# also serves to test how partition perform under same task
city_tag = {'london': [-0.483,51.385,0.233,51.609], 'toronto': [-79.69,43.602,-79.101,43.847], 'paris': [2.062,48.725,2.549,48.967], 'rome': [12.212,41.785,12.702,41.997]}

# if the location is in the city bounderis, then return the city name
def label_city(lat, lon):
    for k,v in city_tag.items():
        if lat > v[1] and lat < v[3] and lon < v[2] and lon > v[0]: 
            return k
    return ''

city_udf = udf(label_city, StringType())
city_name = ['london', 'toronto', 'paris', 'rome']

In [40]:
%%time
# filter each location by given bounderies and give a city label on repartitioned combined df
partition_test1 = combined_node_df_test.withColumn('City', city_udf('latitude', 'longitude'))
partition_test1 = partition_test1.filter(partition_test1.City.isin(city_name))
partition_test1.show(1)

[Stage 75:>                                                         (0 + 1) / 1]

+----------+-------+-------------+---------+---+--------+----+----------+---------+------------+------+
|        id|version|    timestamp|changeset|uid|user_sid|tags|  latitude|longitude|partition_id|  City|
+----------+-------+-------------+---------+---+--------+----+----------+---------+------------+------+
|8365819648|      1|1611709599000|        0|  0|        |  []|51.5148706|0.1952346|           1|london|
+----------+-------+-------------+---------+---+--------+----+----------+---------+------------+------+
only showing top 1 row

CPU times: user 10.4 ms, sys: 365 µs, total: 10.7 ms
Wall time: 1.14 s


                                                                                

In [41]:
%%time
# filter each location by given bounderies and give a city label on orinial combined df
partition_test2 = combined_node_df.withColumn('City', city_udf('latitude', 'longitude'))
partition_test2 = partition_test2.filter(partition_test2.City.isin(city_name))
partition_test2.show(1)

+-----+-------+-------------+---------+---+--------+----+------------------+--------------------+------+
|   id|version|    timestamp|changeset|uid|user_sid|tags|          latitude|           longitude|  City|
+-----+-------+-------------+---------+---+--------+----+------------------+--------------------+------+
|78112|      3|1539614984000|        0|  0|        |  []|51.526976000000005|-0.14579240000000002|london|
+-----+-------+-------------+---------+---+--------+----+------------------+--------------------+------+
only showing top 1 row

CPU times: user 0 ns, sys: 4.78 ms, total: 4.78 ms
Wall time: 794 ms


<h3>Part.2 Preprocessing The Data<h3>

In [6]:
# check the scale of data
combined_node_df.printSchema()
n = combined_node_df.count()
print("all city: ", n)

root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- changeset: long (nullable = true)
 |-- uid: integer (nullable = true)
 |-- user_sid: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)





all city:  191259821


                                                                                

In [7]:
%%time
# filter out the location where has tags of tourism
combined_node_df_new = combined_node_df.filter(array_contains(combined_node_df.tags['key'], 'tourism'))
combined_node_df_new.show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------+-------+-------------+---------+---+--------+--------------------+------------------+--------------------+
|      id|version|    timestamp|changeset|uid|user_sid|                tags|          latitude|           longitude|
+--------+-------+-------------+---------+---+--------+--------------------+------------------+--------------------+
|  388826|      4|1597920001000|        0|  0|        |[{board_type, wil...|51.530421000000004|          -0.1558246|
|16960853|     10|1611135318000|        0|  0|        |[{amenity, founta...|51.409926500000005|          -0.3362121|
|23723987|      5|1603822099000|        0|  0|        |[{information, gu...|51.636484100000004|           -0.151569|
|23724048|      5|1603822099000|        0|  0|        |[{information, ma...|        51.6362223|-0.15106850000000002|
|25507023|     14|1686861832000|        0|  0|        |[{alt_name, Spenc...|51.504848900000006|           -0.126149|
|25508657|     11|1693655599000|        0|  0|        |[{artwork

                                                                                

In [8]:
# number of data that is tourism related
n = combined_node_df_new.count()
print("tourism spot: ", n)



tourism spot:  55382


                                                                                

In [9]:
# reduce the column
combined_node_df_filtered = combined_node_df_new.select(col("id"), col("latitude"), col("longitude"), col("tags"))
combined_node_df_filtered.show(20, False)

+--------+------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
# extract the name and tourism tag out of other tags.
def get_info(tags_string):
    info = ["", ""]
    for k, v in tags_string:
        if k == 'name':
            info[0] = v
        if k == 'tourism':
            info[1] = v
    return info

In [11]:
%%time

# get the name and tourism info out of tags
udf_get_info = udf(lambda x: get_info(x), ArrayType(StringType(), True))
combined_node_df_filtered2 = combined_node_df_filtered.withColumn("name", udf_get_info("tags")[0]).withColumn("tourism_type", udf_get_info("tags")[1]).drop("tags")
combined_node_df_filtered2.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------+------------------+--------------------+--------------------+------------+
|      id|          latitude|           longitude|                name|tourism_type|
+--------+------------------+--------------------+--------------------+------------+
|  388826|51.530421000000004|          -0.1558246|                    | information|
|16960853|51.409926500000005|          -0.3362121|      Diana Fountain|     artwork|
|23723987|51.636484100000004|           -0.151569|                    | information|
|23724048|        51.6362223|-0.15106850000000002|       Oak Hill Park| information|
|25507023|51.504848900000006|           -0.126149|     Spencer Compton|     artwork|
|25508657|51.522035100000004|-0.12267080000000001|                    |     artwork|
|25508658|        51.5217357|          -0.1224809|                    |     artwork|
|25947730|        51.4975442|          -0.4558534|             Novotel|       hotel|
|26559743|        51.3952047|          -0.1452455|               

                                                                                

In [12]:
# refine the quality of tourism spot, since many are low quality tourism tag
tourism_list = ["aquarium", "attraction", "gallery", "cathedral", "castle", "historical building", "monument", "museum", "theme_park", "zoo", "viewpoint"]
combined_node_df_filtered3 = combined_node_df_filtered2.filter(combined_node_df_filtered2['tourism_type'].isin(tourism_list))
combined_node_df_filtered3.show()
n = combined_node_df_filtered3.count()
print("filtered tourism spots", n)

                                                                                

+---------+------------------+--------------------+--------------------+------------+
|       id|          latitude|           longitude|                name|tourism_type|
+---------+------------------+--------------------+--------------------+------------+
| 26559743|        51.3952047|          -0.1452455|                    |  attraction|
| 27408513|        51.5108862|           -0.307107|Pitzhanger Manor ...|      museum|
| 29269886|        51.5228224|-0.15477680000000002|     Madame Tussauds|      museum|
| 31452361|        51.4697472|-0.07007730000000001| The Sassoon Gallery|     gallery|
| 37124475|        51.3650663|-0.16684290000000002|Carshalton Water ...|      museum|
|243762168|        51.4765054|          -0.0046057|                    |   viewpoint|
|248617998|        51.5010897|          -0.0404578|          Stave Hill|   viewpoint|
|252602371|51.506387800000006|-0.08826410000000001|London Bridge Exp...|  attraction|
|253801699|        51.5777464|           -0.482696|   



filtered tourism spots 8435


                                                                                

In [13]:
# to limit the city range by coordinate pairs from  {cityname: [lon, lat, lon, lat]} which is pair of left bottom and right top boundaries
city_tag = {'london': [-0.483,51.385,0.233,51.609], 'toronto': [-79.69,43.602,-79.101,43.847], 'paris': [2.062,48.725,2.549,48.967], 'rome': [12.212,41.785,12.702,41.997]}

# if the location is in the city bounderis, then return the city name
def label_city(lat, lon):
    for k,v in city_tag.items():
        if lat > v[1] and lat < v[3] and lon < v[2] and lon > v[0]: 
            return k
    return ''

city_udf = udf(label_city, StringType())

In [14]:
# give each location that are within the given bounderies a city label
city_name = ['london', 'toronto', 'paris', 'rome']
df = combined_node_df_filtered3.withColumn('City', city_udf('latitude', 'longitude'))
df = df.filter(df.City.isin(city_name))
df.show()

[Stage 18:>                                                         (0 + 1) / 1]

+---------+------------------+--------------------+--------------------+------------+------+
|       id|          latitude|           longitude|                name|tourism_type|  City|
+---------+------------------+--------------------+--------------------+------------+------+
| 26559743|        51.3952047|          -0.1452455|                    |  attraction|london|
| 27408513|        51.5108862|           -0.307107|Pitzhanger Manor ...|      museum|london|
| 29269886|        51.5228224|-0.15477680000000002|     Madame Tussauds|      museum|london|
| 31452361|        51.4697472|-0.07007730000000001| The Sassoon Gallery|     gallery|london|
|243762168|        51.4765054|          -0.0046057|                    |   viewpoint|london|
|248617998|        51.5010897|          -0.0404578|          Stave Hill|   viewpoint|london|
|252602371|51.506387800000006|-0.08826410000000001|London Bridge Exp...|  attraction|london|
|253801699|        51.5777464|           -0.482696|                   

                                                                                

In [16]:
# Test partition by closer location could accelerate the clustering speed during ML
# check https://stackoverflow.com/questions/40416357/difference-between-df-repartition-and-dataframewriter-partitionby/42780452#42780452
df.write.partitionBy("City").mode("overwrite").parquet("/combined_filtered_parquet_bycity")

                                                                                

In [29]:
# Test partition by closer location could accelerate the clustering speed during ML
df.write.mode("overwrite").parquet("/combined_filtered_parquet_random")

                                                                                