In [1]:
import findspark
findspark.init()

import pyspark
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 pyspark-shell'



Set Spark and SQL context.

In [2]:
conf=pyspark.SparkConf()
conf.set('spark.mongodb.input.uri','mongodb://127.0.0.1/archeo.sites?readPreference=primaryPreferred')
conf.set('spark.mongodb.output.uri','mongodb://127.0.0.1/archeo.sites')
sc =pyspark.SparkContext(conf=conf)
sqlContext =pyspark.SQLContext(sc)

In [3]:
sc

In [3]:
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()

root
 |-- Chronologia: string (nullable = true)
 |-- Gmina: string (nullable = true)
 |-- Miejscowosc: string (nullable = true)
 |-- Nr_rejestru_zabytkow: string (nullable = true)
 |-- Nr_stanowiska: string (nullable = true)
 |-- Typ_stanowiska: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [4]:
df.show()

+--------------------+-------+-----------+--------------------+-------------+--------------------+--------------------+
|         Chronologia|  Gmina|Miejscowosc|Nr_rejestru_zabytkow|Nr_stanowiska|      Typ_stanowiska|                 _id|
+--------------------+-------+-----------+--------------------+-------------+--------------------+--------------------+
|neolit, okres wpł...|Baborów|     Babice|          A - 247/70|           16|               osada|[5ba16d7bfa9b9f35...|
|              neolit|Baborów|     Babice|          A - 901/90|           21|               osada|[5ba16d7bfa9b9f35...|
|neolit, średniowi...|Baborów|     Babice|          A - 378/73|           22|               osada|[5ba16d7bfa9b9f35...|
|neolit, kultura ł...|Baborów|     Babice|          A - 372/73|           23|                   ?|[5ba16d7bfa9b9f35...|
|neolit, kultura c...|Baborów|     Babice|          A - 428/76|           24|               osada|[5ba16d7bfa9b9f35...|
|neolit,  późny ok...|Baborów|     Babic

Pipeline aggregation

In [5]:
pipeline = "{'$match': {'Gmina': 'Kluczbork'}}"
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
df.head(5)

[Row(Chronologia='średniowiecze ( XIV w. )', Gmina='Kluczbork', Miejscowosc='Bąków', Nr_rejestru_zabytkow='A - 313/70', Nr_stanowiska=1, Typ_stanowiska='grodzisko', _id=Row(oid='5ba16d7bfa9b9f35083a82d2')),
 Row(Chronologia='średniowiecze', Gmina='Kluczbork', Miejscowosc='Bąków', Nr_rejestru_zabytkow='A - 312/70', Nr_stanowiska=2, Typ_stanowiska='grodzisko', _id=Row(oid='5ba16d7bfa9b9f35083a82d3')),
 Row(Chronologia='średniowiecze', Gmina='Kluczbork', Miejscowosc='Biadacz', Nr_rejestru_zabytkow='A - 384/74', Nr_stanowiska=5, Typ_stanowiska='grodzisko', _id=Row(oid='5ba16d7bfa9b9f35083a82d4')),
 Row(Chronologia='kultura łużycka, średniowiecze', Gmina='Kluczbork', Miejscowosc='Bogacica', Nr_rejestru_zabytkow='A - 1075/98', Nr_stanowiska=11, Typ_stanowiska='osada', _id=Row(oid='5ba16d7bfa9b9f35083a82d5')),
 Row(Chronologia='średniowiecze', Gmina='Kluczbork', Miejscowosc='Czaple Stare', Nr_rejestru_zabytkow='A - 510/79', Nr_stanowiska=3, Typ_stanowiska='osada', _id=Row(oid='5ba16d7bfa9b9f3

Filter

In [6]:
df.filter(df['Chronologia'] == 'średniowiecze').show()

+-------------+---------+------------+--------------------+-------------+--------------+--------------------+
|  Chronologia|    Gmina| Miejscowosc|Nr_rejestru_zabytkow|Nr_stanowiska|Typ_stanowiska|                 _id|
+-------------+---------+------------+--------------------+-------------+--------------+--------------------+
|średniowiecze|Kluczbork|       Bąków|          A - 312/70|            2|     grodzisko|[5ba16d7bfa9b9f35...|
|średniowiecze|Kluczbork|     Biadacz|          A - 384/74|            5|     grodzisko|[5ba16d7bfa9b9f35...|
|średniowiecze|Kluczbork|Czaple Stare|          A - 510/79|            3|         osada|[5ba16d7bfa9b9f35...|
+-------------+---------+------------+--------------------+-------------+--------------+--------------------+



SQL

In [7]:
df.createOrReplaceTempView("temp")
selected = sqlContext.sql("SELECT * FROM temp WHERE chronologia = 'średniowiecze'")
selected.show()

+-------------+---------+------------+--------------------+-------------+--------------+--------------------+
|  Chronologia|    Gmina| Miejscowosc|Nr_rejestru_zabytkow|Nr_stanowiska|Typ_stanowiska|                 _id|
+-------------+---------+------------+--------------------+-------------+--------------+--------------------+
|średniowiecze|Kluczbork|       Bąków|          A - 312/70|            2|     grodzisko|[5ba16d7bfa9b9f35...|
|średniowiecze|Kluczbork|     Biadacz|          A - 384/74|            5|     grodzisko|[5ba16d7bfa9b9f35...|
|średniowiecze|Kluczbork|Czaple Stare|          A - 510/79|            3|         osada|[5ba16d7bfa9b9f35...|
+-------------+---------+------------+--------------------+-------------+--------------+--------------------+



Output - parquet

In [8]:
df.write.parquet("output/test.parquet")