In [3]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("limpieza") \
    .getOrCreate()

# Receptivo
Contiene los datos de personas que ingresaron al país.

Primero vamos a leer el archivo CSV que tenemos con los datos de Receptivo y creamos un dataframe llamado df_receptivo.

In [4]:
df_receptivo = spark.read.csv('../../obligatorio/lnd/receptivo', header=True, sep=',',inferSchema=True)

                                                                                

Vamos a visualizar los tipos de datos y columnas:

In [5]:
df_receptivo.printSchema()

root
 |-- IdIngresos: integer (nullable = true)
 |-- Lugar Ingreso: string (nullable = true)
 |-- IdTranspIngreso: integer (nullable = true)
 |-- Transporte Internacional de Ingreso: string (nullable = true)
 |-- FechaIngreso: date (nullable = true)
 |-- IdFecIng: integer (nullable = true)
 |-- FechaEgreso: date (nullable = true)
 |-- IdFecEgr: integer (nullable = true)
 |-- IdNacionalidad: integer (nullable = true)
 |-- Pais: string (nullable = true)
 |-- IdResidencia: integer (nullable = true)
 |-- Residencia: string (nullable = true)
 |-- IdMotivo: integer (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- IdOcupacion: integer (nullable = true)
 |-- Ocupacion: string (nullable = true)
 |-- IsEstudio: integer (nullable = true)
 |-- Estudio: string (nullable = true)
 |-- IdDestinoLocalidad: integer (nullable = true)
 |-- Localidad: string (nullable = true)
 |-- IdDepartamentoDestino: integer (nullable = true)
 |-- Departamento: string (nullable = true)
 |-- IdOtroDepartament

### Descripción de las Columnas

- "Lugar Ingreso" es por donde el entrevistado ingresó y tiene su propio ID (IdIngresos).
- "Transporte Internacional de Ingreso" es el medio por el cual ingresó y tiene su propio ID (IdTranspIngreso).
- "FechaIngreso" corresponde a la fecha de ingreso con su ID (IdFecIng).
- "FechaEgreso" corresponde a la fecha de salida con su ID (IdFecEgr).
- "Pais" es la nacionalidad de quien ingresó y tiene su propio ID (IdNacionalidad).
- "Residencia" es el lugar de residencia del visitante, junto con su ID (IdResidencia).
- "Motivo" es el motivo de ingreso, y cuenta con su propio ID (IdMotivo).
- "Ocupacion" corresponde a la profesión del turista y tiene un ID (IdOcupacion).
- "Estudio" es el nivel de estudios del visitante, que tiene su propio ID (IsEstudio).
- "Localidad" es el principal destino de la visita con su ID (IdDestinoLocalidad).
- "Departamento" corresponde al departamento de destino principal con su ID (IdDepartamentoDestino).
- "Otro Departamento" es un posible segundo destino y tiene su ID (IdOtroDepartamento).
- "Otra Localidad" corresponde a una segunda localidad visitada con su ID (IdOtraLocalidad).
- "Alojamiento" es el tipo de alojamiento utilizado con su ID (IdAlojamiento).
- "TransporteLocal" corresponde al transporte utilizado durante la estadía y tiene un ID (IdTranspLocal).
- "Lugar Egreso" es por donde se egresó y cuenta con su ID (IdEgresos).
- "Transporte Internacional de Egreso" corresponde al transporte utilizado para salir del país con su ID (IdTranspEgreso).
- "Destino" es la zona de destino principal con su ID (IdDestino).
- "Estadia" representa los días que permaneció en el país.
- "Gente" es la cantidad de personas del grupo encuestado.
- "GastoTotal", "GastoAlojamiento", "GastoAlimentacion", "GastoTransporte", "GastoCultural", "GastoTours", "GastoCompras" y "GastoOtros" corresponde a todos los tipos de gastos posibles, expresado en dólares americanos.
- "Coef" es el coeficiente de expansión del gasto.
- "CoefTot" es el coeficiente de expansión de las personas.

A continuación, eliminamos las columnas innecesarias, en particular los Id de cada columna y los coeficientes de expansión del gasto y de expansión de personas.

In [6]:
df_receptivo = df_receptivo.drop(
    "IdIngresos", 
    "IdTranspIngreso", 
    "IdFecIng", 
    "IdFecEgr", 
    "IdNacionalidad", 
    "IdResidencia", 
    "IdMotivo", 
    "IdOcupacion", 
    "IsEstudio", 
    "IdDestinoLocalidad", 
    "IdDepartamentoDestino",
    "IdOtroDepartamento",
    "IdOtraLocalidad",
    "IdAlojamiento", 
    "IdTranspLocal", 
    "IdEgresos",
    "IdTranspEgreso",
    "IdDestino",
    "Coef",
    "CoefTot",
);

df_receptivo.printSchema()

root
 |-- Lugar Ingreso: string (nullable = true)
 |-- Transporte Internacional de Ingreso: string (nullable = true)
 |-- FechaIngreso: date (nullable = true)
 |-- FechaEgreso: date (nullable = true)
 |-- Pais: string (nullable = true)
 |-- Residencia: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Ocupacion: string (nullable = true)
 |-- Estudio: string (nullable = true)
 |-- Localidad: string (nullable = true)
 |-- Departamento: string (nullable = true)
 |-- Otro Departamento: string (nullable = true)
 |-- Otra Localidad: string (nullable = true)
 |-- Alojamiento: string (nullable = true)
 |-- TransporteLocal: string (nullable = true)
 |-- Lugar Egreso: string (nullable = true)
 |-- Transporte Internacional de Egreso: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- Estadia: integer (nullable = true)
 |-- Gente: integer (nullable = true)
 |-- GastoTotal: double (nullable = true)
 |-- GastoAlojamiento: double (nullable = true)
 |-- GastoAliment

Ahora pasamos a normalizar los nombres de los lugares por donde ingresaron y egresaron los encuestados para que queden igual a las otras columnas de la tabla. Esto se debe a que en "Lugar Ingreso" y "Lugar Egreso" los nombres de los lugares tenían tildes, pero en el resto de las columnas no. Se considera tanto mayúsculas como minúsculas.

In [1]:
from pyspark.sql import functions as F

# Reemplazar tildes en la columna "Lugar Ingreso"
df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "Á", "A")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "É", "E")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "Í", "I")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "Ó", "O")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "Ú", "U")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "á", "a")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "é", "e")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "í", "i")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "ó", "o")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Ingreso",
    F.regexp_replace("Lugar Ingreso", "ú", "u")
)

