In [1]:
# using custom conda environment so I can include shapely for the coordinate -> country look-up
!which python

/home/isaacj/.conda/envs/2020-12-01T16.32.19_isaacj/bin/python


In [2]:
!conda install -c conda-forge shapely 

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 4.8.4
  latest version: 4.9.2

Please update conda by running

    $ conda update -n base conda



# All requested packages already installed.



In [4]:
# zip up conda environment to ship to workers
# see: https://wikitech.wikimedia.org/wiki/Analytics/Systems/Jupyter#Launching_as_SparkSession_in_a_Python_Notebook
!cd '/home/isaacj/.conda/envs/2020-12-01T16.32.19_isaacj'; zip -qur ~/spark_venv.zip .

In [5]:
!ls -lht spark_venv.zip

-rw-r--r-- 1 isaacj wikidev 747M Dec 22 15:08 spark_venv.zip


In [6]:
import csv
import json
import os
import sys

import pandas as pd
from shapely.geometry import shape, Point

import findspark
findspark.init('/usr/lib/spark2')

from pyspark.sql import SparkSession

In [7]:
# setup Spark environment
# this is the equivalent of a "large" YARN config
os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'
os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'

spark = (
    SparkSession.builder
    .appName('Pyspark notebook (isaacj -- spatial)')
    .master('yarn')
    .config(
        'spark.driver.extraJavaOptions',
        ' '.join('-D{}={}'.format(k, v) for k, v in {
            'http.proxyHost': 'webproxy.eqiad.wmnet',
            'http.proxyPort': '8080',
            'https.proxyHost': 'webproxy.eqiad.wmnet',
            'https.proxyPort': '8080',
        }.items()))
    .config("spark.driver.memory", "4g")
    .config('spark.dynamicAllocation.maxExecutors', 128)
    .config("spark.executor.memory", "8g")
    .config("spark.executor.cores", 4)
    .config("spark.sql.shuffle.partitions", 512)
    .getOrCreate()
)
spark

## Load in supporting data files from Github

In [8]:
data_dir = './data'
region_geoms = ('ne_10m_admin_0_map_units.geojson',
                'https://github.com/geohci/wiki-region-groundtruth/raw/main/resources/ne_10m_admin_0_map_units.geojson')
properties_tsv = ('country_properties.tsv',
                  'https://github.com/geohci/wiki-region-groundtruth/raw/main/resources/country_properties.tsv')
aggregation_tsv = ('country_aggregation.tsv',
                   'https://github.com/geohci/wiki-region-groundtruth/raw/main/resources/country_aggregation.tsv')
region_tsv = ('base_regions_qids.tsv',
              'https://github.com/geohci/wiki-region-groundtruth/raw/main/resources/base_regions_qids.tsv')

In [9]:
!rm -R {data_dir}
!mkdir -p {data_dir}
# have to string commands together or `cd` doesn't apply to later commands
!cd {data_dir}; wget -q {region_geoms[1]} {properties_tsv[1]} {aggregation_tsv[1]} {region_tsv[1]}; ls -lht

total 25M
-rw-r--r-- 1 isaacj wikidev 4.6K Dec 23 15:06 base_regions_qids.tsv
-rw-r--r-- 1 isaacj wikidev 1.8K Dec 23 15:06 country_aggregation.tsv
-rw-r--r-- 1 isaacj wikidev  265 Dec 23 15:06 country_properties.tsv
-rw-r--r-- 1 isaacj wikidev  25M Dec 23 15:06 ne_10m_admin_0_map_units.geojson


In [10]:
def get_region_properties(properties_tsv):
    """List of properties used for directly linking Wikidata items to regions.

    e.g., P19: place of birth
    These are compiled based on knowledge of Wikidata and Marc Miquel's excellent work:
    https://github.com/marcmiquel/WDO/blob/e482a2df2b41d389945f3b82179b8b7ca338b8d5/src_data/wikipedia_diversity.py
    """
    expected_header = ['Property', 'Label']
    region_properties = []
    with open(properties_tsv, 'r') as fin:
        tsvreader = csv.reader(fin, delimiter='\t')
        assert next(tsvreader) == expected_header
        for line in tsvreader:
            property = line[0]
            label = line[1]
            region_properties.append((property, label))
    return region_properties

