## 1. Create databases (AWS Cloud)

In [0]:
spark.sql("SHOW DATABASES").show()

+------------------+
|      databaseName|
+------------------+
|           default|
|information_schema|
|        wdi_curate|
+------------------+



In [0]:
spark.sql("SHOW CATALOGS").show()

+--------------+
|       catalog|
+--------------+
|     data_demo|
|hive_metastore|
|       samples|
|        system|
+--------------+



In [0]:
%sql
CREATE DATABASE IF NOT EXISTS data_demo.wdi_curate;
CREATE DATABASE IF NOT EXISTS data_demo.eea_curated;

In [0]:
spark.sql("SHOW DATABASES").show()

+------------------+
|      databaseName|
+------------------+
|           default|
|       eea_curated|
|information_schema|
|        wdi_curate|
+------------------+



In [0]:
spark.sql("USE data_demo.wdi_curate")


DataFrame[]

In [0]:
%sql

SELECT current_schema();

current_database()
wdi_curate


## 2. World Development Indicators data

## 2.1 Read on DataFrame


In [0]:
from datetime import datetime

# Current date in YYYYMMDD format
current_date = datetime.now().strftime("%Y%m%d")

df_world_data = (spark.read.format("csv")
 .option("header", "true")
 .load(f"dbfs:/datalake/raw/world_development_indicators/date={current_date}/WDIData.csv")
)


df_world_country = (spark.read.format("csv")
                    .option("header", "true")
                    .load(f"dbfs:/datalake/raw/world_development_indicators/date={current_date}/WDICountry.csv")
)

df_world_series = (spark.read.format("csv")
                   .option("header", "true")
                   .load(f"dbfs:/datalake/raw/world_development_indicators/date={current_date}/WDISeries.csv")
)

## 3 Rename columns

In [0]:
def rename_columns(df):
    current_columns = df.columns
    for col in current_columns:
        new_col_name = col.replace(" ", "_")
        df = df.withColumnRenamed(col, new_col_name)
    return df

In [0]:
df_world_data = rename_columns(df_world_data)

df_world_country = rename_columns(df_world_country)

df_world_series = rename_columns(df_world_series)

In [0]:
df_world_country.columns[:5]

['Country_Code', 'Short_Name', 'Table_Name', 'Long_Name', '2-alpha_code']

In [0]:


df_world_series.columns[:5]

['Series_Code',
 'Topic',
 'Indicator_Name',
 'Short_definition',
 'Long_definition']

## 4.1 Drop Null

In [0]:
df_world_data = df_world_data.dropna(how='all')

df_world_country = df_world_country.dropna(how='all')

df_world_series = df_world_series.dropna(how='all')



## 4.2 Filtering DataFrame & Drop Duplicates

In [0]:
dataframes = {
  'WDI_Data'   : df_world_data,
  'WDI_Country': df_world_country,
  'WDI_Series' : df_world_series
}
for df_name, df in dataframes.items():
  print(f'Number of records of dataframe {df_name} before dropping nulls: {df.count()}')
 

Number of records of dataframe WDI_Data before dropping nulls: 395276
Number of records of dataframe WDI_Country before dropping nulls: 275
Number of records of dataframe WDI_Series before dropping nulls: 4454


## 4 Filtering DataFrame & Drop Duplicates

In [0]:

df_world_data = df_world_data.dropDuplicates()
df_world_country = df_world_country.dropDuplicates()
df_world_series = df_world_series.dropDuplicates()

dataframes = {
  'WDI_Data'   : df_world_data,
  'WDI_Country': df_world_country,
  'WDI_Series' : df_world_series
}
for df_name, df in dataframes.items():
  
  print(f'Number of records of dataframe {df_name} after dropping nulls: {dataframes[df_name].count()}')

Number of records of dataframe WDI_Data after dropping nulls: 395276
Number of records of dataframe WDI_Country after dropping nulls: 275
Number of records of dataframe WDI_Series after dropping nulls: 2399


## For the WDICountry.csv and WDIData.csv files

In [0]:
from pyspark.sql.functions import length, col

df_world_data = df_world_data.where(length(df_world_data["Country_Code"]) == 3)
df_world_country = df_world_country.where(length(df_world_country["Country_Code"]) == 3)


## For WDISeries.csv

In [0]:

df_world_series  = df_world_series.where(~col("Series_Code").contains(" "))

## 5. Write the data to the data lake’s curated layer on DBFS