# Reemplazar tildes en la columna "Lugar Egreso"
df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "Á", "A")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "É", "E")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "Í", "I")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "Ó", "O")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "Ú", "U")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "á", "a")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "é", "e")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "í", "i")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "ó", "o")
)

df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.regexp_replace("Lugar Egreso", "ú", "u")
)

NameError: name 'df_receptivo' is not defined

A continuación, en las columnas de "Ocupación", "Estudio" y "Localidad" los datos que aparecen como "Otros", "Sin Datos" o como "null" los juntamos para que todos queden con en valor "Otros". 
También se modifica "Ocupacion" para agrupar aquellos que tenían asignado "Otra situación Inactividad" y "Desocupado" a que quede únicamente como "Desocupado". 

In [9]:
df_receptivo = df_receptivo.withColumn(
    "Ocupacion",
    F.when(F.col("Ocupacion").isin("Otros", "Sin Datos", "Desconocido / Sin Datos"), "Otros")
    .otherwise(F.col("Ocupacion"))
)

df_receptivo = df_receptivo.withColumn(
    "Ocupacion",
    F.when(F.col("Ocupacion").isin("Otra situacion Inactividad", "Desocupado"), "Desocupado")
    .otherwise(F.col("Ocupacion"))
)

In [10]:
df_receptivo = df_receptivo.withColumn(
    "Estudio",
    F.when(F.col("Estudio").isin("Sin Datos", "Otros"), "Otros")
    .when(F.col("Estudio").isNull(), "Otros") 
    .otherwise(F.col("Estudio"))
)

Se encontró un error de redacción en las columnas de "Localidad" y de "Otra Localidad" en cuanto a cómo estaba escrito "Jaureguiberry" y se los modificó para corregirlo.

In [11]:
df_receptivo = df_receptivo.withColumn(
    "Localidad",
    F.when(F.col("Localidad").isin("Sin Datos", "Otros"), "Otros")
    .when(F.col("Localidad").isNull(), "Otros") 
    .when(F.col("Localidad").isin("Jaugueriberry"), "Jaureguiberry")
    .otherwise(F.col("Localidad"))
)

In [12]:
df_receptivo = df_receptivo.withColumn(
    "Otra Localidad",
    F.when(F.col("Otra Localidad").isin("Jaugueriberry"), "Jaureguiberry")
    .otherwise(F.col("Otra Localidad"))
)

En la columna "Motivo" se renombro algunos de los valores para que queden igual redactados a como están escritos en la columna de "Motivo" en Emisivo.

In [13]:
df_receptivo = df_receptivo.withColumn(
    "Motivo",
    F.when(F.col("Motivo") == "Trabajo / Profesional", "Trabajo")
    .when(F.col("Motivo") == "Ocio y vacaciones", "Ocio / Vacaciones")
    .when(F.col("Motivo") == "Visita a familiares y amigos", "Visita familiares / amigos")
    .when(F.col("Motivo") == "Salud / wellness", "Salud")
    .when(F.col("Motivo") == "Religioso", "Religion")
    .otherwise(F.col("Motivo"))
)



+--------------------------+-----+
|Motivo                    |count|
+--------------------------+-----+
|Trabajo                   |6397 |
|Salud                     |277  |
|Visita familiares / amigos|15282|
|Transito                  |3111 |
|Religion                  |193  |
|Compras                   |249  |
|Estudios                  |292  |
|Ocio / Vacaciones         |36248|
|Segunda Residencia        |2161 |
|Otros                     |1022 |
|MICE                      |101  |
+--------------------------+-----+



                                                                                

