In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  when,col, split, substring, coalesce,regexp_replace

#Initialize SparkSession
spark = SparkSession.builder \
    .appName("PySparkESG") \
    .getOrCreate()

# BRONZE LAYER

### Data extraction from csv files

In [2]:
countries = spark.read.option("header", "true").csv("ESG_Data/ESGCountry.csv").cache()

In [3]:
countries_series = spark.read.option("header", "true").csv("ESG_Data/ESGCountry-series.csv").cache()

In [4]:
esg_data = spark.read.option("header", "true").csv("ESG_Data/ESGCSV.csv").cache()

In [5]:
footnote = spark.read.option("header", "true").csv("ESG_Data/ESGfootnote.csv").cache()

In [6]:
series = spark.read.option("header","true").csv("ESG_Data/ESGSeries.csv").cache()

In [7]:
series_time = spark.read.option("header","true").csv("ESG_Data/ESGSeries-time.csv").cache()

### Ingestion into bronze

##### Bronze Countries 

In [8]:
countries=(countries.select("Country Code","Table Name","Region","Income Group")
            )
bronze_countries=(countries.select([col(c).alias(c.replace(" ", "").replace("-", "")) for c in countries.columns])).cache()

##### Bronze Countries Series

In [9]:
countries_series=(countries_series.select("CountryCode","SeriesCode","DESCRIPTION"))

bronze_countries_series=(countries_series.select([col(c).alias(c.replace(" ", "").replace("-", "")) for c in countries_series.columns])).withColumnRenamed("DESCRIPTION","CountrySerieDescription").cache()

##### Bronze Footnote

In [10]:
bronze_footnote=(footnote.select("CountryCode","SeriesCode","Year").withColumnRenamed("Year","FootNoteYear")).cache()

##### Bronze Esg Data

In [11]:
esg_data=esg_data.drop("Indicator Name")
bronze_esg=(esg_data.select([col(c).alias(c.replace(" ", "").replace("-", "")) for c in esg_data.columns])).cache()

##### Bronze Series

In [12]:
series =(series.select("Series Code","Indicator Name","Topic","Source","Long definition"))
bronze_series=(series.select([col(c).alias(c.replace(" ", "").replace("-", "")) for c in series.columns])).cache()

##### Bronze series time

In [13]:
series_time = series_time.select("SeriesCode","Year","DESCRIPTION")

bronze_series_time= (series_time.select([col(c).alias(c.replace(" ", "").replace("-", "")) for c in series_time.columns]).withColumnRenamed("DESCRIPTION","SeriesTimesDescription").withColumnRenamed("Year","SeriesTimeYear")).cache()


# SILVER LAYER

##### Silver Countries and Country Series

In [14]:
SilverCountry = (bronze_countries
                 .dropDuplicates() 
                 .withColumn("Region", when(col("Region").isNull(), "UnknownRegion")
                                       .otherwise(col("Region")))  
                 .withColumnRenamed("TableName","CountryName")
                 .withColumn("IncomeGroup", when(col("IncomeGroup").isNull(), "Unknown Income Group")
                                       .otherwise(col("IncomeGroup")))  
                 .withColumn("CountryName", regexp_replace(col("CountryName"),"[,\\.]",""))
                )


In [15]:
SilverCountrySeries= (bronze_countries_series
                      .join(bronze_esg,on="CountryCode",how="inner")
                      .select(bronze_countries_series["*"])
                      .dropDuplicates()).cache()

##### Silver Footnote

In [16]:
SilverFootNote=(bronze_footnote.dropDuplicates()
                .withColumn("FootNoteYear", substring(col("FootNoteYear"),3,4))
                ).cache()

##### Silver Time and Time Series

In [17]:
SilverSeries= (bronze_series
             .dropDuplicates()
             .withColumn("MainCategory", split(col("Topic"), ": ")[0])
             .withColumn("SubCategory", split(col("Topic"), ": ")[1])
             
               ).cache()

In [18]:
SilverSeriesTime=(bronze_series_time
                  .dropDuplicates()
                
                  ).cache()

##### Esg Data

