In [110]:
from pyspark import SparkContext,HiveContext
from pyspark.sql import SQLContext,SparkSession
from pyspark.sql import functions as F
import os
import subprocess
import pandas as pd
from functools import reduce
from datetime import date

In [78]:
ruta = '/aire'

In [79]:
cmd = 'hdfs dfs -ls %s'%ruta
lst = subprocess.check_output(cmd, shell=True).decode('utf8').strip().split('\n')
lst = [x.split(' ')[-1] for x in lst]
lst = sorted(['hdfs://%s'%x for x in lst if (x!='items')&(x[-3:]=='csv')])
lst

['hdfs:///aire/2010CO.csv',
 'hdfs:///aire/2010NO.csv',
 'hdfs:///aire/2010NO2.csv',
 'hdfs:///aire/2010NOX.csv',
 'hdfs:///aire/2010O3.csv',
 'hdfs:///aire/2010PM10.csv',
 'hdfs:///aire/2010PM25.csv',
 'hdfs:///aire/2010SO2.csv',
 'hdfs:///aire/2011CO.csv',
 'hdfs:///aire/2011NO.csv',
 'hdfs:///aire/2011NO2.csv',
 'hdfs:///aire/2011NOX.csv',
 'hdfs:///aire/2011O3.csv',
 'hdfs:///aire/2011PM10.csv',
 'hdfs:///aire/2011PM25.csv',
 'hdfs:///aire/2011PMCO.csv',
 'hdfs:///aire/2011SO2.csv',
 'hdfs:///aire/2012CO.csv',
 'hdfs:///aire/2012NO.csv',
 'hdfs:///aire/2012NO2.csv',
 'hdfs:///aire/2012NOX.csv',
 'hdfs:///aire/2012O3.csv',
 'hdfs:///aire/2012PM10.csv',
 'hdfs:///aire/2012PM25.csv',
 'hdfs:///aire/2012PMCO.csv',
 'hdfs:///aire/2012SO2.csv',
 'hdfs:///aire/2013CO.csv',
 'hdfs:///aire/2013NO.csv',
 'hdfs:///aire/2013NO2.csv',
 'hdfs:///aire/2013NOX.csv',
 'hdfs:///aire/2013O3.csv',
 'hdfs:///aire/2013PM10.csv',
 'hdfs:///aire/2013PM25.csv',
 'hdfs:///aire/2013PMCO.csv',
 'hdfs:///aire/

In [80]:
sparkSession = SparkSession.builder.appName("bigdatita").getOrCreate()

In [81]:
df = sparkSession.read.csv(lst,inferSchema=False,header=True)

In [82]:
df.printSchema()

root
 |-- FECHA: string (nullable = true)
 |-- HORA: string (nullable = true)
 |-- ESTACION: string (nullable = true)
 |-- VALOR: string (nullable = true)
 |-- CONTAMINANTE: string (nullable = true)



In [83]:
# Convertir a fecha y hora
df = df.withColumn('FECHA',F.to_timestamp(F.col('FECHA')))# Se usan las funciones que están en F
df = df.withColumn('HORA',F.col('HORA').cast('int'))
df = df.withColumn('VALOR',F.col('VALOR').cast('double'))

In [84]:
df.printSchema()

root
 |-- FECHA: timestamp (nullable = true)
 |-- HORA: integer (nullable = true)
 |-- ESTACION: string (nullable = true)
 |-- VALOR: double (nullable = true)
 |-- CONTAMINANTE: string (nullable = true)



In [85]:
# Creamos catalogos de fecha y hora para poder poner un sencillo identificador en el df principal
# Se convierte a pandas por que es más fácil
catfh = df.select('FECHA','HORA').drop_duplicates().toPandas()
catfh = catfh.sort_values(by=['FECHA','HORA']).reset_index(drop=True)
catfh['id'] = catfh.index+1
anclai,anclaf = catfh['id'].min(), catfh['id'].max()
catfh = spark.createDataFrame(catfh)

In [86]:
df = df.join(catfh,['FECHA','HORA'],'inner')

In [87]:
df = df.drop('FECHA','HORA')

In [88]:
df.printSchema()

root
 |-- ESTACION: string (nullable = true)
 |-- VALOR: double (nullable = true)
 |-- CONTAMINANTE: string (nullable = true)
 |-- id: long (nullable = true)



In [89]:
vobs = 1000
vdes =  1
anclai,anclaf = anclai+vobs-1,anclaf-vdes
anclai,anclaf

(1000, 97847)

In [91]:
df.show(5)

+--------+-----+------------+---+
|ESTACION|VALOR|CONTAMINANTE| id|
+--------+-----+------------+---+
|     VAL|  0.0|         SO2| 76|
|     SUR|  0.0|         SO2| 76|
|     TAC|  2.0|         SO2| 76|
|     FAC|  2.0|         SO2| 76|
|     LLA| null|         SO2| 76|
+--------+-----+------------+---+
only showing top 5 rows



In [107]:
'''
Ahora la idea es predecir el valor que tendra un dia después cada contaminante por estacion/id(datetime)
a partir de mil oobservaciones,
para ello creamos varias columnas con agregados
'''
def ing(df,k,ancla):
    aux = df.filter((df['id']>=(ancla-k+1))&(df['id']<=ancla))
    expr = [y(F.col('VALOR')).alias(f'x_{z}_{k}') for y,z in zip([F.min,F.max,F.mean,F.stddev],
                                                             ['minimo','maximo','media','desv'])]
    aux = aux.groupBy('ESTACION').pivot('CONTAMINANTE').agg(*expr).withColumn('ancla',F.lit(ancla))
    return aux

In [111]:
um = ['ESTACION','ancla']

In [None]:
%%time
step = 100
aux = reduce(lambda x,y:x.join(y,um,'outer'),map(lambda k:ing(df,k,1000),range(step,vobs+step,step)))

In [None]:
aux.printSchema()

In [None]:
X = reduce(lambda x,y:x.union(y),map(lambda ancla:reduce(lambda x,y:x.join(y,um,'outer'),
                    map(lambda k:ing(df,k,ancla),range(step,vobs+step,step))),range(anclai,anclaf+1)))