En la columna de "Alojamiento" se corrigió un error de redacción en "Apart Hotel". También se cambió los valores que dijeran "Sin Datos" para que queden agrupadas con el valor "Otros" en las columnas de "Alojamiento", "TransporteLocal", "Lugar Egreso" y "Transporte Internacional de Egreso".

In [14]:
df_receptivo = df_receptivo.withColumn(
    "Alojamiento",
    F.when(F.col("Alojamiento") == "Appart Hotel", "Apart Hotel")
    .when(F.col("Alojamiento") == "Sin Datos", "Otros")
    .otherwise(F.col("Alojamiento"))
)

In [15]:
df_receptivo = df_receptivo.withColumn(
    "TransporteLocal",
    F.when(F.col("TransporteLocal") == "Sin Datos", "Otros")
    .otherwise(F.col("TransporteLocal"))
)

In [16]:
df_receptivo = df_receptivo.withColumn(
    "Lugar Egreso",
    F.when(F.col("Lugar Egreso") == "Sin Datos", "Otros")
    .otherwise(F.col("Lugar Egreso"))
)

In [17]:
df_receptivo = df_receptivo.withColumn(
    "Transporte Internacional de Egreso",
    F.when(F.col("Transporte Internacional de Egreso") == "Sin Datos", "Otros")
    .otherwise(F.col("Transporte Internacional de Egreso"))
)

A continuación, se trabajará con las columnas de gastos. El objetivo principal es verificar que los gastos registrados en estas columnas coincidan con el valor indicado como el total de gastos (en GastoTotal).

Primero, se calcula la suma de todos los tipos de gastos (alojamiento, transporte, etc.) y luego se compara con el valor registrado en la columna GastoTotal. Si la suma de los gastos calculada resulta menor que el valor de GastoTotal, se registra la diferencia en la columna GastosOtros. Por el contrario, si la suma de los gastos es mayor que el valor de GastoTotal, se actualiza esta última columna con el valor obtenido en la suma.

In [18]:
# Crear una nueva columna que calcule la suma de los otros gastos
df_receptivo = df_receptivo.withColumn(
    "SumaGastos",
    F.col("GastoAlojamiento") + F.col("GastoAlimentacion") + F.col("GastoTransporte") + F.col("GastoCultural") + F.col("GastoTours") + F.col("GastoCompras") + F.col("GastoOtros")
)

# Cuando GastoTotal es mayor que SumaGastos, agregar la diferencia a GastoOtros
df_receptivo = df_receptivo.withColumn(
    "GastoOtros",
    F.when(F.col("GastoTotal") > F.col("SumaGastos"), F.col("GastoOtros") + (F.col("GastoTotal") - F.col("SumaGastos")))
    .otherwise(F.col("GastoOtros"))
)

# Cuando SumaGastos es mayor que GastoTotal, actualizar GastoTotal con el valor de SumaGastos
df_receptivo = df_receptivo.withColumn(
    "GastoTotal",
    F.when(F.col("SumaGastos") > F.col("GastoTotal"), F.col("SumaGastos"))
    .otherwise(F.col("GastoTotal"))
)

df_receptivo = df_receptivo.drop("SumaGastos");

Por último, renombramos las columnas para facilitar el uso de las mismas para cuando se trabaje con Hive.

In [19]:
column_mapping = {
    "Lugar Ingreso": "lugar_ingreso",
    "Transporte Internacional de Ingreso": "transporte_internacional_de_ingreso",
    "FechaIngreso": "fecha_ingreso",
    "FechaEgreso": "fecha_egreso",
    "Pais": "pais",
    "Residencia": "residencia",
    "Motivo": "motivo",
    "Ocupacion": "ocupacion",
    "Estudio": "estudio",
    "Localidad": "localidad",
    "Departamento": "departamento",
    "Otro Departamento": "otro_departamento",
    "Otra Localidad": "otra_localidad",
    "Alojamiento": "alojamiento",
    "TransporteLocal": "transporte_local",
    "Lugar Egreso": "lugar_egreso",
    "Transporte Internacional de Egreso": "transporte_internacional_de_egreso",
    "Destino": "destino",
    "Estadia": "estadia",
    "Gente": "gente",
    "GastoTotal": "gasto_total",
    "GastoAlojamiento": "gasto_alojamiento",
    "GastoAlimentacion": "gasto_alimentacion",
    "GastoTransporte": "gasto_transporte",
    "GastoCultural": "gasto_cultural",
    "GastoTours": "gasto_tours",
    "GastoCompras": "gasto_compras",
    "GastoOtros": "gasto_otros"
}

for old_name, new_name in column_mapping.items():
    df_receptivo = df_receptivo.withColumnRenamed(old_name, new_name)

In [20]:
df_receptivo.printSchema()

