## **EJEMPLO PRACTICO DE UNA ACTIVIDAD COMUN EN DATA ENGINEER**

- **OBJETO DE ESTUDIO: EMPLEO EN ARGENTINA RELACIONADO A LA DISTRIBUCION DE GENERO POR PROVINCIA, DEPARTAMENTO Y ACTIVIDAD**

**Se pretenden aplicar conceptos relacionados con Datawarehouse para comprender y familizarizar dichas taréas.**

  - **Si bien por la naturaleza de la información es más viable optar por un modelo relacional no referido a un Datawarehouse para aprovechar la estructura, se hace con fines prácticos.**

In [1]:
# PYSPARK
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DateType, DecimalType
from pyspark.sql.functions import col, when, regexp_extract, concat, lit, mean, round, asc
from pyspark.sql.functions import year, month, when
from pyspark.sql.functions import col, when

In [122]:
# PANDAS
import pandas as pd

In [2]:
# SPARK SESSION
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Employment Analysis").getOrCreate()

In [3]:
spark

## **DATA DISTRIBUCION**

- provincia_id: Número entero (integer)Código de la provincia 

- proporcion_mujeres: Número decimal (number)Proporción de mujeres en el establecimiento productivo 

- lat Número decimal: (number)Latitud redondeada del establecimiento 

- sucursal: Número entero (integer)Indicador único por sucursal de cada cuit. El par cuit-sucursal conforma un       
establecimiento. 

- empleo: Texto (string)Cantidad agrupada de empleo del establecimiento  

- in_departamentos: Número entero (integer)Código del departamento 
  
- anio: Número entero (integer)Año al que refiere la información del establecimiento en cuestión.
 
- lon: Número decimal (number)Longitud redondeada del establecimiento 

- quintil: Número entero (integer)Quintil de exportaciones de bienes en el que se ubica la empresa según el nivel de exportaciones del año en cuestión. 
  
- cuit: Texto (string)Anonimización del CUIT de la empresa a la que pertenece el establecimiento. 
  
- clae6: Número entero (integer)Actividad de la empresa a nivel de seis dígitos (CLAE6) 

In [3]:
data_distribucion = spark.read.format('CSV').option("headers","true").load("distribucion_establecimientos_productivos_sexo (1).csv")

In [6]:
data_distribucion.show(5)

In [4]:
data_distribucion_schema = StructType([
    StructField("cuit", StringType(), True),
    StructField("sucursal", IntegerType(), True),
    StructField("anio", DateType(), True),
    StructField("lat", DecimalType(), True),
    StructField("lon", DecimalType(), True),
    StructField("clae6", IntegerType(), True),
    StructField("in_departamento", IntegerType(), True),
    StructField("provincia_id", IntegerType(), True),
    StructField("quintil", IntegerType(), True),
    StructField("empleo", StringType(), True),
    StructField("proporcion_mujeres", DecimalType(), True)
])

In [5]:
data_distribucion_df = spark.read.schema(data_distribucion_schema).format("csv").option("header","true").load("C:/Dropbox/ApacheSpark/Datasets_empleo/distribucion_establecimientos_productivos_sexo (1).csv")
data_distribucion_df.show(5)  

+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+
|          cuit|sucursal|      anio|lat|lon| clae6|in_departamento|provincia_id|quintil|empleo|proporcion_mujeres|
+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+
|84X20AZ402006P|       1|2021-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|
|84X20AZ402006P|       1|2022-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|
|41X8684801PW69|       1|2021-01-01|-32|-69| 13019|          70070|          70|      0|a. 1-9|                 0|
|06X74P20120870|       1|2021-01-01|-35|-58|141201|           2105|           2|      0|a. 1-9|                 0|
|06X74P20120870|       1|2022-01-01|-35|-58|141201|           2105|           2|      0|a. 1-9|                 0|
+--------------+--------+----------+---+---+------+---------------+------------+

## **DATOS POR DEPARTAMENTO Y ACTIVIDAD**

- in_departamentos: Número entero (integer)Código de departamento 

- departamento: Texto (string)Nombre de departamento 

- provincia_id: Número entero (integer)Código de provincia 

- provincia: Texto (string)Nombre de provincia 

- clae6: Número entero (integer)Código de actividad a nivel de seis dígitos (CLAE6)

- clae2: Número entero (integer)Código de actividad a nivel de dos dígitos (CLAE2)

- letra: Texto (string)Código de actividad a nivel de letra (CLAE Letra)

- Empleo: Número entero (integer)Cantidad de trabajadores formales empleados en el departamento para la actividad correspondiente 

- Establecimientos: Número entero (integer)Cantidad de establecimientos con trabajadores formales en el departamento para la actividad correspondiente 

In [123]:
data_dep_act = spark.read.format('CSV').option("headers","true").load("Datos_por_departamento_y_actividad.csv")

In [6]:
data_dep_act_schema = StructType([
    StructField("anio", DateType(), True),
    StructField("in_departamento", IntegerType(), True),
    StructField("departamento", StringType(), True),
    StructField("provincia_id", IntegerType(), True),
    StructField("provincia", StringType(), True),
    StructField("clae6", IntegerType(), True),
    StructField("clae2", IntegerType(), True),
    StructField("letra", StringType(), True),
    StructField("Empleo", IntegerType(), True),
    StructField("Establecimientos", IntegerType(), True),
    StructField("empresas_exportadoras", IntegerType(), True)
])

In [7]:
data_dep_act_df = spark.read.schema(data_dep_act_schema).format("csv").option("header","true").load("Datos_por_departamento_y_actividad.csv")
data_dep_act_df.show(5)  

+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+
|      anio|in_departamento|departamento|provincia_id|provincia|clae6|clae2|letra|Empleo|Establecimientos|empresas_exportadoras|
+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|    33|               9|                    4|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|   407|              73|                    2|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11119|    1|    A|  1193|             343|                    8|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11121|    1|    A|    85|              29|                    0|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11129|    1|    A|    78|        

## **DATA ACTIVIDADES**

- clae6: Número entero (integer)Código de actividad a seis dígitos (CLAE6) 

- clae2: Número entero (integer)Código de actividad a dos dígitos (CLAE2)

- letra: Texto (string)Código de actividad a nivel de letra (CLAE Letra) 

- clae6_desc: Texto (string)Descriptor del clae6 (UTF-8) 

- clae2_desc: Texto (string)Descriptor del clae2 (UTF-8) 

- letra_desc: Texto (string)Descriptor de la letra (UTF-8) 

In [126]:
data_actividades = spark.read.format('CSV').option("headers","true").load("actividades_establecimientos.csv")

In [8]:
data_actividades_schema = StructType([
    StructField("clae6", IntegerType(), True),
    StructField("clae2", IntegerType(), True),
    StructField('letra', StringType(), True),
    StructField("clae6_desc", StringType(), True),
    StructField("clae2_desc", StringType(), True),
    StructField("letra_desc", StringType(), True),
])

In [9]:
data_actividades_df = spark.read.schema(data_actividades_schema).format("csv").option("header","true").load("actividades_establecimientos.csv")
data_actividades_df.show(5)

+-----+-----+-----+--------------------+--------------------+--------------------+
|clae6|clae2|letra|          clae6_desc|          clae2_desc|          letra_desc|
+-----+-----+-----+--------------------+--------------------+--------------------+
|14211|    1|    A|Cría de ganado eq...|Agricultura, gana...|AGRICULTURA, GANA...|
|11331|    1|    A|Cultivo de hortal...|Agricultura, gana...|AGRICULTURA, GANA...|
|14410|    1|    A|Cría de ganado ov...|Agricultura, gana...|AGRICULTURA, GANA...|
|11211|    1|    A|     Cultivo de soja|Agricultura, gana...|AGRICULTURA, GANA...|
|17010|    1|    A|Caza y repoblació...|Agricultura, gana...|AGRICULTURA, GANA...|
+-----+-----+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



## **DATA DISTRIBUCION GENERO**

- in_departamentos: Número entero (integer)ID del departamento según nomenclatura de INDEC 
  
- departamento: Texto (string)Nombre de departamento 
  
- provincia_id: Número entero (integer)ID de la provincia según nomenclatura de INDEC 
  
- provincia: Texto (string)Nombre de la provincia 
  
- clae6: Número entero (integer)Código de actividad a seis dígitos del CLAE 
  
- letra: Texto (string)Código de actividad a nivel de letra del CLAE 
  
- genero: Texto (string)Indica el sexo biológico de los trabajadores de la fila correspondiente 
  
- Empleo: Número decimal (number)Indica la cantidad de puestos de trabajo para el nivel de desagregación deseado 
  
- Establecimiento: Número decimal (number)Cantidad de establecimientos para los cruces solicitados 

In [129]:
data_genero = spark.read.format('CSV').option("headers","true").load("Datos_por_departamento_actividad_y_sexo.csv")

In [10]:
data_genero_schema = StructType([
    StructField("anio", DateType(), True),
    StructField("in_departamento", IntegerType(), True),
    StructField('departamento', StringType(), True),
    StructField("provincia_id", IntegerType(), True),
    StructField("provincia", StringType(), True),
    StructField("clae6", IntegerType(), True),
    StructField("clae2", IntegerType(), True),
    StructField("letra", StringType(), True),
    StructField('genero', StringType(), True),
    StructField('Empleo', IntegerType(), True),
    StructField('Establecimientos', IntegerType(), True),
    StructField('empresas_exportadoras', IntegerType(), True),
])

In [11]:
data_genero_df = spark.read.schema(data_genero_schema).format("csv").option("header","true").load("Datos_por_departamento_actividad_y_sexo.csv")
data_genero_df.show(5)

+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+
|      anio|in_departamento|departamento|provincia_id|provincia|clae6|clae2|letra| genero|Empleo|Establecimientos|empresas_exportadoras|
+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|Varones|    29|               5|                    4|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|Mujeres|     4|               4|                    4|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|Varones|   295|              45|                    2|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|Mujeres|   112|              28|                    2|
|2021-01-01|           2007|    Comuna 1|

