# Data Processing

In [None]:
import pandas as pd
import os
import json
from IPython.display import display
import matplotlib.pyplot as plt
import pyspark
from pyspark.sql import SparkSession
import datetime

clean_path="../data/clean/"
processing_path="../data/processing/"

In [None]:
# Initial Data
_parameters = pd.read_json(clean_path+"parameters.json")
_countries = pd.read_json(clean_path+"countries.json")
_locations = pd.read_json(clean_path+"locations.json")
   
display(_parameters)
display(_countries[_countries['name']=='Spain'])
display(_locations)

for (dirpath, dirnames, filenames) in os.walk(clean_path):
    for filename in filenames:
        if 'measurements' in filename:
            display(pd.read_json(clean_path+filename))

In [None]:
# Create Spark Context
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.getOrCreate()

In [None]:
measuraments= {}
for (dirpath, dirnames, filenames) in os.walk(clean_path):
    for filename in filenames:
        if 'measurements' in filename:
            measuraments[filename.replace('.json','')] = spark.read.json(clean_path+filename).rdd

measuraments[[*measuraments][0]].first()

In [None]:
# Structure Data

from dateutil import parser
from pyspark.sql.functions import *

for key in measuraments:
    measuraments[key] = measuraments[key].map(lambda i: (parser.parse(i['date']['utc']),i['value'],i['parameter'],i['unit'],i['city'],i['location']))
    print(measuraments[key].first())
    
    measuraments[key] = measuraments[key].toDF(schema=['date','value','parameter','unit','city','location'])
    #print(measuraments[key].dtypes)
    print(measuraments[key].count())
    
    measuraments[key] = measuraments[key].select(measuraments[key].date.cast('timestamp'),
                                                 measuraments[key].value,
                                                 measuraments[key].parameter,
                                                 measuraments[key].unit,
                                                 measuraments[key].city,
                                                 measuraments[key].location)
    
    measuraments_d[key] = measuraments[key].select('*', date_format('date', 'yyyy-MM-dd').alias('time_window')).groupby(['time_window', 'parameter']).agg({'value': 'avg'}).orderBy('time_window')
    measuraments_m[key] = measuraments[key].select('*', date_format('date', 'yyyy-MM').alias('time_window')).groupby(['time_window', 'parameter']).agg({'value': 'avg'}).orderBy('time_window')
  




    #print(measuraments[key].dtypes)
    
# agg per day
#measuraments[[*measuraments][0]] = measuraments[[*measuraments][0]].select('*', date_format('date', 'yyyy-MM-dd').alias('time_window')).groupby('time_window').agg({'value': 'avg'}).orderBy('time_window')
#measuraments[[*measuraments][0]].show()

"""
from dateutil import parser
measuraments[key]['date'].map(lambda x: parser.parse(x))
measuraments[key]['date'] = pd.to_datetime(measuraments[key]['date']).dt.tz_localize(None)

measuraments[key] = spark.createDataFrame(measuraments[key])
print(type(measuraments[key]))


#from dateutil import parser
#measuraments[key].withColumn("date", "0")
#print(measuraments[key].first())
"""

# TODO: UTC TIME TO DATETIME
# TODO: https://eradiating.wordpress.com/2016/02/27/aggregating-time-series-with-spark-dataframe/

In [None]:
print(measuraments[key].dtypes)
measuraments[key].count()

In [None]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs) 

result = unionAll(*list(measuraments.values())).show()