root
 |-- lugar_ingreso: string (nullable = true)
 |-- transporte_internacional_de_ingreso: string (nullable = true)
 |-- fecha_ingreso: date (nullable = true)
 |-- fecha_egreso: date (nullable = true)
 |-- pais: string (nullable = true)
 |-- residencia: string (nullable = true)
 |-- motivo: string (nullable = true)
 |-- ocupacion: string (nullable = true)
 |-- estudio: string (nullable = true)
 |-- localidad: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- otro_departamento: string (nullable = true)
 |-- otra_localidad: string (nullable = true)
 |-- alojamiento: string (nullable = true)
 |-- transporte_local: string (nullable = true)
 |-- lugar_egreso: string (nullable = true)
 |-- transporte_internacional_de_egreso: string (nullable = true)
 |-- destino: string (nullable = true)
 |-- estadia: integer (nullable = true)
 |-- gente: integer (nullable = true)
 |-- gasto_total: double (nullable = true)
 |-- gasto_alojamiento: double (nullable = true)
 |-- gasto_a

# Emisivo
Contiene los datos de personas que egresaron al país.

Primero vamos a leer el archivo CSV que tenemos con los datos de Emisivo y creamos un dataframe llamado df_emisivo.

In [21]:
df_emisivo = spark.read.csv('../../obligatorio/lnd/emisivo', header=True, sep=',',inferSchema=True)

                                                                                

Asi podemos visualizar los tipos de datos y columnas:

In [22]:
df_emisivo.printSchema()

root
 |-- IdLugarSalida: integer (nullable = true)
 |-- Lugar Salida: string (nullable = true)
 |-- IdTranspSalidad: integer (nullable = true)
 |-- Transporte Internacional de Salida: string (nullable = true)
 |-- FechaSalida: date (nullable = true)
 |-- IdFecSalida: integer (nullable = true)
 |-- FechaEntrada: date (nullable = true)
 |-- IdFecEntrada: integer (nullable = true)
 |-- IdNacionalidad: integer (nullable = true)
 |-- Pais: string (nullable = true)
 |-- IdDeptoResidencia: integer (nullable = true)
 |-- Departamento: string (nullable = true)
 |-- IdMotivo: integer (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- IdOcupacion: integer (nullable = true)
 |-- Ocupacion: string (nullable = true)
 |-- IdNivelEstudio: integer (nullable = true)
 |-- Estudio: string (nullable = true)
 |-- IdDestino: integer (nullable = true)
 |-- Destino: string (nullable = true)
 |-- IdAlojamiento: integer (nullable = true)
 |-- Alojamiento: string (nullable = true)
 |-- IdLugarIngreso: i

### Descripción de las Columnas

- "Lugar Salida" es el lugar y tiene su propio ID (IdLugarSalida).
- "Transporte Internacional de Salida" es el medio de transporte internacional utilizado para la salida, con su propio ID (IdTranspSalidad).
- "FechaSalida" del viaje, con su propio ID (IdFecSalida).
- "FechaEntrada" al país, con su propio ID (IdFecEntrada).
- "Nacionalidad" junto con el código que representa la nacionalidad del turista (IdNacionalidad).
- "Pais" de destino del viaje.
- "Departamento" con su propio ID (IdDeptoResidencia).
- "Motivo" de ingreso al país, con su propio ID (IdMotivo).
- "Ocupacion" (o profesión) del turista, con su propio ID (IdOcupacion).
- "Estudio" es el nivel de estudio del turista, con su propio ID (IdNivelEstudio).
- "Destino" dentro del país con su propio ID (IdDestino).
- "Alojamiento" es el tipo de alojamiento utilizado, con su propio ID (IdAlojamiento).
- "Lugar Ingreso" es el lugar por donde se ingresó al país, con su propio ID (IdLugarIngreso).
- "Transporte Internacional de Ingreso" es el medio de transporte internacional utilizado para el ingreso, con su propio ID (IdTranspIngreso).
- "Transporte Local" es el medio de transporte local utilizado, con su propio ID (IdTranspLocal).
- "Estadia" es la cantidad de días de estadía.
- "Gente" es el número de integrantes del grupo encuestado.
- "GastoTotal", "GastoAlojamiento", "GastoAlimentacion", "GastoTransporteInternacional", "GastoTransporteLocal", "GastoCultural", "GastoTours", "GastoCompras" y "GastoResto" se refieren a los distintos tipos de gastos realizados, medido en dólares americanos corrientes.
- "Coef" es el Coeficiente Expansión Gasto.
- "CoefTot" es el Coeficiente Expansión Personas.

A continuación, eliminamos las columnas innecesarias, en particular los Id de cada columna y los coeficientes de expansión del gasto y de expansión de personas.

In [23]:
df_emisivo = df_emisivo.drop(
    "IdLugarSalida", 
    "IdTranspSalidad", 
    "IdFecSalida", 
    "IdFecEntrada", 
    "IdNacionalidad", 
    "IdDeptoResidencia", 
    "IdMotivo", 
    "IdOcupacion", 
    "IdNivelEstudio", 
    "IdDestino", 
    "IdAlojamiento", 
    "IdLugarIngreso", 
    "IdTranspIngreso", 
    "IdTranspLocal", 
    "Coef", 
    "CoefTot");

In [24]:
df_emisivo.printSchema()

root
 |-- Lugar Salida: string (nullable = true)
 |-- Transporte Internacional de Salida: string (nullable = true)
 |-- FechaSalida: date (nullable = true)
 |-- FechaEntrada: date (nullable = true)
 |-- Pais: string (nullable = true)
 |-- Departamento: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Ocupacion: string (nullable = true)
 |-- Estudio: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- Alojamiento: string (nullable = true)
 |-- Lugar Ingreso: string (nullable = true)
 |-- Transporte Internacional de Ingreso: string (nullable = true)
 |-- Trasporte Local: string (nullable = true)
 |-- Estadia: integer (nullable = true)
 |-- Gente: integer (nullable = true)
 |-- GastoTotal: double (nullable = true)
 |-- GastoAlojamiento: double (nullable = true)
 |-- GastoAlimentacion: double (nullable = true)
 |-- GastoTransporteInternac: double (nullable = true)
 |-- GatoTransporteLocal: double (nullable = true)
 |-- GastoCultural: double (nullable = t

Ahora pasamos a normalizar los nombres de los lugares por donde salieron los encuestados para que queden igual a las otras columnas de la tabla. Esto se debe a que en Lugar Salida los nombres de los lugares tenían tildes, pero en el resto de las columnas no. Se considera tanto mayúsculas como minúsculas.

In [36]:
# Reemplazar tildes en la columna "Lugar Salida"
df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "Á", "A")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "É", "E")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "Í", "I")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "Ó", "O")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "Ú", "U")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "á", "a")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "é", "e")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "í", "i")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "ó", "o")
)

