# Utilities notebook

## Setting up functions

In [75]:
import os
from pyspark.sql import SparkSession
import requests
import pandas as pd
import numpy as np
from cassandra.cluster import Cluster
from exceptions import *


# Set pyspark env

os.environ["PYSPARK_PYTHON"] = "python"

spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()


cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()
session.set_keyspace('compulsory')

In [2]:
# Set up request
def get_access_token():
    """Function to get access token from Barentswatch API

    Returns:
        str: Access token
    """
    url = "https://id.barentswatch.no/connect/token"
    # Read secret key from file
    secret_key = open(r'..\..\..\IND320\No_sync\fish_api', 'r').read()

    # Set up request to get access token
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
        "client_id": "erlend.risvik@gmail.com:fishclient",
        "scope": "api",
        "client_secret": secret_key,
        "grant_type": "client_credentials"
    }

    response = requests.post(url, headers=headers, data=data)
    return response.json()['access_token']

def convert_to_spark(df):
    """Function to convert pandas dataframe to spark dataframe

    Parameters:
    -----------
    df : pandas dataframe
        Dataframe to convert

    Returns:
    --------
    spark dataframe
    """
    return spark.createDataFrame(df)

def write_to_cassandra(df, table_name):
    """Function to write data to cassandra database

    Parameters:
    -----------
    df : pandas dataframe
        Dataframe to write
    table_name : str
        Name of table to write to
    """
    
    df_spark = convert_to_spark(df)
    (df_spark.write
     .format("org.apache.spark.sql.cassandra")
     .options(table=table_name, keyspace="compulsory")
     .mode("append")
     .save())

def check_exist_fish(year):
    """Function to check if data exists in database.'
    Parameters:
    -----------
    year : int
        Year of data

    Returns:
    --------
    bool: True if data exists, False if not
    """

    (spark.read.format("org.apache.spark.sql.cassandra")
    .options(table = 'fish_data_full', keyspace="compulsory")
    .load()
    .createOrReplaceTempView('fish_data_full'))
    
    check = spark.sql(f"SELECT count(*) FROM fish_data_full WHERE year = {year}")   
    return check.collect()[0][0] >= 1 

def check_exist_lice(locality, year):
    """Function to check if data exists in database.'
    Parameters:
    -----------
    locality : int
        Locality number
    year : int
        Year of data

    Returns:
    --------
    bool: True if data exists, False if not
    """

    (spark.read.format("org.apache.spark.sql.cassandra")
    .options(table = 'lice_data_full', keyspace="compulsory")
    .load()
    .createOrReplaceTempView('lice_data_full'))
    
    check = spark.sql(f"SELECT count(*) FROM lice_data_full WHERE year = {year} AND localityno = {locality}")   
    return check.collect()[0][0] >= 1 

def get_one_week_fish_data(year, week, access_token):
    """Function to get fish data from Barentswatch API.
    
    Parameters:
    -----------
    year : int
        Year of data
    week : int
        Week of data
    access_token : str
        Access token from Barentswatch API
    Returns:
    --------
    json: json object with data
    """

    # Set url to correct API address
    url = f"https://www.barentswatch.no/bwapi/v1/geodata/fishhealth/locality/{year}/{week}"

    headers = {
        "Authorization": "Bearer "+ access_token}

    df = requests.get(url, headers = headers).json()
    return df

def get_one_year_fish_data(year, access_token):
    """Function to get all fish data from Barentswatch API limited to one year.

    Parameters:
    -----------
    access_token : str
        Access token from Barentswatch API
    Returns:
    --------
    df: pandas dataframe with data
    """
    if check_exist_fish(year):
        return None

    # Set list of weeks (1-52).
    weeks = np.arange(1, 53)
    df = pd.DataFrame()
    for week in weeks:
        data = get_one_week_fish_data(year = year, week = week, access_token = access_token)["localities"]
        data = pd.DataFrame(data)
        data["year"] = year
        data["week"] = week
        df = pd.concat([df, data], ignore_index=True)

    df.columns = df.columns.str.lower()
    try:
        write_to_cassandra(df = df, table_name = "fish_data_full")
    except:
        return None

