In [1]:
import findspark
findspark.init()
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
node_schema = StructType([StructField("id",LongType(),True),
StructField("version",IntegerType(),True),
StructField("timestamp",LongType(),True),
StructField("changeset",LongType(),True),
StructField("uid",IntegerType(),True),
StructField("user_sid",BinaryType(),True),
StructField("tags",ArrayType(StructType([StructField("key",StringType(),True),StructField("value",StringType(),True)]),True),True),
StructField("latitude",DoubleType(),True),
StructField("longitude",DoubleType(),True)])

In [3]:
node_df = spark.read.schema(node_schema).parquet("/Users/mayankbaid/Downloads/hungary_osm/20190430-hungary.osm.pbf.node.parquet")
node_df.createOrReplaceTempView("nodes")

In [4]:
node_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- changeset: long (nullable = true)
 |-- uid: integer (nullable = true)
 |-- user_sid: binary (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [5]:
exploded_nodes = node_df.select("id",explode("tags").alias("exp_tags"))
exploded_nodes.show(truncate=False)

+-------+--------------------------+
|id     |exp_tags                  |
+-------+--------------------------+
|277309 |[highway, traffic_signals]|
|449380 |[name, Mörbisch am See]   |
|449380 |[traffic_sign, city_limit]|
|7916545|[created_by, JOSM]        |
|7916549|[created_by, JOSM]        |
|7916653|[created_by, JOSM]        |
|7916662|[crossing, uncontrolled]  |
|7916662|[crossing_ref, zebra]     |
|7916662|[highway, crossing]       |
|7916666|[created_by, JOSM]        |
|7916715|[created_by, JOSM]        |
|7916716|[created_by, JOSM]        |
|7916717|[created_by, JOSM]        |
|7918195|[created_by, JOSM]        |
|7918201|[created_by, JOSM]        |
|7918202|[created_by, JOSM]        |
|7918208|[created_by, JOSM]        |
|7918234|[created_by, JOSM]        |
|7918240|[created_by, JOSM]        |
|7918241|[created_by, JOSM]        |
+-------+--------------------------+
only showing top 20 rows



In [6]:
exploded_nodes.select("exp_tags.key").distinct().orderBy("key").show(exploded_nodes.count(),truncate=False)

+---------------------------------------------+
|key                                          |
+---------------------------------------------+
|Csolnok                                      |
|FIXME                                        |
|GPS                                          |
|HU:hu-go:milestone                           |
|HU:hu-go:milestone_1                         |
|HU:hu-go:road                                |
|HU:hu-go:road_1                              |
|ISO3166-1                                    |
|ISO3166-2                                    |
|Icn                                          |
|Label                                        |
|Leiras                                       |
|Letrehozta                                   |
|Letrehozva                                   |
|MCC                                          |
|MNC                                          |
|Megjegyzes                                   |
|Modositotta                            

In [7]:
atms = exploded_nodes.filter("exp_tags.key == 'atm'")

In [8]:
atms.count()

857

In [9]:
utilit = exploded_nodes.filter("exp_tags.key == 'utility'")

In [11]:
utilit.select("*").show(truncate=False)

+---------+---------------------+
|id       |exp_tags             |
+---------+---------------------+
|874150243|[utility, wastewater]|
+---------+---------------------+



In [12]:
bars = exploded_nodes.filter("exp_tags.key == 'bar'")
bars.count()

1

In [13]:
exploded_nodes.select("exp_tags.key").groupBy("key").count().orderBy("count").show(exploded_nodes.count(),truncate=False)

+---------------------------------------------+------+
|key                                          |count |
+---------------------------------------------+------+
|url:miserend.hu                              |1     |
|old:addr:housenumber                         |1     |
|men                                          |1     |
|name:haw                                     |1     |
|name:mt                                      |1     |
|computer:parts                               |1     |
|name:gn                                      |1     |
|source:emergency                             |1     |
|official_name:fr                             |1     |
|lunch:menu:subscription:description          |1     |
|name:alt                                     |1     |
|fuel:octane_92                               |1     |
|inscription:s                                |1     |
|sac_scale                                    |1     |
|crossing:activation                          |1     |
|name:dv  