def get_aggregation_logic(aggregates_tsv):
    """Mapping of QIDs -> regions not directly associated with them.

    e.g., Sahrawi Arab Democratic Republic (Q40362) -> Western Sahara (Q6250)
    """
    expected_header = ['Aggregation', 'From', 'QID To', 'QID From']
    aggregation = {}
    with open(aggregates_tsv, 'r') as fin:
        tsvreader = csv.reader(fin, delimiter='\t')
        assert next(tsvreader) == expected_header
        for line in tsvreader:
            try:
                qid_to = line[2]
                qid_from = line[3]
            except Exception:
                print("Skipped:", line)
            if qid_from:
                aggregation[qid_from] = qid_to
    return aggregation

def get_region_data(region_qids_tsv, region_geoms_geojson, aggregation_tsv):
    # load in canonical mapping of QID -> region name for labeling
    qid_to_region = {}
    with open(region_qids_tsv, 'r') as fin:
        tsvreader = csv.reader(fin, delimiter='\t')
        assert next(tsvreader) == ['Region', 'QID']
        for line in tsvreader:
            region = line[0]
            qid = line[1]
            qid_to_region[qid] = region
    print("\nLoaded {0} QID-region pairs for matching against Wikidata -- e.g., Q31: {1}".format(
        len(qid_to_region), qid_to_region['Q31']))
    # load in additional QIDs that should be mapped to a more canonical region name
    aggregation = get_aggregation_logic(aggregation_tsv)
    for qid_from in aggregation:
        qid_to = aggregation[qid_from]
        if qid_to in qid_to_region:
            qid_to_region[qid_from] = qid_to_region[qid_to]  
        else:
            print("-- Skipping aggregation for {0} to {1}".format(qid_from, qid_to))
    print("Now {0} QID-region pairs after adding aggregations -- e.g., Q40362: {1}".format(
        len(qid_to_region), qid_to_region['Q40362']))

    # load in geometries for the regions identified via Wikidata
    with open(region_geoms_geojson, 'r') as fin:
        regions = json.load(fin)['features']
    region_shapes = {}
    skipped = []
    for c in regions:
        qid = c['properties']['WIKIDATAID']
        if qid in qid_to_region:
            region_shapes[qid] = shape(c['geometry'])
        else:
            skipped.append('{0} ({1})'.format(c['properties']['NAME'], qid))
    print("\nLoaded {0} region geometries. Skipped {1}: {2}".format(
        len(region_shapes), len(skipped), skipped))
    
    # check alignment between QID list and region geometries
    in_common = 0
    for qid in qid_to_region:
        if qid in region_shapes:
            in_common += 1
        else:
            alt_found = False
            for qid_alt in qid_to_region:
                if qid != qid_alt and qid_to_region[qid] == qid_to_region[qid_alt]:
                    alt_found = True
            if not alt_found:
                print('Prop-only: {0} ({1})'.format(qid_to_region[qid], qid))
    print("{0} QIDs in common between prop-values and geometries.".format(in_common))
    return region_shapes, qid_to_region

In [11]:
# load in data
# I skip a few regions that have coordinates -- see https://github.com/geohci/wiki-region-groundtruth/blob/main/resources/REGIONS.md
# And a few regions are only present as Wikidata properties and don't have coordinates
# Abkhazia / South Ossetia just aren't in the geographic data
# the Kingdoms are agglomerations of regions but if I didn't include them, certain
# Wikidata items that e.g., link to UK but not England specifically would be missed
region_properties = get_region_properties(os.path.join(data_dir, properties_tsv[0]))
region_shapes, qid_to_region = get_region_data(os.path.join(data_dir, region_tsv[0]),
                                               os.path.join(data_dir, region_geoms[0]),
                                               os.path.join(data_dir, aggregation_tsv[0]))



Loaded 260 QID-region pairs for matching against Wikidata -- e.g., Q31: Belgium
Now 295 QID-region pairs after adding aggregations -- e.g., Q40362: Western Sahara

Loaded 280 region geometries. Skipped 15: ['Dhekelia (Q9206745)', 'UNDOF Zone (Q1428532)', 'Korean DMZ (south) (Q331990)', 'Korean DMZ (north) (Q331990)', 'USNB Guantanamo Bay (Q762570)', 'Cyprus U.N. Buffer Zone (Q116970)', 'Siachen Glacier (Q333946)', 'Akrotiri (Q9143535)', 'Paracel Is. (Q274388)', 'Coral Sea Is. (Q172216)', 'Spratly Is. (Q215664)', 'Clipperton I. (Q161258)', 'Bajo Nuevo Bank (Q1257783)', 'Serranilla Bank (Q1169008)', 'Scarborough Reef (Q628716)']
Prop-only: Abkhazia (Q23334)
Prop-only: Kingdom of Denmark (Q756617)
Prop-only: Kingdom of the Netherlands (Q29999)
Prop-only: South Ossetia (Q23427)
Prop-only: United Kingdom (Q145)
280 QIDs in common between prop-values and geometries.


