### Importamos las librerias necesarias para realizar nuestro ETL:

In [106]:
from pyspark.sql.functions import concat_ws, col, sum, lower, regexp_extract, substring, split, expr, trim, udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

### Creacion de tabla `processed_files` para mantener el registro de los archivos ya procesados.

In [2]:
spark.sql("CREATE TABLE IF NOT EXISTS processed_files_metadata_google (file_name STRING) USING DELTA")

### Obtenemos la lista de archivos contenidos en el storage. En este caso ADLS.

In [3]:
# Crear una instancia de SparkSession
spark = SparkSession.builder.getOrCreate()

# Especificar el nombre del contenedor y directorio
container_name = "datumtech"
directory_path = "/GoogleMaps/metadata-sitios"

# Obtener la lista de carpetas
adls_files = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) \
    .listStatus(spark._jvm.org.apache.hadoop.fs.Path(f"abfss://{container_name}.blob.core.windows.net/{directory_path}"))

### Creamos un variable `new_files` que contiene los archivos que no estan en ADLS.

In [4]:
processed_files = spark.sql("SELECT file_name FROM processed_files_metadata_google").toPandas()["file_name"].tolist()

new_files = [file.getPath().getName() for file in adls_files if file.getPath().getName() not in processed_files]


### Podemos ver que archivos ya estan procesados en la tabla `processed_files_metadata_google`

In [5]:
spark.sql("SELECT * FROM processed_files_metadata_google").show()

### Podemos ver que archivos no estan procesados aun.

In [108]:
for file in new_files:
    print(file)

### Creamos la funcion `etl` que realiza todo el proceso y devuelve el dataframe en formato parquet a la tabla silver corrrespondiente a los datos ya procesados y la funcion para cambiar los nombres de los estados por sus abreviaciones. Tambien definimos variables necesarias para hacer filtrados mas adelante.

In [109]:
# Variable con categorias a filtrar
category = [
    'burger', 'burgers', 'hamburger', 'hamburgers' 'hot dog', 'steakhouse', 'lunch', 'motel', 'patisserie', 'pizza', 'deli', 'diner', 'dinner', 'icecream', 'ice cream', 'hotel', 'hotels', 'seafood','cookie', 'crab house', 'cupcake', 'chocolate', 'churreria', 'cocktail', 'cocktails', 'coffee', 'coffees' 'tea', 'restaurant', 'restaurats', 'chesse', 'charcuterie', 'cafe', 'cafes', 'BBQ', 'bagle', 'bakery' 'bakerys', 'bar', 'bars', 'bar & grill', 'barbacue', 'beer' 'bistro', 'pasteleria', 'pastelerias', 'breakfast', 'brunch', 'buffet', 'burrito', 'cafeteria', 'cafeterias', 'cake', 'cakes', 'food', 'wine', 'wineries',
    'lunch'
    ]

In [110]:
# Variable con estados y sus abreviaciones
us_states = [
    'Alabama', 'AL', 'Alaska', 'AK', 'Arizona', 'AZ', 'Arkansas', 'AR', 'California', 'CA', 'Colorado', 'CO', 'Connecticut', 'CT', 'Delaware', 'DE', 'Florida', 'FL', 'Georgia', 'GA', 'Hawaii', 'HI',
    'Idaho', 'ID', 'Illinois', 'IL', 'Indiana', 'IN', 'Iowa', 'IA', 'Kansas', 'KS', 'Kentucky', 'KY', 'Louisiana', 'LA', 'Maine', 'ME', 'Maryland', 'MD', 'Massachusetts', 'MA', 'Michigan', 'MI', 
    'Minnesota', 'MN', 'Mississippi', 'MS', 'Missouri', 'MO', 'Montana', 'MT', 'Nebraska', 'NE', 'Nevada', 'NV', 'New Hampshire', 'NH', 'New Jersey', 'NJ', 'New Mexico', 'NM', 'New York', 'NY', 
    'North Carolina', 'NC',  'North Dakota', 'ND', 'Ohio', 'OH', 'Oklahoma', 'OK', 'Oregon', 'OR', 'Pennsylvania', 'PA', 'Rhode Island', 'RI', 'South Carolina', 'SC', 'South Dakota', 'SD', 
    'Tennessee', 'TN', 'Texas', 'TX', 'Utah', 'UT', 'Vermont', 'VT', 'Virginia', 'VA', 'Washington', 'WA', 'West Virginia', 'WV', 'Wisconsin', 'WI', 'Wyoming', 'WY'
    ]