df_emisivo = df_emisivo.withColumn(
    "Lugar Salida",
    F.regexp_replace("Lugar Salida", "ú", "u")
)

Se encontró un error de redacción en las columna de "Departamento" en cuanto a cómo estaba escrito "San Jose" y se los modificó para corregirlo.

In [27]:
df_emisivo = df_emisivo.withColumn(
    "departamento",
    F.when(F.col("departamento") == "Saqn Jose", "San Jose")
    .otherwise(F.col("departamento"))
)

A continuación, en las columnas de "Ocupación", "Estudio" y "Destino", "Trasporte Local" y "Alojamiento" los datos que aparecen como "Otros", "Sin Datos" o como "null" los juntamos para que todos queden con en valor "Otros". 
También se modifica "Ocupacion" para agrupar aquellos que tenían asignado "Otra situación Inactividad" y "Desocupado" a que quede únicamente como "Desocupado". 

In [29]:
df_emisivo = df_emisivo.withColumn(
    "Ocupacion",
    F.when(F.col("Ocupacion").isin("Otros", "Sin Datos", "Desconocido / Sin Datos"), "Otros")
    .otherwise(F.col("Ocupacion"))
)

df_emisivo = df_emisivo.withColumn(
    "Ocupacion",
    F.when(F.col("Ocupacion").isin("Desocupado", "Otra situacion Inactividad"), "Otros")
    .otherwise(F.col("Ocupacion"))
)

In [30]:
df_emisivo = df_emisivo.withColumn(
    "Estudio",
    F.when(F.col("Estudio").isin("Sin Datos", "Otros"), "Otros")
    .when(F.col("Estudio").isNull(), "Otros") 
    .otherwise(F.col("Estudio"))
)

In [31]:
df_emisivo = df_emisivo.withColumn(
    "Destino",
    F.when(F.col("Destino").isin("Sin Datos", "Otros"), "Otros")
    .otherwise(F.col("Destino"))
)

In [34]:
df_emisivo = df_emisivo.withColumn(
    "Trasporte Local",
    F.when(F.col("Trasporte Local").isin("Sin Datos", "Otros"), "Otros")
    .otherwise(F.col("Trasporte Local"))
)

In [32]:
df_emisivo = df_emisivo.withColumn(
    "Alojamiento",
    F.when(F.col("Alojamiento").isin("Sin Datos", "Otros"), "Otros")
    .otherwise(F.col("Alojamiento"))
)

Se encontró un error de redacción en las columnas de "Alojamiento" en cuanto a cómo estaba escrito "Apart Hotel" y se los modificó para corregirlo.

In [33]:
df_emisivo = df_emisivo.withColumn(
    "Alojamiento",
    F.when(F.col("Alojamiento") == "Appart Hotel", "Apart Hotel")
    .otherwise(F.col("Alojamiento"))
)

En la columna "Motivo" se renombro algunos de los valores para que queden igual redactados a como están escritos en la columna de "Motivo" en Receptivo.

