In [1]:
import json
from datetime import date, timedelta
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import lpad, concat, col, lit, to_timestamp, udf, create_map
from pyspark.sql.types import IntegerType
from itertools import chain
import pandas as pd
import os
 

In [2]:
#rename columns
def renameDataframe(df):
    df = df.withColumnRenamed("Time-Kvarter","Time") \
        .withColumnRenamed("MålerVærdier", "MeterValue") \
        .withColumnRenamed("MålerEgenskab", "MeterType") \
        .withColumnRenamed("MålerArtBeskrivelse", "MeterDescribe") 
    return df

#preprocess dataframe
def processDataframe(df):
    df = df.withColumn("MålerVærdier", df["MålerVærdier"].cast("float")) \
        .withColumn("Time-Kvarter", lpad(df['Time-Kvarter'],4,'0')) \
        .withColumn('Datetime', concat(col('Dato'),lit('-'),col('Time-Kvarter'))) \
        .withColumn('FullAdresses', concat(col('InstallationAdresse'),lit(' '),col('InstallationPostNr'))) \
        .withColumn("Datetime", to_timestamp("Datetime", "yyyy-MM-dd-HHmm")) \
        .withColumn("Dato", to_timestamp("Dato", "yyyy-MM-dd")) 
    # df = df.drop("Dato")
    df = df.drop("Dataset")
    df = df.drop("InstallationAdresse")
    df = df.drop("InstallationPostNr")
    df = df.drop("MeterRegister")
    return df

In [14]:
#init spark app
spark = SparkSession.builder \
            .master("local") \
            .appName("Tref") \
            .config("spark.executor.memory", "4g") \
            .getOrCreate()


In [20]:
# clean parquet and store as csv
path = 'data/2020/monthly/'
files = os.listdir(path)

with open('data/address500.json') as f:
    addressList = json.load(f)

for index, file in enumerate(files):
    print(file)
    df = None
    df = spark.read.parquet(os.path.join(path,file))
    df = processDataframe(df)
    df = renameDataframe(df)
    df = df.filter(col('InstallationsID').isin(addressList))
    df = df.toPandas()
    df['FullAdresses'] = df['FullAdresses'].str.replace(',', ' ')
    df.to_csv(os.path.join(path,file[-14:-8]+'.csv'))

In [21]:
#from csv 
d ={1:'',2:'',3:'',4:''}
for i in range(1,5):
    d[i] = pd.read_csv('data_'+str(i)+'.csv')

In [22]:
df = pd.concat([d[1], d[2], d[3], d[4]])
df = df.sort_values(by=['hour'])
df.to_csv('data2020.csv')

In [39]:
df19 = pd.read_csv('data2019.csv')
df20 = pd.read_csv('data2020.csv')
df = pd.concat([df19,df20])
df = df.sort_values(by=['hour'])
df.to_csv('dataAll.csv')

In [32]:
dfnew = pd.read_csv('dataAll.csv')
dfold = pd.read_csv('dataFull.csv')

In [33]:
len(dfnew)

17448

In [31]:
len(dfold)

17520

In [1]:
import pandas as pd
import logging
import requests
import json
import psycopg2
from datetime import datetime
from dateutil import tz

class DMIRetriever:
    def __init__(self, path, url):
        self.keyDMI = self.initKeyDMI(path)
        self.urlDMI = url

    def initKeyDMI(self, filePath):
        with open(filePath, 'r') as f:
            key = f.read()
        return key

    # TODO: one function should only do one thing, consider split the function below (one for web request, one for parse the web response result)
    def getWeatherData(self, startDate, endDate, stationId="06123", field=None, limit='100000000') -> pd.DataFrame:
        """
        get weather data from DMI
        """
        query = self._generateDMIQuery(
            field=field, startDate=startDate, endDate=endDate, stationId=stationId, limit=limit)
        try:
            r = requests.get(self.urlDMI, params=query)
            logging.debug(r)

        except requests.exceptions.RequestException as e:
            raise SystemExit(e)

        if r.status_code != 200:
            logging.debug(r.status_code)
            raise ValueError(str(r))
        json = r.json()

        # json to dataframe
        df = pd.DataFrame(json)
        return self._cleanDMIData(df)

    def _cleanDMIData(self, df):
        # clean data
        df['time'] = pd.to_datetime(df['timeObserved'], unit='us', utc=False)
        df = df.drop(['_id', 'timeCreated', 'timeObserved',
                      'stationId', 'parameterId'], axis=1)
        df.columns = ['temp', 'datetime']

        return df.set_index('datetime').sort_index(ascending=True)

    def _generateDMIQuery(self, startDate, endDate, stationId, field, limit) -> dict:
        """
        Generate a dmi query
        """
        # reformat datetime
        startDate = datetime.strptime(startDate, '%Y-%m-%d')
        endDate = datetime.strptime(endDate, '%Y-%m-%d')

        startDate = str(int(pd.to_datetime(startDate).value * 10**-3))
        endDate = str(int(pd.to_datetime(endDate).value * 10**-3))

        # create a dict for query
        query = {
            'api-key': self.keyDMI,
            'from': startDate,
            'to': endDate,
            'limit': limit
        }
        if field:
            query['parameterId'] = field
        if stationId:
            query['stationId'] = stationId
        return query

In [7]:
urlDMI = 'https://dmigw.govcloud.dk/metObs/v1/observation'
dmiRetriever = DMIRetriever(path='apikey.txt', url=urlDMI)

    # clean temperature data frame
dfTemp = dmiRetriever.getWeatherData(startDate='2019-01-01', endDate='2021-01-01',stationId="06123", field='temp_mean_past1h', limit='100000000000000')


In [8]:
dfTemp.to_csv('temp.csv')

In [9]:
dfTemp

Unnamed: 0_level_0,temp
datetime,Unnamed: 1_level_1
2019-01-01 00:00:00,7.7
2019-01-01 01:00:00,7.9
2019-01-01 02:00:00,7.9
2019-01-01 03:00:00,7.9
2019-01-01 04:00:00,8.0
...,...
2020-12-31 19:00:00,3.3
2020-12-31 20:00:00,2.4
2020-12-31 21:00:00,1.2
2020-12-31 22:00:00,3.5