In [12]:
# Create table with QID -> Region mapping for making data in end result more readable
spark.createDataFrame(pd.DataFrame(qid_to_region.items(), columns=['qid', 'region'])).createOrReplaceTempView('qid_to_region')
spark.sql("SELECT * FROM qid_to_region LIMIT 10").show(50, False)

+------+--------------+
|qid   |region        |
+------+--------------+
|Q23334|Abkhazia      |
|Q889  |Afghanistan   |
|Q5689 |Åland Islands |
|Q222  |Albania       |
|Q262  |Algeria       |
|Q16641|American Samoa|
|Q228  |Andorra       |
|Q916  |Angola        |
|Q25228|Anguilla      |
|Q51   |Antarctica    |
+------+--------------+



In [13]:
def pointInCountry(lon, lat):
    """Determine which region contains a lat-lon coordinate.
    
    Depends on shapely library and region_shapes object, which contains a dictionary
    mapping QIDs to shapely geometry objects.
    """
    pt = Point(lon, lat)
    for qid in region_shapes:
        if region_shapes[qid].contains(pt):
            return qid
    return "N/A"
    
spark.udf.register('pointInCountry', pointInCountry, 'String')

<function __main__.pointInCountry(lon, lat)>

In [22]:
# value info in wikidata entity table (https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Edits/Wikidata_entity)
# is a string as opposed to struct (because it has a variable schema)
# this UDF extracts the QID value (or null if doesn't exist)
def getWikidataValue(obj):
    try:
        d = json.loads(obj)
        return d.get('id')
    except Exception:
        return None
    
spark.udf.register('getWikidataValue', getWikidataValue, 'String')

# specific functions for getting lat and lon out of the P625 property in the Wikidata entity
def getLat(obj):
    try:
        d = json.loads(obj)
        return d.get('latitude')
    except Exception:
        return None
    
def getLon(obj):
    try:
        d = json.loads(obj)
        return d.get('longitude')
    except Exception:
        return None
    
spark.udf.register('getLat', getLat, 'Float')
spark.udf.register('getLon', getLon, 'Float')

<function __main__.getLon(obj)>

In [21]:
json.loads('{"latitude":41.0760632218953,"longitude":0.4121379044185,"altitude":null,"precision":1.0E-6,"globe":"http://www.wikidata.org/entity/Q2"}')

{'latitude': 41.0760632218953,
 'longitude': 0.4121379044185,
 'altitude': None,
 'precision': 1e-06,
 'globe': 'http://www.wikidata.org/entity/Q2'}

In [15]:
dump_snapshot = '2020-11'
wd_snapshot = '2020-12-07'
prop_list = tuple([p[0] for p in region_properties])
print(prop_list)
tablename = 'isaacj.qid_to_country_2020_12_07'

('P19', 'P17', 'P27', 'P495', 'P131', 'P1532', 'P3842', 'P361', 'P1269')


In [16]:
do_execute = True
create_table_query = """
    CREATE TABLE IF NOT EXISTS {0} (
        qid              STRING  COMMENT 'Wikidata ID of item with at least one Wikipedia sitelink -- e.g., Q42',
        property         STRING  COMMENT 'Wikidata property (e.g., P625 for coordinates) from which country was derived',
        country          STRING  COMMENT 'Region name'
    )
    """.format(tablename)

if do_execute:
    print(create_table_query)
    spark.sql(create_table_query)


    CREATE TABLE IF NOT EXISTS isaacj.qid_to_country_2020_12_07 (
        qid              STRING  COMMENT 'Wikidata ID of item with at least one Wikipedia sitelink -- e.g., Q42',
        property         STRING  COMMENT 'Wikidata property (e.g., P625 for coordinates) from which country was derived',
        country          STRING  COMMENT 'Region name'
    )
    


