In [1]:
# Find Spark on my local computer
import findspark
findspark.init()

In [2]:
# Import libraries
import configparser
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

In [3]:
# Create SparkConf with native microsoft jdbc driver for sql-server
configure = SparkConf().setAppName("Artskart")\
                       .setMaster("local[*]")\
                       .set("spark.driver.extraClassPath","sqljdbc_9.2/enu/mssql-jdbc-9.2.1.jre8.jar")

In [4]:
# Create SparkContext and connect to our application
sc = SparkContext(conf = configure)

In [5]:
# Create spark session
spark = SparkSession \
    .builder \
    .getOrCreate()

In [6]:
# Validate Spark session
spark

In [9]:
jdbcHostname=<jdbcHostname>
jdbcPort=<jdbcPort>
jdbcDatabase=<jdbcDatabase>
jdbcUsername=<jdbcUsername>
jdbcPassword=<jdbcPassword>


jdbcUrl = "jdbc:sqlserver://{0}:{1};Database={2};".format(jdbcHostname, jdbcPort, jdbcDatabase)

connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

##### this error might occur when connecting with native JDBC driver
`com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host HOSTNAME, port 1433 has failed.
Error: "Connection refused: connect. Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port.
Make sure that TCP connections to the port are not blocked by a firewall.`

please read further: https://kb.sos-berlin.com/pages/viewpage.action?pageId=17499564

In [10]:
pushdown_query = "(select * from View_ShapeExportRedlist) redlist_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

