In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
import folium
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext , functions
from folium import plugins


In [3]:
spark = SparkSession.builder \
.appName('ReadFromParquet') \
.master('local[6]') \
.getOrCreate()

In [4]:
spark

In [5]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Data Feature

## osmWay

In [6]:
sqlContext.setConf("spark.sql.parquet.binaryAsString","true")
osmWay = sqlContext.read.parquet("../data/20190531-hungary.osm.pbf.way.parquet")
osmWay = osmWay.select('id','tags','nodes')
osmWay.createOrReplaceTempView("osmWay")




In [7]:
osmWay.printSchema()

root
 |-- id: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- nodes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- index: integer (nullable = true)
 |    |    |-- nodeId: long (nullable = true)



In [8]:
osmWay.head()

Row(id=3175810, tags=[Row(key='highway', value='residential'), Row(key='name', value='Honvéd utca')], nodes=[Row(index=0, nodeId=15231786), Row(index=1, nodeId=1310148452), Row(index=2, nodeId=1310021025), Row(index=3, nodeId=1310020985), Row(index=4, nodeId=5766745506), Row(index=5, nodeId=1310020988), Row(index=6, nodeId=1310021030), Row(index=7, nodeId=1310021038), Row(index=8, nodeId=1310021044), Row(index=9, nodeId=1309949162), Row(index=10, nodeId=1309949099), Row(index=11, nodeId=1282709263), Row(index=12, nodeId=5766742402), Row(index=13, nodeId=1234760030)])

In [9]:
osmWay.show(5)

