# **Proyecto Final**

## **Creamos la sesion de spark**

______

In [4]:
# 📌 1️⃣ Eliminar cualquier instalación previa
!rm -rf /content/spark-*
!apt-get update > /dev/null
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# 📌 2️⃣ Descargar Spark 3.5.0 desde Apache (con verificación de descarga)
!curl -o /content/spark-3.5.0-bin-hadoop3.tgz https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# 📌 3️⃣ Verificar si el archivo se descargó correctamente
!ls -lh /content/spark-3.5.0-bin-hadoop3.tgz

# 📌 4️⃣ Extraer el archivo si es válido
!tar xf /content/spark-3.5.0-bin-hadoop3.tgz -C /content/

# 📌 5️⃣ Configurar variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["SPARK_HOME"], "bin")
os.environ["PYTHONPATH"] = os.path.join(os.environ["SPARK_HOME"], "python") + os.pathsep + os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.10.9.7-src.zip")

# 📌 6️⃣ Instalar PySpark
!pip install -q pyspark

# 📌 7️⃣ Iniciar la sesión de Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MiSparkApp") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print("✅ Spark inicializado correctamente:", spark.version)

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  381M  100  381M    0     0   180k      0  0:36:01  0:36:01 --:--:--  206k
-rw-r--r-- 1 root root 382M Mar  5 09:48 /content/spark-3.5.0-bin-hadoop3.tgz
✅ Spark inicializado correctamente: 3.5.0


In [5]:
# importamos funciones para trabajo con dataframes
from pyspark.sql import functions as F


____
____


## **Cargamos los archivos limpios de la fase anterior**

__________

In [6]:
df_empresas= spark.read.parquet('/content/Empresas_limpio.parquet', inferSchema=True)
df_empresas.show(5)

+----------+---------+---------------+-------------+----------------------+----------+-------+
|empresa_id|   nombre|consumo_energia|emisiones_co2|certificacion_iso14001|    sector|   pais|
+----------+---------+---------------+-------------+----------------------+----------+-------+
| EMP000000|Empresa_0|        44404.7|       3217.3|                     0|   Energia| Canada|
| EMP000001|Empresa_1|       25576.11|      16135.7|                     1|  Finanzas|  China|
| EMP000002|Empresa_2|       29305.07|      2224.93|                     0|     Salud|  Japon|
| EMP000003|Empresa_3|       44600.25|      3764.32|                     1|Transporte|  China|
| EMP000004|Empresa_4|       46039.63|     11840.17|                     1|Tecnologia|Francia|
+----------+---------+---------------+-------------+----------------------+----------+-------+
only showing top 5 rows



In [7]:
df_proyectos= spark.read.parquet('/content/proyectos_limpio.parquet', inferSchema=True)
df_proyectos.show(5)

+-----------+----------+--------------------+-------------------+--------------+---------------+------------+
|proyecto_id|empresa_id|capacidad_generacion|reduccion_emisiones|costo_proyecto|estado_proyecto|tipo_energia|
+-----------+----------+--------------------+-------------------+--------------+---------------+------------+
| PROJ000000| EMP016520|             9131.06|            2270.88|         26.03|         Activo|       Solar|
| PROJ000001| EMP012923|             2164.32|             4886.1|        364.36|     Finalizado|       Solar|
| PROJ000002| EMP023136|             2374.24|              446.3|        190.17|  En desarrollo|     Biomasa|
| PROJ000003| EMP004436|             4065.11|            4465.83|         126.6|     Finalizado|  Geotermica|
| PROJ000004| EMP021681|             4581.51|            3280.46|        227.13|         Activo|      Eolica|
+-----------+----------+--------------------+-------------------+--------------+---------------+------------+
only showi

In [8]:
df_regulaciones= spark.read.parquet('/content/Regulaciones_limpio.parquet', inferSchema=True)
df_regulaciones.show(5)

+-------------+----------------+--------------------+----------------+--------+
|regulacion_id|limite_emisiones|subsidios_renovables|impuesto_carbono|    pais|
+-------------+----------------+--------------------+----------------+--------+
|    REG000000|         3012.63|                   1|           19.54|   India|
|    REG000001|         7325.54|                   1|           42.41|    EEUU|
|    REG000002|         9060.84|                   1|            6.15|Alemania|
|    REG000003|         7075.95|                   1|           44.65|  Espana|
|    REG000004|         1997.42|                   0|            8.75|Alemania|
+-------------+----------------+--------------------+----------------+--------+
only showing top 5 rows