In [111]:
# Diccionario de correspondencias de estados
state_mapping = {
    'Alabama': 'AL', 'Alaska': 'AK', 'Arizona': 'AZ', 'Arkansas': 'AR', 'California': 'CA', 'Colorado': 'CO', 'Connecticut': 'CT', 'Delaware': 'DE', 'Florida': 'FL', 'Georgia': 'GA', 'Hawaii': 'HI',
    'Idaho': 'ID', 'Illinois': 'IL', 'Indiana': 'IN', 'Iowa': 'IA', 'Kansas': 'KS', 'Kentucky': 'KY', 'Louisiana': 'LA', 'Maine': 'ME', 'Maryland': 'MD', 'Massachusetts': 'MA', 'Michigan': 'MI',
    'Minnesota': 'MN', 'Mississippi': 'MS', 'Missouri': 'MO', 'Montana': 'MT', 'Nebraska': 'NE', 'Nevada': 'NV', 'New Hampshire': 'NH', 'New Jersey': 'NJ', 'New Mexico': 'NM', 'New York': 'NY',
    'North Carolina': 'NC', 'North Dakota': 'ND', 'Ohio': 'OH', 'Oklahoma': 'OK', 'Oregon': 'OR', 'Pennsylvania': 'PA', 'Rhode Island': 'RI', 'South Carolina': 'SC', 'South Dakota': 'SD',
    'Tennessee': 'TN', 'Texas': 'TX', 'Utah': 'UT', 'Vermont': 'VT', 'Virginia': 'VA', 'Washington': 'WA', 'West Virginia': 'WV', 'Wisconsin': 'WI', 'Wyoming': 'WY'
}

In [112]:
# Función para mapear los nombres de los estados a las abreviaciones
def map_state_name_to_abbr(state_name):
    if state_name in state_mapping:
        return state_mapping[state_name]
    else:
        return state_name

In [125]:

def etl(file):
    
    # Definimos las rutas:
    path_raw=f"abfss://datumtech@datumlake.dfs.core.windows.net/GoogleMaps/metadata-sitios/{file}"
    path_bronze = f"abfss://datumtech@datumlake.dfs.core.windows.net/bronze/GoogleMapsbronze/metadata-sitios-bronze/{file}-bronze"
    path_silver = f"abfss://datumtech@datumlake.dfs.core.windows.net/silver/GoogleMapssilver/metadata-sitios-silver/{file}-silver"

    # Cargamos el archivo desde ADLS. Nos quedamos solo con las columnas consideradas para el proyecto:
    df_raw = spark.read.format("json").load(path_raw).select('address', 'avg_rating', 'category', 'gmap_id', 'latitude', 'longitude', 'name', 'num_of_reviews', 'url')
     
    # Guardamos el DataFrame df_raw en la tabla bronze correspondiente a los datos en crudo o poco procesados en Azure Data Lake con un formato parquet ideal para manejar altos volumenes de datos.
    df_raw.write.format("parquet").save(path_bronze)
        
    # Cargamos el archivo desde la tabla bronze en Azure Data Lake en un DataFrame.
    df_metadata = spark.read.format("parquet").load(path_bronze)
        
    # Utilizamos la función concat_ws para obtener los valores del array de la columna 'category' concatenados en el mismo registro
    df_metadata = df_metadata.withColumn('values_concatenados', concat_ws(', ', df_metadata.category))  

    # Eliminamos la columna "category" del DataFrame
    df_metadata = df_metadata.drop("category")

    # Renombra la columna "values_concatenados" a "nueva_columna"
    df_metadata = df_metadata.withColumnRenamed("values_concatenados", "category")    

    # Eliminar los duplicados
    df_metadata = df_metadata.dropDuplicates()

    # Rellenamos valores vacíos o nulos en las columnas 'address', 'name' y 'url'. A pesar de no tener nulos en url, dejamos planteado el codigo para usar el notebook en jobs posteriores
    # Eliminamos los registros donde 'avg_rating', 'gmap_id', 'latitude', 'longitude', 'num_of_reviews' y 'category' son nulos o vacios.
    df_metadata = df_metadata.fillna('Unknown', subset=['address', 'name'])
    df_metadata = df_metadata.na.drop(subset=['avg_rating', 'gmap_id', 'latitude', 'longitude', 'num_of_reviews', 'category']) 
    
    # Filtramos de la columna category las que sean necesarias para el proyecto.
    filtro = expr("lower(category)").rlike(r"\b(" + "|".join(category) + r")\b")
    df_metadata = df_metadata.filter(filtro)

    # Creamos la columna 'state' extrayendo el estado de la columna 'address'
    state_regex = '|'.join(['\\b' + state + '\\b' for state in us_states])
    df_metadata = df_metadata.withColumn('state', when(col('address').rlike(state_regex), regexp_extract(col('address'), state_regex, 0)).otherwise('Unknown'))

    # Registrar la función UDF y aplicamos la transformación a la columna 'state_abbr'
    map_state_name_to_abbr_udf = udf(map_state_name_to_abbr, StringType()) 
    df_metadata = df_metadata.withColumn('state', map_state_name_to_abbr_udf(col('state')))

    # Guardamos el DataFrame df_metadata en la tabla silver correspondiente a los datos procesados en Azure Data Lake.
    return df_metadata.write.format("parquet").save(path_silver)
    

### Iteramos sobre cada archivo sin procesar.

In [126]:
for file in new_files:
    etl(file.rstrip("/"))

### Agregamos a la tabla `processed_files_metadata_google` los archivos ya procesados.

In [115]:
new_files_df = spark.createDataFrame([(file,) for file in new_files], ["file_name"])
new_files_df.write.format("delta").mode("append").saveAsTable("processed_files_metadata_google")

### Podemos verificar que, efectivamente esten registrados los archivos ya procesados.

In [117]:
spark.sql("SELECT * FROM processed_files_metadata_google").show()

### Para comprobar que todo se haya ejecutado de manera correcta, podemos traer cualquier archivo de la tabla silver y hacer algunas verificaciones.

In [118]:
# Funcion para cargar el archivo desde la tabla correspondiente al estado de los datos en Azure Data Lake en un DataFrame.
def load_from(file, level):
    path= f"abfss://datumtech@datumlake.dfs.core.windows.net/{level}/GoogleMaps{level}/metadata-sitios-{level}/{file}-{level}"
    df = spark.read.format("parquet").load(path)
    return df

In [127]:
# Cambiando ...new_files[<valor de la fila en la tabla>][1]... podras cargar algun archivo de los ya procesados de la tabla silver.
df = load_from(new_files[0].rstrip("/"), "silver")

In [129]:
df.show(5)

### Podemos verificar si hay nulos en algun archivo en la tabla silver.

In [130]:
# Funcion para el conteo de nulos del dataframe.
def null_counts (df):
    counts = df.select([sum(col(c).isNull().cast("integer")).alias(c) for c in df.columns])
    return counts.show()

In [131]:
# Vemos que columnas poseen nulos y en que cantidad.
nulls = null_counts(df)

In [132]:
df.dtypes