def get_one_week_lice_data(localty, year, week, access_token):
    """Function to get lice count data from Barentswatch API.

    Parameters:
    -----------
    localty : int
        Localty number
    year : int
        Year of data
    week : int
        Week of data
    access_token : str
        Access token from Barentswatch API

    Returns:
    --------
    json: json object with data 
    """

    # Set url to correct API address
    url = f'https://www.barentswatch.no/bwapi/v1/geodata/fishhealth/locality/{localty}/{year}/{week}'
    headers = {
        "Authorization": "Bearer "+ access_token}
    
    df = requests.get(url, headers=headers).json()
    return df

def get_one_year_lice_data(locality, year, access_token):
    """
    Function to get all lice count data from Barentswatch API limited to one year.

    Parameters:
    -----------
    localty : int
        Localty number
    year : int
        Year of data
    access_token : str
        Access token from Barentswatch API
    Returns:
    --------
    df: pandas dataframe with data
    """

    if check_exist_lice(locality, year):
        return None

    # Set list of weeks (1-52).
    weeks = np.arange(1, 53)
    df = pd.DataFrame()
    for week in weeks:
        data = get_one_week_lice_data(localty = locality, year = year, week = week, access_token = access_token)["localityWeek"]
        for key, value in data.items():
            # Set to list to make it compatible to convert to pandas dataframe
            data[key] = [value]
        # Dropping columns that contain purely None and nested dictionaries
        data = pd.DataFrame(data).drop(columns = ["bathTreatments", "cleanerFish", "inFeedTreatments", \
                                                  "mechanicalRemoval", "timeSinceLastChitinSynthesisInhibitorTreatment"]) 
        data["year"] = year
        data["week"] = week
        df = pd.concat([df, data], ignore_index=True)
    # Lowercase column names
    df.columns = df.columns.str.lower()
    try:
        write_to_cassandra(df = df, table_name = "lice_data_full")
    except:
        return None

def clean_table(table_name):
    """Function to clean table in cassandra database

    Parameters:
    -----------
    table_name : str
        Name of table to clean
    """
    session.execute(f"TRUNCATE {table_name}")


def get_df(table_name):

    (spark.read.format("org.apache.spark.sql.cassandra")
    .options(table=table_name, keyspace="compulsory")
    .load()
    .createOrReplaceTempView(table_name))

    df = spark.sql(f"select * from {table_name}").toPandas()
    df = df.sort_values(by=['week'])
    # quick fix to fix datatype.
    if table_name == 'fish_data_full':
        df['lat'] = df['lat'].astype(np.float64)
        df['lon'] = df['lon'].astype(np.float64)

    return df

access_token = get_access_token()

In [None]:
clean_table('lice_data_full')

## Testing the functions

### Fish data

In [71]:
# Start by listing the empty table
table_name = 'fish_data_full'
clean_table(table_name)

(spark.read.format("org.apache.spark.sql.cassandra")
 .options(table=table_name, keyspace="compulsory")
 .load()
 .createOrReplaceTempView(table_name))

fish_data = spark.sql(f"select * from {table_name}").toPandas()
fish_data.head()

Unnamed: 0,localityweekid,avgadultfemalelice,hascleanerfishdeployed,hasila,hasmechanicalremoval,haspd,hasreportedlice,hassalmonoids,hassubstancetreatments,infilteredselection,...,isonland,isslaughterholdingcage,lat,localityno,lon,municipality,municipalityno,name,week,year


In [72]:
# Next, we fetch data for 2015. We do it twice to see if it gets updated only once.
get_one_year_fish_data(year = 2015, access_token = access_token)
get_one_year_fish_data(year = 2015, access_token = access_token)

In [73]:
# Then we print the table again to see if it has been updated (only once)
(spark.read.format("org.apache.spark.sql.cassandra")
 .options(table=table_name, keyspace="compulsory")
 .load()
 .createOrReplaceTempView(table_name))

fish_data = spark.sql(f"select * from {table_name}").toPandas()
fish_data