In [0]:
from datetime import datetime


# Get the current date
current_date = datetime.now()
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
day = current_date.strftime("%d")

#partition ='20240313'
dbfs_base_path   = 'dbfs:/datalake/curated/wdi'
output_partition = f'year={year}/month={month}/day={day}/'
#save_path='dbfs:/datalake/curated/wdi/data/year=2024/month=03/day=13/'

data_wdi_curate = {
  'data'   : df_world_data,
  'country': df_world_country,
  'series' : df_world_series
}

for name, df in data_wdi_curate.items():
  (
    data_wdi_curate[name]
    .coalesce(1)
    .write
    .mode("overwrite")
    .format('parquet')
    .option('path', f'{dbfs_base_path}/{name}/{output_partition}')
    .save()
  )


In [0]:
spark.sql("SHOW CATALOGS").show()

+--------------+
|       catalog|
+--------------+
|     data_demo|
|hive_metastore|
|       samples|
|        system|
+--------------+



In [0]:
spark.sql("SHOW DATABASES").show()

+------------------+
|      databaseName|
+------------------+
|           default|
|       eea_curated|
|information_schema|
|        wdi_curate|
+------------------+



In [0]:
data_wdi_curate = {
  'data'   : df_world_data,
  'country': df_world_country,
  'series' : df_world_series
}
# Specify the S3 bucket and path
s3_bucket = "databricks-workspace-stack-10fab-bucket"
s3_path = "unity-catalog/1803637943354536"


# Create an external table
spark.sql("USE data_demo.wdi_curate")
for name, df in data_wdi_curate.items():

    # Write the DataFrame to S3 in Parquet format
    df_temp_path = f"s3a://{s3_bucket}/{s3_path}/temp_{name}"
    data_wdi_curate[name].write.mode("overwrite").parquet(df_temp_path)

    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {name}
    USING PARQUET
    LOCATION '{df_temp_path}'
    """)


## 6. For the CO2 emissions data

In [0]:
input_path = "/datalake/raw/co2_passenger_cars_emissions"
df_co2_emissions = spark.read.option("multiline", "true").json(input_path)


## 7. Replace spaces in column names with underscores (“_”)

In [0]:
# Renaming the columns
for col_name in df_co2_emissions.columns:
    new_col_name = col_name.replace(' ', '_').replace('(', '').replace(')', '')
    df_co2_emissions = df_co2_emissions.withColumnRenamed(col_name, new_col_name)


In [0]:
df_co2_emissions.columns[:5]

['At1_mm', 'At2_mm', 'Cn', 'Cr', 'Ct']

In [0]:
print(f'Number of records of dataframe co2_emissions before dropping nulls: {df_co2_emissions.count()}')



Number of records of dataframe co2_emissions before dropping nulls: 300000


In [0]:

df_co2_emissions = df_co2_emissions.dropna(how='all')

print(f'Number of records of dataframe co2_emissions after dropping nulls: {df_co2_emissions.count()}')

Number of records of dataframe co2_emissions after dropping nulls: 300000


## 8. Apply data quality filters on the CO2 emissions DataFrame

In [0]:

df_co2_emissions = df_co2_emissions.dropDuplicates()

df_co2_emissions = df_co2_emissions.where(length(df_co2_emissions["MS"]) == 2)

df_co2_emissions = df_co2_emissions.filter(col("MS").rlike("^[A-Z]{2}$"))


## 9.1 Write the data to the data lake’s curated layer on DBFS

In [0]:

# Path to the curated layer in DBFS
path_to_write = "dbfs:/datalake/curated/co2_emissions/"

# Write the DataFrame to Parquet, dynamically partitioned by 'year'
df_co2_emissions.write.partitionBy("year").mode("overwrite").parquet(path_to_write)

## 9.2 Create external table 

In [0]:
# Specify the S3 bucket and path
s3_bucket = "databricks-workspace-stack-10fab-bucket"
s3_path = "unity-catalog/1803637943354536"


# Create an external table
spark.sql("USE data_demo.eea_curated")

# Write the DataFrame to S3 in Parquet format
df_temp_path = f"s3a://{s3_bucket}/{s3_path}/temp0"
df_co2_emissions.write.mode("overwrite").parquet(df_temp_path)

spark.sql(f"""
CREATE TABLE IF NOT EXISTS co2_emissions
USING PARQUET
LOCATION '{df_temp_path}'
""")


DataFrame[]