In [24]:
"""
Desired full query is below that would grab country data from a set of ~10 properties
AND grab country data based on an item's coordinates
However...this query failed for all wikis, simiplewiki only, and even hausawiki (tiny)
so some work needs to be done.
The error message isn't particularly useful -- just says a worker failed the max number of tries.

I then split this query into its two component parts (property matching and coordinate geolocating)
and the former (property matching) works, which makes sense because it's quite simple.
The latter (coordinate geolocation) by itself triggers the error, so that's where the problem seems to be

Explanation of CTEs:
* relevant_wikis: get list of Wikipedia wiki_dbs (e.g., enwiki) so as to limit the
    the Wikidata items considered to just those with Wikipedia sitelinks
* relevant_qids: get set of Wikidata item IDs that have at least one Wikipedia sitelink
* exploded_statements: explode Wikidata entity data to one Wikidata claim per row
* lat_lon_coords: extract lat/lon values from claims to be geolocated
* geolocated: pass lat/lon values to UDF to identify which country they are in.
    This I believe is where all the heavy lifting is occurring...
* coordinate_countries: map country QIDs for geolocation to country names
* relevant_statements: get all Wikidata claims that might have a country value
* property_countries: extract any country values from these claims
* INSERT: union together coordinate countries and property countries into single table
"""
print_for_hive = False
do_execute = True

query = """
WITH relevant_wikis AS (
    SELECT
      DISTINCT(dbname) AS wiki_db
    FROM wmf_raw.mediawiki_project_namespace_map
    WHERE
      snapshot = '{0}'
      AND hostname LIKE '%.wikipedia.org'
),
relevant_qids AS (
    SELECT
      DISTINCT(item_id) AS item_id
    FROM wmf.wikidata_item_page_link wd
    INNER JOIN relevant_wikis db
      ON (wd.wiki_db = db.wiki_db)
    WHERE
      snapshot = '{1}'
      AND page_namespace = 0
),
exploded_statements AS (
    SELECT
      id AS item_id,
      explode(claims) AS claim
    FROM wmf.wikidata_entity w
    INNER JOIN relevant_qids q
      ON (w.id = q.item_id)
    WHERE
      w.snapshot = '{1}'
),
lat_lon_coords AS (
    SELECT
      item_id,
      getLat(claim.mainSnak.dataValue.value) as lat,
      getLon(claim.mainSnak.dataValue.value) as lon
    FROM exploded_statements
    WHERE
      claim.mainSnak.property = 'P625'
),
geolocated AS (
    SELECT
      item_id,
      pointInCountry(lon, lat) AS country_qid
    FROM lat_lon_coords
),
coordinate_countries AS (
    SELECT
      item_id AS item_id,
      q.region AS country
    FROM geolocated g
    INNER JOIN qid_to_region q
      ON (g.country_qid = q.qid)
),
relevant_statements AS (
    SELECT
      item_id,
      claim.mainSnak.property AS property,
      getWikidataValue(claim.mainSnak.dataValue.value) as value
    FROM exploded_statements
    WHERE
      claim.mainSnak.property IN {2}
),
property_countries AS (
    SELECT
      item_id,
      property,
      q.region AS country
    FROM relevant_statements r
    INNER JOIN qid_to_region q
      ON (r.value = q.qid)
)
INSERT OVERWRITE TABLE {3}
SELECT
  item_id,
  'P625',
  country
FROM coordinate_countries
UNION ALL
SELECT
  item_id,
  property,
  country
FROM property_countries
""".format(dump_snapshot, wd_snapshot, prop_list, tablename)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)


WITH relevant_wikis AS (
    SELECT
      DISTINCT(dbname) AS wiki_db
    FROM wmf_raw.mediawiki_project_namespace_map
    WHERE
      snapshot = '2020-11'
      AND hostname = 'ha.wikipedia.org'
),
relevant_qids AS (
    SELECT
      DISTINCT(item_id) AS item_id
    FROM wmf.wikidata_item_page_link wd
    INNER JOIN relevant_wikis db
      ON (wd.wiki_db = db.wiki_db)
    WHERE
      snapshot = '2020-12-07'
      AND page_namespace = 0
),
exploded_statements AS (
    SELECT
      id AS item_id,
      explode(claims) AS claim
    FROM wmf.wikidata_entity w
    INNER JOIN relevant_qids q
      ON (w.id = q.item_id)
    WHERE
      w.snapshot = '2020-12-07'
),
lat_lon_coords AS (
    SELECT
      item_id,
      getLat(claim.mainSnak.dataValue.value) as lat,
      getLon(claim.mainSnak.dataValue.value) as lon
    FROM exploded_statements
    WHERE
      claim.mainSnak.property = 'P625'
),
geolocated AS (
    SELECT
      item_id,
      pointInCountry(lon, lat) AS country_qid
    FROM lat

Py4JJavaError: An error occurred while calling o62.sql.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:86)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 21.0 failed 4 times, most recent failure: Lost task 64.3 in stage 21.0 (TID 8946, an-worker1115.eqiad.wmnet, executor 508): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-13-8f01e34c45b3>", line 3, in pointInCountry
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 48, in __init__
    self._set_coords(*args)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 137, in _set_coords
    self._geom, self._ndim = geos_point_from_py(tuple(args))
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 214, in geos_point_from_py
    dx = c_double(coords[0])
