# PySPARQL SELECT example
### Query your SPARQL endpoint and manage the results inside Apache Spark

In [1]:
from PySPARQL.Wrapper import PySPARQLWrapper
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("PySPARQL example") \
    .getOrCreate()

### Use dbpedia SPARQL endpoint

In [3]:
sparql_endpoint = "https://dbpedia.org/sparql"

In [4]:
wrapper = PySPARQLWrapper(spark, sparql_endpoint)

### Retrieve all the European coutries with their capitals

In [5]:
query = """
PREFIX dbr:    <http://dbpedia.org/resource/>
PREFIX dbo:    <http://dbpedia.org/ontology/>
PREFIX rdf:    <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs:   <http://www.w3.org/2000/01/rdf-schema#>

SELECT DISTINCT ?cityLabel ?countryLabel 
WHERE { 
    ?city a dbo:City ;
        rdfs:label ?cityLabel .
    ?country a dbo:Country ;
        rdfs:label ?countryLabel ;
        dbo:capital ?city ;
        dct:subject dbc:Countries_in_Europe .
    FILTER (
        LANG(?cityLabel) = "en" &&
        LANG(?countryLabel) = "en"
    )
}     
"""

In [6]:
result = wrapper.query(query)

In [7]:
countriesDF = result.dataFrame
countriesDF.show()

+------------------+--------------------+
|         cityLabel|        countryLabel|
+------------------+--------------------+
|             Minsk|             Belarus|
|          Sarajevo|Bosnia and Herzeg...|
|             Sofia|            Bulgaria|
|            Zagreb|             Croatia|
|           Nicosia|              Cyprus|
|            Prague|      Czech Republic|
|            Berlin|             Germany|
|          Budapest|             Hungary|
|              Rome|               Italy|
|            Astana|          Kazakhstan|
|          Pristina|              Kosovo|
|              Riga|              Latvia|
|           Vilnius|           Lithuania|
|          Valletta|               Malta|
|          Chișinău|             Moldova|
|         Podgorica|          Montenegro|
|         Bucharest|             Romania|
|City of San Marino|          San Marino|
|         Ljubljana|            Slovenia|
|            Madrid|               Spain|
+------------------+--------------

### Save the result as Spark table

In [8]:
countriesDF.write.saveAsTable("countries")

### Read the data back using SparkSQL

In [9]:
sqlDF = spark.sql("SELECT * FROM countries")
sqlDF.show()

+------------------+--------------------+
|         cityLabel|        countryLabel|
+------------------+--------------------+
|          Valletta|               Malta|
|          Chișinău|             Moldova|
|         Podgorica|          Montenegro|
|         Bucharest|             Romania|
|City of San Marino|          San Marino|
|         Ljubljana|            Slovenia|
|            Madrid|               Spain|
|              Kiev|             Ukraine|
|           Tallinn|             Estonia|
|           Yerevan|             Armenia|
|            Vienna|             Austria|
|            Skopje|Republic of Maced...|
|            Ankara|              Turkey|
|             Minsk|             Belarus|
|          Sarajevo|Bosnia and Herzeg...|
|             Sofia|            Bulgaria|
|            Zagreb|             Croatia|
|           Nicosia|              Cyprus|
|            Prague|      Czech Republic|
|            Berlin|             Germany|
+------------------+--------------