## **DATA SALARIO MEDIO POR ACTIVIDAD CLAE6**

- Fecha: Campo fecha

- clae6: Id de actividad

- w_mean: salario medio de ese año y actividad

In [132]:
data_sal_med = spark.read.format('CSV').option("headers","True").load("salario_medio_clae6.csv")

In [12]:
data_sal_med_schema = StructType([
    StructField("fecha", DateType(), True),
    StructField("clae6", IntegerType(), True),
    StructField('w_mean', IntegerType(), True),
])

In [13]:
data_sal_med_df = spark.read.schema(data_sal_med_schema).format("csv").option("header","true").load("salario_medio_clae6.csv")

In [14]:
data_sal_med_df.show(5)

+----------+-----+------+
|     fecha|clae6|w_mean|
+----------+-----+------+
|2007-01-01|11111|  1214|
|2007-01-01|11112|  1430|
|2007-01-01|11119|  1196|
|2007-01-01|11121|  1428|
|2007-01-01|11129|  1077|
+----------+-----+------+
only showing top 5 rows



## **LIMPIEZA Y TRANSFORMACION DE DATOS**

**DATA DISTRIBUCION**

In [15]:
# Agregamos la columnas de año y mes.
data_distribucion_df = data_distribucion_df.withColumn('Año', year(col('anio')))
data_distribucion_df = data_distribucion_df.withColumn('Mes', month(col('anio')))

In [16]:
# Organizamos la jerarquia de empleo en tres columnas
data_distribucion_df = data_distribucion_df.withColumn(
    'Min_empleados',
    regexp_extract(col('empleo'),r'(\d+)', 1)
)

data_distribucion_df = data_distribucion_df.withColumn(
    'Max_empleados',
    regexp_extract(col('empleo'), r'(\d+)-(\d+)', 2)
)

In [17]:
data_distribucion_df = data_distribucion_df.withColumn(
    'Mujeres',
    when((col('proporcion_mujeres') == 0), False)
    .otherwise(True)
)

In [18]:
data_distribucion_df = data_distribucion_df.withColumn(
    'Nivel_Empleo',
    regexp_extract(col('empleo'),r'[a-zA-Z0-9]', 0)
)

In [19]:
data_distribucion_df.show()

+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
|          cuit|sucursal|      anio|lat|lon| clae6|in_departamento|provincia_id|quintil|empleo|proporcion_mujeres| Año|Mes|Min_empleados|Max_empleados|Mujeres|Nivel_Empleo|
+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
|84X20AZ402006P|       1|2021-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|2021|  1|            1|            9|  false|           a|
|84X20AZ402006P|       1|2022-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|2022|  1|            1|            9|  false|           a|
|41X8684801PW69|       1|2021-01-01|-32|-69| 13019|          70070|          70|      0|a. 1-9|                 0|2021|  1|            

**DATA POR DEPARTAMENTO Y ACTIVIDAD**

In [85]:
pd.read_csv("Datos_por_departamento_y_actividad.csv", sep=',').isnull().any()

anio                     False
in_departamentos         False
departamento             False
provincia_id             False
provincia                False
clae6                    False
clae2                    False
letra                    False
Empleo                   False
Establecimientos         False
empresas_exportadoras    False
dtype: bool

In [20]:
data_dep_act_df = data_dep_act_df.withColumn(
    'Año',
    year(col('anio'))
)

data_dep_act_df = data_dep_act_df.withColumn(
    'Mes',
    month(col('anio'))
)

data_dep_act_df.show()

+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+----+---+
|      anio|in_departamento|departamento|provincia_id|provincia|clae6|clae2|letra|Empleo|Establecimientos|empresas_exportadoras| Año|Mes|
+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+----+---+
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|    33|               9|                    4|2021|  1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|   407|              73|                    2|2021|  1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11119|    1|    A|  1193|             343|                    8|2021|  1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11121|    1|    A|    85|              29|                    0|2021|  1|
|2021-01-01|           2007|    Co

**DATA ACTIVIDADES**

In [21]:
data_actividades_df.show()

+-----+-----+-----+--------------------+--------------------+--------------------+
|clae6|clae2|letra|          clae6_desc|          clae2_desc|          letra_desc|
+-----+-----+-----+--------------------+--------------------+--------------------+
|14211|    1|    A|Cría de ganado eq...|Agricultura, gana...|AGRICULTURA, GANA...|
|11331|    1|    A|Cultivo de hortal...|Agricultura, gana...|AGRICULTURA, GANA...|
|14410|    1|    A|Cría de ganado ov...|Agricultura, gana...|AGRICULTURA, GANA...|
|11211|    1|    A|     Cultivo de soja|Agricultura, gana...|AGRICULTURA, GANA...|
|17010|    1|    A|Caza y repoblació...|Agricultura, gana...|AGRICULTURA, GANA...|
|11400|    1|    A|   Cultivo de tabaco|Agricultura, gana...|AGRICULTURA, GANA...|
|12121|    1|    A|Cultivo de uva de...|Agricultura, gana...|AGRICULTURA, GANA...|
|14720|    1|    A|Producción de pel...|Agricultura, gana...|AGRICULTURA, GANA...|
|12110|    1|    A|Cultivo de vid pa...|Agricultura, gana...|AGRICULTURA, GANA...|
|145

In [22]:
# GENERAMOS ID UNICO PARA PODER RELACIONAR LAS TABLAS
data_actividades_df = data_actividades_df.withColumn(
    'id_act',
    concat(col('letra'), lit('_'), col('clae6'))
)

**DATA GENERO**

In [23]:
data_genero_df = data_genero_df.withColumn(
    'Sexo',
    when((col('genero') == 'Varones'), 1)
    .otherwise(0)
)

data_genero_df.show(5)

+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+----+
|      anio|in_departamento|departamento|provincia_id|provincia|clae6|clae2|letra| genero|Empleo|Establecimientos|empresas_exportadoras|Sexo|
+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+----+
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|Varones|    29|               5|                    4|   1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|Mujeres|     4|               4|                    4|   0|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|Varones|   295|              45|                    2|   1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|Mujeres|   112|              28|                    2|   0|
|2021-

In [24]:
data_genero_df = data_genero_df.withColumn(
    'id_act',
    concat(col('letra'), lit('_'), col('clae6'))
)

In [25]:
data_genero_df = data_genero_df.withColumn('Año', year(col('anio')))
data_genero_df = data_genero_df.withColumn('Mes', month(col('anio')))

**DATA SALARIO MEDIO**

In [26]:
data_sal_med_df = data_sal_med_df.withColumn('Año', year(col('fecha')))
data_sal_med_df = data_sal_med_df.withColumn('Mes', month(col('fecha')))

## **ANÁLISIS DE DATOS**

**Primero debemos entender que las jerarquías para la distribución de empleo y actividades con diversificación de género femenino se basan en provincia, departamento y sucursal.**

- **El ID de la sucursal es 'cuit' en la tabla DATA DISTRIBUCIÓN.**

- **El ID del departamento es 'in_departamento' en la tabla DATA DEP ACT, la cual comparte ID con la tabla DATA DISTRIBUCIÓN.**

- **El ID del departamento en la tabla DATA GÉNERO es 'in_departamento', al igual que en la tabla anterior. De esta tabla también podremos extraer el género masculino y femenino por agrupación.**

- **También, ambas tablas poseen un ID en común que es 'provincia_id'.**

- **Ahora, para agrupar por clase de actividad, las tres tablas disponen de 'clae6' como ID, siendo la última tabla DATA ACTIVIDADES.**

- **Por último, podemos obtener una serie histórica de los salarios promedio por actividad en el lapso de los años 2007-2023 con la clave de la actividad 'clae6'.**

In [32]:
# Claves primarias de las 5 tablas
data_distribucion_df.columns

['cuit',
 'sucursal',
 'anio',
 'lat',
 'lon',
 'clae6',
 'in_departamento',
 'provincia_id',
 'quintil',
 'empleo',
 'proporcion_mujeres',
 'Año',
 'Mes',
 'Min_empleados',
 'Max_empleados',
 'Mujeres',
 'Nivel_Empleo']

In [28]:
data_dep_act_df.columns

['anio',
 'in_departamento',
 'departamento',
 'provincia_id',
 'provincia',
 'clae6',
 'clae2',
 'letra',
 'Empleo',
 'Establecimientos',
 'empresas_exportadoras',
 'Año',
 'Mes']

In [29]:
data_genero_df.columns

['anio',
 'in_departamento',
 'departamento',
 'provincia_id',
 'provincia',
 'clae6',
 'clae2',
 'letra',
 'genero',
 'Empleo',
 'Establecimientos',
 'empresas_exportadoras',
 'Sexo',
 'id_act',
 'Año',
 'Mes']

In [30]:
data_actividades_df.columns

['clae6', 'clae2', 'letra', 'clae6_desc', 'clae2_desc', 'letra_desc', 'id_act']

In [31]:
data_sal_med_df.columns

['fecha', 'clae6', 'w_mean', 'Año', 'Mes']

## **INFORMACION RELEVANTE A RECOLECTAR CON SPARK SQL**

**PASOS A SEGUIR PARA LA CONSULTA POR MODELO RELACIONAL**

**NORMALIZAR LAS 5 TABLAS**

**REALIZAR LAS SIGUIENTES CONSULTAS SQL C/ SPARK:**

**A NIVEL PROVINCIA:**
- **DISTRITOS Y SUCURSALES, ACTIVIDAD Y LETRA, CANTIDAD DE EMPLEADOS, DISTRIBUCIÓN DE GÉNERO Y PROPORCIÓN DE MUJERES.**
  
**A NIVEL DISTRITO:**
- **SUCURSALES, ACTIVIDAD Y LETRA, CANTIDAD DE EMPLEADOS, DISTRIBUCIÓN DE GÉNERO Y PROPORCIÓN DE MUJERES.**
  
**A NIVEL SUCURSALES:**
- **ACTIVIDAD Y LETRA, NIVEL DE EMPLEO Y CANTIDAD DE EMPLEADOS, DE ELLO SABER PROPORCIÓN DE MUJERES, DISTRIBUCIÓN DE GÉNERO.**
  