TypeError: must be real number, not NoneType

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 28 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-13-8f01e34c45b3>", line 3, in pointInCountry
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 48, in __init__
    self._set_coords(*args)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 137, in _set_coords
    self._geom, self._ndim = geos_point_from_py(tuple(args))
  File "/var/lib/hadoop/data/c/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000570/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 214, in geos_point_from_py
    dx = c_double(coords[0])
TypeError: must be real number, not NoneType

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


In [25]:
# Properties only -- works just fine for Hausa Wikipedia (and I suspect all wikis)
print_for_hive = False
do_execute = True

query = """
WITH relevant_wikis AS (
    SELECT
      DISTINCT(dbname) AS wiki_db
    FROM wmf_raw.mediawiki_project_namespace_map
    WHERE
      snapshot = '{0}'
      AND hostname = 'ha.wikipedia.org'
),
relevant_qids AS (
    SELECT
      DISTINCT(item_id) AS item_id
    FROM wmf.wikidata_item_page_link wd
    INNER JOIN relevant_wikis db
      ON (wd.wiki_db = db.wiki_db)
    WHERE
      snapshot = '{1}'
      AND page_namespace = 0
),
exploded_statements AS (
    SELECT
      id AS item_id,
      explode(claims) AS claim
    FROM wmf.wikidata_entity w
    INNER JOIN relevant_qids q
      ON (w.id = q.item_id)
    WHERE
      w.snapshot = '{1}'
),
lat_lon_coords AS (
    SELECT
      item_id,
      getLat(claim.mainSnak.dataValue.value) as lat,
      getLon(claim.mainSnak.dataValue.value) as lon
    FROM exploded_statements
    WHERE
      claim.mainSnak.property = 'P625'
),
relevant_statements AS (
    SELECT
      item_id,
      claim.mainSnak.property AS property,
      getWikidataValue(claim.mainSnak.dataValue.value) as value
    FROM exploded_statements
    WHERE
      claim.mainSnak.property IN {2}
),
property_countries AS (
    SELECT
      item_id,
      property,
      q.region AS country
    FROM relevant_statements r
    INNER JOIN qid_to_region q
      ON (r.value = q.qid)
)
INSERT OVERWRITE TABLE {3}
SELECT
  item_id,
  property,
  country
FROM property_countries
""".format(dump_snapshot, wd_snapshot, prop_list, tablename)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)