In [37]:
df_emisivo = df_emisivo.withColumn(
    "Motivo",
    F.when(F.col("Motivo") == "Trabajo / Profesion", "Trabajo")
    .when(F.col("Motivo") == "Trabajo remunerado Destino", "Trabajo")
    .when(F.col("Motivo") == "Ocio, Recreo, Vacaciones", "Ocio / Vacaciones")
    .when(F.col("Motivo") == "Tratamiento Salud", "Salud")
    .when(F.col("Motivo") == "Religion y Peregrinaciones", "Religion")
    .when(F.col("Motivo") == "Deportivo", "Deporte")
    .otherwise(F.col("Motivo"))
)

Se renombro la columna "GastoResto" para que coincida con el mismo nombre que tiene en Receptivo, este siendo "GastoOtros".

A continuación, se trabajará con las columnas de gastos. El objetivo principal es verificar que los gastos registrados en estas columnas coincidan con el valor indicado como el total de gastos (en GastoTotal).

Primero, se calcula la suma de todos los tipos de gastos (alojamiento, transporte, etc.) y luego se compara con el valor registrado en la columna GastoTotal. Si la suma de los gastos calculada resulta menor que el valor de GastoTotal, se registra la diferencia en la columna GastosOtros. Por el contrario, si la suma de los gastos es mayor que el valor de GastoTotal, se actualiza esta última columna con el valor obtenido en la suma.

In [3]:
df_emisivo = df_emisivo.withColumnRenamed("GastoResto", "GastoOtros")
df_emisivo = df_emisivo.withColumnRenamed("GatoTransporteLocal", "GastoTransporteLocal")

# Crear una nueva columna que calcule la suma de los otros gastos
df_emisivo = df_emisivo.withColumn(
    "SumaGastos",
    F.col("GastoAlojamiento") + F.col("GastoAlimentacion") + F.col("GastoTransporteInternac") + F.col("GastoTransporteLocal") + F.col("GastoCultural") + F.col("GastoTours") + F.col("GastoCompras") + F.col("GastoOtros")
)

# Cuando GastoTotal es mayor que SumaGastos, agregar la diferencia a GastoOtros
df_emisivo = df_emisivo.withColumn(
    "GastoOtros",
    F.when(F.col("GastoTotal") > F.col("SumaGastos"), F.col("GastoOtros") + (F.col("GastoTotal") - F.col("SumaGastos")))
    .otherwise(F.col("GastoOtros"))
)

# Cuando SumaGastos es mayor que GastoTotal, actualizar GastoTotal con el valor de SumaGastos
df_emisivo = df_emisivo.withColumn(
    "GastoTotal",
    F.when(F.col("SumaGastos") > F.col("GastoTotal"), F.col("SumaGastos"))
    .otherwise(F.col("GastoTotal"))
)

df_emisivo = df_emisivo.drop("SumaGastos");

NameError: name 'df_emisivo' is not defined

Por último, renombramos las columnas para facilitar el uso de las mismas para cuando se trabaje con Hive.

In [38]:
column_mapping = {
    "Lugar Salida": "lugar_salida",
    "Transporte Internacional de Salida": "transporte_internacional_de_salida",
    "FechaSalida": "fecha_salida",
    "FechaEntrada": "fecha_entrada",
    "Pais": "pais",
    "departamento": "departamento",
    "Motivo": "motivo",
    "Ocupacion": "ocupacion",
    "Estudio": "estudio",
    "Destino": "destino",
    "Alojamiento": "alojamiento",
    "Lugar Ingreso": "lugar_ingreso",
    "Transporte Internacional de Ingreso": "transporte_internacional_de_ingreso",
    "Trasporte Local": "transporte_local",
    "Estadia": "estadia",
    "Gente": "gente",
    "GastoTotal": "gasto_total",
    "GastoAlojamiento": "gasto_alojamiento",
    "GastoAlimentacion": "gasto_alimentacion",
    "GastoTransporteInternac": "gasto_transporte_internacional",
    "GastoTransporteLocal": "gasto_transporte_local",
    "GastoCultural": "gasto_cultural",
    "GastoTours": "gasto_tours",
    "GastoCompras": "gasto_compras",
    "GastoOtros": "gasto_otros"
}

for old_name, new_name in column_mapping.items():
    df_emisivo = df_emisivo.withColumnRenamed(old_name, new_name)

In [39]:
df_emisivo.printSchema()

root
 |-- lugar_salida: string (nullable = true)
 |-- transporte_internacional_de_salida: string (nullable = true)
 |-- fecha_salida: date (nullable = true)
 |-- fecha_entrada: date (nullable = true)
 |-- pais: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- motivo: string (nullable = true)
 |-- ocupacion: string (nullable = true)
 |-- estudio: string (nullable = true)
 |-- destino: string (nullable = true)
 |-- alojamiento: string (nullable = true)
 |-- lugar_ingreso: string (nullable = true)
 |-- transporte_internacional_de_ingreso: string (nullable = true)
 |-- transporte_local: string (nullable = true)
 |-- estadia: integer (nullable = true)
 |-- gente: integer (nullable = true)
 |-- gasto_total: double (nullable = true)
 |-- gasto_alojamiento: double (nullable = true)
 |-- gasto_alimentacion: double (nullable = true)
 |-- gasto_transporte_internacional: double (nullable = true)
 |-- gasto_transporte_local: double (nullable = true)
 |-- gasto_cultural: dou