**A NIVEL ACTIVIDAD:**
- **SALARIO MEDIO EN FUNCIÓN DE AÑOS POR ACTIVIDAD.**

**A NIVEL SALARIO:**
- **Análisis de la serie histórica**


In [33]:
data_distribucion_df.show(2)

+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
|          cuit|sucursal|      anio|lat|lon| clae6|in_departamento|provincia_id|quintil|empleo|proporcion_mujeres| Año|Mes|Min_empleados|Max_empleados|Mujeres|Nivel_Empleo|
+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
|84X20AZ402006P|       1|2021-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|2021|  1|            1|            9|  false|           a|
|84X20AZ402006P|       1|2022-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|2022|  1|            1|            9|  false|           a|
+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+------------

In [34]:
data_genero_df.show(2)

+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+----+-------+----+---+
|      anio|in_departamento|departamento|provincia_id|provincia|clae6|clae2|letra| genero|Empleo|Establecimientos|empresas_exportadoras|Sexo| id_act| Año|Mes|
+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+----+-------+----+---+
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|Varones|    29|               5|                    4|   1|A_11111|2021|  1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|Mujeres|     4|               4|                    4|   0|A_11111|2021|  1|
+----------+---------------+------------+------------+---------+-----+-----+-----+-------+------+----------------+---------------------+----+-------+----+---+
only showing top 2 rows



In [35]:
data_dep_act_df.show(2)

+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+----+---+
|      anio|in_departamento|departamento|provincia_id|provincia|clae6|clae2|letra|Empleo|Establecimientos|empresas_exportadoras| Año|Mes|
+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+----+---+
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11111|    1|    A|    33|               9|                    4|2021|  1|
|2021-01-01|           2007|    Comuna 1|           2|     CABA|11112|    1|    A|   407|              73|                    2|2021|  1|
+----------+---------------+------------+------------+---------+-----+-----+-----+------+----------------+---------------------+----+---+
only showing top 2 rows



In [36]:
data_actividades_df.show(2)

+-----+-----+-----+--------------------+--------------------+--------------------+-------+
|clae6|clae2|letra|          clae6_desc|          clae2_desc|          letra_desc| id_act|
+-----+-----+-----+--------------------+--------------------+--------------------+-------+
|14211|    1|    A|Cría de ganado eq...|Agricultura, gana...|AGRICULTURA, GANA...|A_14211|
|11331|    1|    A|Cultivo de hortal...|Agricultura, gana...|AGRICULTURA, GANA...|A_11331|
+-----+-----+-----+--------------------+--------------------+--------------------+-------+
only showing top 2 rows



In [37]:
data_sal_med_df.show(2)

+----------+-----+------+----+---+
|     fecha|clae6|w_mean| Año|Mes|
+----------+-----+------+----+---+
|2007-01-01|11111|  1214|2007|  1|
|2007-01-01|11112|  1430|2007|  1|
+----------+-----+------+----+---+
only showing top 2 rows



**NORMALIZAMOS DEPARTAMENTO Y PROVINCIA EN LAS TABLAS CORRESPONDIENTES**

In [38]:
data_dep_id = data_genero_df.select('in_departamento', 'departamento').distinct().collect()

In [39]:
schema = StructType([
  StructField('id_departamento', IntegerType(), True),
  StructField('departamento', StringType(), True),
])

data_dep_df = spark.createDataFrame(data_dep_id, schema)

In [40]:
data_dep_df.show()

+---------------+------------------+
|id_departamento|      departamento|
+---------------+------------------+
|          18112|     Monte Caseros|
|           6833|      Tres Arroyos|
|          14161|    Tercero Arriba|
|          18175|             Sauce|
|          14168|           Totoral|
|          18063|       General Paz|
|          26021|         Escalante|
|          10091|       Santa María|
|          26063|          Mártires|
|           6686|             Rojas|
|           6392|  General Villegas|
|           6063|          Balcarce|
|           6231|          Daireaux|
|           6525|        Marcos Paz|
|          14042|General San Martín|
|           6413|             Junín|
|          22112|         O'Higgins|
|           6672|             Rauch|
|          18007|       Bella Vista|
|           2077|         Comuna 11|
+---------------+------------------+
only showing top 20 rows



In [41]:
data_prov = data_genero_df.select('provincia_id', 'provincia').distinct().collect()
schema = StructType([
  StructField('provincia_id', IntegerType(), True),
  StructField('provincia', StringType(), True),
])

data_prov_df = spark.createDataFrame(data_prov, schema)

In [42]:
data_dep_act_df = data_dep_act_df.drop('departamento', 'provincia')
data_genero_df = data_genero_df.drop('departamento', 'provincia')

In [52]:
data_distribucion_df.show(1)

+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
|          cuit|sucursal|      anio|lat|lon| clae6|in_departamento|provincia_id|quintil|empleo|proporcion_mujeres| Año|Mes|Min_empleados|Max_empleados|Mujeres|Nivel_Empleo|
+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
|84X20AZ402006P|       1|2021-01-01|-32|-60|475230|          30077|          30|      0|a. 1-9|                 0|2021|  1|            1|            9|  false|           a|
+--------------+--------+----------+---+---+------+---------------+------------+-------+------+------------------+----+---+-------------+-------------+-------+------------+
only showing top 1 row



In [53]:
data_dep_act_df.show(1)

+----------+---------------+------------+-----+-----+-----+------+----------------+---------------------+----+---+
|      anio|in_departamento|provincia_id|clae6|clae2|letra|Empleo|Establecimientos|empresas_exportadoras| Año|Mes|
+----------+---------------+------------+-----+-----+-----+------+----------------+---------------------+----+---+
|2021-01-01|           2007|           2|11111|    1|    A|    33|               9|                    4|2021|  1|
+----------+---------------+------------+-----+-----+-----+------+----------------+---------------------+----+---+
only showing top 1 row



In [54]:
data_genero_df.show(5)

+----------+---------------+------------+-----+-----+-----+-------+------+----------------+---------------------+----+-------+----+---+
|      anio|in_departamento|provincia_id|clae6|clae2|letra| genero|Empleo|Establecimientos|empresas_exportadoras|Sexo| id_act| Año|Mes|
+----------+---------------+------------+-----+-----+-----+-------+------+----------------+---------------------+----+-------+----+---+
|2021-01-01|           2007|           2|11111|    1|    A|Varones|    29|               5|                    4|   1|A_11111|2021|  1|
|2021-01-01|           2007|           2|11111|    1|    A|Mujeres|     4|               4|                    4|   0|A_11111|2021|  1|
|2021-01-01|           2007|           2|11112|    1|    A|Varones|   295|              45|                    2|   1|A_11112|2021|  1|
|2021-01-01|           2007|           2|11112|    1|    A|Mujeres|   112|              28|                    2|   0|A_11112|2021|  1|
|2021-01-01|           2007|           2|11119| 

In [55]:
data_actividades_df.show(20)

+-----+-----+-----+--------------------+--------------------+--------------------+-------+
|clae6|clae2|letra|          clae6_desc|          clae2_desc|          letra_desc| id_act|
+-----+-----+-----+--------------------+--------------------+--------------------+-------+
|14211|    1|    A|Cría de ganado eq...|Agricultura, gana...|AGRICULTURA, GANA...|A_14211|
|11331|    1|    A|Cultivo de hortal...|Agricultura, gana...|AGRICULTURA, GANA...|A_11331|
|14410|    1|    A|Cría de ganado ov...|Agricultura, gana...|AGRICULTURA, GANA...|A_14410|
|11211|    1|    A|     Cultivo de soja|Agricultura, gana...|AGRICULTURA, GANA...|A_11211|
|17010|    1|    A|Caza y repoblació...|Agricultura, gana...|AGRICULTURA, GANA...|A_17010|
|11400|    1|    A|   Cultivo de tabaco|Agricultura, gana...|AGRICULTURA, GANA...|A_11400|
|12121|    1|    A|Cultivo de uva de...|Agricultura, gana...|AGRICULTURA, GANA...|A_12121|
|14720|    1|    A|Producción de pel...|Agricultura, gana...|AGRICULTURA, GANA...|A_14720|

In [56]:
data_dep_df.show()

+---------------+------------------+
|id_departamento|      departamento|
+---------------+------------------+
|          18112|     Monte Caseros|
|           6833|      Tres Arroyos|
|          14161|    Tercero Arriba|
|          18175|             Sauce|
|          14168|           Totoral|
|          18063|       General Paz|
|          26021|         Escalante|
|          10091|       Santa María|
|          26063|          Mártires|
|           6686|             Rojas|
|           6392|  General Villegas|
|           6063|          Balcarce|
|           6231|          Daireaux|
|           6525|        Marcos Paz|
|          14042|General San Martín|
|           6413|             Junín|
|          22112|         O'Higgins|
|           6672|             Rauch|
|          18007|       Bella Vista|
|           2077|         Comuna 11|
+---------------+------------------+
only showing top 20 rows



In [57]:
data_prov_df.show()

+------------+----------------+
|provincia_id|       provincia|
+------------+----------------+
|           2|            CABA|
|          18|      Corrientes|
|          26|          Chubut|
|          14|         Cordoba|
|          10|       Catamarca|
|          22|           Chaco|
|           6|    Buenos Aires|
|          30|      Entre Rios|
|          82|        Santa Fe|
|          42|        La Pampa|
|          58|         Neuquen|
|          38|           Jujuy|
|          78|      Santa Cruz|
|          94|Tierra Del Fuego|
|          54|        Misiones|
|          50|         Mendoza|
|          66|           Salta|
|          70|        San Juan|
|          90|         Tucuman|
|          46|        La Rioja|
+------------+----------------+
only showing top 20 rows



In [58]:
data_sal_med_df.show()