WITH relevant_wikis AS (
    SELECT
      DISTINCT(dbname) AS wiki_db
    FROM wmf_raw.mediawiki_project_namespace_map
    WHERE
      snapshot = '2020-11'
      AND hostname = 'ha.wikipedia.org'
),
relevant_qids AS (
    SELECT
      DISTINCT(item_id) AS item_id
    FROM wmf.wikidata_item_page_link wd
    INNER JOIN relevant_wikis db
      ON (wd.wiki_db = db.wiki_db)
    WHERE
      snapshot = '2020-12-07'
      AND page_namespace = 0
),
exploded_statements AS (
    SELECT
      id AS item_id,
      explode(claims) AS claim
    FROM wmf.wikidata_entity w
    INNER JOIN relevant_qids q
      ON (w.id = q.item_id)
    WHERE
      w.snapshot = '2020-12-07'
),
lat_lon_coords AS (
    SELECT
      item_id,
      getLat(claim.mainSnak.dataValue.value) as lat,
      getLon(claim.mainSnak.dataValue.value) as lon
    FROM exploded_statements
    WHERE
      claim.mainSnak.property = 'P625'
),
relevant_statements AS (
    SELECT
      item_id,
      claim.mainSnak.property AS property,
      

In [27]:
# Coordinate geolocation only -- fails even for tiny Hausa Wikipedia
print_for_hive = False
do_execute = True

query = """
WITH relevant_wikis AS (
    SELECT
      DISTINCT(dbname) AS wiki_db
    FROM wmf_raw.mediawiki_project_namespace_map
    WHERE
      snapshot = '{0}'
      AND hostname = 'ha.wikipedia.org'
),
relevant_qids AS (
    SELECT
      DISTINCT(item_id) AS item_id
    FROM wmf.wikidata_item_page_link wd
    INNER JOIN relevant_wikis db
      ON (wd.wiki_db = db.wiki_db)
    WHERE
      snapshot = '{1}'
      AND page_namespace = 0
),
exploded_statements AS (
    SELECT
      id AS item_id,
      explode(claims) AS claim
    FROM wmf.wikidata_entity w
    INNER JOIN relevant_qids q
      ON (w.id = q.item_id)
    WHERE
      w.snapshot = '{1}'
),
lat_lon_coords AS (
    SELECT
      item_id,
      getLat(claim.mainSnak.dataValue.value) as lat,
      getLon(claim.mainSnak.dataValue.value) as lon
    FROM exploded_statements
    WHERE
      claim.mainSnak.property = 'P625'
),
geolocated AS (
    SELECT
      item_id,
      pointInCountry(lon, lat) AS country_qid
    FROM lat_lon_coords
),
coordinate_countries AS (
    SELECT
      item_id AS item_id,
      q.region AS country
    FROM geolocated g
    INNER JOIN qid_to_region q
      ON (g.country_qid = q.qid)
)
INSERT OVERWRITE TABLE {3}
SELECT
  item_id AS item_id,
  'P625' AS property,
  country AS country
FROM coordinate_countries
""".format(dump_snapshot, wd_snapshot, prop_list, tablename)

if print_for_hive:
    print(re.sub(' +', ' ', re.sub('\n', ' ', query)).strip())
else:
    print(query)

if do_execute:
    result = spark.sql(query)


WITH relevant_wikis AS (
    SELECT
      DISTINCT(dbname) AS wiki_db
    FROM wmf_raw.mediawiki_project_namespace_map
    WHERE
      snapshot = '2020-11'
      AND hostname = 'ha.wikipedia.org'
),
relevant_qids AS (
    SELECT
      DISTINCT(item_id) AS item_id
    FROM wmf.wikidata_item_page_link wd
    INNER JOIN relevant_wikis db
      ON (wd.wiki_db = db.wiki_db)
    WHERE
      snapshot = '2020-12-07'
      AND page_namespace = 0
),
exploded_statements AS (
    SELECT
      id AS item_id,
      explode(claims) AS claim
    FROM wmf.wikidata_entity w
    INNER JOIN relevant_qids q
      ON (w.id = q.item_id)
    WHERE
      w.snapshot = '2020-12-07'
),
lat_lon_coords AS (
    SELECT
      item_id,
      getLat(claim.mainSnak.dataValue.value) as lat,
      getLon(claim.mainSnak.dataValue.value) as lon
    FROM exploded_statements
    WHERE
      claim.mainSnak.property = 'P625'
),
geolocated AS (
    SELECT
      item_id,
      pointInCountry(lon, lat) AS country_qid
    FROM lat

Py4JJavaError: An error occurred while calling o62.sql.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:86)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 184 in stage 34.0 failed 4 times, most recent failure: Lost task 184.3 in stage 34.0 (TID 13853, an-worker1112.eqiad.wmnet, executor 784): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-13-8f01e34c45b3>", line 3, in pointInCountry
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 48, in __init__
    self._set_coords(*args)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 137, in _set_coords
    self._geom, self._ndim = geos_point_from_py(tuple(args))
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 215, in geos_point_from_py
    dy = c_double(coords[1])
TypeError: must be real number, not NoneType

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 28 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-13-8f01e34c45b3>", line 3, in pointInCountry
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 48, in __init__
    self._set_coords(*args)
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 137, in _set_coords
    self._geom, self._ndim = geos_point_from_py(tuple(args))
  File "/var/lib/hadoop/data/g/yarn/local/usercache/isaacj/appcache/application_1607972933093_50670/container_e33_1607972933093_50670_01_000868/venv/lib/python3.7/site-packages/shapely/geometry/point.py", line 215, in geos_point_from_py
    dy = c_double(coords[1])
TypeError: must be real number, not NoneType

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