Unnamed: 0,localityweekid,avgadultfemalelice,hascleanerfishdeployed,hasila,hasmechanicalremoval,haspd,hasreportedlice,hassalmonoids,hassubstancetreatments,infilteredselection,...,isonland,isslaughterholdingcage,lat,localityno,lon,municipality,municipalityno,name,week,year
0,764221,,False,False,False,False,False,False,False,True,...,False,False,60.471451,18496,6.764933,Ullensvang,1231,Kaland,10,2015
1,88187,0.32,False,False,False,True,True,True,False,True,...,False,False,60.407501,10338,6.356717,Kvam,1238,Djupevik,20,2015
2,339587,,False,False,False,False,False,True,False,True,...,False,False,61.212399,12158,7.093267,Sogndal,1420,Skjersnes,24,2015
3,43161,0.00,False,False,False,False,True,True,False,True,...,False,False,59.832218,28096,5.984267,Kvinnherad,1224,Slåttenes,39,2015
4,872461,,False,False,False,False,False,False,False,True,...,False,False,63.954182,10248,10.085183,Åfjord,1630,Eidskjæra Nø,51,2015
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
88659,5943,,False,False,False,False,False,True,True,True,...,False,False,61.297916,32317,4.660450,Askvoll,1428,Guriøyna,4,2015
88660,31804,,False,False,False,True,False,True,False,True,...,False,False,63.822449,33557,8.740750,Frøya,1620,Langskjæra II,1,2015
88661,799676,,False,False,False,False,False,False,False,True,...,False,False,62.928532,28776,7.580517,Eide,1551,Krekvikbogen,27,2015
88662,300508,,False,False,False,False,False,True,False,True,...,False,False,63.462101,31257,7.856233,Smøla,1573,Hjortholman 2,47,2015


88k rows checks out!

### Lice data

In [74]:
# Start by listing the empty table
table_name = 'lice_data_full'
clean_table(table_name)

(spark.read.format("org.apache.spark.sql.cassandra")
 .options(table=table_name, keyspace="compulsory")
 .load()
 .createOrReplaceTempView(table_name))

lice_data = spark.sql(f"select * from {table_name}").toPandas()
lice_data.head()

Unnamed: 0,id,avgadultfemalelice,avgmobilelice,avgstationarylice,hasbathtreatment,hascleanerfishdeployed,hasinfeedtreatment,hasmechanicalremoval,hasreportedlice,hassalmonoids,isfallow,isslaughterholdingcage,localityno,seatemperature,week,year


In [75]:
# Next, we fetch data for 2015 and locality 24175. We do it twice to see if it gets updated only once.

get_one_year_lice_data(locality = 24175, year = 2017, access_token = access_token)
get_one_year_lice_data(locality = 24175, year = 2017, access_token = access_token)

In [76]:
# Then we print the table again to see if it has been updated (only once)
(spark.read.format("org.apache.spark.sql.cassandra")
 .options(table=table_name, keyspace="compulsory")
 .load()
 .createOrReplaceTempView(table_name))

lice_data = spark.sql(f"select * from {table_name}").toPandas()
lice_data

Unnamed: 0,id,avgadultfemalelice,avgmobilelice,avgstationarylice,hasbathtreatment,hascleanerfishdeployed,hasinfeedtreatment,hasmechanicalremoval,hasreportedlice,hassalmonoids,isfallow,isslaughterholdingcage,localityno,seatemperature,week,year
0,495742,0.01,0.03,0.01,False,False,False,False,True,True,False,False,24175,10.6,27,2017
1,477446,0.01,0.07,0.05,False,False,False,False,True,True,False,False,24175,3.9,18,2017
2,461843,0.0,0.04,0.03,False,False,False,False,True,True,False,False,24175,4.3,10,2017
3,472303,0.02,0.05,0.01,False,False,False,False,True,True,False,False,24175,3.6,15,2017
4,537072,,,,False,False,False,False,False,True,False,False,24175,5.2,49,2017
5,527788,0.46,0.55,0.27,False,False,False,False,True,True,False,False,24175,8.13,44,2017
6,470589,0.02,0.07,0.19,False,False,False,False,True,True,False,False,24175,3.6,14,2017
7,455120,0.02,0.03,0.06,False,False,False,False,True,True,False,False,24175,4.3,7,2017
8,523717,0.28,0.22,0.19,True,False,False,False,True,True,False,False,24175,9.57,42,2017
9,278917,0.0,0.03,0.04,False,False,False,False,True,True,False,False,24175,5.1,1,2017


Checks out!

## Weather data

In [3]:
import ast

SECRET_INFO = open("../../NO_SYNC/weather_api", 'r').read().replace('\n', '')
SECRET_ID = ast.literal_eval(SECRET_INFO)["client_id"]

In [4]:
os.environ["PYSPARK_PYTHON"] = "python"

spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()

(spark.read.format("org.apache.spark.sql.cassandra")
 .options(table="fish_data_full", keyspace="compulsory")
 .load()
 .createOrReplaceTempView("fish_data_full"))

In [5]:
lice_data = get_df('lice_data_full')
fish_data = get_df('fish_data_full')