___
____

## **Tabla Beneficios Proyectos Energeticos**

In [13]:
# Vamos a calular la primera columna, inversion total sostenible:

df_costo_proyectos= df_proyectos.groupBy('empresa_id').agg(F.round(F.sum('costo_proyecto'),2).alias('costo_total_proyectos'))
df_costo_proyectos.sort('empresa_id').show(5)

+----------+---------------------+
|empresa_id|costo_total_proyectos|
+----------+---------------------+
| EMP000000|               676.32|
| EMP000001|                 4.58|
| EMP000004|               249.27|
| EMP000005|               282.32|
| EMP000007|               697.71|
+----------+---------------------+
only showing top 5 rows



In [12]:
df_proyectos.filter(F.col('empresa_id')== 'EMP000000').show()

+-----------+----------+--------------------+-------------------+--------------+---------------+--------------+
|proyecto_id|empresa_id|capacidad_generacion|reduccion_emisiones|costo_proyecto|estado_proyecto|  tipo_energia|
+-----------+----------+--------------------+-------------------+--------------+---------------+--------------+
| PROJ002488| EMP000000|             9150.33|             224.96|        301.21|         Activo|Hidroelectrica|
| PROJ020806| EMP000000|             1962.66|             606.28|        375.11|  En desarrollo|        Eolica|
+-----------+----------+--------------------+-------------------+--------------+---------------+--------------+



In [24]:
df_inversion_total= df_empresas.join(df_costo_proyectos, on= 'empresa_id', how= 'inner').drop('certificacion_iso14001', 'sector','consumo_energia')
df_inversion_total= df_inversion_total.withColumn('inversion_total_sostenible', F.round(F.col('emisiones_co2')*F.col('costo_total_proyectos'),2))
df_inversion_total.sort('empresa_id').show(5)

+----------+---------+-------------+-------+---------------------+--------------------------+
|empresa_id|   nombre|emisiones_co2|   pais|costo_total_proyectos|inversion_total_sostenible|
+----------+---------+-------------+-------+---------------------+--------------------------+
| EMP000000|Empresa_0|       3217.3| Canada|               676.32|                2175924.34|
| EMP000001|Empresa_1|      16135.7|  China|                 4.58|                  73901.51|
| EMP000004|Empresa_4|     11840.17|Francia|               249.27|                2951399.18|
| EMP000005|Empresa_5|      5665.54| Canada|               282.32|                1599495.25|
| EMP000007|Empresa_7|       9487.0| Brasil|               697.71|                6619174.77|
+----------+---------+-------------+-------+---------------------+--------------------------+
only showing top 5 rows



#### Observaciones:

- Agrupamos por empresa para tener la suma de costos de los distintos proyectos. Comprobamos con el proyecto EMP000000 lo que se esta realizando.

- Para calular la inversion total realizada por empresa, multiplicamos las emisiones totales de las empresas por su costo total asociado, que fue la suma de costos de sus proyectos.

- De la misma manera que para la tabla anterior tiene sentido unicamente dejar las empresas que tiene proyectos sostenibles, ya que seran las que tengan costos asociados, subsidios asociados a los proyectos y a las que se les puede por lo tanto sacar balances de sostenibilidad. Para la primera tabla dejamos como salida igualmente, un listado de esas empresas que no presentan proyectos sostenibles, como complemento informativo.

In [42]:
# Pasamos a calcular los subsidios recibidos por empresa.

df_subsidios= df_proyectos.join(df_empresas.select('empresa_id','nombre','pais'), on= 'empresa_id', how= 'left')
df_subsidios= df_subsidios.join(df_regulaciones, on= 'pais', how= 'left')
df_subsidios = df_subsidios.withColumn('subsidios_efectivos', F.col('costo_proyecto')*F.col('subsidios_renovables'))