In [19]:
SilverEsgData = (
    bronze_esg
    .join(
        bronze_series,
        bronze_esg["IndicatorCode"] == bronze_series["SeriesCode"],
        how="inner"
    )
    .selectExpr(
        "`CountryCode`",
        "`IndicatorCode`",
        f"stack({2023 - 1990 + 1}, " +
        ", ".join([f"'{year}', `{year}`" for year in range(1990, 2024)]) +
        ") as (Year, Value)"
    )
    .withColumn("EsgYear", col("Year").cast("int"))
    .drop("Year")
    .withColumn("Value", col("Value").cast("float"))
    .withColumn(
        "Value",
        when(col("Value").isNull(), "Unknown Value").otherwise(col("Value"))
    )
    .withColumn("Value", col("Value").cast("double"))
    # .join(
    #     SilverFootNote,
    #     (bronze_esg["IndicatorCode"] == SilverFootNote["SeriesCode"]) &
    #     (bronze_esg["CountryCode"] == SilverFootNote["CountryCode"]),
    #     how="left"
    # )
    # .withColumn(
    # "FootNoteYear",
    # when(col("FootNoteYear").isNull(), "No Footnote").otherwise(col("FootNoteYear")))
)

#SilverEsgData.show()


# GOLD LAYER

### I this section i'm going to create a dimensional model of this use case in order to organize my data more

In [20]:
SilverEsgData.printSchema()

root
 |-- CountryCode: string (nullable = true)
 |-- IndicatorCode: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- EsgYear: integer (nullable = true)


Renewable energy consumption (% of total final energy consumption)

In [21]:
FactESG = (SilverEsgData.withColumnRenamed("IndicatorCode","SeriesCode")
           # .join(SilverFootNote, (SilverEsgData["CountryCode"]==SilverFootNote["CountryCode"])&(SilverEsgData["IndicatorCode"]==SilverFootNote["SeriesCode"]))
           ).cache()



Country Dimension

In [22]:
DimCountry = (SilverCountry.select("*")).cache()


Indicator Dimension

In [23]:
DimIndicator = (SilverSeries.select("SeriesCode","IndicatorName","Topic","MainCategory","SubCategory","Source","Longdefinition")).cache()

In [24]:
#DimIndicator.select("IndicatorName").distinct().show(truncate=False)

Countries Series Dimension

In [25]:
DimCountrySeries =(SilverCountrySeries.select("CountryCode","SeriesCode","CountrySerieDescription")).cache()

In [26]:
RenewableEnergyEurope= (
    
    FactESG.join(DimIndicator,on="SeriesCode", how="inner").drop(DimIndicator["SeriesCode"])
    .join(DimCountry,on="CountryCode",how="inner").drop(DimCountry["CountryCode"])
    .join(DimCountrySeries, on="CountryCode",how="inner").drop(DimCountrySeries["CountryCode"]).drop(DimCountrySeries["SeriesCode"])
    .where( (col("MainCategory")=="Environment")&(col("IndicatorName")=="Renewable energy consumption (% of total final energy consumption)"))
    .filter(col("CountryName").isin("Morocco","Netherlands","France"))
    
    
)


RenewableEnergyMena= (
    
    FactESG.join(DimIndicator,on="SeriesCode", how="inner").drop(DimIndicator["SeriesCode"])
    .join(DimCountry,on="CountryCode",how="inner").drop(DimCountry["CountryCode"])
    .join(DimCountrySeries, on="CountryCode",how="inner").drop(DimCountrySeries["CountryCode"]).drop(DimCountrySeries["SeriesCode"])
    .where((col("Region")=="Middle East & North Africa") & (col("MainCategory")=="Governance")&(col("IndicatorName")=="Control of Corruption: Estimate"))
)


In [27]:
RenewableEnergyEurope = RenewableEnergyEurope.toPandas()

In [28]:
from powerbiclient import Report,QuickVisualize,get_dataset_config, models
from powerbiclient.authentication import DeviceCodeLoginAuthentication,InteractiveLoginAuthentication
import pandas as pd


In [None]:
auth= InteractiveLoginAuthentication()

In [None]:
PBI_visualize=QuickVisualize(get_dataset_config(RenewableEnergyEurope), auth=auth)

In [None]:
PBI_visualize