In [6]:
lice_data["localityno"].value_counts()

15462    52
23695    52
15375    52
30817    52
Name: localityno, dtype: int64

In [77]:
def get_cords(df, localityno):
    """Function to get coordinates from dataframe

    Parameters:
    -----------
    df : pandas dataframe
        Dataframe to get coordinates from
    localityno : int
        Locality number to get coordinates from

    Returns:
    --------
    list: list of tuples with coordinates
    """
    # there are multiple rows with the same localityno, so we need to get the first one
    subset = df[df["localityno"] == localityno].iloc[0:1]

    return float(subset["lat"]), float(subset["lon"])

def get_nearest_stations(lat, lon):
    """Function to get nearest weather station from frost.met.no

    Parameters:
    -----------
    lat : float
        Latitude
    lon : float
        Longitude

    Returns:
    --------
    json: json object with data
    """
    # Set up parameters

    endpoint = 'https://frost.met.no/sources/v0.jsonld'
    parameters = {
    "geometry" : f"nearest(POINT({lon} {lat}))",
    "nearestmaxcount": 20,
    }

    # Issue an HTTP GET request
    r = requests.get(endpoint, parameters, auth=(SECRET_ID,''))
    # Extract JSON data
    json = r.json()

    # Check if the request worked, print out any errors
    if r.status_code == 200:
        data = json['data']
        # extract the list of source ids and distance as a tuple
        data = [(d['id'], d['distance']) for d in data]
        return data
    else:
        raise FetchDataError(f"Request failed with status code {r.status_code}")
    
def get_daily_data(df, localityno, year):
    """Function to get daily weather data from frost.met.no

    Parameters:
    -----------
    df : pandas dataframe
        Dataframe to get coordinates from
    localityno : int
        Locality number
    year : int
        Year of data

    Returns:
    --------
    df3: pandas dataframe with data
    """ 
    
    lat, lon = get_cords(df = df, localityno = localityno)
    stations = get_nearest_stations(lat, lon)
    ids = [d[0] for d in stations]
    distances = [d[1] for d in stations]
   
    endpoint = 'https://frost.met.no/observations/v0.jsonld'

    for idx, id in enumerate(ids):
        parameters = {
            'sources': id,
            'elements': 'sum(precipitation_amount P1D), mean(air_temperature P1D), mean(wind_speed P1D), mean(relative_humidity P1D)',
            'referencetime': f"{year}-01-01/{year}-12-31",
            'levels' : 'default',
            'timeoffsets': 'default'
        }

    # Issue an HTTP GET request
        r = requests.get(endpoint, parameters, auth=(SECRET_ID,''))
        # Extract JSON data
        json = r.json()

        df = pd.DataFrame()
        try: 
            data = json['data']
            for i in range(len(data)):
                row = pd.DataFrame(data[i]['observations'])
                row['referenceTime'] = data[i]['referenceTime']
                row['sourceId'] = data[i]['sourceId']
                df = pd.concat([df, row], ignore_index=True)

            df = df.reset_index(drop=True)

            columns = ['sourceId','referenceTime','elementId','value','unit','timeOffset']
            df2 = df[columns].copy()
            df2['referenceTime'] = pd.to_datetime(df2['referenceTime']).dt.strftime('%Y-%m-%d')
            
            df3 = df2.pivot(index='referenceTime', columns='elementId', values='value').reset_index()      
            df3.columns = ['date', 'temperature', 'humidity', 'wind_speed', 'precipitation']
        except:
            if idx == len(ids)-1 or distances[idx]>50:
                raise NoDataError("No data available")
            continue
        
        # add the distance as a column
        df3['distance'] = distances[idx]
        return df3
    
weather_data = get_daily_data(df = fish_data, localityno = 23695, year = 2015)
weather_data.head()

Unnamed: 0,date,temperature,humidity,wind_speed,precipitation,distance
0,2015-01-01,1.3,92.0,12.8,0.5,18.074864
1,2015-01-02,-2.4,98.0,3.4,7.9,18.074864
2,2015-01-03,-4.6,97.0,1.8,34.1,18.074864
3,2015-01-04,-5.4,97.0,2.1,52.5,18.074864
4,2015-01-05,-0.7,98.0,6.4,8.5,18.074864