+----------+-----+------+----+---+
|     fecha|clae6|w_mean| Año|Mes|
+----------+-----+------+----+---+
|2007-01-01|11111|  1214|2007|  1|
|2007-01-01|11112|  1430|2007|  1|
|2007-01-01|11119|  1196|2007|  1|
|2007-01-01|11121|  1428|2007|  1|
|2007-01-01|11129|  1077|2007|  1|
|2007-01-01|11130|  1061|2007|  1|
|2007-01-01|11211|  1149|2007|  1|
|2007-01-01|11291|  1005|2007|  1|
|2007-01-01|11299|  1033|2007|  1|
|2007-01-01|11310|   765|2007|  1|
|2007-01-01|11321|   667|2007|  1|
|2007-01-01|11329|   794|2007|  1|
|2007-01-01|11331|   821|2007|  1|
|2007-01-01|11341|   914|2007|  1|
|2007-01-01|11342|  1019|2007|  1|
|2007-01-01|11400|   645|2007|  1|
|2007-01-01|11501|   411|2007|  1|
|2007-01-01|11509|  1106|2007|  1|
|2007-01-01|11911|   838|2007|  1|
|2007-01-01|11912|   969|2007|  1|
+----------+-----+------+----+---+
only showing top 20 rows



**CREAMOS LAS TABLAS PARA GENERAR LAS CONSULTAS CORRESPONDIENTES**

In [59]:
data_distribucion_df.createOrReplaceTempView("data_distribucion")
data_genero_df.createOrReplaceTempView("data_genero")
data_dep_act_df.createOrReplaceTempView("data_dep_act")
data_actividades_df.createOrReplaceTempView("data_actividades")
data_prov_df.createOrReplaceTempView('data_provincias')
data_dep_df.createOrReplaceTempView('data_departamentos')
data_sal_med_df.createOrReplaceTempView('data_salarios')

**CONSULTAS SQL PYSPARK**

- A NIVEL PROVINCIA:
    - A - CUANTOS DEPARTAMENTOS Y SUCURSALES TIENE CADA PROVINCIA.
    - B - CANTIDAD DE EMPLEADOS DE CADA PROVINCIA.
    - C - DE LA CANTIDAD DE EMPLEADOS DE CADA PROVINCIA, SABER LA PROPORCION DE MUJERES.
    - D - DISTRIBUCION DE GENERO POR CADA PROVINCIA.
    - E - ACTIVIDAD PROMEDIO Y LETRA PROMEDIO DE CADA PROVINCIA.

**PROVINCIA**

In [60]:
# CUANTOS DEPARTAMENTOS Y SUCURSALES TIENE CADA PROVINCIA
dep_suc_prov = spark.sql("""
SELECT (p.provincia) AS Provincias, COUNT(DISTINCT dis.in_departamento) AS Departamentos, COUNT(DISTINCT dis.cuit) AS Sucursales
FROM data_distribucion AS dis
JOIN data_provincias AS p
ON dis.provincia_id = p.provincia_id
GROUP BY Provincias
ORDER BY Sucursales DESC""")

dep_suc_prov.show(24)

+-------------------+-------------+----------+
|         Provincias|Departamentos|Sucursales|
+-------------------+-------------+----------+
|       Buenos Aires|          135|    194800|
|               CABA|           15|    142104|
|            Cordoba|           26|     58266|
|           Santa Fe|           19|     57146|
|            Mendoza|           18|     25715|
|         Entre Rios|           17|     18379|
|            Tucuman|           17|     12857|
|          Rio Negro|           13|     11761|
|              Salta|           23|     10836|
|           Misiones|           17|     10777|
|            Neuquen|           16|     10513|
|              Chaco|           25|      9651|
|         Corrientes|           25|      9640|
|             Chubut|           15|      9434|
|           San Juan|           19|      7823|
|           La Pampa|           22|      7289|
|Santiago Del Estero|           27|      6293|
|           San Luis|            9|      5893|
|            

In [61]:
# CANTIDAD DE EMPLEADOS POR CADA PROVINCIA Y DISTINCION DE GENERO
emp_prov = spark.sql("""
SELECT (p.provincia) AS Provincias,
        SUM(CASE WHEN gen.genero = 'Varones' THEN 1 ELSE 0 END) AS Cantidad_Hombres,
        SUM(CASE WHEN gen.genero = 'Mujeres' THEN 1 ELSE 0 END) AS Cantidad_Mujeres,
        COUNT(*) AS Cantidad_Empleados
FROM data_genero AS gen
JOIN data_provincias AS p
ON gen.provincia_id = p.provincia_id
GROUP BY Provincias
ORDER BY Cantidad_Empleados DESC
""")                 

emp_prov.show(24)

+-------------------+----------------+----------------+------------------+
|         Provincias|Cantidad_Hombres|Cantidad_Mujeres|Cantidad_Empleados|
+-------------------+----------------+----------------+------------------+
|       Buenos Aires|           61469|           50271|            111740|
|               CABA|           16797|           14629|             31426|
|            Cordoba|           12774|           10530|             23304|
|           Santa Fe|           11387|            9440|             20827|
|            Mendoza|            8949|            6889|             15838|
|         Entre Rios|            7122|            5225|             12347|
|            Tucuman|            4817|            3551|              8368|
|           Misiones|            4802|            3509|              8311|
|         Corrientes|            4909|            3194|              8103|
|              Chaco|            4633|            2920|              7553|
|              Salta|    

In [62]:
# DISTRIBUCION DE MUJERES
dist_mujeres = spark.sql("""
SELECT p.provincia AS Provincias, SUM(proporcion_mujeres) AS Proporcion_mujeres
FROM data_distribucion AS DIS
JOIN data_provincias AS p
ON dis.provincia_id = p.provincia_id
GROUP BY Provincias
ORDER BY Proporcion_mujeres DESC""")

dist_mujeres.show()

+-------------------+------------------+
|         Provincias|Proporcion_mujeres|
+-------------------+------------------+
|       Buenos Aires|            169071|
|               CABA|            121327|
|            Cordoba|             47294|
|           Santa Fe|             44806|
|            Mendoza|             17745|
|         Entre Rios|             13458|
|          Rio Negro|             10465|
|            Neuquen|              9792|
|            Tucuman|              9438|
|             Chubut|              8518|
|              Salta|              8242|
|           Misiones|              7110|
|              Chaco|              5897|
|         Corrientes|              5773|
|           La Pampa|              5494|
|           San Juan|              5397|
|           San Luis|              4625|
|Santiago Del Estero|              4464|
|         Santa Cruz|              4236|
|              Jujuy|              4207|
+-------------------+------------------+
only showing top

In [63]:
# CANTIDAD DE EMPLEADOS POR PROVINCIA EN DATA_DEP_ACT
emp_prov_2 = spark.sql("""
SELECT p.provincia AS Provincias, SUM(dis2.Empleo) AS Cantidad_Empleados
FROM data_dep_act AS dis2
JOIN data_provincias AS p
ON dis2.provincia_id = p.provincia_id
GROUP BY Provincias
ORDER BY Cantidad_Empleados DESC""")

emp_prov_2.show()

+-------------------+------------------+
|         Provincias|Cantidad_Empleados|
+-------------------+------------------+
|       Buenos Aires|           4115444|
|               CABA|           3137197|
|           Santa Fe|           1064197|
|            Cordoba|           1050882|
|            Mendoza|            495777|
|            Tucuman|            359074|
|         Entre Rios|            278325|
|            Neuquen|            243015|
|              Salta|            235378|
|          Rio Negro|            225777|
|           Misiones|            216948|
|             Chubut|            196317|
|              Chaco|            170500|
|           San Juan|            168703|
|         Corrientes|            159982|
|              Jujuy|            122316|
|         Santa Cruz|            116971|
|           San Luis|            112037|
|Santiago Del Estero|            107381|
|           La Pampa|             78907|
+-------------------+------------------+
only showing top

In [64]:
# ACTIVIDAD Y LETRA PROMEDIO POR CADA PROVINCIA
act_prov = spark.sql("""
WITH Actividad_conteo AS (
    SELECT 
        p.provincia AS Provincias,
        gen.clae6,
        COUNT(gen.clae6) AS Actividad_conteo
    FROM 
        data_genero AS gen                    
    JOIN 
        data_provincias AS p 
    ON 
        gen.provincia_id = p.provincia_id
    GROUP BY 
        p.provincia, gen.clae6
),
Max_Actividad_Provincia AS (
    SELECT 
        Provincias, 
        clae6, 
        Actividad_conteo,
        ROW_NUMBER() OVER (PARTITION BY Provincias ORDER BY Actividad_conteo DESC) AS ac
    FROM 
        Actividad_conteo
)
SELECT 
    map.Provincias, 
    map.clae6,
    map.Actividad_conteo,
    act.clae6_desc AS Actividad
FROM 
    Max_Actividad_Provincia AS map
JOIN 
    data_actividades as act
ON 
    map.clae6 = act.clae6
WHERE 
    map.ac = 1
ORDER BY
    map.Actividad_conteo DESC
""")

act_prov.show(truncate=False)

+-------------------+------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Provincias         |clae6 |Actividad_conteo|Actividad                                                                                                                                                                       |
+-------------------+------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Buenos Aires       |473000|534             |Venta al por menor de combustible para vehículos automotores y motocicletas (Incluye la venta al por menor de productos lubricantes y refrigerantes)                            |
|Cordoba            |351320|102             |Distribución de energía eléctrica                              

In [65]:
# LETRA PROMEDIO POR PROVINCIA
let_prov = spark.sql("""
WITH Letra_conteo AS (
    SELECT
        p.provincia AS Provincias,
        gen.id_act,
        COUNT(gen.id_act) AS Letra_conteo
    FROM
        data_genero AS gen
    JOIN 
        data_provincias AS p
    ON
        gen.provincia_id = p.provincia_id
    GROUP BY
        p.provincia, gen.id_act
),
Max_Letra_Provincia AS (
    SELECT 
        Provincias,
        id_act,
        Letra_conteo,
        ROW_NUMBER () OVER (PARTITION BY Provincias ORDER BY Letra_conteo DESC) AS lc
    FROM
        Letra_conteo
)
SELECT 
    mlp.Provincias,
    mlp.id_act,
    mlp.Letra_conteo,
    act.letra_desc AS Actividad_Letra
FROM
    Max_Letra_Provincia AS mlp
JOIN
    data_actividades AS act
ON
    mlp.id_act = act.id_act
WHERE 
    mlp.lc = 1
ORDER BY 
   mlp.Letra_conteo DESC
""")

