# Introduction

The project describes a simple ETL data pipeline from a CSV file, and randomly generated ESG data about 8 detergents and their CO2 emissions, and randomly generated sales data of the detergents in different countries.
The CSV file must first be stored as a table in the Databricks data catalog.
(Data source: https://www.kaggle.com/datasets/nelgiriyewithana/countries-of-the-world-2023)

The pipeline generates 2 results in relation to the CO2 footprint caused by detergents:
- CO2 footprint per article per country
- CO2 footprint per country relative to total CO2 footprint per country

Other results could be evaluated, such as CO2 footprint by month and year and per country and per article. But for demonstration purposes, only the 2 above use cases were evaluated.

The detergents data and sales data are related to each other in a star schema structure: detergents table (dimension table) for sales table (facts table). (https://www.databricks.com/blog/2022/05/20/five-simple-steps-for-implementing-a-star-schema-in-databricks-with-delta-lake.html)

Procedural programming was used, because it is mostly structured, reusable and logically encapsulated. Object-oriented would have made less sense, since many individual components work together and should ultimately form a larger whole.

For production usage, the following things should still be changed:
- Put basic core code in a Python repository, and package it, and only call main() in Databricks. This allows changes to be tracked and versioned. Repos can have restricted access (not everyone can simply change production code in the notebook). And code can be used in other notebooks.
- DB credentials not in code, but in secrets manager or key vault

- Data pipeline can generally be triggered (e.g. by Data Factory)
- Serverless functions can also be used

- As a follow-up measure, the DevOps lifecycle could be defined:
  - Development in VS Code, remote access to test cluster in Databricks
  - Repositories in Azure DevOps (or GitHub) for versioning the artifacts
  - Production code linked to Databricks repos
  - Notebooks triggered by Data Factory


In [None]:
from pyspark.sql import SparkSession  # SparkSession entry point for programming Spark with the Dataset and DataFrame API 
from pyspark import SparkConf  # SparkConf to customize your Spark application configuration before initializing your SparkSession or SparkContext
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
import string

In [None]:
def create_spark_session():
    conf = SparkConf()   
    conf.setAppName("Databricks Shell")
    conf.setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    return spark

spark = create_spark_session()

In [None]:
def load_world_data(spark):
    path_world_data = "dbfs:/user/hive/warehouse/world_data_2023"
    df_world_data = spark.read.format("delta").load(path_world_data)
    df_world_data_short = df_world_data.select("Country", "Abbreviation", "Co2-Emissions", "Population", "GDP")
    return df_world_data_short

df_world_data_short = load_world_data(spark)
display(df_world_data_short)

Country,Abbreviation,Co2-Emissions,Population,GDP
Afghanistan,AF,8672.0,38041754.0,"$19,101,353,833"
Albania,AL,4536.0,2854191.0,"$15,278,077,447"
Algeria,DZ,150006.0,43053054.0,"$169,988,236,398"
Andorra,AD,469.0,77142.0,"$3,154,057,987"
Angola,AO,34693.0,31825295.0,"$94,635,415,870"
Antigua and Barbuda,AG,557.0,97118.0,"$1,727,759,259"
Argentina,AR,201348.0,44938712.0,"$449,663,446,954"
Armenia,AM,5156.0,2957731.0,"$13,672,802,158"
Australia,AU,375908.0,25766605.0,"$1,392,680,589,329"
Austria,AT,61448.0,8877067.0,"$446,314,739,528"


In [None]:
def create_detergents_dataframe():
    n_detergents = 8
    detergents = ["Detergent_" + x for x in string.ascii_uppercase[:n_detergents]]
    prices = np.random.uniform(5, 20, n_detergents)
    emissions = np.random.rand(n_detergents)
    df_detergents = pd.DataFrame({
        "detergent": detergents,
        "price": prices,
        "co2emission": emissions
    })
    return spark.createDataFrame(df_detergents)

df_detergents = create_detergents_dataframe()
display(df_detergents)

detergent,price,co2emission
Detergent_A,17.420960280189746,0.9616346998255388
Detergent_B,12.398255223514354,0.6088777795126956
Detergent_C,15.38816300110926,0.1685132512428528
Detergent_D,12.196656683679128,0.1527578941910609
Detergent_E,18.31587830145941,0.0080433970912259
Detergent_F,18.15053750603291,0.6782694693879415
Detergent_G,10.615604054675238,0.0881403843837383
Detergent_H,19.604752483181542,0.0784055737922437


In [None]:
def create_sales_dataframe(country_codes, detergents):
    n_sales = 2000
    df_sales = pd.DataFrame({
        "order_number": range(n_sales),
        "article": np.random.choice(detergents, n_sales),
        "quantity": np.random.binomial(30, 0.1, n_sales) + 1,
        "year": np.random.choice(range(2000, 2023), n_sales),
        "month": np.random.choice(range(1, 12), n_sales),
        "country_abbr": np.random.choice(country_codes, n_sales)
    })
    return spark.createDataFrame(df_sales)

# Get unique country codes list
country_codes = [row[0] for row in df_world_data_short.select("Abbreviation").distinct().collect()]

df_sales = create_sales_dataframe(country_codes, df_detergents.select("detergent").rdd.flatMap(lambda x: x).collect())
display(df_sales)


order_number,article,quantity,year,month,country_abbr
0,Detergent_C,3,2010,11,PH
1,Detergent_E,3,2010,2,ZM
2,Detergent_A,2,2008,5,KH
3,Detergent_F,5,2001,8,MC
4,Detergent_H,6,2021,3,KN
5,Detergent_H,3,2008,2,NE
6,Detergent_H,4,2000,9,PG
7,Detergent_A,5,2009,11,CM
8,Detergent_G,5,2013,10,TZ
9,Detergent_F,3,2000,4,GQ


In [None]:
df_sales_extended = df_sales.join(df_detergents, df_sales.article == df_detergents.detergent, "inner") \
                            .drop("detergent") \
                            .withColumnRenamed("price", "price_per_article") \
                            .withColumnRenamed("co2emission", "co2emission_per_article")

df_grouped_article_country = df_sales_extended.groupBy("article", "country_abbr").agg(
    F.sum(F.col("co2emission_per_article") * F.col("quantity")).alias("Total_co2emission_per_article_per_country"),
    F.sum(F.col("price_per_article") * F.col("quantity")).alias("Total_price_per_article_per_country")
)

display(df_grouped_article_country)


article,country_abbr,Total_co2emission_per_article_per_country,Total_price_per_article_per_country
Detergent_G,PY,1.057684612604861,127.38724865610286
Detergent_E,MR,0.0241301912736777,54.94763490437823
Detergent_G,HU,0.7932634594536455,95.54043649207712
Detergent_C,BG,1.348106009942823,123.10530400887409
Detergent_A,CL,9.616346998255388,174.20960280189746
Detergent_A,US,11.539616397906464,209.05152336227692
Detergent_E,BO,0.0482603825473555,109.89526980875648
Detergent_E,GD,0.0563037796385814,128.21114811021587
Detergent_C,BH,2.3591855173999403,215.43428201552965
Detergent_H,NE,1.1760836068836569,294.07128724772315


In [None]:
# company co2 footprint per country relative to total co2 footprint per country
df_grouped_country = df_grouped_article_country.groupBy("country_abbr").agg(F.sum("Total_co2emission_per_article_per_country") \
    .alias("Total_co2emission_per_country")
)

df_grouped_total_compare = df_grouped_country.join(df_world_data_short, on=df_grouped_country.country_abbr == df_world_data_short.Abbreviation, how="inner")\
               .drop("Abbreviation") \
               .withColumnRenamed("Total_co2emission_per_country", "co2emission_by_company") \
               .withColumnRenamed("Co2-Emissions", "Country-Total-Co2-Emissions")
display(df_grouped_total_compare)

country_abbr,co2emission_by_company,Country,Country-Total-Co2-Emissions,Population,GDP
DZ,15.543569515966068,Algeria,150006.0,43053054,"$169,988,236,398"
MM,15.184096644656714,Myanmar,25280.0,54045420,"$76,085,852,617"
LT,11.382430301420928,Lithuania,12963.0,2786844,"$54,219,315,600"
FI,15.763031900586595,Finland,45871.0,5520314,"$268,761,201,365"
AZ,10.485624798788756,Azerbaijan,37620.0,10023318,"$39,207,000,000"
UA,19.13396254540514,Ukraine,202250.0,44385155,"$153,781,069,118"
ZM,11.07851570446694,Zambia,5141.0,17861030,"$23,064,722,446"
SL,13.544961844870263,Sierra Leone,1093.0,7813215,"$3,941,474,311"
SB,3.665530677494708,Solomon Islands,169.0,669823,"$1,425,074,226"
LA,15.64291715369331,Laos,17763.0,7169455,"$18,173,839,128"


In [None]:
# SQL Server connection properties
server = ""
database = ""
username = ""
password = ""

url = f"jdbc:sqlserver://{server}:1433;database={database}"
properties = {"user": username, "password": password, "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"}

In [None]:
# write to database
df_grouped_article_country.write.jdbc(url=url, table="SalesDataAggregated", mode="append", properties=properties)
df_grouped_total_compare.write.jdbc(url=url, table="Co2EmissionsCompare", mode="append", properties=properties)