# Cruceros
Contiene los datos de tráfico de cruceros.

Primero vamos a leer el archivo CSV que tenemos con los datos de Cruceros y creamos un dataframe llamado df_crucero.

In [40]:
df_crucero = spark.read.csv('../../obligatorio/lnd/cruceros', header=True, sep=',',inferSchema=True)

Asi podemos visualizar los tipos de datos y columnas:

In [41]:
df_crucero.printSchema()

root
 |-- IdCruceros: integer (nullable = true)
 |-- idNacionalidad: integer (nullable = true)
 |-- Pais: string (nullable = true)
 |-- Fecha: date (nullable = true)
 |-- idfecha: integer (nullable = true)
 |-- idPuerto: integer (nullable = true)
 |-- Puerto: string (nullable = true)
 |-- TotalPersonas: integer (nullable = true)
 |-- CantidadVisitas: integer (nullable = true)
 |-- GastoTotal: double (nullable = true)
 |-- GastoTours: double (nullable = true)
 |-- GastoAlimentacion: double (nullable = true)
 |-- GastoTransporte: double (nullable = true)
 |-- GastoShopping: double (nullable = true)
 |-- GastoOtros: double (nullable = true)



### Descripción de las Columnas

- "IdCruceros" es el código identificatorio de los cruceros.
- "Pais" corresponde a la nacionalidad del entrevistado y tiene su propio ID (idNacionalidad).
- "Fecha" indica cuándo se realizó la encuesta y tiene un ID (idfecha).
- "Puerto" es el puerto de arribo del crucerista con su ID (idPuerto).
- "TotalPersonas" indica la cantidad de personas en cada grupo.
- "Cantidad Visitas" representa el número de visitas anteriores del crucerista.
- "GastoTotal", "GastoTours", "GastoAlimentacion", "GastoTransporte", "GastoShopping", "GastoOtros" corresponde a todos los tipos de gastos realizados y su valor en dólares americanos.

A continuación, en la columna de "CantidadVisitas" nos aseguramos que los valores que estaban en "null" queden en 0.

In [43]:
df_crucero = df_crucero.fillna({'CantidadVisitas': 0})


Eliminamos las columnas innecesarias, en particular los Id de cada columna.

In [45]:
df_crucero = df_crucero.drop("idNacionalidad", "idPuerto", "idFecha");

In [46]:
df_crucero.printSchema()

root
 |-- IdCruceros: integer (nullable = true)
 |-- Pais: string (nullable = true)
 |-- Fecha: date (nullable = true)
 |-- Puerto: string (nullable = true)
 |-- TotalPersonas: integer (nullable = true)
 |-- CantidadVisitas: integer (nullable = false)
 |-- GastoTotal: double (nullable = true)
 |-- GastoTours: double (nullable = true)
 |-- GastoAlimentacion: double (nullable = true)
 |-- GastoTransporte: double (nullable = true)
 |-- GastoShopping: double (nullable = true)
 |-- GastoOtros: double (nullable = true)



A continuación, se trabajará con las columnas de gastos. El objetivo principal es verificar que los gastos registrados en estas columnas coincidan con el valor indicado como el total de gastos (en GastoTotal).

Primero, se calcula la suma de todos los tipos de gastos (tours, transporte, etc.) y luego se compara con el valor registrado en la columna GastoTotal. Si la suma de los gastos calculada resulta menor que el valor de GastoTotal, se registra la diferencia en la columna GastosOtros. Por el contrario, si la suma de los gastos es mayor que el valor de GastoTotal, se actualiza esta última columna con el valor obtenido en la suma.

In [47]:
# Crear una nueva columna que calcule la suma de los otros gastos
df_crucero = df_crucero.withColumn(
    "SumaGastos",
    F.col("GastoTours") + F.col("GastoAlimentacion") + F.col("GastoTransporte") + F.col("GastoShopping") + F.col("GastoOtros")
)

# Cuando GastoTotal es mayor que SumaGastos, agregar la diferencia a GastoOtros
df_crucero = df_crucero.withColumn(
    "GastoOtros",
    F.when(F.col("GastoTotal") > F.col("SumaGastos"), F.col("GastoOtros") + (F.col("GastoTotal") - F.col("SumaGastos")))
    .otherwise(F.col("GastoOtros"))
)

# Cuando SumaGastos es mayor que GastoTotal, actualizar GastoTotal con el valor de SumaGastos
df_crucero = df_crucero.withColumn(
    "GastoTotal",
    F.when(F.col("SumaGastos") > F.col("GastoTotal"), F.col("SumaGastos"))
    .otherwise(F.col("GastoTotal"))
)

df_crucero = df_crucero.drop("SumaGastos");

Por último, renombramos las columnas para facilitar el uso de las mismas para cuando se trabaje con Hive.