df_subsidios.show(5)
df_subsidios.groupBy('empresa_id').count().filter(F.col('empresa_id')=='EMP000000').show()
df_regulaciones.groupBy('pais').count().filter(F.col('pais')=='Canada').show()

+-------+----------+-----------+--------------------+-------------------+--------------+---------------+------------+-------------+-------------+----------------+--------------------+----------------+-------------------+
|   pais|empresa_id|proyecto_id|capacidad_generacion|reduccion_emisiones|costo_proyecto|estado_proyecto|tipo_energia|       nombre|regulacion_id|limite_emisiones|subsidios_renovables|impuesto_carbono|subsidios_efectivos|
+-------+----------+-----------+--------------------+-------------------+--------------+---------------+------------+-------------+-------------+----------------+--------------------+----------------+-------------------+
|Francia| EMP016520| PROJ000000|             9131.06|            2270.88|         26.03|         Activo|       Solar|Empresa_16520|    REG029693|         4636.67|                   1|           20.05|              26.03|
|Francia| EMP016520| PROJ000000|             9131.06|            2270.88|         26.03|         Activo|       Solar

In [43]:
df_subsidios = df_subsidios.groupBy('empresa_id').agg(F.round(F.sum('subsidios_efectivos'),2).alias('subsidios_recibidos'))
df_subsidios.sort('empresa_id').show(5)

+----------+-------------------+
|empresa_id|subsidios_recibidos|
+----------+-------------------+
| EMP000000|          976606.08|
| EMP000001|             7053.2|
| EMP000004|          377893.32|
| EMP000005|          407670.08|
| EMP000007|         1008190.95|
+----------+-------------------+
only showing top 5 rows



#### Observaciones:

- Para calcular los subsidios recibidos, tenemos que ver cuales se hacen efectivos por proyecto, es decir un subsidio se paga( cuando tiene 1 el registro) o no se paga( cuando registro es 0), y se evalua por proyecto, es decir si es efectivo se recibe de subsidio ese costo del proyecto.

- Se tiene multiples subsidios asociados a multiples regulaciones, que van por país, por lo tanto necesitamos evaluar cada regulacion con su respectivo subsidio que aplica en ese pais para cada proyecto de una empresa que este en ese pais. Realizamos el cruce de esta manera.

- Comprobamos que lo hicimos correctamente, ya que para Canada se tiene 2936 registros, lo que quiere decir que la Empresa: EMP000000, ubicada en canada deberia tener el doble de esos registros en el cruce ya que cada una de esas regulaciones aplicara para cada uno de sus dos proyectos. Esto es asi.

- Como tenemos el subsidio unitario por proyecto para cada empresa, debemos agrupar el coste total del subsidio sumando todos estos subsidios unitarios por empresa

In [54]:
# Pasamos a calcular el tercer campo, los impuestos totales por empresa.

df_reduccion_emisiones= df_proyectos.groupBy('empresa_id').agg(F.round(F.sum('reduccion_emisiones'),2).alias('reduccion_emisiones_totales'))
df_reduccion_emisiones.sort('empresa_id').show(5)

df_emisiones_netas= df_inversion_total.join(df_reduccion_emisiones, on= 'empresa_id', how= 'inner').select('empresa_id','nombre',
                                                                                                           'emisiones_co2','pais','inversion_total_sostenible',
                                                                                                           'reduccion_emisiones_totales')
df_emisiones_netas= df_emisiones_netas.withColumn('emisiones_netas',F.round( F.col('emisiones_co2')-F.col('reduccion_emisiones_totales'),2))
df_emisiones_netas.sort('empresa_id').show(5)

+----------+---------------------------+
|empresa_id|reduccion_emisiones_totales|
+----------+---------------------------+
| EMP000000|                     831.24|
| EMP000001|                    1565.71|
| EMP000004|                    3963.83|
| EMP000005|                    3517.67|
| EMP000007|                    3262.14|
+----------+---------------------------+
only showing top 5 rows

+----------+---------+-------------+-------+--------------------------+---------------------------+---------------+
|empresa_id|   nombre|emisiones_co2|   pais|inversion_total_sostenible|reduccion_emisiones_totales|emisiones_netas|
+----------+---------+-------------+-------+--------------------------+---------------------------+---------------+
| EMP000000|Empresa_0|       3217.3| Canada|                2175924.34|                     831.24|        2386.06|
| EMP000001|Empresa_1|      16135.7|  China|                  73901.51|                    1565.71|       14569.99|
| EMP000004|Empresa_4|    