+-------+--------------------+--------------------+
|     id|                tags|               nodes|
+-------+--------------------+--------------------+
|3175810|[[highway, reside...|[[0, 15231786], [...|
|3175943|[[highway, reside...|[[0, 15234255], [...|
|3175983|[[highway, reside...|[[0, 15232076], [...|
|3192356|[[highway, second...|[[0, 1259548666],...|
|3212111|[[highway, second...|[[0, 15475952], [...|
+-------+--------------------+--------------------+
only showing top 5 rows



In [10]:
osmWay.count()

2509434

## osmNode

In [11]:
osmNode = sqlContext.read.parquet("../data/20190531-hungary.osm.pbf.node.parquet")
osmNode = osmNode.select('tags', 'latitude','longitude')
osmNode.createOrReplaceTempView("osmNode")

In [12]:
osmNode.printSchema()

root
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [13]:
osmNode.head()

Row(tags=[], latitude=47.5135549, longitude=19.047102000000002)

In [14]:
osmNode.show(5)

+----+------------------+------------------+
|tags|          latitude|         longitude|
+----+------------------+------------------+
|  []|        47.5135549|19.047102000000002|
|  []|        47.5135409|19.058013900000002|
|  []|        47.5146097|19.058831700000002|
|  []|        47.5166629|        19.0613926|
|  []|47.514618000000006|        19.0435418|
+----+------------------+------------------+
only showing top 5 rows



In [15]:
osmNode.count()

17916477

In [16]:
sqlContext.sql("SELECT tags,latitude, longitude FROM osmNode \
WHERE tags[0].key='amenity' AND tags[0].value='taxi'").show()

+--------------------+------------------+------------------+
|                tags|          latitude|         longitude|
+--------------------+------------------+------------------+
|[[amenity, taxi],...|        47.5470381|         19.028375|
|   [[amenity, taxi]]|        47.4982064|        19.0704767|
|   [[amenity, taxi]]|47.496360800000005|19.070999500000003|
|   [[amenity, taxi]]|47.507562500000006|        19.0729668|
|   [[amenity, taxi]]|47.499501300000006|        19.0646431|
|   [[amenity, taxi]]|47.782211200000006|        19.1324766|
|   [[amenity, taxi]]|        47.5420646|        19.1225384|
|   [[amenity, taxi]]|        48.2466887|        20.6166425|
|   [[amenity, taxi]]|46.353023300000004|        17.7953511|
|   [[amenity, taxi]]|46.075580200000005|        18.2048794|
|[[amenity, taxi],...|47.511096200000004|19.080156300000002|
|   [[amenity, taxi]]|        47.9000818|        20.3767527|
|   [[amenity, taxi]]|46.370169600000004|18.149269800000003|
|   [[amenity, taxi]]|47

## osmRelation

In [17]:
osmRelation = sqlContext.read.parquet("../data/20190531-hungary.osm.pbf.relation.parquet")
osmRelation = osmRelation.select('id','tags','members')
osmRelation.createOrReplaceTempView("osmRelation")

In [18]:
osmRelation.printSchema()

root
 |-- id: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- role: string (nullable = true)
 |    |    |-- type: string (nullable = true)



In [19]:
osmRelation.head()

Row(id=11772, tags=[Row(key='addr:city', value='Budapest'), Row(key='addr:housenumber', value='9'), Row(key='addr:postcode', value='1051'), Row(key='addr:street', value='Széchenyi István tér'), Row(key='building', value='yes'), Row(key='club', value='cinema'), Row(key='email', value='info@titkarsag.mta.hu'), Row(key='name', value='Magyar Tudományos Akadémia'), Row(key='name:de', value='Ungarische Akademie der Wissenschaften'), Row(key='name:en', value='Hungarian Academy of Sciences'), Row(key='office', value='government'), Row(key='phone', value='+36 1 4116100'), Row(key='short_name', value='MTA'), Row(key='tourism', value='attraction'), Row(key='type', value='multipolygon'), Row(key='website', value='https://mta.hu/'), Row(key='wikidata', value='Q265058'), Row(key='wikipedia', value='hu:Magyar Tudományos Akadémia')], members=[Row(id=24026306, role='inner', type='Way'), Row(id=24321932, role='outer', type='Way'), Row(id=241012663, role='outer', type='Way'), Row(id=241012664, role='oute

In [20]:
osmRelation.show(5)

+-----+--------------------+--------------------+
|   id|                tags|             members|
+-----+--------------------+--------------------+
|11772|[[addr:city, Buda...|[[24026306, inner...|
|11832|[[building, yes],...|[[24035233, outer...|
|11898|[[building, offic...|[[24345565, inner...|
|12697|[[addr:city, Buda...|[[24260435, outer...|
|12939|[[addr:conscripti...|[[24320909, outer...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [21]:
osmRelation.count()

12983

# Data Explore

## Distribution of Different Type of Highway

In [22]:
sqlContext.sql("SELECT tags[0].value FROM osmWay WHERE tags[0].key='amenity'").distinct().count()

115

In [23]:
sqlContext.sql("SELECT tags[0].value FROM osmWay WHERE tags[0].key='amenity'").distinct().collect()

[Row(tags[0].value='bench'),
 Row(tags[0].value='marketplace'),
 Row(tags[0].value='arts_centre'),
 Row(tags[0].value='porters_cubicle'),
 Row(tags[0].value='bicycle_parking'),
 Row(tags[0].value='college'),
 Row(tags[0].value='veterinary'),
 Row(tags[0].value='university'),
 Row(tags[0].value='monastery'),
 Row(tags[0].value='nightclub'),
 Row(tags[0].value='animal_breeding'),
 Row(tags[0].value='boat_storage'),
 Row(tags[0].value='smoking_area'),
 Row(tags[0].value='crypt'),
 Row(tags[0].value='shop'),
 Row(tags[0].value='funeral_home'),
 Row(tags[0].value='car_rental'),
 Row(tags[0].value='courthouse'),
 Row(tags[0].value='prison'),
 Row(tags[0].value='library'),
 Row(tags[0].value='game_feeding'),
 Row(tags[0].value='shelter'),
 Row(tags[0].value='waste_transfer_station'),
 Row(tags[0].value='dentist'),
 Row(tags[0].value='kitchen'),
 Row(tags[0].value='drinking_water'),
 Row(tags[0].value='social_centre'),
 Row(tags[0].value='mortuary'),
 Row(tags[0].value='bar'),
 Row(tags[0].val

## Number of Taxi Stop and Location Each of Them

In [24]:
taxiLocation = sqlContext.sql("SELECT latitude,longitude FROM osmNode \
WHERE tags[0].key='amenity' AND tags[0].value='taxi'").show()

+------------------+------------------+
|          latitude|         longitude|
+------------------+------------------+
|        47.5470381|         19.028375|
|        47.4982064|        19.0704767|
|47.496360800000005|19.070999500000003|
|47.507562500000006|        19.0729668|
|47.499501300000006|        19.0646431|
|47.782211200000006|        19.1324766|
|        47.5420646|        19.1225384|
|        48.2466887|        20.6166425|
|46.353023300000004|        17.7953511|
|46.075580200000005|        18.2048794|
|47.511096200000004|19.080156300000002|
|        47.9000818|        20.3767527|
|46.370169600000004|18.149269800000003|
|47.681249900000005|17.636871000000003|
|        47.4943144|        19.0383152|
|        47.5058865|19.038680300000003|
|        46.1891935|        18.2660945|
|        47.1891838|         18.409127|
|48.100897100000005|20.730863000000003|
|        45.9904286|        18.6876803|
+------------------+------------------+
only showing top 20 rows



In [25]:
taxiLocation = sqlContext.sql("SELECT latitude,longitude FROM osmNode \
WHERE tags[0].key='amenity' AND tags[0].value='taxi'").toPandas()

In [26]:
taxidf = pd.DataFrame(data=taxiLocation)
taxidf

Unnamed: 0,latitude,longitude
0,47.547038,19.028375
1,47.498206,19.070477
2,47.496361,19.071000
3,47.507563,19.072967
4,47.499501,19.064643
5,47.782211,19.132477
6,47.542065,19.122538
7,48.246689,20.616643
8,46.353023,17.795351
9,46.075580,18.204879


In [27]:
taxiMap = folium.Map([taxiLocation.latitude[0], taxiLocation.longitude[0]], zoom_start=7)

In [28]:
for index, row in taxidf.iterrows():
    folium.CircleMarker([row['latitude'], row['longitude']],
                        radius=15,
                        popup=row[['latitude','longitude']],
                        fill_color="#e4d93d", # divvy color
                       ).add_to(taxiMap)
taxiMap