let_prov.show(truncate=False)

+-------------------+--------+------------+----------------------------------------------------------------------------------------+
|Provincias         |id_act  |Letra_conteo|Actividad_Letra                                                                         |
+-------------------+--------+------------+----------------------------------------------------------------------------------------+
|Buenos Aires       |G_473000|534         |COMERCIO AL POR MAYOR Y AL POR MENOR; REPARACIÓN DE VEHÍCULOS AUTOMOTORES Y MOTOCICLETAS|
|Cordoba            |D_351320|102         |SUMINISTRO DE ELECTRICIDAD, GAS, VAPOR Y AIRE ACONDICIONADO                             |
|Corrientes         |A_14113 |94          |AGRICULTURA, GANADERÍA, CAZA, SILVICULTURA Y PESCA                                      |
|Chaco              |G_477310|91          |COMERCIO AL POR MAYOR Y AL POR MENOR; REPARACIÓN DE VEHÍCULOS AUTOMOTORES Y MOTOCICLETAS|
|Santiago Del Estero|G_473000|85          |COMERCIO AL POR MAYOR Y AL

**DEPARTAMENTO**

- A NIVEL DEPARTAMENTO:
    - A - CUANTAS SUCURSALES TIENE CADA DEPARTAMENTO.
    - B - CANTIDAD DE EMPLEADOS DE CADA DEPARTAMENTO.
    - C - DE LA CANTIDAD DE EMPLEADOS SABER LA PROPORCION DE MUJERES.
    - D - DISTRIBUCION DE GENERO POR CADA DEPARTAMENTO.
    - E - ACTIVIDAD PROMEDIO Y LETRA PROMEDIO DE CADA DEPARTAMENTO.


In [66]:
# CANTIDAD DE SUCURSALES POR DEPARTAMENTO
dep_suc = spark.sql("""
SELECT
    COUNT(DISTINCT dis.cuit) AS Sucursales,
    d.departamento AS Departamentos
FROM data_distribucion AS dis
JOIN
    data_departamentos AS d
ON
    dis.in_departamento = d.id_departamento
GROUP BY Departamentos
ORDER BY Sucursales DESC""")

In [67]:
dep_suc.show()

+----------+------------------+
|Sucursales|     Departamentos|
+----------+------------------+
|     60991|           Capital|
|     35255|          Comuna 1|
|     23300|           Rosario|
|     16072|         Comuna 14|
|     14611|         Comuna 13|
|     14162|General Pueyrredón|
|     12054|        La Matanza|
|     11992|          Comuna 2|
|     10661|          Comuna 3|
|     10171|General San Martín|
|      9701|         Comuna 15|
|      9478|          La Plata|
|      7714|          Comuna 6|
|      7587|       Confluencia|
|      7492|         Comuna 12|
|      7489|        San Isidro|
|      7487|     Vicente López|
|      7158|          Comuna 7|
|      7131|        La Capital|
|      7108|          Comuna 5|
+----------+------------------+
only showing top 20 rows



In [71]:
# CANTIDAD DE EMPLEADOS, DISTRIBUCION H/M Y PROPORCION MUJERES
dep_dist_gen = spark.sql("""
WITH Cantidad_hombres AS (
    SELECT
        gen.in_departamento AS id_dep,
        SUM(CASE WHEN gen.genero = 'Varones' THEN 1 ELSE 0 END) AS Cantidad_hombres
    FROM 
        data_genero AS gen
    JOIN
        data_departamentos AS d
    ON
        gen.in_departamento = d.id_departamento
    GROUP BY
        id_dep
    ORDER BY 
        Cantidad_hombres DESC
),
                         
Cantidad_mujeres AS (
    SELECT
        gen.in_departamento AS id_dep,            
        SUM(CASE WHEN gen.genero = 'Mujeres' THEN 1 ELSE 0 END) AS Cantidad_mujeres
    FROM
        data_genero AS gen
    JOIN
        data_departamentos AS d
    ON 
        gen.in_departamento = d.id_departamento
    GROUP BY
        id_dep
    ORDER BY
        Cantidad_mujeres DESC
)                            
SELECT
    d.departamento AS Departamentos,
    SUM(gen.Empleo) AS Empleados,
    h.Cantidad_hombres,
    m.Cantidad_mujeres,
    ROUND((h.Cantidad_hombres / (h.Cantidad_hombres + m.Cantidad_mujeres)) * 100, 2) AS Proporcion_hombres,
    ROUND((m.Cantidad_mujeres / (h.Cantidad_hombres + m.Cantidad_mujeres)) * 100, 2) AS Proporcion_mujeres
FROM
    data_genero AS gen
JOIN 
    data_departamentos AS d
ON
    gen.in_departamento = d.id_departamento              
JOIN
    Cantidad_hombres AS h
ON
    d.id_departamento = h.id_dep
JOIN 
    Cantidad_mujeres AS m
ON
    h.id_dep = m.id_dep       
GROUP BY 
        Departamentos, h.Cantidad_hombres, m.Cantidad_mujeres
ORDER BY 
        Empleados DESC""")

In [None]:
dep_dist_gen.show()

In [76]:
# ACTIVIDAD MAS REALIZADA POR DEPARTAMENTO
max_act_dep = spark.sql("""
WITH Actividad_conteo AS (
SELECT
    d.departamento AS Departamentos,
    gen.clae6,
    COUNT(gen.clae6) AS Actividad_conteo
FROM 
    data_genero AS gen
JOIN
    data_departamentos AS d
ON  
    gen.in_departamento = d.id_departamento
GROUP BY
    Departamentos, gen.clae6
ORDER BY
    Actividad_conteo DESC
),

Max_Actividad_Dep AS (
SELECT
    Departamentos,
    clae6,
    Actividad_conteo,
    ROW_NUMBER () OVER (PARTITION BY Departamentos ORDER BY Actividad_conteo DESC) AS ac
FROM
    Actividad_conteo

)
SELECT
    mad.Departamentos,
    mad.clae6,
    mad.Actividad_conteo,
    act.clae6_desc AS Descripcion
FROM
    Max_Actividad_Dep AS mad
JOIN
    data_actividades AS act
ON
    mad.clae6 = act.clae6
WHERE
     mad.ac = 1
ORDER BY
    mad.Actividad_conteo DESC                   
""")

In [77]:
max_act_dep.show()

+--------------------+------+----------------+--------------------+
|       Departamentos| clae6|Actividad_conteo|         Descripcion|
+--------------------+------+----------------+--------------------+
|             Capital|475300|              44|Venta al por meno...|
|          25 de Mayo|471120|              20|Venta al por meno...|
|          9 de Julio|471120|              18|Venta al por meno...|
|           Rivadavia|473000|              17|Venta al por meno...|
|          San Martín|477310|              17|Venta al por meno...|
|          Avellaneda|471130|              12|Venta al por meno...|
|           Chacabuco|641930|              12|Servicios de la b...|
|               Colón|107121|              12|Elaboración indus...|
|      General Alvear|473000|              12|Venta al por meno...|
|    General Belgrano|641930|              12|Servicios de la b...|
|  General San Martín|477310|              12|Venta al por meno...|
|               Junín|829900|              12|Se

In [78]:
# LETRA PROMEDIO POR DEPARTAMENTO
max_let_dep = spark.sql("""
WITH Letra_conteo AS (
SELECT
    d.departamento AS Departamentos,
    gen.id_act,
    COUNT(gen.id_act) AS Letra_conteo
FROM
    data_genero AS gen
JOIN
    data_departamentos AS d
ON 
    gen.in_departamento = d.id_departamento
GROUP BY
    Departamentos, gen.id_act
ORDER BY 
    Letra_conteo DESC
),
                    
Max_Letra_Dep AS (
SELECT
    Departamentos,
    id_act,
    Letra_conteo,
    ROW_NUMBER() OVER(PARTITION BY Departamentos ORDER BY Letra_conteo DESC) as lc
FROM
    Letra_conteo
)
SELECT
    mld.Departamentos,
    mld.id_act,
    mld.Letra_conteo,
    act.letra_desc
FROM
    Max_Letra_Dep AS mld
JOIN
    data_actividades AS act
ON
    mld.id_act = act.id_act
WHERE
    mld.lc = 1
ORDER BY
    mld.Letra_conteo DESC
    """)

In [79]:
max_let_dep.show(truncate=False)

+-----------------------------+--------+------------+----------------------------------------------------------------------------------------+
|Departamentos                |id_act  |Letra_conteo|letra_desc                                                                              |
+-----------------------------+--------+------------+----------------------------------------------------------------------------------------+
|Capital                      |S_941200|44          |SERVICIOS DE ASOCIACIONES Y SERVICIOS PERSONALES                                        |
|25 de Mayo                   |G_471120|20          |COMERCIO AL POR MAYOR Y AL POR MENOR; REPARACIÓN DE VEHÍCULOS AUTOMOTORES Y MOTOCICLETAS|
|9 de Julio                   |G_471120|18          |COMERCIO AL POR MAYOR Y AL POR MENOR; REPARACIÓN DE VEHÍCULOS AUTOMOTORES Y MOTOCICLETAS|
|Rivadavia                    |G_473000|17          |COMERCIO AL POR MAYOR Y AL POR MENOR; REPARACIÓN DE VEHÍCULOS AUTOMOTORES Y MOTOCICLETAS|

- A NIVEL SUCURSALES:
    - A - CANTIDAD DE EMPLEADOS DE CADA SUCURSAL
    - B - DE ESA CANTIDAD PROPORCION DE MUJERES
    - C - DISTRIBUCION DE GENERO POR SUCURSALES
    - D - ACTIVIDAD Y NIVEL DE EMPLEO DE CADA SCURSAL