In [49]:
column_mapping = {
    "IdCruceros": "id_cruceros",
    "Pais": "pais",
    "Fecha": "fecha",
    "Puerto": "puerto",
    "TotalPersonas": "total_personas",
    "CantidadVisitas": "cantidad_visitas",
    "GastoTotal": "gasto_total",
    "GastoTours": "gasto_tours",
    "GastoAlimentacion": "gasto_alimentacion",
    "GastoTransporte": "gasto_transporte",
    "GastoShopping": "gasto_compras",
    "GastoOtros": "gasto_otros"
}

for old_name, new_name in column_mapping.items():
    df_crucero = df_crucero.withColumnRenamed(old_name, new_name)

In [50]:
df_crucero.printSchema()

root
 |-- id_cruceros: integer (nullable = true)
 |-- pais: string (nullable = true)
 |-- fecha: date (nullable = true)
 |-- puerto: string (nullable = true)
 |-- total_personas: integer (nullable = true)
 |-- cantidad_visitas: integer (nullable = false)
 |-- gasto_total: double (nullable = true)
 |-- gasto_tours: double (nullable = true)
 |-- gasto_alimentacion: double (nullable = true)
 |-- gasto_transporte: double (nullable = true)
 |-- gasto_compras: double (nullable = true)
 |-- gasto_otros: double (nullable = true)



# Paises
Contiene los mappeos de paises a el nombre en inglés y su continente.

Primero vamos a leer el archivo CSV que tenemos con los datos de Paises y creamos un dataframe llamado df_paises.

In [51]:
df_paises = spark.read.csv('../../obligatorio/lnd/paises', header=True, sep=',',inferSchema=True)

Por último, renombramos las columnas para facilitar el uso de las mismas para cuando se trabaje con Hive.

In [52]:
column_mapping = {
    "Pais": "pais",
    "IdPais": "id_pais",
    "Continente": "continente",
    "PaisesIngles": "paises_ingles"
}

for old_name, new_name in column_mapping.items():
    df_paises = df_paises.withColumnRenamed(old_name, new_name)

In [53]:
df_paises.printSchema()

root
 |-- pais: string (nullable = true)
 |-- id_pais: integer (nullable = true)
 |-- continente: string (nullable = true)
 |-- paises_ingles: string (nullable = true)
 |-- Code: string (nullable = true)



## Guardar los data frames en rfn

Guardamos los datos en nuestra carpeta rfn, acá es donde se cambia que el separador pase de "," a "|".

In [54]:
df_receptivo.write \
    .mode("overwrite") \
    .option("delimiter", "|") \
    .option("header", "false") \
    .csv("/obligatorio/rfn/receptivo")

2024-12-07T21:59:30,197 WARN [Thread-4] org.apache.spark.sql.catalyst.util.package - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [55]:
df_emisivo.write \
    .mode("overwrite") \
    .option("delimiter", "|") \
    .option("header", "false") \
    .csv("/obligatorio/rfn/emisivo")

                                                                                

In [57]:
df_crucero.write \
    .mode("overwrite") \
    .option("delimiter", "|") \
    .option("header", "false") \
    .csv("/obligatorio/rfn/cruceros")

In [58]:
df_paises.write \
    .mode("overwrite") \
    .option("delimiter", "|") \
    .option("header", "false") \
    .csv("/obligatorio/rfn/paises")

## Mover los archivos de lnd a raw

A continuación, se mueven los archivos que estaban en lnd a raw.

In [66]:
hadoop_conf = spark._jsc.hadoopConfiguration()

In [67]:
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)

In [57]:
ruta_origen_receptivo = "/obligatorio/lnd/receptivo"
ruta_destino_receptivo = "/obligatorio/raw/receptivo"

# Mover el archivo de lnd a raw
origen_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_origen_receptivo)
destino_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_destino_receptivo)

if fs.exists(destino_path):
    fs.delete(destino_path, True)

fs.rename(origen_path, destino_path)

True

In [58]:
ruta_origen_emisivo = "/obligatorio/lnd/emisivo"
ruta_destino_emisivo = "/obligatorio/raw/emisivo"

# Mover el archivo de lnd a raw
origen_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_origen_emisivo)
destino_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_destino_emisivo)

if fs.exists(destino_path):
    fs.delete(destino_path, True)

fs.rename(origen_path, destino_path)

True

In [59]:
ruta_origen_cruceros = "/obligatorio/lnd/cruceros"
ruta_destino_cruceros = "/obligatorio/raw/cruceros"

# Mover el archivo de lnd a raw
origen_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_origen_cruceros)
destino_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_destino_cruceros)

if fs.exists(destino_path):
    fs.delete(destino_path, True)

fs.rename(origen_path, destino_path)

True

In [68]:
ruta_origen_paises = "/obligatorio/lnd/paises"
ruta_destino_paises = "/obligatorio/raw/paises"

# Mover el archivo de lnd a raw
origen_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_origen_paises)
destino_path = spark._jvm.org.apache.hadoop.fs.Path(ruta_destino_paises)

if fs.exists(destino_path):
    fs.delete(destino_path, True)

fs.rename(origen_path, destino_path)

True