In [1]:
# Prosessing libs
import pandas as pd
import os
import numpy as np
from fastparquet import ParquetFile
import glob

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as psf

# Visualisation libs
from matplotlib import pyplot as plt
import seaborn as sns
import plotly.express as px


In [2]:
# change memory allocation for pyarrow
# os.environ['PYARROW_MEMORY_POOL_MAX_CHUNKSIZE'] = '3000000000'
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "notebook"
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# os.environ['PYSPARK_PYTHON'] = sys.executable

## Get the path of all files

In [3]:
os.getcwd()

'c:\\Users\\milan\\OneDrive - MUNI\\VŠ\\PhD\\Zahraniční stáž\\Work\\HeiGIT_notebooks\\analysis\\ai-assisted-osm-mapping-stats\\notebooks-contributions'

In [4]:
os.chdir("..")


In [5]:
home_dir = os.getcwd()
parquet_dir = os.path.join(home_dir, r"oshdb-contributions-parquet-data-test")
parquet_dir

parquet_dir = r"E:\\Rafael-data\\type=way\\state=latest"


In [6]:
files = glob.glob(f'{parquet_dir}\\**\\*.parquet', recursive=True)
files


['E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_0.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_1.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_10.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_11.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_2.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_3.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_4.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_5.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_6.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_7.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_8.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2017\\data_9.parquet',
 'E:\\\\Rafael-data\\\\type=way\\\\state=latest\\year=2018\\data_0.parquet

In [7]:
# os.chdir("..")
# home_dir = os.getcwd()
# parquet_dir_node = os.path.join(home_dir, r"oshdb-contributions-parquet-data\type=node")
# parquet_dir = os.path.join(home_dir, r"oshdb-contributions-parquet-data")
# duckdb_dir = os.path.join(home_dir, r"notebooks-contributions\duckdb")
# test_dir = os.path.join(home_dir, r"oshdb-contributions-parquet-data\type=node\year=2023")


In [8]:
# files = glob.glob(f'{parquet_dir}\\**\\*.parquet', recursive=True)
# files


In [9]:
# files_location_lst = []

# for (path_dir, folder_names, file_names) in os.walk(parquet_dir_node):
#     for file_name in file_names:
#         if file_name.endswith(".parquet"):
#             file_location = os.path.join(path_dir, file_name)
#             files_location_lst.append(file_location)
# files_location_lst


## Load the all data with Apache PySpark

In [10]:
spark = SparkSession.builder.appName("ReadParquetFiles").getOrCreate()

df = spark.read.parquet(*files)

df.show()


+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+---------------+--------+----+-------+-----+----------+------+------------+------------+--------------+-------+--------------------+-------------+--------------+--------------------+------+----+
|contrib_id|valid_from|           valid_to|   osm_id|osm_version|changeset_id|changeset_timestamp|      hashtags|              editor|user_id|                tags|   tags_delta|primary_feature|building|road|is_area| area|area_delta|length|length_delta|contrib_type|country_iso_a3|country|            centroid|geometry_type|geometry_valid|            geometry| state|type|
+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+---------------+--------+----+-------+-----+----------+------+------------+----

In [11]:
df.count()

707464699

In [12]:
# spark.stop()

In [13]:
# df.describe().show()

In [14]:
# df.na.drop(how="any",subset=["building_area"]).show()

### Data transformation, Data types conversion

In [15]:
# Cast the 'changeset_timestamp' column to a LongType
df = df.withColumn('changeset_timestamp', psf.col(
    'changeset_timestamp').cast('long'))

# Use the 'changeset_timestamp' column to create a new 'datetime' column
df = df.withColumn('datetime', psf.from_unixtime(
    psf.col('changeset_timestamp')/1000, 'yyyy-MM-dd HH:mm:ss.SSSSSS'))

# Show the resulting DataFrame
df.show()


+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+---------------+--------+----+-------+-----+----------+------+------------+------------+--------------+-------+--------------------+-------------+--------------+--------------------+------+----+--------------------+
|contrib_id|valid_from|           valid_to|   osm_id|osm_version|changeset_id|changeset_timestamp|      hashtags|              editor|user_id|                tags|   tags_delta|primary_feature|building|road|is_area| area|area_delta|length|length_delta|contrib_type|country_iso_a3|country|            centroid|geometry_type|geometry_valid|            geometry| state|type|            datetime|
+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+---------------+--------+----+-------

In [16]:
# Use the 'changeset_timestamp' column to create a new 'datetime' column
df = df.withColumn('y_m', psf.from_unixtime(
    psf.col('changeset_timestamp')/1000, 'yyyy-MM'))

# Create a new column called 'year' containing the year from the 'datetime' column
df = df.withColumn('year', psf.year('datetime'))



# Show the resulting DataFrame
df.show()


+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+---------------+--------+----+-------+-----+----------+------+------------+------------+--------------+-------+--------------------+-------------+--------------+--------------------+------+----+--------------------+-------+----+
|contrib_id|valid_from|           valid_to|   osm_id|osm_version|changeset_id|changeset_timestamp|      hashtags|              editor|user_id|                tags|   tags_delta|primary_feature|building|road|is_area| area|area_delta|length|length_delta|contrib_type|country_iso_a3|country|            centroid|geometry_type|geometry_valid|            geometry| state|type|            datetime|    y_m|year|
+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+-----------

In [17]:
# Convert the 'tags' column to a string for filtering
df = df.withColumn("tags_str", df.tags.cast("string"))
df = df.withColumn("road_str", df.road.cast("string"))

In [18]:
str(df.columns)

"['contrib_id', 'valid_from', 'valid_to', 'osm_id', 'osm_version', 'changeset_id', 'changeset_timestamp', 'hashtags', 'editor', 'user_id', 'tags', 'tags_delta', 'primary_feature', 'building', 'road', 'is_area', 'area', 'area_delta', 'length', 'length_delta', 'contrib_type', 'country_iso_a3', 'country', 'centroid', 'geometry_type', 'geometry_valid', 'geometry', 'state', 'type', 'datetime', 'y_m', 'year', 'tags_str', 'road_str']"

In [19]:
df.count()

707464699

In [20]:
df.groupBy("road").count().show()

+----+---------+
|road|    count|
+----+---------+
|  -1|  3610999|
|null|614806132|
|   1| 24773460|
|   0| 64274108|
+----+---------+



In [21]:
df.groupBy("state").count().show()

+------+---------+
| state|    count|
+------+---------+
|latest|707464699|
+------+---------+



In [22]:
df = df.withColumn("hashtags_un", psf.concat_ws(",", psf.col("hashtags")))


In [23]:
df.show()

+----------+----------+-------------------+---------+-----------+------------+-------------------+--------------+--------------------+-------+--------------------+-------------+---------------+--------+----+-------+-----+----------+------+------------+------------+--------------+-------+--------------------+-------------+--------------+--------------------+------+----+--------------------+-------+----+--------------------+--------+------------+
|contrib_id|valid_from|           valid_to|   osm_id|osm_version|changeset_id|changeset_timestamp|      hashtags|              editor|user_id|                tags|   tags_delta|primary_feature|building|road|is_area| area|area_delta|length|length_delta|contrib_type|country_iso_a3|country|            centroid|geometry_type|geometry_valid|            geometry| state|type|            datetime|    y_m|year|            tags_str|road_str| hashtags_un|
+----------+----------+-------------------+---------+-----------+------------+-------------------+----

In [24]:
df.groupBy("building").count().show()

+--------+---------+
|building|    count|
+--------+---------+
|      -1|  3593851|
|    null|309961236|
|       1|278137282|
|       0|115772330|
+--------+---------+



In [25]:
df.columns

['contrib_id',
 'valid_from',
 'valid_to',
 'osm_id',
 'osm_version',
 'changeset_id',
 'changeset_timestamp',
 'hashtags',
 'editor',
 'user_id',
 'tags',
 'tags_delta',
 'primary_feature',
 'building',
 'road',
 'is_area',
 'area',
 'area_delta',
 'length',
 'length_delta',
 'contrib_type',
 'country_iso_a3',
 'country',
 'centroid',
 'geometry_type',
 'geometry_valid',
 'geometry',
 'state',
 'type',
 'datetime',
 'y_m',
 'year',
 'tags_str',
 'road_str',
 'hashtags_un']

In [26]:
df.count()

707464699

### Data wrangling – buildings
- select distinct osm_id
- remove all -1 and null values
- then use groupby (osm_id) and function last (time column)

In [27]:
df_blds = df.filter(df.building.contains(1) | df.building.contains(0))


In [28]:
df_blds = df_blds.groupBy("osm_id", "editor", "country",
                 "tags_str", "hashtags", "year").agg(psf.last("datetime").alias("datetime"))


In [29]:
# df_blds.show()

In [30]:
df_blds = df_blds.withColumn(
    "hashtags_un", psf.concat_ws(",", psf.col("hashtags")))


## AI building filter

### Filter 1
- column tag_str contains microsoft/BuildingFootprints or esri/Google_Africa_Buildings

In [31]:
df_blds = df_blds.filter(df.tags_str.contains("microsoft/BuildingFootprints") | df.tags_str.contains("esri/Google_Africa_Buildings"))


In [32]:
df_blds.groupBy("year").count().show()


+----+-------+
|year|  count|
+----+-------+
|2023|3272936|
|2022|7192211|
|2019|  10300|
|2020|2149154|
|2021|2964550|
+----+-------+



### Data wrangling – roads
- select distinct osm_id
- remove all -1
- then use groupby (osm_id) and function last (time column)

In [27]:
df_roads = df.filter(df.road.contains(1) | df.road.contains(0))

In [28]:
df_roads = df_roads.groupBy("osm_id", "editor", "length", "country",
                 "tags_str", "hashtags", "year").agg(psf.last("datetime").alias("datetime"))


In [29]:
df_roads.show()

+------+--------------------+------+-------+--------------------+--------+----+--------------------+
|osm_id|              editor|length|country|            tags_str|hashtags|year|            datetime|
+------+--------------------+------+-------+--------------------+--------+----+--------------------+
|   105| StreetComplete 45.0|   384|    DEU|{highway -> resid...|      []|2023|2023-01-08 15:24:...|
|   137| StreetComplete 50.2|   206|    GBR|{source -> Bing;s...|      []|2023|2023-01-24 22:33:...|
|   137| StreetComplete 50.2|   206|    GBR|{source -> Bing;s...|      []|2023|2023-01-24 21:31:...|
|   187|           iD 2.24.2|   172|    GBR|{highway -> terti...|      []|2023|2023-02-21 02:00:...|
|   188|           iD 2.24.2|   159|    GBR|{highway -> prima...|      []|2023|2023-02-17 15:04:...|
|   762| StreetComplete 50.2|  1091|    GBR|{highway -> prima...|      []|2023|2023-01-29 15:33:...|
|  1474|JOSM/1.5 (18622 e...|   162|    GBR|{highway -> resid...|      []|2023|2023-02-03 1

In [30]:
df_roads = df_roads.withColumn("hashtags_un", psf.concat_ws(",", psf.col("hashtags")))


In [31]:
df_roads.groupBy("hashtags_un").count().show()


+--------------------+-----+
|         hashtags_un|count|
+--------------------+-----+
|#MapComplete,#cyc...|   98|
|#AtlasChecks,#Kaa...|   50|
|          #UNmappers|  105|
|#MondayValidation...|   41|
|#TürkiyeEQ060223,...|   57|
|#TürkiyeEQ060223,...|   14|
|#hotosm-project-1...|  149|
|#BeninMappingTour...|   30|
|          #kaart-494|  137|
|#JurRiver,#REACH,...|   13|
|#Mapathon,#Mapper...|   29|
|#AKDN,#OMHAP,#Syr...|    6|
|#CycloneFreddy,#T...|   53|
|#mapbox_linters_p...|   16|
|   #Kaart,#Kaart2023|  389|
|                #map|   66|
|              #Molde|   10|
|#Air,#Buildings,#...|   51|
|#CycleMapSouthAme...|    1|
|#GIRS2023,#Türkiy...|   56|
+--------------------+-----+
only showing top 20 rows



In [32]:
df_roads.show()

+------+--------------------+------+-------+--------------------+--------+----+--------------------+-----------+
|osm_id|              editor|length|country|            tags_str|hashtags|year|            datetime|hashtags_un|
+------+--------------------+------+-------+--------------------+--------+----+--------------------+-----------+
|   105| StreetComplete 45.0|   384|    DEU|{highway -> resid...|      []|2023|2023-01-08 15:24:...|           |
|   137| StreetComplete 50.2|   206|    GBR|{source -> Bing;s...|      []|2023|2023-01-24 22:33:...|           |
|   137| StreetComplete 50.2|   206|    GBR|{source -> Bing;s...|      []|2023|2023-01-24 21:31:...|           |
|   187|           iD 2.24.2|   172|    GBR|{highway -> terti...|      []|2023|2023-02-21 02:00:...|           |
|   188|           iD 2.24.2|   159|    GBR|{highway -> prima...|      []|2023|2023-02-17 15:04:...|           |
|   762| StreetComplete 50.2|  1091|    GBR|{highway -> prima...|      []|2023|2023-01-29 15:33:

## AI road filter

### Filter 1
- column tags contains “#mapwithai” or “#nsroadimport” or "#MapWithAI"

In [33]:
df_roads_f1 = df_roads.filter(df_roads.tags_str.contains("mapwithai") | df_roads.tags_str.contains("MapWithAI") | df_roads.tags_str.contains("nsroadimport"))


In [34]:
df_roads_f1.groupBy("year").sum().show()


+----+------------+-----------+---------+
|year| sum(osm_id)|sum(length)|sum(year)|
+----+------------+-----------+---------+
|2023|221644006172|     278590|   418761|
+----+------------+-----------+---------+



### Filter 2
- column tags_str contains (#nsroadimport" or "mapwithai" or “MapWithAI") and "highway"
OR
- column hashtags_un contains (#nsroadimport" or "mapwithai") and it is highway (from column tags_str ???)

In [35]:
df_roads_f2 = df_roads.filter(((df_roads.tags_str.contains("mapwithai") | df_roads.tags_str.contains("MapWithAI") | df_roads.tags_str.contains("nsroadimport")) & (df_roads.tags_str.contains("highway")))
                              | (df_roads.hashtags_un.contains("mapwithai") | df_roads.hashtags_un.contains("nsroadimport") & (df_roads.tags_str.contains("highway"))))


In [36]:
df_roads_f2.groupBy("year").sum().show()


+----+---------------+-----------+---------+
|year|    sum(osm_id)|sum(length)|sum(year)|
+----+---------------+-----------+---------+
|2023|165903613475359|  243608349|367680250|
+----+---------------+-----------+---------+



### Filter 3
3_1 – column editor contains RapiD

3_2 – column editor contains RapiD or column tags_str/hashtags_un contains mapwithai

3_3 – column editor contains RapiD and column tags_str/hashtags_un contains mapwithai

#### F3_1

In [37]:
df_roads_f3_1 = df_roads.filter(df_roads.editor.contains("RapiD") )


In [38]:
df_roads_f3_1.groupBy("year").sum().show()


+----+---------------+-----------+---------+
|year|    sum(osm_id)|sum(length)|sum(year)|
+----+---------------+-----------+---------+
|2023|155386313768669|  194174637|389579225|
+----+---------------+-----------+---------+



#### F3_2

In [39]:
df_roads_f3_2 = df_roads.filter((df_roads.editor.contains("RapiD") | df_roads.tags_str.contains("mapwithai") | df_roads.hashtags_un.contains("mapwithai") | df_roads.tags_str.contains("MapWithAI") | df_roads.hashtags_un.contains("MapWithAI")))


In [40]:
df_roads_f3_2.groupBy("year").sum().show()


+----+---------------+-----------+---------+
|year|    sum(osm_id)|sum(length)|sum(year)|
+----+---------------+-----------+---------+
|2023|269224991627715|  371914843|655460092|
+----+---------------+-----------+---------+



#### F3_3

In [41]:
df_roads_f3_3 = df_roads.filter((df_roads.editor.contains("RapiD") & (df_roads.tags_str.contains("mapwithai") | df_roads.hashtags_un.contains("mapwithai") | df_roads.tags_str.contains("MapWithAI") | df_roads.hashtags_un.contains("MapWithAI"))))


In [42]:
df_roads_f3_3.groupBy("year").sum().show()


+----+--------------+-----------+---------+
|year|   sum(osm_id)|sum(length)|sum(year)|
+----+--------------+-----------+---------+
|2023|52019131287119|   65803042|101700256|
+----+--------------+-----------+---------+



## Convert to Pandas DataFrame - AI buildings, AI roads (this chapter is useless so far)

In [25]:
# Filter AI blds
df_blds = df.filter(df.tags_str.contains("microsoft/BuildingFootprints") | df.tags_str.contains("esri/Google_Africa_Buildings"))\
.groupby("country", "editor", "year", "tags_str")\
.agg(psf.sum("user_id"))\
.withColumnRenamed("sum(user_id)", "count_u_id")\
.sort(psf.col("count_u_id").desc())\
.toPandas()


In [26]:
df_blds

Unnamed: 0,country,editor,year,tags_str,count_u_id
0,TUR,RapiD 1.1.9-tm.1,2023,"{building -> yes, source -> microsoft/Building...",1862607831910
1,IND,RapiD 1.1.9,2023,"{building -> yes, source -> microsoft/Building...",697575610154
2,IDN,JOSM/1.5 (18583 en),2023,"{building -> yes, source -> microsoft/Building...",167757823842
3,USA,RapiD 1.1.9,2023,"{building -> yes, source -> microsoft/Building...",116856923771
4,SYR,RapiD 1.1.9-tm.1,2023,"{building -> yes, source -> microsoft/Building...",97647827025
...,...,...,...,...,...
24984,GBR,StreetComplete 50.2,2023,"{building -> house, source -> microsoft/Buildi...",7329
24985,GBR,StreetComplete 50.2,2023,"{building -> detached, source -> microsoft/Bui...",7329
24986,GBR,StreetComplete 50.2,2023,"{building -> apartments, source -> microsoft/B...",7329
24987,GBR,RapiD 1.1.9,2023,"{building -> yes, source -> microsoft/Building...",7329


In [32]:
df_blds.notna().all(axis=1)


0        True
1        True
2        True
3        True
4        True
         ... 
24984    True
24985    True
24986    True
24987    True
24988    True
Length: 24989, dtype: bool

In [None]:
# & (psf.col("road") == 1)


In [98]:
# Filter AI roads
df_roads = df_roads.filter(((df_roads.tags_str.contains("mapwithai") | df_roads.tags_str.contains("MapWithAI") | df_roads.tags_str.contains("#nsroadimport")) & (df_roads.tags_str.contains("highway")))
                           | (df_roads.hashtags_un.contains("#mapwithai") | df_roads.hashtags_un.contains("#nsroadimport")))\
    .groupby("country", "editor", "year", "tags_str", "hashtags_un", "length")\
    .agg(psf.sum("country"))\
    .withColumnRenamed("sum(country)", "count_country")\
    .sort(psf.col("count_country").desc())\
    .toPandas()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\milan\general_py_venv\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\milan\general_py_venv\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\milan\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
df_roads


Unnamed: 0,country,editor,year,tags_str,hashtags_un,length,count_osm_id
0,TZA,JOSM/1.5 (18646 en),2023,{},"#Tanzania,#mapwithai",0,3883303105856
1,TZA,JOSM/1.5 (18583 en),2023,{},"#Tanzania,#mapwithai",0,2204724899226
2,IND,RapiD 1.1.9-tm.1,2023,{},"#India,#mapwithai",0,1786006015589
3,IND,JOSM/1.5 (18427 en),2023,{},"#India,#mapwithai",0,1006969338360
4,TZA,JOSM/1.5 (18463 en),2023,{},"#Tanzania,#mapwithai",0,841542487332
...,...,...,...,...,...,...,...
82556,USA,JOSM/1.5 (18700 en),2023,"{highway -> residential, tiger:county -> Orang...","#buildingmapping,#mapwithai",329,13331490
82557,USA,JOSM/1.5 (18700 en),2023,"{highway -> residential, tiger:county -> Orang...","#buildingmapping,#mapwithai",340,13303745
82558,USA,JOSM/1.5 (18700 en),2023,"{highway -> residential, tiger:county -> Orang...","#buildingmapping,#mapwithai",399,13303745
82559,USA,JOSM/1.5 (18565 en),2023,"{highway -> tertiary, tiger:county -> Orange, ...","#buildingmapping,#mapwithai",697,13300336