In [72]:
def get_daily_data(df, localityno, year):
    """Function to get daily weather data from frost.met.no

    Parameters:
    -----------
    df : pandas dataframe
        Dataframe to get coordinates from
    localityno : int
        Locality number
    year : int
        Year of data

    Returns:
    --------
    df3: pandas dataframe with data
    """ 
    
    lat, lon = get_cords(df = df, localityno = localityno)
    stations = get_nearest_stations(lat, lon)
    ids = [d[0] for d in stations]
    distances = [d[1] for d in stations]
   
    endpoint = 'https://frost.met.no/observations/v0.jsonld'

    for idx, id in enumerate(ids):
        parameters = {
            'sources': id,
            'elements': 'sum(precipitation_amount P1D), mean(air_temperature P1D), mean(wind_speed P1D), mean(relative_humidity P1D)',
            'referencetime': f"{year}-01-01/{year}-12-31",
            'levels' : 'default',
            'timeoffsets': 'default'
        }

    # Issue an HTTP GET request
        r = requests.get(endpoint, parameters, auth=(SECRET_ID,''))
        # Extract JSON data
        json = r.json()

        df = pd.DataFrame()
        try: 
            data = json['data']
            for i in range(len(data)):
                row = pd.DataFrame(data[i]['observations'])
                row['referenceTime'] = data[i]['referenceTime']
                row['sourceId'] = data[i]['sourceId']
                df = pd.concat([df, row], ignore_index=True)

            df = df.reset_index(drop=True)

            columns = ['sourceId','referenceTime','elementId','value','unit','timeOffset']
            df2 = df[columns].copy()
            df2['referenceTime'] = pd.to_datetime(df2['referenceTime']).dt.strftime('%Y-%m-%d')
            
            df3 = df2.pivot(index='referenceTime', columns='elementId', values='value').reset_index()      
            df3.columns = ['date', 'temperature', 'humidity', 'wind_speed', 'precipitation']
        except:
            if idx == len(ids)-1 or distances[idx]>50:
                return None
            continue
        
        # add the distance as a column
        df3['distance'] = distances[idx]
        return df3

weather_data = get_daily_data(df = fish_data, localityno = 23695, year = 2015)
weather_data.head()


Unnamed: 0,date,temperature,humidity,wind_speed,precipitation,distance
0,2015-01-01,1.3,92.0,12.8,0.5,18.074864
1,2015-01-02,-2.4,98.0,3.4,7.9,18.074864
2,2015-01-03,-4.6,97.0,1.8,34.1,18.074864
3,2015-01-04,-5.4,97.0,2.1,52.5,18.074864
4,2015-01-05,-0.7,98.0,6.4,8.5,18.074864


In [74]:
def convert_to_weekly_data(weather_data):
    weather_data['date'] = pd.to_datetime(weather_data['date'])
    weather_data['week'] = weather_data['date'].dt.isocalendar().week

    # create the weekly_weather_data_mean DataFrame where we aggregate by weekly means
    weekly_weather_data_mean = pd.DataFrame()
    weekly_weather_data_mean['week'] = weather_data['week']
    weekly_weather_data_mean['humidity'] = weather_data['humidity']
    weekly_weather_data_mean['temperature'] = weather_data['temperature']
    weekly_weather_data_mean['wind_speed'] = weather_data['wind_speed']
    weekly_weather_data_mean = weekly_weather_data_mean.groupby('week').mean()

    # same for precipitation, but we use weekly sum
    weekly_weather_data_sum = pd.DataFrame()
    weekly_weather_data_sum['week'] = weather_data['week']
    weekly_weather_data_sum['precipitation'] = weather_data['precipitation']
    weekly_weather_data_sum = weekly_weather_data_sum.groupby('week').sum()

    # merging the two dataframes
    weekly_weather_data = pd.merge(weekly_weather_data_mean, weekly_weather_data_sum, left_index=True, right_index=True)
    return weekly_weather_data

weekly_weather_data = convert_to_weekly_data(weather_data)
weekly_weather_data

Unnamed: 0_level_0,humidity,temperature,wind_speed,precipitation
week,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,96.0,-2.775,5.025,95.0
2,93.142857,-1.771429,7.042857,34.9
3,91.428571,-2.828571,8.614286,35.5
4,72.285714,-4.228571,6.7,6.6
5,82.0,-2.957143,5.942857,8.4
6,93.428571,-3.971429,4.657143,41.4
7,84.428571,-0.014286,8.614286,8.9
8,85.571429,-1.542857,11.385714,26.9
9,72.285714,-0.5,12.257143,4.8
10,86.571429,-1.385714,10.342857,24.4
