In [1]:
import pyspark

sc = pyspark.sql.SparkSession.Builder().getOrCreate()

## Levanto los archivos

In [2]:
DIR = "datos/"
FILE_COORDENADAS = "DistancesCoordenadasUTM.csv"
FILE_CALIBRACION = "DatosCalibracion.csv"
FILE_RECEPTORES = ["DatosRC1.csv", "DatosRC2.csv", "DatosD1.csv", "DatosD2.csv"] 

def rdd_from_file(filename):
    return sc.read.csv(filename, header=True, inferSchema=True).rdd

coord_rdd = rdd_from_file(DIR+FILE_COORDENADAS)
cal_rdd = rdd_from_file(DIR+FILE_CALIBRACION)
recep_rdd = [rdd_from_file(DIR+name) for name in FILE_RECEPTORES]

## Utils

In [3]:
def apply_all(list_rdd, f):
    return list(map(f, list_rdd)) 

def remove_no_receptions(l):
    return list(filter(lambda e: e > 0,l))
 

## Filtro los datos de calibración vacios

In [4]:
cal_rdd = cal_rdd.filter(lambda x: x['Fecha'] != 'NA')

## Filtro datos de receptores invalidos (hay invalidos porque se repite el header)

In [5]:
recep_rdd = apply_all(recep_rdd, lambda recep: recep.filter(lambda row: row['Date'] is not None))


## Fix fechas ambiguas

In [6]:
import re
from datetime import datetime, timedelta

global pattern
pattern = re.compile('[1-9][0-9]*/[1-9][0-9]*/.*')

def fix_date_format(row, field):
    row_dict = row.asDict()
    date = row[field]
    format_from = '%m/%d/%Y'
    format_to = '%d/%m/%Y'
    if pattern.match(date):
        row_dict[field] = datetime.strptime(date, format_from).strftime(format_to)
    return row_dict

cal_rdd = cal_rdd.map(lambda x: fix_date_format(x,'Fecha'))
recep_rdd = apply_all(recep_rdd, lambda recep: recep.map(lambda x: fix_date_format(x, 'Date'))) 


## Agrego timestamp

In [7]:
def add_timestamp(row, date_field, time_field, suffix = ''):
    row_dict = row if isinstance(row, (dict)) else row.asDict()
    row_dict['timestamp'+suffix] = datetime.strptime(row[date_field]+' '+ row[time_field], '%d/%m/%Y %H:%M:%S')
    return row_dict
    
cal_rdd = cal_rdd.map(lambda x: add_timestamp(x, 'Fecha', 'Inicio', '_inicio')).map(lambda x: add_timestamp(x, 'Fecha', 'Fin', '_fin'))
recep_rdd = apply_all(recep_rdd, lambda recep: recep.map(lambda x: add_timestamp(x, 'Date', 'Time'))) 



## Saco recepciones de pajaros

In [8]:
cal_tags = cal_rdd.map(lambda x: int(x['Tag'])).distinct().collect()
recep_rdd = apply_all(recep_rdd, lambda recep: recep.filter(lambda x: x['Tag ID'] in cal_tags))

## Agrego las recepeciones de las antenas por cada periodo de emision

In [9]:
global recibidos_by
recibidos_by = [recep_rdd[i].collect() for i in range(len(recep_rdd))]


In [42]:
def normalize_seconds(recep, to_sec):
    timestamp = recep[1]
    recep_seconds = timestamp.second
    diff_mod = abs((recep_seconds%5) - (to_sec%5))
    delta = min(diff_mod, 5-diff_mod)
    if ((recep_seconds-delta)%5) == (to_sec%5):
        new_timestamp = timestamp-timedelta(seconds=delta)
    else:
        new_timestamp = timestamp+timedelta(seconds=delta)
    return (new_timestamp, recep[0])

def add_recep(calibr):
    for i in range(len(recibidos_by)):
        recepciones = []
        start_time = calibr['timestamp_inicio']
        all_recepcions = (
            list(map(lambda x: (x['Power'], x['timestamp']), filter(lambda x: int(calibr['Tag']) == x['Tag ID'] and start_time <= x['timestamp'] and x['timestamp'] < calibr['timestamp_fin'], recibidos_by[i])))
        )
        if len(all_recepcions) == 0:
            recepciones.extend([0]*24)
            continue
        else:
            first_recep_sec = all_recepcions[0][1].second
            print(all_recepcions)
            all_recepcions = list(map(lambda x: normalize_seconds(x, first_recep_sec), all_recepcions))
            recep_time_to_power = dict(all_recepcions)
            #print(all_recepcions)
            print(recep_time_to_power)
            assert len(recep_time_to_power) == len(all_recepcions)

            start_time = start_time + timedelta(seconds=first_recep_sec%5)
            for emision in range(24):
                print(start_time)
                recepciones.append(recep_time_to_power.get(start_time, 0))
                start_time=start_time+timedelta(seconds=5)
                    
        calibr['recep_{}'.format(i)] = recepciones

    return calibr

cal_rdd = cal_rdd.map(add_recep)


In [None]:
#add_recep(cal_rdd.filter(lambda x: x['Punto'] == 148).take(1)[0])

## Agrego datos de la 4ta antena a todos las emisiones.

In [43]:
cal_rdd_df = cal_rdd.toDF(sampleRatio=0.2)

def fix_null_recep(row):
    row_dict = row.asDict()
    if row_dict['recep_3'] is None:
        row_dict['recep_3'] = [0] * 24
    return row_dict

join=cal_rdd_df.drop('recep_3').join(cal_rdd_df.filter('sort_array(recep_3, False)[0] <> 0').select('Punto', 'recep_3'), 'Punto', 'left_outer')

cal_rdd = join.rdd.map(fix_null_recep)


## Saco emisiones exactamente iguales

In [44]:
cal_rdd = cal_rdd.groupBy(lambda x: (x['Punto'], x['Tag'], str(x['recep_0']+x['recep_1']+x['recep_2']+x['recep_3']))).map(lambda x: list(x[1])[0])


## Saco emisiones de la 4ta antena si ya estan en otra emision

In [45]:
points_with_4th_and_other_emission = cal_rdd.filter(lambda x: any(x['recep_3'])).groupBy(lambda x: x['Punto']).map(lambda x: (x[0],len(list(x[1])))).filter(lambda x: x[1] > 1)
points_list = points_with_4th_and_other_emission.map(lambda x: x[0]).collect()
cal_rdd = cal_rdd.filter(lambda x: not(x['Punto'] in points_list and not any(x['recep_0']+x['recep_1']+x['recep_2'])))


## Agrego posicion de los puntos

In [46]:
global dict_coordenadas
coordenadas_UTM = sc.read.csv('datos/DistancesCoordenadasUTM.csv', header=True, inferSchema=True).rdd
dict_coordenadas = coordenadas_UTM.map(lambda x: (x['Punto'],(x['X'], x['Y']))).collectAsMap()


def add_coord(row):
    coordinadas = dict_coordenadas[row['Punto']]
    row['x'] = coordinadas[0]
    row['y'] = coordinadas[1]
    return row

cal_rdd = cal_rdd.map(add_coord)

In [47]:
import json

def my_converter(o):
    if isinstance(o, datetime):
        return o.__str__()
    

cal_rdd.map(lambda x: json.dumps(x, default=my_converter)).saveAsTextFile('datos/points-recep-with-zeros.jsonlines')