DataFrame[FID: int, ProxyId: string, NodeDatabaseID: int, InstitutionCode: string, CollectionCode: string, CatalogNumber: string, VitNavn: string, BasisOfRecord: string, Kingdom: string, Phylum: string, Class: string, Order: string, Family: string, Genus: string, Species: string, Subspecies: string, Author: string, IdentifiedBy: string, YearIdentified: int, MonthIdentified: int, DayIdentified: int, TypeStatus: string, CollectorNumber: string, FieldNumber: string, Collector: string, YearCollected: int, MonthCollected: int, DayCollected: int, ContinentOcean: string, Country: string, StateProvince: string, CountyOrg: string, Locality: string, CountyID: string, MunicipalityID: string, County: string, MuniName: string, Longitude: double, Latitude: double, CoordinatePrecision: int, BoundingBox: string, MinElevation: int, MaxElevation: int, MinDepth: string, MaxDepth: string, Sex: string, PreparationType: string, IndividualCount: string, PreviousCatalogNumber: string, RelationshipType: string

In [7]:
# Set output path to local working directory
output = "<path_name>"

In [8]:
# Set filename
filename = "redlist.csv"

In [13]:
# Save a copy on my local working directory
# Repartition by the number of cores availiable on my machine; 8

df.repartition(8).write.save(output + filename, format="csv", header=True) # row count: 4,384,290

In [9]:
# Read file back into RAM with Spark
redlist = spark.read.csv(output + filename, header=True)

In [12]:
redlist.head()

Row(FID='41954328', ProxyId='urn:uuid:43688d2a-f87b-4031-9e20-c6e0ca85840a', NodeDatabaseID='1010', InstitutionCode='NOF', CollectionCode='so2-birds', CatalogNumber='24208745', VitNavn='Delichon urbicum', BasisOfRecord='humanobservation', Kingdom='Animalia', Phylum='Chordata', Class='Aves', Order='Passeriformes', Family='Hirundinidae', Genus='Delichon', Species='Delichon urbicum', Subspecies=None, Author='(Linnaeus, 1758)', IdentifiedBy=None, YearIdentified='0', MonthIdentified='0', DayIdentified='0', TypeStatus=None, CollectorNumber=None, FieldNumber=None, Collector='Frank Holmen', YearCollected='2020', MonthCollected='5', DayCollected='19', ContinentOcean=None, Country='Norway', StateProvince=None, CountyOrg=None, Locality='Møssevoll, Vessøyjordene, Grimstad, Ag', CountyID='42', MunicipalityID='4202', County='Agder', MuniName='Grimstad', Longitude='8.694183', Latitude='58.388015', CoordinatePrecision='116', BoundingBox=None, MinElevation=None, MaxElevation=None, MinDepth=None, MaxDep

In [14]:
redlist.printSchema()

root
 |-- FID: string (nullable = true)
 |-- ProxyId: string (nullable = true)
 |-- NodeDatabaseID: string (nullable = true)
 |-- InstitutionCode: string (nullable = true)
 |-- CollectionCode: string (nullable = true)
 |-- CatalogNumber: string (nullable = true)
 |-- VitNavn: string (nullable = true)
 |-- BasisOfRecord: string (nullable = true)
 |-- Kingdom: string (nullable = true)
 |-- Phylum: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Order: string (nullable = true)
 |-- Family: string (nullable = true)
 |-- Genus: string (nullable = true)
 |-- Species: string (nullable = true)
 |-- Subspecies: string (nullable = true)
 |-- Author: string (nullable = true)
 |-- IdentifiedBy: string (nullable = true)
 |-- YearIdentified: string (nullable = true)
 |-- MonthIdentified: string (nullable = true)
 |-- DayIdentified: string (nullable = true)
 |-- TypeStatus: string (nullable = true)
 |-- CollectorNumber: string (nullable = true)
 |-- FieldNumber: string (nullable = 

In [17]:
# Check if we have any duplicate rows
duplicates = redlist.count() - redlist.distinct().count()
print(duplicates)

0


In [21]:
redlist.select(["County", "MuniName", "Country", "NorskNavn"]).take(5)

[Row(County='Agder', MuniName='Grimstad', Country='Norway', NorskNavn='taksvale'),
 Row(County='Trøndelag', MuniName='Levanger', Country='Norway', NorskNavn='stær'),
 Row(County='Trøndelag', MuniName='Ørland', Country='Norway', NorskNavn='storspove'),
 Row(County='Rogaland', MuniName='Stavanger', Country='Norway', NorskNavn='stær'),
 Row(County='Agder', MuniName='Farsund', Country='Norway', NorskNavn='sivspurv')]

In [None]:
# Register the DataFrame as a SQL temporary view
redlist.createOrReplaceTempView("redlist")

#### Data exploration

#### Questions to ask: 
- How many observations of redlisted species do we have in our dataset
- Which redlisted specie were the most observed
- Which redlisted specie were the least observed
- How are the observations skeewed
- Do we have any duplicates rows
- Are there any corrupt rows

In [16]:
# How many observations of redlisted species do we have?
redlist.count()

4384290

In [22]:
# Which redlisted specieS are the most observed ones?
species_observed = spark.sql("""
    SELECT SPECIES,
           COUNT(*) AS CNT
    FROM REDLIST
    GROUP BY SPECIES
    ORDER BY CNT DESC
    limit 5
""").show()

+--------------------+------+
|             SPECIES|   CNT|
+--------------------+------+
|         Larus canus|364333|
|Somateria mollissima|300448|
|    Sturnus vulgaris|266324|
| Emberiza citrinella|253517|
|Chroicocephalus r...|189492|
+--------------------+------+



In [21]:
# Count distinct redlisted species observed
num_of_species = spark.sql("""
    SELECT COUNT(DISTINCT SPECIES) AS NUM_OF_SPECIES
    FROM REDLIST
""").show()

+--------------+
|NUM_OF_SPECIES|
+--------------+
|          4075|
+--------------+



In [27]:
# Check how the observations are distributed on "MonthIdentified"
dist_identified = spark.sql("""

    SELECT MONTHIDENTIFIED,
           COUNT(*) AS CNT
    FROM REDLIST
    GROUP BY MONTHIDENTIFIED
    ORDER BY MONTHIDENTIFIED ASC
    
""").show()

+---------------+-------+
|MONTHIDENTIFIED|    CNT|
+---------------+-------+
|              0|4215033|
|              1|  23978|
|             10|  15870|
|             11|   5216|
|             12|   3809|
|              2|   3400|
|              3|   2485|
|              4|   4090|
|              5|   9104|
|              6|  20445|
|              7|  23163|
|              8|  29187|
|              9|  28510|
+---------------+-------+



In [33]:
# Check how the observations are distributed on "MonthCollected"
dist_identified = spark.sql("""
    -- Check how data is distributed on "MonthCollected"
    SELECT MONTHCOLLECTED,
           COUNT(*) AS CNT
    FROM REDLIST
    GROUP BY MONTHCOLLECTED
    ORDER BY MONTHCOLLECTED ASC
    
""").show()

+--------------+------+
|MONTHCOLLECTED|   CNT|
+--------------+------+
|             1|271527|
|             2|182156|
|             3|307859|
|             4|544599|
|             5|710578|
|             6|532953|
|             7|408897|
|             8|460916|
|             9|347744|
|            10|302214|
|            11|182210|
|            12|132637|
+--------------+------+



In [17]:
# Copy our local file to S3 using AWS CLI
!aws s3 cp output+filename s3a://<bucket-name> --recursive