**SUCURSALES**

In [80]:
# NIVEL DE EMPLEO POR SUCURSAL Y PROPORCION DE HOMBRES Y MUJERES
max_suc_emp = spark.sql("""
WITH Nivel_conteo AS (
SELECT
    dis.cuit AS Sucursales,
    dis.Nivel_Empleo,
    COUNT(dis.Nivel_Empleo) AS Nivel_conteo,                    
    SUM(CASE WHEN dis.Mujeres = 'false' THEN 1 ELSE 0 END) AS Cant_hombres,
    SUM(CASE WHEN dis.Mujeres = 'true' THEN 1 ELSE 0 END) AS Cant_mujeres
FROM
    data_distribucion AS dis
GROUP BY
    Sucursales, dis.Nivel_Empleo
ORDER BY
    Nivel_conteo DESC
),                  
                        
Max_Nivel_Suc AS (
SELECT
    Sucursales,
    Nivel_Empleo,
    Nivel_conteo,
    Cant_hombres,
    Cant_mujeres,
    ROW_NUMBER() OVER(PARTITION BY Sucursales ORDER BY Nivel_conteo DESC) AS nc
FROM 
    Nivel_conteo
)
SELECT
    mns.Sucursales,
    mns.Nivel_Empleo,
    mns.Nivel_conteo,
    mns.Cant_hombres,
    mns.Cant_mujeres
FROM
    Max_Nivel_Suc AS mns
WHERE
    mns.nc = 1
ORDER BY
    mns.Nivel_conteo DESC
""")

In [81]:
max_suc_emp.show()

+--------------+------------+------------+------------+------------+
|    Sucursales|Nivel_Empleo|Nivel_conteo|Cant_hombres|Cant_mujeres|
+--------------+------------+------------+------------+------------+
|P6X057488A0772|           a|        2108|        1588|         520|
|14A80622Z304P6|           a|        1223|         264|         959|
|72X148020A1072|           a|         977|         675|         302|
|44A52A14Z30138|           a|         966|         806|         160|
|92A78P14WUB0J4|           a|         756|         148|         608|
|2JA676016Z0234|           a|         753|         531|         222|
|66X7UB04730437|           a|         727|         581|         146|
|00A784J62Z0538|           a|         695|         183|         512|
|10X1010P0A0Z53|           a|         683|         339|         344|
| 35X50840A055Z|           a|         674|         272|         402|
|42A12AP58Z0W36|           a|         654|         617|          37|
|07A127A50Z0824|           a|     

***A REVISAR***

In [82]:
# ACTIVIDAD MAS REALIZADA POR SUCURSALES
max_act_suc = spark.sql("""
WITH Act_conteo AS (
SELECT
    dis.cuit AS Sucursales,
    dis.clae6,
    COUNT(dis.clae6) AS Act_conteo
FROM
    data_distribucion AS dis
GROUP BY
    Sucursales, dis.clae6
ORDER BY
    Act_conteo DESC
                        
),
                        
Max_Act_Cont AS (
SELECT
    Sucursales,
    clae6,
    Act_conteo,
    ROW_NUMBER() OVER(PARTITION BY Sucursales ORDER BY Act_conteo) AS acnt
FROM
    Act_conteo
)
        
SELECT
    mac.Sucursales,
    mac.clae6,
    mac.Act_conteo,
    act.clae6_desc
FROM 
    Max_Act_Cont AS mac
JOIN
    data_actividades AS act
ON
    mac.clae6 = act.clae6
WHERE
    acnt = 1
ORDER BY
    Act_conteo DESC
""")

In [83]:
max_act_suc.show()

+--------------+------+----------+--------------------+
|    Sucursales| clae6|Act_conteo|          clae6_desc|
+--------------+------+----------+--------------------+
|P6X057488A0772|530010|      2469|Servicio de corre...|
|72X148020A1072|780000|      1380|Obtención y dotac...|
|44A52A14Z30138|780000|      1315|Obtención y dotac...|
|10X1010P0A0Z53|641930|      1258|Servicios de la b...|
|14A80622Z304P6|942000|      1246|Servicios de sind...|
|66X7UB04730437|471110|      1239|Venta al por meno...|
|2JA676016Z0234|829900|       955|Servicios empresa...|
|92A545A7PZ0W65|611090|       921|Servicios de tele...|
|92A78P14WUB0J4|812010|       837|Servicios de limp...|
|00A784J62Z0538|812010|       801|Servicios de limp...|
| 35X50840A055Z|641930|       766|Servicios de la b...|
|42A12AP58Z0W36|801090|       739|Servicios de segu...|
|4JA768170Z0140|491110|       726|Servicio de trans...|
|P8A8Z607030462|471110|       712|Venta al por meno...|
|07A127A50Z0824|651310|       702|      Obras So

## **CONSULTAS ESPECIFICAS**

- **Top 5 provincias con más y menos empleados**

- **Top 5 provincias con más y menos mujeres**

- **Top 5 actividades más desarrolladas en el país**

- **Top 10 departamentos con menor cantidad de mujeres**

- **Top 15 sucursales con nivel de empleo más alto**

- **Actividad más desarrollada por provincia con más mujeres y lo mismo pero para los hombres**

In [84]:
# TOP PROVINCIAS CON MAS EMPLEADOS
prov_max_emp = spark.sql("""
SELECT
    p.provincia AS Provincias,
    SUM(gen.Empleo) AS Total_Empleados
FROM
    data_genero AS gen
JOIN 
    data_provincias AS p
ON 
    gen.provincia_id = p.provincia_id
GROUP BY
    Provincias
ORDER BY
    Total_Empleados DESC
LIMIT 10
""")

prov_max_emp.show()

+------------+---------------+
|  Provincias|Total_Empleados|
+------------+---------------+
|Buenos Aires|        4115444|
|        CABA|        3137197|
|    Santa Fe|        1064197|
|     Cordoba|        1050882|
|     Mendoza|         495777|
|     Tucuman|         359074|
|  Entre Rios|         278325|
|     Neuquen|         243015|
|       Salta|         235378|
|   Rio Negro|         225777|
+------------+---------------+



In [85]:
# TOP PROVINCIAS CON MENOS EMPLEADOS
prov_max_emp = spark.sql("""
SELECT
    p.provincia AS Provincias,
    SUM(gen.Empleo) AS Total_Empleados
FROM
    data_genero AS gen
JOIN 
    data_provincias AS p
ON 
    gen.provincia_id = p.provincia_id
GROUP BY
    Provincias
ORDER BY
    Total_Empleados ASC
LIMIT 10
""")

prov_max_emp.show()

+-------------------+---------------+
|         Provincias|Total_Empleados|
+-------------------+---------------+
|            Formosa|          51482|
|           La Rioja|          62237|
|          Catamarca|          63784|
|   Tierra Del Fuego|          72853|
|           La Pampa|          78907|
|Santiago Del Estero|         107381|
|           San Luis|         112037|
|         Santa Cruz|         116971|
|              Jujuy|         122316|
|         Corrientes|         159982|
+-------------------+---------------+



In [86]:
# TOP PROVINCIAS CON MAS MUJERES
max_m = spark.sql("""
SELECT
    p.provincia AS Provincias,
    SUM(CASE WHEN gen.Sexo = 1 THEN 1 ELSE 0 END) AS Cant_Mujeres
FROM
    data_genero AS gen
JOIN
    data_provincias AS p
ON  
    gen.provincia_id = p.provincia_id
GROUP BY
    Provincias
ORDER BY
    Cant_Mujeres DESC
""")

max_m.show(10)

+------------+------------+
|  Provincias|Cant_Mujeres|
+------------+------------+
|Buenos Aires|       61469|
|        CABA|       16797|
|     Cordoba|       12774|
|    Santa Fe|       11387|
|     Mendoza|        8949|
|  Entre Rios|        7122|
|  Corrientes|        4909|
|     Tucuman|        4817|
|    Misiones|        4802|
|       Chaco|        4633|
+------------+------------+
only showing top 10 rows



In [87]:
# TOP PROVINCIAS CON MENOS MUJERES
min_m = spark.sql("""
SELECT
    p.provincia AS Provincias,
    SUM(CASE WHEN gen.Sexo = 1 THEN 1 ELSE 0 END) AS Cant_Mujeres
FROM
    data_genero AS gen
JOIN
    data_provincias AS p
ON  
    gen.provincia_id = p.provincia_id
GROUP BY
    Provincias
ORDER BY
    Cant_Mujeres ASC
""")

min_m.show(10)

+----------------+------------+
|      Provincias|Cant_Mujeres|
+----------------+------------+
|Tierra Del Fuego|        1254|
|         Formosa|        1567|
|        La Rioja|        1622|
|      Santa Cruz|        2014|
|       Catamarca|        2071|
|        San Luis|        2572|
|           Jujuy|        2648|
|         Neuquen|        3300|
|          Chubut|        3419|
|        La Pampa|        3467|
+----------------+------------+
only showing top 10 rows



In [88]:
# TOP 10 ACTIVIDADES MAS DESARROLLADAS EN EL PAIS
max_act = spark.sql("""
WITH Act_conteo AS (
    SELECT
        gen.clae6,
        COUNT(gen.clae6) AS Act_conteo
FROM
    data_genero AS gen
GROUP BY
    gen.clae6
ORDER BY
    Act_conteo DESC
),

Max_Act AS (
    SELECT
        clae6,
        Act_conteo,
        ROW_NUMBER() OVER(PARTITION BY clae6 ORDER BY Act_conteo DESC) AS ma
FROM
    Act_conteo
)
                    
SELECT
    mact.clae6,
    mact.Act_conteo,
    act.clae6_desc
FROM
    Max_Act AS mact
JOIN
    data_actividades AS act
ON
    mact.clae6 = act.clae6
WHERE
    ma = 1
ORDER BY
    Act_conteo DESC""")

max_act.show(10, truncate=False)

