In [None]:
import pandas as pd
import numpy as np
from shapely.geometry import Point, Polygon
from geopy.distance import geodesic
import geohash2
pd.options.mode.chained_assignment = None

In [None]:
def preprocess_dataframe_celdas(df, poligono=None, operador=None):
    """
    Preprocesa un DataFrame de celdas.
    Pasos:
    1. Cambiar latitud por longitud en caso de que los datos estén mal geolocalizados
    2. Filtrar por polígono si así se desea, incluyendo una ruta como argumento en la función (opcional)
    3. Filtrar por operador, incluyendo el str del operador como argumento (opcional)
    4. Calcular el CGI de las celdas con la columna cell_id_non_encrypted

    Argumentos:
    df (pandas.DataFrame): El DataFrame inicial con las celdas.
    poligono (str): Ruta al archivo WKT que contiene el polígono (opcional).
    operador (str): Operador con el que se desea filtrar las celdas (opcional).
    
    Returns:
    df (pandas.DataFrame): El DataFrame de las celdas preprocesado.
    """
    df = df.rename(columns={'latitude': 'Latitud', 'longitude': 'Longitud'})
    
    if poligono is not None:
        with open(poligono, "r") as f:
            contenido_wkt = f.read()
        poligono_shape = Polygon([(float(x), float(y)) for x, y in [pair.split() for pair in contenido_wkt[10:-2].split(", ")]])
        
        df['Poligono'] = df.apply(lambda row: poligono_shape.contains(Point(row['Longitud'], row['Latitud'])), axis=1)
        df = df[df['Poligono']]
        df.drop(columns=['Poligono'], inplace=True)

    if operador is not None:
        if operador == 'Movistar':
            df = df[df['cell_id_non_encrypted'].str.split('-').str[1] == '7']
        if operador == 'Orange':
            df = df[df['cell_id_non_encrypted'].str.split('-').str[1] == '3']
        if operador == 'Vodafone':
            df = df[df['cell_id_non_encrypted'].str.split('-').str[1] == '1']

    df['CGI'] = df['cell_id_non_encrypted'].str.split('-').apply(lambda x: int(x[2]) * 256 + int(x[3]))
    
    return df

In [None]:
def preprocess_dataframe_drive_test(df,df_celdas, poligono=None):
    """
    Preprocesa un DataFrame de drive test.
    Pasos:
    1. Cambiar latitud por longitud en caso de que los datos estén mal geolocalizados
    2. Filtrar por polígono si así se desea, incluyendo una ruta como argumento en la función (opcional)
    3. Asignar CGI a cada medida en función del DataFrame de las celdas
    4. Asignar Banda de Frecuencia a cada medida
    5. Calcular franja horaria, día y geohash de la conexión

    Argumentos:
    df (pandas.DataFrame): El DataFrame inicial que contiene las medidas del drive test.
    df_celdas (pandas.DataFrame): El DataFrame preprocesado que contiene las celdas.
    poligono (str): Ruta al archivo WKT que contiene el polígono (opcional).
    
    Returns:
    df (pandas.DataFrame): El DataFrame de las medidas drive test preprocesado.
    """

    df.rename(columns={'Latitud': 'Longitud', 'Longitud': 'Latitud'}, inplace=True)
    
    if poligono is not None:

        with open(poligono, "r") as f:
            contenido_wkt = f.read()
        poligono_shape = Polygon([(float(x), float(y)) for x, y in [pair.split() for pair in contenido_wkt[10:-2].split(", ")]])
        

        df['Poligono'] = df.apply(lambda row: poligono_shape.contains(Point(row['Longitud'], row['Latitud'])), axis=1)
        df = df[df['Poligono']]
        df.drop(columns=['Poligono'], inplace=True)
        
    for index, row in df.iterrows():
        coordenadas_conexion = (row['Latitud'], row['Longitud'])
        celdas_filtradas = df_celdas[(df_celdas['pci'] == row['PCI']) & (df_celdas['earfcn'] == row['EARFCN'])]
        distancias = []
        for _, celda in celdas_filtradas.iterrows():
            coordenadas_celda = (celda['Latitud'], celda['Longitud'])
            distancia = geodesic(coordenadas_conexion, coordenadas_celda).meters
            distancias.append((distancia, celda['CGI']))
        if distancias:
            distancia_minima, eNodeB = min(distancias, key=lambda x: x[0])
            df.at[index, 'CGI'] = eNodeB
        else:
            df.at[index, 'CGI'] = None
            
    df = df.dropna(subset=['CGI'])
    df['CGI'] = df['CGI'].astype(int)
    
    df['BAND_FREQ'] = None 

    for index, row in df.iterrows():
        cgi = row['CGI']
        matching_cell = df_celdas[df_celdas['CGI'] == cgi]
        if not matching_cell.empty:
            band_freq = matching_cell.iloc[0]['cell_frequency_band']
            df.at[index, 'BAND_FREQ'] = band_freq
    
    df['Fecha'] = pd.to_datetime(df['Fecha'])
    df['Franja_horaria'] = pd.cut(df['Fecha'].dt.hour, bins=range(0, 25, 1), labels=range(1, 25), right=False)
    
    df['Dia'] = df['Fecha'].dt.dayofweek
    df = df[~df['Fecha'].dt.dayofweek.isin([5, 6])]
    
    df['Geohash'] = df.apply(lambda row: geohash2.encode(row['Latitud'], row['Longitud'], precision=8), axis=1)
            
    return df

