In [2]:
import apache_beam as beam
import json
import polars as pl
import googlemaps
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

In [3]:
archivo_entrada = './raw-data/1.json'
archivo_salida = './clean-data/x.parquet'

In [4]:
# Configurar la API Key de Google Maps
with open('./APK/APK.txt', 'r') as file:
    api_key = file.read().strip()

gmaps = googlemaps.Client(key=api_key)

In [9]:
# Función para cargar un archivo JSON y convertirlo en un DataFrame de Pandas
def cargar_archivo_json(archivo):
    data_list=[]
    with open(archivo, 'r') as file:
        for line in file:
            # Cargar cada objeto JSON por separado
            data = json.loads(line)
            data_list.append(data)

    # Ahora data_list es una lista que contiene todos los objetos JSON del archivo
    df = pl.DataFrame(data_list)
    return df

In [6]:
# Función para realizar transformaciones en el DataFrame de Pandas
def aplicar_transformaciones(dft):
    dft = dft.drop(['address', 'description', 'price', 'hours', 'MISC', 'state', 'relative_results'])
    dft = dft.explode('category')
    dft = dft.filter(dft['category'].str.contains(f'(?i)hotel'))
    dft = dft.unique(subset=['gmap_id'], keep='first')
    return dft

In [7]:
# Definir la función para obtener city y country a partir de coordenadas
def obtener_geo(df2):
    counties, cities, states, countries, = [], [], [], []

    for lat, lon in zip(df2['latitude'], df2['longitude']):
        resultado = gmaps.reverse_geocode((lat, lon))
        county, city, state, country = None, None, None, None

        if resultado:
            for component in resultado[0]['address_components']:
                if 'locality' in component['types'] and not city:
                    city = component['long_name']

                elif 'administrative_area_level_2' in component['types'] and not county:
                    county = component['long_name']

                elif 'administrative_area_level_1' in component['types'] and not state:
                    state = component['long_name']

                elif 'country' in component['types'] and not country:
                    country = component['long_name']
                elif city and county and state and country:
                  break

        counties.append(county)
        cities.append(city)
        states.append(state)
        countries.append(country)

    counties, cities, states, countries = pl.Series(counties), pl.Series(cities), pl.Series(states), pl.Series(countries)
    df2 = df2.with_columns(
    County=counties,
    City=cities,
    State=states,
    Country=countries)
    return df2

In [10]:
with beam.Pipeline() as pipeline:
    # Carga el archivo json como una PCollection de Polars DataFrames
    datos_pcollection = (
        pipeline
        | 'Cargar archivo json' >> beam.Create([archivo_entrada])
        | 'Convertir a DataFrame de Polars' >> beam.Map(cargar_archivo_json)
    )

    # Aplica transformaciones utilizando Polars
    datos_transformados_pcollection = (
        datos_pcollection
        | 'Aplicar transformaciones' >> beam.Map(aplicar_transformaciones)
    )
    # Aplica transformaciones utilizando Polars
    #datos_geograficos_pcollection = (
    #    datos_transformados_pcollection
    #    | 'Obtener datos geograficos' >> beam.Map(obtener_geo)
    #)
    # guarda el resultado
    datos_transformados_pcollection | 'Guardar datos transformados' >> beam.Map(lambda df: df.write_parquet(archivo_salida))

ERROR:apache_beam.runners.common:could not create a new DataFrame: series "Service options" has length 2 while series "Accessibility" has length 1 [while running '[10]: Convertir a DataFrame de Polars']
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "c:\Program Files\Python311\Lib\site-packages\apache_beam\transforms\core.py", line 1963, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "C:\Users\User\AppData\Local\Temp\ipykernel_12480\3786972356.py", line 6, in cargar_archivo_json
    df = pl.DataFrame(x)
         ^^^^^^^^^^^^^^^
  File "C:\Users\User\AppData\Roaming\Python\Python311\site-packages\polars\dataframe\frame.py", line 369, in __init__
    self._df = dict_to_pydf(
               ^^^^^^^^^^^^^
  File "C:\Users\User\AppData\Roaming\Python\Pytho

<class 'str'>
{"name": "Porter Pharmacy", "address": "Porter Pharmacy, 129 N Second St, Cochran, GA 31014", "gmap_id": "0x88f16e41928ff687:0x883dad4fd048e8f8", "description": null, "latitude": 32.3883, "longitude": -83.3571, "category": ["Pharmacy"], "avg_rating": 4.9, "num_of_reviews": 16, "price": null, "hours": [["Friday", "8AM\u20136PM"], ["Saturday", "8AM\u201312PM"], ["Sunday", "Closed"], ["Monday", "8AM\u20136PM"], ["Tuesday", "8AM\u20136PM"], ["Wednesday", "8AM\u201312PM"], ["Thursday", "8AM\u20136PM"]], "MISC": {"Service options": ["In-store shopping", "Same-day delivery"], "Health & safety": ["Mask required", "Staff required to disinfect surfaces between visits"], "Accessibility": ["Wheelchair accessible entrance"], "Planning": ["Quick visit"]}, "state": "Open \u22c5 Closes 6PM", "relative_results": ["0x88f16e41929435cf:0x5b2532a2885e9ef6", "0x88f16c32716531c1:0x5f19bdaa5044e4fa", "0x88f16e6e3f4a21df:0xcf495da9bb4d89ea"], "url": "https://www.google.com/maps/place//data=!4m2!3

ShapeError: could not create a new DataFrame: series "Service options" has length 2 while series "Accessibility" has length 1 [while running '[10]: Convertir a DataFrame de Polars']