+------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|clae6 |Act_conteo|clae6_desc                                                                                                                                                                      |
+------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|473000|1819      |Venta al por menor de combustible para vehículos automotores y motocicletas (Incluye la venta al por menor de productos lubricantes y refrigerantes)                            |
|477310|1725      |Venta al por menor de productos farmacéuticos y de herboristería                                                                                                                |
|471130|1688   

In [89]:
# TOP 10 DEPARTAMENTOS CON MENOR CANTIDAD DE MUJERES
dep_prop_mujeres = spark.sql("""
SELECT
    d.departamento AS Departamentos,
    SUM(CASE WHEN gen.Sexo = 1 THEN 1 ELSE 0 END) AS Cant_hombres,
    SUM(CASE WHEN gen.Sexo = 0 THEN 1 ELSE 0 END) AS Cant_mujeres,
    ROUND((Cant_mujeres / (Cant_hombres + Cant_mujeres)) * 100, 2) AS Proporcion_mujeres
FROM
    data_genero AS gen
JOIN
    data_departamentos AS d
ON
    gen.in_departamento = d.id_departamento
GROUP BY
    Departamentos
ORDER BY 
    Proporcion_mujeres ASC""")

dep_prop_mujeres.show(10)

+--------------------+------------+------------+------------------+
|       Departamentos|Cant_hombres|Cant_mujeres|Proporcion_mujeres|
+--------------------+------------+------------+------------------+
|General Juan Facu...|           2|           0|               0.0|
|               Ullum|          10|           0|               0.0|
|         Ramón Lista|           2|           0|               0.0|
|      Santa Victoria|           7|           0|               0.0|
|Ángel Vicente Peñ...|           2|           0|               0.0|
|    General Lamadrid|          11|           1|              8.33|
|        Valle Grande|          15|           2|             11.76|
|               Mitre|           7|           1|              12.5|
|  Juan Felipe Ibarra|          63|          10|              13.7|
|          Sobremonte|          37|           6|             13.95|
+--------------------+------------+------------+------------------+
only showing top 10 rows



In [90]:
# TOP 15 SUCURSALES CON NIVEL DE EMPLEO MAS ALTO
suc_max_nivel_emp = spark.sql("""
WITH Nivel_conteo AS (
    SELECT
        dis.cuit AS Sucursales,
        dis.Nivel_Empleo,
        COUNT(dis.Nivel_Empleo) AS Nivel_conteo
    FROM
        data_distribucion AS dis
    GROUP BY
        Sucursales, Nivel_Empleo
    ORDER BY
        Nivel_conteo DESC
),
                              
Max_Nivel_Cont AS (
    SELECT
        Sucursales,
        Nivel_Empleo,
        Nivel_conteo,
        ROW_NUMBER() OVER(PARTITION BY Sucursales ORDER BY Nivel_conteo DESC) AS nc
    FROM
        Nivel_conteo                    
)                            
SELECT
    mnc.Sucursales,
    mnc.Nivel_Empleo,
    mnc.Nivel_Conteo
FROM
    Max_Nivel_Cont AS mnc
WHERE
    nc = 1 AND mnc.Nivel_Empleo = "e"
ORDER BY
    mnc.Nivel_conteo DESC""")

In [91]:
suc_max_nivel_emp.show(15)

+--------------+------------+------------+
|    Sucursales|Nivel_Empleo|Nivel_Conteo|
+--------------+------------+------------+
|07X60867PZ0W40|           e|           8|
|50A50A744Z4074|           e|           7|
|02A26480AZ0531|           e|           6|
|25XA84PZW30739|           e|           6|
|68X60JPW0Z3937|           e|           6|
|97X404PW8Z0972|           e|           6|
|52A5P8466A0554|           e|           4|
|24X77PJZ830540|           e|           4|
|77X0P7780A0W52|           e|           4|
|21X2A4125Z0321|           e|           4|
|81A76Z6J03003P|           e|           4|
|54A0815P0A0858|           e|           4|
|P2AAZ257430573|           e|           4|
|24A841661Z0640|           e|           4|
|77X001884A0652|           e|           4|
+--------------+------------+------------+
only showing top 15 rows



In [92]:
# ACTIVIDAD MAS DESARROLLADA POR PROVINCIA CON MAS PUESTOS DE TRABAJO PARA MUJERES.
act_prov_m = spark.sql("""
WITH Actividad_mujeres AS (
    SELECT
        p.provincia AS Provincias,
        act.clae6_desc AS Actividad,
        SUM(gen.Empleo) AS Cant_Puestos_Mujeres
        FROM
            data_genero AS gen
        JOIN
            data_actividades AS act
        ON
            gen.clae6 = act.clae6
        JOIN
            data_provincias AS p
        ON
            gen.provincia_id = p.provincia_id

        WHERE
            gen.Genero = 'Mujeres'
                            
        GROUP BY 
                Provincias, Actividad
                            
        ORDER BY
                Cant_Puestos_Mujeres DESC
),
            
ranking_actividades AS (
    SELECT
        Provincias,
        Actividad,
        Cant_Puestos_Mujeres,
        RANK() OVER (PARTITION BY Provincias ORDER BY Cant_Puestos_Mujeres DESC) AS rank
    FROM
        Actividad_mujeres
)
                       
SELECT
    ra.Provincias, 
    ra.Actividad,
    ra.Cant_Puestos_Mujeres
FROM
    ranking_actividades AS ra
WHERE
    rank = 1
ORDER BY 
        ra.Cant_Puestos_Mujeres DESC""")

In [93]:
act_prov_m.show(truncate=False)

+-------------------+----------------------------------------------------------------------------------------------------------------------------+--------------------+
|Provincias         |Actividad                                                                                                                   |Cant_Puestos_Mujeres|
+-------------------+----------------------------------------------------------------------------------------------------------------------------+--------------------+
|Buenos Aires       |Enseñanza inicial, jardín de infantes y primaria                                                                            |221150              |
|CABA               |Enseñanza inicial, jardín de infantes y primaria                                                                            |60089               |
|Cordoba            |Enseñanza inicial, jardín de infantes y primaria                                                                            |32536         

In [94]:
# ACTIVIDAD MAS DESARROLLADA POR PROVINCIA CON MAS PUESTOS DE TRABAJO PARA HOMBRES.
act_prov_h = spark.sql("""
WITH Actividad_hombres AS (
    SELECT
        p.provincia AS Provincia,
        act.clae6_desc AS Actividad,
        SUM(gen.Empleo) AS Cant_Puestos_Hombres
    FROM
        data_genero AS gen
    JOIN
        data_actividades AS act
    ON
        gen.clae6 = act.clae6
    JOIN
        data_provincias AS p
    ON
        gen.provincia_id = p.provincia_id
    WHERE
        gen.Genero = 'Varones'
    GROUP BY
            Provincia, Actividad
),
    
ranking_actividades AS (
    SELECT
        Provincia,
        Actividad,
        Cant_Puestos_Hombres,
        RANK() OVER(PARTITION BY Provincia ORDER BY Cant_Puestos_Hombres DESC) AS rank
    FROM
        Actividad_hombres
)
SELECT
    ra.Provincia,
    ra.Actividad,
    ra.Cant_Puestos_Hombres
FROM 
    ranking_actividades AS ra
                       
WHERE rank = 1
                       
ORDER BY
        ra.Cant_Puestos_Hombres DESC""")

In [95]:
act_prov_h.show(24, truncate=False)

+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Provincia          |Actividad                                                                                                                                                                                                                                                                                                              |Cant_Puestos_Hombres|
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TAMBIEN ES POSIBLE CONVERTIR EL OBJETO SPARK DATAFRAME A PANDAS DATAFRAME PARA FAMILIARIZAR MAS FACIL CON LAS LIBRERIAS DE OBJETOS VISUALES.**

***DEBEMOS TENER EN CUENTA QUE SI EL VOLUMEN DE DATOS ES MUY GRANDE, ESTA TRANSFORMACION TIENDE A SER LENTA***

In [96]:
def spark_to_pandas(data_genero_df, data_distribucion_df, data_dep_df, data_prov_df, data_sal_med_df):
    genero_pandas = data_genero_df.toPandas()
    distribucion_pandas = data_distribucion_df.toPandas()
    departamentos_pandas = data_dep_df.toPandas()
    provincias_pandas = data_prov_df.toPandas()
    salarios_pandas = data_sal_med_df.toPandas()

    genero_pandas.to_csv('df_genero.csv')
    distribucion_pandas.to_csv('df_distribucion.csv')
    departamentos_pandas.to_csv('df_departamentos.csv')
    provincias_pandas.to_csv('df_provincias.csv')
    salarios_pandas.to_csv('df_salarios.csv')

    if genero_pandas and distribucion_pandas and departamentos_pandas and provincias_pandas and salarios_pandas:
        return True
    else:
        return False

#spark_to_pandas(data_genero_df, data_distribucion_df, data_dep_df, data_prov_df, data_sal_med_df)

## **SE PREPARA LA ESTRUCTURA EN UN ENTORNO DE PRUEBAS ANTES DE COMENZAR CON LA EJECUCION DEL PROGRAMA REAL**

**TRANSFORMACION DE DATOS**