In [91]:
df_impuestos= df_emisiones_netas.join(df_regulaciones, on= 'pais', how= 'left').select('empresa_id','nombre','emisiones_netas',
                                                                                       'regulacion_id','limite_emisiones','impuesto_carbono')
df_impuestos.groupBy('empresa_id').count().filter(F.col('empresa_id')=='EMP000000').show()

+----------+-----+
|empresa_id|count|
+----------+-----+
| EMP000000| 2936|
+----------+-----+



In [94]:
df_calculo_impuestos= df_impuestos.withColumn('impuestos_efectivos', F.when(F.col('emisiones_netas')>F.col('limite_emisiones'),
                                                                    F.round((F.col('emisiones_netas')-F.col('limite_emisiones'))*
                                                                            F.col('impuesto_carbono'),2)).otherwise(0))

df_calculo_impuestos= df_calculo_impuestos.groupBy('empresa_id').agg(F.round(F.sum('impuestos_efectivos'),2).alias('impuestos_recibidos'))

df_calculo_impuestos.filter(F.col('empresa_id')=='EMP000000').show()

+----------+-------------------+
|empresa_id|impuestos_recibidos|
+----------+-------------------+
| EMP000000|          5164825.0|
+----------+-------------------+



#### Observaciones:

- Para poder calcular los impuestos que paga una empresa tenemos que calcular sus emisiones netas, ya que el pago del immpuesto va ligado a esto. Una empresa pagara en caso de que las emisiones netas sean mayores al limite de una regulacion. El monto sera esa diferencia por el precio del impuesto de esa regulacion. Las emisiones netas a su vez son la diferencia entre las emisiones totales de una empresa y la suma de todas las reducciones de emisiones que lograron con sus proyectos renovables.

- Calculamos primero la reduccion de emisiones por empresa agrupando en el dataframe de proyectos, nos traemos las emisiones totales cruzando con el df de inversiones totales ya que tiene las emisiones totales por empresa y ya habiamos quitado campos que no usaremos. Calculamos la resta teniendo los dos campos de interes.

- Cruzamos el dataframe de las emisiones netas con el de regulaciones, de la misma manera que hicimos para calcular los subsidios, Evaluamos por empresa EMP000000, que tenemos en este caso las 2936 regulaciones que tiene su pais y pasamos a evaluar cada emision neta contra el limite de emisiones y en los casos que aplique calcular el impuesto

In [102]:
# Antes de calcular los campos 4 y 5 podemos empezar a unir la tabla de salida ya que estos campos dependen de los calculados anteriormente.

df_inversion_X_subsidios= df_inversion_total.join(df_subsidios, on= 'empresa_id', how= 'inner').select('empresa_id',
                                                                                                       'nombre','inversion_total_sostenible','subsidios_recibidos')

df_inversion_X_subsidios.sort('empresa_id').show(5)

+----------+---------+--------------------------+-------------------+
|empresa_id|   nombre|inversion_total_sostenible|subsidios_recibidos|
+----------+---------+--------------------------+-------------------+
| EMP000000|Empresa_0|                2175924.34|          976606.08|
| EMP000001|Empresa_1|                  73901.51|             7053.2|
| EMP000004|Empresa_4|                2951399.18|          377893.32|
| EMP000005|Empresa_5|                1599495.25|          407670.08|
| EMP000007|Empresa_7|                6619174.77|         1008190.95|
+----------+---------+--------------------------+-------------------+
only showing top 5 rows



In [103]:
df_inversion_X_subsidios_X_impuestos= df_inversion_X_subsidios.join(df_calculo_impuestos, on= 'empresa_id', how= 'inner')
df_inversion_X_subsidios_X_impuestos.show(5)
df_inversion_X_subsidios_X_impuestos.groupBy().count().show()