In [None]:
def preprocess_dataframe_crowdsourced(df, df_drive_test, poligono=None, operador=None):
    """
    Preprocesa un DataFrame de datos crowdsourced.
    Pasos:
    1. Calcular CGI de las medidas a partir de la columna cell_id_non_encrypted
    2. Renombrar varias columnas para hacerlo lo más semejante posible al DataFrame del drive test
    3. Filtrar por polígono si así se desea, incluyendo una ruta como argumento en la función (opcional)
    4. Filtrar por vehículo
    5. Filtrar por operador si así se desea, incluyendo el nombre del operador como argumento en la función (opcional)
    6. Calcular franja horaria y filtrar por las mismas que haya en el drive test
    7. Calcular día de la semana y eliminar fines de semana, asumiendo un drive test nunca se va a realizar esos días
    8. Calcular geohash de la conexión mediante su latitud y longitud
    
    Argumentos:
    df (pandas.DataFrame): El DataFrame inicial con las medidas crowdsourced.
    df_drive_test (pandas.DataFrame): El Dataframe procesado de las medidas del drive test.
    poligono (str): Ruta al archivo WKT que contiene el polígono (opcional).
    operador (str): Operador con el que se desea filtrar las celdas (opcional).
    
    Returns:
    df (pandas.DataFrame): El DataFrame de las medidas crowdsourced preprocesado.
    """

    df['CGI'] = df['cell_id_non_encrypted'].str.split('-').apply(lambda x: int(x[2]) * 256 + int(x[3]))
    
    df.rename(columns={'carrier':'Operador','hour': 'Hora', 'gps_latitude':'Latitud','gps_longitude':'Longitud','cell_frequency_band':'BAND_FREQ','timestamp_local':'Fecha','rsrp':'RSRP','rsrq':'RSRQ'}, inplace=True)
    
    if poligono is not None:

        with open(poligono, "r") as f:
            contenido_wkt = f.read()
        poligono_shape = Polygon([(float(x), float(y)) for x, y in [pair.split() for pair in contenido_wkt[10:-2].split(", ")]])
        
        df['Poligono'] = df.apply(lambda row: poligono_shape.contains(Point(row['Longitud'], row['Latitud'])), axis=1)
        df = df[df['Poligono']]
        df.drop(columns=['Poligono'], inplace=True)
    
    df = df[df['location_status'] == 'in_vehicle']
    
    if operador is not None:
        if operador == 'Movistar':
            df = df[df['Operador'] == 'MOVISTAR']
        if operador == 'Orange':
            df = df[df['Operador'] == 'ORANGE']
        if operador == 'Vodafone':
            df = df[df['Operador'] == 'VODAFONE']
    
    df['Fecha'] = pd.to_datetime(df['Fecha'])
    df['Franja_horaria'] = pd.cut(df['Fecha'].dt.hour, bins=range(0, 25, 1), labels=range(1, 25), right=False)
    
    franjas_únicas = df_drive_test['Franja_horaria'].unique()
    franjas_únicas = list(franjas_únicas)
    df = df[df['Franja_horaria'].isin(franjas_únicas)]
    
    df['Dia'] = df['Fecha'].dt.dayofweek
    df = df[~df['Fecha'].dt.dayofweek.isin([5, 6])]
    
    df['Geohash'] = df.apply(lambda row: geohash2.encode(row['Latitud'], row['Longitud'], precision=8), axis=1)
    
    return df