In [97]:
class TransformData:
    def __init__(self):
        self.ruta_dist = "distribucion_establecimientos_productivos_sexo (1).csv"
        self.ruta_gen = "Datos_por_departamento_actividad_y_sexo.csv"
        self.ruta_act = "actividades_establecimientos.csv"
        self.ruta_salario = "salario_medio_clae6.csv"

    def transform_data_dist(self):
        data_distribucion_schema = StructType([
        StructField("cuit", StringType(), True),
        StructField("sucursal", IntegerType(), True),
        StructField("anio", DateType(), True),
        StructField("lat", DecimalType(), True),
        StructField("lon", DecimalType(), True),
        StructField("clae6", IntegerType(), True),
        StructField("in_departamento", IntegerType(), True),
        StructField("provincia_id", IntegerType(), True),
        StructField("quintil", IntegerType(), True),
        StructField("empleo", StringType(), True),
        StructField("proporcion_mujeres", DecimalType(), True)
        ])

        data_distribucion_df = spark.read.schema(data_distribucion_schema).format("csv").option("header","true").load(self.ruta_dist)

        data_distribucion_df = data_distribucion_df.withColumn('Año', year(col('anio')))
        data_distribucion_df = data_distribucion_df.withColumn('Mes', month(col('anio')))

        data_distribucion_df = data_distribucion_df.withColumn(
            'Min_empleados',
            regexp_extract(col('empleo'),r'(\d+)', 1)
        )

        data_distribucion_df = data_distribucion_df.withColumn(
            'Max_empleados',
            regexp_extract(col('empleo'), r'(\d+)-(\d+)', 2)
        )

        data_distribucion_df = data_distribucion_df.withColumn(
            'Mujeres',
            when((col('proporcion_mujeres') == 0), False)
            .otherwise(True)
        )

        data_distribucion_df = data_distribucion_df.withColumn(
            'Nivel_Empleo',
            regexp_extract(col('empleo'),r'[a-zA-Z0-9]', 0)
        )
        
        data_distribucion_df = data_distribucion_df.withColumn('Año', year(col('anio')))
        return data_distribucion_df
    
    def transform_data_gen(self):
        data_genero_schema = StructType([
        StructField("anio", DateType(), True),
        StructField("in_departamento", IntegerType(), True),
        StructField('departamento', StringType(), True),
        StructField("provincia_id", IntegerType(), True),
        StructField("provincia", StringType(), True),
        StructField("clae6", IntegerType(), True),
        StructField("clae2", IntegerType(), True),
        StructField("letra", StringType(), True),
        StructField('genero', StringType(), True),
        StructField('Empleo', IntegerType(), True),
        StructField('Establecimientos', IntegerType(), True),
        StructField('empresas_exportadoras', IntegerType(), True),
        ])

        data_genero_df = spark.read.schema(data_genero_schema).format("csv").option("header","true").load(self.ruta_gen)

        data_genero_df = data_genero_df.withColumn(
            'Sexo',
            when((col('genero') == 'Varones'), 1)
            .otherwise(0)
        )

        data_genero_df = data_genero_df.withColumn(
            'id_act',
            concat(col('letra'), lit('_'), col('clae6'))
        )

        data_genero_df = data_genero_df.withColumn('Año', year(col('anio')))
        data_genero_df = data_genero_df.withColumn('Mes', month(col('anio')))

        return data_genero_df
    

    def transform_data_act(self):
        data_actividades_schema = StructType([
            StructField("clae6", IntegerType(), True),
            StructField("clae2", IntegerType(), True),
            StructField('letra', StringType(), True),
            StructField("clae6_desc", StringType(), True),
            StructField("clae2_desc", StringType(), True),
            StructField("letra_desc", StringType(), True),
        ])

        data_actividades_df = spark.read.schema(data_actividades_schema).format("csv").option("header","true").load(self.ruta_act)

        data_actividades_df = data_actividades_df.withColumn(
            'id_act',
            concat(col('letra'), lit('_'), col('clae6'))
        )

        return data_actividades_df
    
    def transform_data_salario(self):
        data_sal_med_schema = StructType([
            StructField("fecha", DateType(), True),
            StructField("clae6", IntegerType(), True),
            StructField('w_mean', IntegerType(), True),
        ])

        data_sal_med_df = spark.read.schema(data_sal_med_schema).format("csv").option("header","true").load(self.ruta_salario)

        data_sal_med_df = data_sal_med_df.withColumn('Año', year(col('fecha')))
        data_sal_med_df = data_sal_med_df.withColumn('Mes', month(col('fecha')))
        data_sal_med_df = data_sal_med_df.filter((col('Año') >= 2021) & (col('Año') <= 2022))
        data_sal_med_df = data_sal_med_df.groupBy(['clae6', 'Año']).agg(round(mean('w_mean'),2).alias('w_mean')).orderBy(asc('Año'))

        return data_sal_med_df

    def transform_data_dep_prov(self, data_genero_df):
        # DATA DEPARTAMENTO
        data_dep_id = data_genero_df.select('in_departamento', 'departamento').distinct().collect()
        schema = StructType([
        StructField('in_departamento', IntegerType(), True),
        StructField('departamento', StringType(), True),
        ])

        data_dep_df = spark.createDataFrame(data_dep_id, schema)

        # DATA PROVINCIAS
        data_prov = data_genero_df.select('provincia_id', 'provincia').distinct().collect()
        schema = StructType([
        StructField('provincia_id', IntegerType(), True),
        StructField('provincia', StringType(), True),
        ])

        data_prov_df = spark.createDataFrame(data_prov, schema)

        # RETORNO DATA GENERO NORMALIZADO
        data_genero_df = data_genero_df.drop(*['departamento', 'provincia'])

        return data_dep_df, data_prov_df, data_genero_df

In [125]:
clase_transform = TransformData()
data_genero_df = clase_transform.transform_data_gen()
data_actividades_df = clase_transform.transform_data_act()
data_dep_df, data_prov_df, data_genero_df = clase_transform.transform_data_dep_prov(data_genero_df)
data_sal_med_df = clase_transform.transform_data_salario()

**CREACION DE TABLAS**

In [98]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

class CreateTablesDW:
    def __init__(self, data_genero_df, data_actividades_df, data_dep_df, data_prov_df, data_sal_med_df):
        self.data_genero_df = data_genero_df
        self.data_actividades_df = data_actividades_df
        self.data_dep_df = data_dep_df
        self.data_prov_df = data_prov_df
        self.data_sal_med_df = data_sal_med_df

    def create_fact_table(self):
        table_fact = self.data_genero_df.select(['provincia_id', 'in_departamento','clae6','genero','Empleo','Establecimientos','empresas_exportadoras','Año'])
        table_fact = table_fact.withColumnsRenamed({'provincia_id':'Provincia_ID', 'in_departamento':'Departamento_ID',
                                   'clae6':'ID_Clae6', 'genero':'Genero', 'empresas_exportadoras':'Empresas_Exportadoras', 'Año':'Fecha'})
        window_spec = Window.orderBy("Fecha")
        table_fact = table_fact.withColumn('ID_Gen', row_number().over(window_spec))
        table_fact = table_fact.select(['ID_Gen','Provincia_ID', 'Departamento_ID', 'ID_Clae6', 'Genero', 'Empleo', 'Establecimientos', 'Empresas_Exportadoras', 'Fecha'])
        return table_fact
    
    def create_dim_salario(self, dim_act):
        dim_salario = self.data_sal_med_df.select(['Año', 'clae6','w_mean'])
        # Definir una ventana de partición si es necesario
        window_spec = Window.orderBy("Año")
        dim_salario = dim_salario.withColumn('ID_Sal', row_number().over(window_spec))
        dim_salario = dim_salario.withColumn('ID_Sal_t', row_number().over(window_spec))
        dim_salario = dim_salario.withColumnsRenamed({'Año':'Fecha', 'clae6':'ID_Clae6', 'w_mean':'Salario_Mean'})
        dim_salario = dim_salario.select(['ID_Sal_t','ID_Sal', 'ID_Clae6','Salario_Mean', 'Fecha'])
        df_filtered_salarios = dim_salario.join(dim_act, on='ID_Clae6', how='inner')
        df_filtered_salarios = df_filtered_salarios.select('ID_Sal_t', 'ID_Clae6', 'Salario_Mean', 'Fecha')
        df_filtered_salarios = df_filtered_salarios.withColumnRenamed('ID_Sal_t', 'ID_Sal')

        return df_filtered_salarios
    
    def create_dim_prov(self):
        dim_prov = self.data_prov_df.withColumnsRenamed({'provincia_id':'Provincia_ID', 'provincia':'Provincia'})
        
        return dim_prov
    
    def create_dim_dep(self):
        dim_dep = self.data_dep_df.withColumnsRenamed({'in_departamento':'Departamento_ID','departamento':'Departamento'})
        
        return dim_dep
    
    def create_dim_act(self):
        dim_act = self.data_actividades_df.select(['clae6', 'clae6_desc'])
        dim_act = dim_act.withColumnsRenamed({'clae6':'ID_Clae6', 'clae6_desc':'Desc_clae6'})

        return dim_act

In [52]:
clase_tables = CreateTablesDW(data_genero_df, data_actividades_df, data_dep_df, data_prov_df, data_sal_med_df)
table_fact = clase_tables.create_fact_table()
dim_act = clase_tables.create_dim_act()
dim_salario = clase_tables.create_dim_salario(dim_act)
dim_prov = clase_tables.create_dim_prov()
dim_dep = clase_tables.create_dim_dep()

**CARGA DE DATOS**

In [116]:
class LoadDW:
    def __init__(self, dim_prov, dim_dep, dim_act, dim_salario, table_fact):
        self.dim_prov = dim_prov
        self.dim_act = dim_act
        self.dim_salario = dim_salario
        self.dim_dep = dim_dep
        self.table_fact = table_fact

    def load_dim_data(self):
        for datos, tabla in {self.dim_prov:'dim_provincias', self.dim_dep:'dim_departamentos', self.dim_act:'dim_actividad', self.dim_salario:'dim_salarios', self.table_fact:'fact_empleo'}.items():
            datos.write \
            .format("jdbc") \
            .option("url", "jdbc:mysql://HOST:PORT/DATABASENAME") \
            .option("driver", "com.mysql.cj.jdbc.Driver") \
            .option("dbtable", tabla) \
            .option("user", "USER") \
            .option("password", "PASSWORD") \
            .mode("append") \
            .save()



In [117]:
clase_load = LoadDW(dim_prov, dim_dep, dim_act, dim_salario, table_fact)
clase_load.load_dim_data()

**EN MI OPINION ESTA FORMA DE EJECUTAR LOS MODULOS POR PARTES ES UNA PRACTICA SEGURA Y EDUCATIVA. LO CUAL PERMITE DETECTAR ERRORES DE MANERA ANTICIPADA ANTES DEL DESARROLLO EN PRODUCCION.**