+----------+---------+--------------------------+-------------------+-------------------+
|empresa_id|   nombre|inversion_total_sostenible|subsidios_recibidos|impuestos_recibidos|
+----------+---------+--------------------------+-------------------+-------------------+
| EMP000000|Empresa_0|                2175924.34|          976606.08|          5164825.0|
| EMP000001|Empresa_1|                  73901.51|             7053.2|     5.0900876665E8|
| EMP000004|Empresa_4|                2951399.18|          377893.32|     1.2334216064E8|
| EMP000005|Empresa_5|                1599495.25|          407670.08|          3565494.9|
| EMP000007|Empresa_7|                6619174.77|         1008190.95|      6.833883734E7|
+----------+---------+--------------------------+-------------------+-------------------+
only showing top 5 rows

+-----+
|count|
+-----+
|16033|
+-----+



In [109]:
df_beneficios_proyectos_energeticos= df_inversion_X_subsidios_X_impuestos.withColumn('ahorro_total',
                                                                                     F.round((F.col('subsidios_recibidos')+F.col('impuestos_recibidos')),2)).withColumn(
                                                                                         'balance_sostenibilidad',F.round((F.col('ahorro_total')-F.col('inversion_total_sostenible'
                                                                                     )),2))
df_beneficios_proyectos_energeticos.show(5)


+----------+---------+--------------------------+-------------------+-------------------+--------------+----------------------+
|empresa_id|   nombre|inversion_total_sostenible|subsidios_recibidos|impuestos_recibidos|  ahorro_total|balance_sostenibilidad|
+----------+---------+--------------------------+-------------------+-------------------+--------------+----------------------+
| EMP000000|Empresa_0|                2175924.34|          976606.08|          5164825.0|    6141431.08|            3965506.74|
| EMP000001|Empresa_1|                  73901.51|             7053.2|     5.0900876665E8|5.0901581985E8|        5.0894191834E8|
| EMP000004|Empresa_4|                2951399.18|          377893.32|     1.2334216064E8|1.2372005396E8|        1.2076865478E8|
| EMP000005|Empresa_5|                1599495.25|          407670.08|          3565494.9|    3973164.98|            2373669.73|
| EMP000007|Empresa_7|                6619174.77|         1008190.95|      6.833883734E7| 6.934702829E7|

In [111]:
df_beneficios_proyectos_energeticos= df_beneficios_proyectos_energeticos.withColumnRenamed('impuestos_recibidos','impuestos_total')
df_beneficios_proyectos_energeticos.show(5)

+----------+---------+--------------------------+-------------------+---------------+--------------+----------------------+
|empresa_id|   nombre|inversion_total_sostenible|subsidios_recibidos|impuestos_total|  ahorro_total|balance_sostenibilidad|
+----------+---------+--------------------------+-------------------+---------------+--------------+----------------------+
| EMP000000|Empresa_0|                2175924.34|          976606.08|      5164825.0|    6141431.08|            3965506.74|
| EMP000001|Empresa_1|                  73901.51|             7053.2| 5.0900876665E8|5.0901581985E8|        5.0894191834E8|
| EMP000004|Empresa_4|                2951399.18|          377893.32| 1.2334216064E8|1.2372005396E8|        1.2076865478E8|
| EMP000005|Empresa_5|                1599495.25|          407670.08|      3565494.9|    3973164.98|            2373669.73|
| EMP000007|Empresa_7|                6619174.77|         1008190.95|  6.833883734E7| 6.934702829E7|         6.272785352E7|
+-------

In [112]:
df_beneficios_proyectos_energeticos.write.csv('/content/df_beneficios_proyectos_energeticos.csv', header= True)

#### Observaciones

- Para los dos ultimos campos nos bastan operaciones entre los campos previamente calculados. Cruzamos los data frames donde tenemos la inversion total, los impuestos recibidos y los subsidios recibidos. De esta manera tendremos las 3 primeras variables juntas para los calculos posteriores. Lo  cruces se realizaron tipo inner, aunque pudo haber sido left o right, ya que se tienen exactamente las mismas empresas en los 3 Dataframes, aquellas con proyectos renovables.

- Teniendo todo en un solo dataframe, calculamos el ahorro total como impuestos mas subsidios, y el balance como ahorro menos inversion, de acuerdo a lo indicado.

- finalmente podemos exportar nuestra tabla de salida #2