# Task 2:

####  Objective: Perform Sector and Subsector level analysis and Identify the sources/ assets resposible for the emissions.

#### Initiating Spark Session

In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.2.4-bin-hadoop2.7\\spark-3.2.4-bin-hadoop2.7'

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Check_User").getOrCreate()

# Get the user running the Spark job
user = spark.sparkContext.sparkUser()
print("Spark job user:", user)


Spark job user: Surya


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType,IntegerType

import os

# Create a Spark session
spark = SparkSession.builder.appName("Task2") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

In [4]:
# Define the schema for the provided data
schema_ownership = StructType([
    StructField("asset_id", StringType(), True),
    StructField("asset_name", StringType(), True),
    StructField("owner_name", StringType(), True),
    StructField("owner_classification", StringType(), True),
    StructField("percentage_of_ownership", StringType(), True),
    StructField("owner_direct_parent", StringType(), True),
    StructField("owner_grouping", StringType(), True),
    StructField("operator_name", StringType(), True),
    StructField("percentage_of_operation", StringType(), True),
    StructField("data_source", StringType(), True),
    StructField("url", StringType(), True),
    StructField("recency", TimestampType(), True),
    StructField("created_date", StringType(), True),
    StructField("original_inventory_sector", StringType(), True)
])

In [5]:
from hdfs import InsecureClient

# Define the directory path in HDFS
hdfs_directory = '/user/username/project_data/Data/'

# Define the HDFS path
hdfs_path = "hdfs://localhost:9000/user/username/project_data/Data/"


#Method to obtain the file_names

def obtain_file_names(folder_path):
    try:
        # Initialize HDFS client
        client = InsecureClient('http://localhost:50070', user='surya') 
        # List files in the directory
        file_names = client.list(folder_path)
        return file_names
    except subprocess.CalledProcessError as e:
        return print(f"Error executing command: {e}")
    

<b>The below code will create a data frame which stores the file names, type of the file namely, emission and ownership files and their respective HDFS paths.</b>

In [6]:
from pyspark.sql import Row

folders = ["agriculture", "buildings", "fluorinated_gases", "fossil_fuel_operations", 
           "manufacturing", "mineral_extraction", "power", "waste"] 

# Define the schema for the DataFrame
schema_1 = StructType([
    StructField("folder", StringType(), True),
    StructField("file_type", StringType(), True),
    StructField("file_name", StringType(), True),
    StructField("file_path", StringType(), True)  # Adding the file_path column
])

# List to store all rows
all_rows = []

for folder in folders:
    folder_dir_path = os.path.join(hdfs_directory, folder)
    file_names = obtain_file_names(folder_dir_path)
    #print(folder, folder_dir_path, file_names, '\n')
    
    # Filter file paths for emissions and ownership files
    emission_files = [name for name in file_names if name.startswith('asset_') and name.endswith('_emissions.csv')]
    ownership_files = [name for name in file_names if name.startswith('asset_') and name.endswith('_ownership.csv')]
    
    # Collect data for emission and ownership files in the list
    all_rows.extend([Row(folder, 'emission_files', name, f"hdfs://localhost:9000{folder_dir_path}/{name}") for name in emission_files])
    all_rows.extend([Row(folder, 'ownership_files', name, f"hdfs://localhost:9000{folder_dir_path}/{name}") for name in ownership_files])

# Create DataFrame from the collected rows
all_files_df = spark.createDataFrame(all_rows, schema_1)


In [7]:
all_files_df.show()

+--------------------+---------------+--------------------+--------------------+
|              folder|      file_type|           file_name|           file_path|
+--------------------+---------------+--------------------+--------------------+
|         agriculture| emission_files|asset_cropland-fi...|hdfs://localhost:...|
|         agriculture| emission_files|asset_enteric-fer...|hdfs://localhost:...|
|         agriculture| emission_files|asset_manure-mana...|hdfs://localhost:...|
|         agriculture| emission_files|asset_synthetic-f...|hdfs://localhost:...|
|         agriculture|ownership_files|asset_enteric-fer...|hdfs://localhost:...|
|         agriculture|ownership_files|asset_manure-mana...|hdfs://localhost:...|
|fossil_fuel_opera...| emission_files|asset_coal-mining...|hdfs://localhost:...|
|fossil_fuel_opera...| emission_files|asset_oil-and-gas...|hdfs://localhost:...|
|fossil_fuel_opera...| emission_files|asset_oil-and-gas...|hdfs://localhost:...|
|fossil_fuel_opera...|owners

<b> Create a dictionary by merging all the individual emission and ownership files for each sector</b>

In [8]:

from pyspark.sql.functions import lit

# Create an empty dictionary to store DataFrames for each folder and file type
folder_dfs = {}

# Create separate dataframes for each folder and file type combination
folders = all_files_df.select("folder").distinct().rdd.flatMap(lambda x: x).collect()

for folder in folders:
    folder_df = all_files_df.filter(all_files_df.folder == folder)
    
    # Define DataFrame names dynamically based on folder name and file type
    emission_df_name = f"{folder}_emission_df"
    ownership_df_name = f"{folder}_ownership_df"
    
    # Separate emission and ownership files
    emission_files = folder_df.filter(folder_df.file_type == "emission_files").collect()
    ownership_files = folder_df.filter(folder_df.file_type == "ownership_files").collect()
    
    # Load data from CSV files into respective DataFrames
    if emission_files:
        emission_paths = [row.file_path for row in emission_files]
        temp_emission_df = spark.read.option("header", "true").csv(emission_paths)
        # Add folder and file_type columns to temp_emission_df
        temp_emission_df = temp_emission_df.withColumn("folder", lit(folder)).withColumn("file_type", lit("emission_files"))
        
        # Store the DataFrame in the dictionary with the dynamic name
        folder_dfs[emission_df_name] = temp_emission_df
    
    if ownership_files:
        ownership_paths = [row.file_path for row in ownership_files]
        temp_ownership_df = spark.read.option("header", "true").csv(ownership_paths)
        # Add folder and file_type columns to temp_ownership_df
        temp_ownership_df = temp_ownership_df.withColumn("folder", lit(folder)).withColumn("file_type", lit("ownership_files"))
        
        # Store the DataFrame in the dictionary with the dynamic name
        folder_dfs[ownership_df_name] = temp_ownership_df

# Display or perform operations on the separate DataFrames for each folder and file type
for df_name, df in folder_dfs.items():
    print(f"DataFrame Name: {df_name}")


DataFrame Name: agriculture_emission_df
DataFrame Name: agriculture_ownership_df
DataFrame Name: fossil_fuel_operations_emission_df
DataFrame Name: fossil_fuel_operations_ownership_df
DataFrame Name: manufacturing_emission_df
DataFrame Name: manufacturing_ownership_df
DataFrame Name: mineral_extraction_emission_df
DataFrame Name: power_emission_df
DataFrame Name: power_ownership_df
DataFrame Name: waste_emission_df


## Sector wise Analysis - Agriculture

In [9]:
folder_dfs['agriculture_emission_df'].show(5)

+----------+------------+-------------------------+-------------------+-------------------+--------------------+----------+------------------+----------------+----------------------+--------+--------------+---------------+--------+--------------+-------------------+-------------+------------------+----------+--------------------+-----------+--------------+
|  asset_id|iso3_country|original_inventory_sector|         start_time|           end_time|temporal_granularity|       gas|emissions_quantity|emissions_factor|emissions_factor_units|capacity|capacity_units|capacity_factor|activity|activity_units|       created_date|modified_date|        asset_name|asset_type|           st_astext|     folder|     file_type|
+----------+------------+-------------------------+-------------------+-------------------+--------------------+----------+------------------+----------------+----------------------+--------+--------------+---------------+--------+--------------+-------------------+-------------+--

<b>Initial cleaning - droping the unwanted columns, removing and filling the null values with the averages.</b>

In [10]:
columns_to_drop = ['start_time', 'end_time', 'temporal_granularity','created_date','modified_date']

folder_dfs['agriculture_emission_df'] = folder_dfs['agriculture_emission_df'].drop(*columns_to_drop)

In [11]:
from pyspark.sql.functions import when, count,col

null_counts = folder_dfs['agriculture_emission_df'] .agg(*[
    count(when(col(c).isNull(), c)).alias(c)
    for c in folder_dfs['agriculture_emission_df'] .columns
])

#null_counts.show()

In [12]:
from pyspark.sql.functions import  avg
columns_to_check = ['emissions_quantity','emissions_factor','capacity','capacity_factor','activity']
# Calculate averages for specified columns
avg_values = folder_dfs['agriculture_emission_df'].agg(*(avg(col(c)).alias(c) for c in columns_to_check))

# Extract the averages
avg_dict = avg_values.collect()[0].asDict()

# Fill null values with averages
agriculture_emission_df_new= folder_dfs['agriculture_emission_df'].fillna(avg_dict)


In [13]:
agriculture_emission_df_new.printSchema()

root
 |-- asset_id: string (nullable = true)
 |-- iso3_country: string (nullable = true)
 |-- original_inventory_sector: string (nullable = true)
 |-- gas: string (nullable = true)
 |-- emissions_quantity: string (nullable = false)
 |-- emissions_factor: string (nullable = false)
 |-- emissions_factor_units: string (nullable = true)
 |-- capacity: string (nullable = false)
 |-- capacity_units: string (nullable = true)
 |-- capacity_factor: string (nullable = false)
 |-- activity: string (nullable = false)
 |-- activity_units: string (nullable = true)
 |-- asset_name: string (nullable = true)
 |-- asset_type: string (nullable = true)
 |-- st_astext: string (nullable = true)
 |-- folder: string (nullable = false)
 |-- file_type: string (nullable = false)



In [14]:
#checking the null count after cleaning

null_counts_ = agriculture_emission_df_new .agg(*[
    count(when(col(c).isNull(), c)).alias(c)
    for c in agriculture_emission_df_new .columns
])

#null_counts_.show()

In [15]:
#creating new metrics 

agriculture_emission_df_new= agriculture_emission_df_new.withColumn('Total Emissions', col('emissions_factor') * col('activity'))
agriculture_emission_df_new= agriculture_emission_df_new.withColumn('Utilization', col('activity') * col('capacity'))


In [16]:
#agriculture_emission_df_new.show()

In [17]:
folder_dfs['agriculture_ownership_df'].show(5)

+--------+---------------+--------------------+--------------------+-----------------------+-------------------+--------------------+-------------+-----------------------+--------------------+--------------------+-------------------+--------------------+-------------------------+-----------+---------------+
|asset_id|     asset_name|          owner_name|owner_classification|percentage_of_ownership|owner_direct_parent|      owner_grouping|operator_name|percentage_of_operation|         data_source|                 url|            recency|        created_date|original_inventory_sector|     folder|      file_type|
+--------+---------------+--------------------+--------------------+-----------------------+-------------------+--------------------+-------------+-----------------------+--------------------+--------------------+-------------------+--------------------+-------------------------+-----------+---------------+
| 5099205|USA_CA_beef_275|Harris Feeding Co...|                null|     

<b>Performing Left join between agriculture emission and ownership files, hence obtaining the owner_names for respective assets</b>

In [18]:
agriculture_df_final = agriculture_emission_df_new.join(folder_dfs['agriculture_ownership_df'], ['asset_id', 'asset_name','original_inventory_sector','folder'], 'left')

In [19]:
#agriculture_df_final.show()

In [20]:
agriculture_df_final.select("original_inventory_sector").distinct().show()


+-------------------------+
|original_inventory_sector|
+-------------------------+
|           cropland-fires|
|     synthetic-fertili...|
|        manure-management|
|     enteric-fermentation|
+-------------------------+



### Agriculture - Synthetic fertilizers

In [21]:
# Filter data based on original_inventory_sector
filtered_df1 =  agriculture_df_final.filter(col('original_inventory_sector') == 'synthetic-fertilizer-application-top500')

#filtered_df1.show()

In [22]:
filtered_df1.select("asset_name").distinct().show()


+--------------------+
|          asset_name|
+--------------------+
|China_Liaoning_Hu...|
|China_Jiangsu_Nan...|
|Spain_Castilla-La...|
|Spain_Castilla-La...|
|China_Shandong_Ji...|
|India_West Bengal...|
|Canada_Manitoba_D...|
|India_Uttar Prade...|
|China_Liaoning_Pa...|
|India_Uttar Prade...|
|United States_Cal...|
|Indonesia_Kaliman...|
|India_Uttar Prade...|
|China_Hunan_Hengyang|
|China_Jiangsu_Lia...|
|China_Shandong_Li...|
|China_Jilin_Liaoyuan|
|India_Uttar Prade...|
|China_Fujian_Nanping|
| China_Jilin_Tonghua|
+--------------------+
only showing top 20 rows



In [23]:
from pyspark.sql.functions import col, split

# Split the 'asset_name' column by underscore and get the first part i.e., country name
grouped_df = filtered_df1.withColumn("Asset_Group", split(col("asset_name"), "_").getItem(0))

# Group by the 'Group' column and count occurrences
grouped_result = grouped_df.groupBy("Asset_Group").count().orderBy("count", ascending=False)

grouped_result.show()


+--------------+--------+
|   Asset_Group|   count|
+--------------+--------+
|         China|19234810|
|         India| 8295805|
|        Canada| 3113390|
|      Pakistan| 1909950|
|         Spain|  763865|
|United Kingdom|  479780|
|        France|  428085|
|     Indonesia|  303835|
| United States|  183400|
|  South Africa|  127750|
|    Bangladesh|  116830|
|         Egypt|   14175|
+--------------+--------+



In [24]:
#grouped_df.show()

In [25]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
grouped_avg = grouped_df.groupBy("Asset_Group", "asset_type" , "gas","original_inventory_sector","asset_name").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
        avg(col("capacity")).alias("capacity"),
        avg(col("capacity_factor")).alias("capacity_factor"),
        avg(col("activity")).alias("activity"),
        avg(col("Total Emissions")).alias("Total Emissions"),
        avg(col("Utilization")).alias("Utilization")
    
)

grouped_avg.show(5)

+-----------+------------+----------+-------------------------+--------------------+--------------------+--------------------+------------------+---------------+------------------+------------------+------------------+
|Asset_Group|  asset_type|       gas|original_inventory_sector|          asset_name|  emissions_quantity|    emissions_factor|          capacity|capacity_factor|          activity|   Total Emissions|       Utilization|
+-----------+------------+----------+-------------------------+--------------------+--------------------+--------------------+------------------+---------------+------------------+------------------+------------------+
|     France|  Vegetables|       n2o|     synthetic-fertili...|France_Grand Est_...|0.006638373513446747|0.015714285714285695|  8.14886603282457|         2089.0| 50.20292144499993|0.7889030512785705|411.75182069456383|
|      India|       Maize|co2e_100yr|     synthetic-fertili...|India_Madhya Prad...|  10.450277673450199| 0.0157142857142857

### Agriculture - Enteric-fermentation

In [26]:
# Filter data based on original_inventory_sector (example: filtering for 'sector_name')
filtered_df2 =  agriculture_df_final.filter(col('original_inventory_sector') == 'enteric-fermentation')

#filtered_df2.show()

In [27]:
from pyspark.sql.functions import when, col, split

# Split the 'asset_name' column by underscore and get the first part
grouped_df2 = filtered_df2.withColumn("Asset_Group", split(col("asset_type"), "_").getItem(0))

# Create a new column 'asset_category' based on the below conditions in 'asset_name'
grouped_df2 = grouped_df2.withColumn(
    "asset_category",
    when(col("asset_type").contains("beef"), "beef")
    .when(col("asset_type").contains("dairy"), "dairy")
    .otherwise("asset_type")
)

# Group by the 'Asset_Group' and 'asset_category' columns and count occurrences
grouped_result2 = grouped_df2.groupBy("Asset_Group", "asset_category").count().orderBy("count", ascending=False)

grouped_result2.show()


+-----------+--------------+-----+
|Asset_Group|asset_category|count|
+-----------+--------------+-----+
|        USA|         dairy| 7480|
|        USA|          beef| 7095|
|        ARG|          beef|  690|
|        ARG|         dairy|   40|
+-----------+--------------+-----+



In [28]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
grouped_avg2 = grouped_df2.groupBy("Asset_Group","asset_category","asset_type" , "gas","asset_name","original_inventory_sector").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
        avg(col("capacity")).alias("capacity"),
        avg(col("capacity_factor")).alias("capacity_factor"),
        avg(col("activity")).alias("activity"),
        avg(col("Total Emissions")).alias("Total Emissions"),
        avg(col("Utilization")).alias("Utilization")
    
)

#grouped_avg2.show()

In [29]:
from pyspark.sql.functions import col, when
# Provided asset names and owner names
asset_owner_data = [
    ("USA_CA_beef_275", "Harris Feeding Company"),
    ("USA_CA_beef_276", "Brandt Cattle"),
    ("USA_TX_beef_459", "Barrett and Crofoot"),
    ("USA_TX_beef_460", "Five Rivers Cattle Feeding"),
    ("USA_TX_beef_461", "Cactus Feeders"),
    ("USA_TX_beef_464", "Champion Feeders"),
    ("USA_TX_beef_465", "Bar-G Feedyard"),
    ("USA_TX_beef_467", "Friona Industries")
]

# Create a dictionary using comprehension
asset_owner_dict = {asset_type: owner_name for asset_type, owner_name in asset_owner_data}

# Create a DataFrame from the dictionary
dict_df = spark.createDataFrame(list(asset_owner_dict.items()), ["asset_type", "owner_name"])
# Join 'grouped_avg2' with the DataFrame created from the dictionary
updated_df = grouped_avg2.join(
    dict_df,
    on=["asset_type"],
    how="left"
)

In [30]:
updated_df.show(5)

+----------------+-----------+--------------+----+----------+-------------------------+------------------+------------------+--------+---------------+-----------------+------------------+-----------+----------+
|      asset_type|Asset_Group|asset_category| gas|asset_name|original_inventory_sector|emissions_quantity|  emissions_factor|capacity|capacity_factor|         activity|   Total Emissions|Utilization|owner_name|
+----------------+-----------+--------------+----+----------+-------------------------+------------------+------------------+--------+---------------+-----------------+------------------+-----------+----------+
|     ARG_beef_12|        ARG|          beef|null|      null|     enteric-fermentation|              null|         1698.1328|    null|           null|72.89330596466804|123782.51375903845|       null|      null|
|USA_CA_dairy_410|        USA|         dairy|null|      null|     enteric-fermentation|              null|           1883.52|    null|           null|72.893

### Agriculture - Manure Management

In [31]:
# Filter data based on original_inventory_sector (example: filtering for 'sector_name')
filtered_df3 =  agriculture_df_final.filter(col('original_inventory_sector') == 'manure-management')

#filtered_df3.show()


In [32]:
from pyspark.sql.functions import when, col, split

# Split the 'asset_name' column by underscore and get the first part
grouped_df3 = filtered_df3.withColumn("Asset_Group", split(col("asset_type"), "_").getItem(0))

# Create a new column 'asset_category' based on conditions in 'asset_name'
grouped_df3 = grouped_df3.withColumn(
    "asset_category",
    when(col("asset_type").contains("beef"), "beef")
    .when(col("asset_type").contains("dairy"), "dairy")
    .otherwise("asset_type")
)

# Group by the 'Asset_Group' and 'asset_category' columns and count occurrences
grouped_result3 = grouped_df3.groupBy("Asset_Group", "asset_category").count().orderBy("count", ascending=False)

grouped_result3.show()


+-----------+--------------+-----+
|Asset_Group|asset_category|count|
+-----------+--------------+-----+
|        USA|         dairy| 7480|
|        USA|          beef| 7035|
|        ARG|          beef|  690|
|        ARG|         dairy|   40|
+-----------+--------------+-----+



In [33]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
grouped_avg3 = grouped_df3.groupBy("Asset_Group","asset_category","asset_type" , "gas","asset_name","owner_name","original_inventory_sector").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
        avg(col("capacity")).alias("capacity"),
        avg(col("capacity_factor")).alias("capacity_factor"),
        avg(col("activity")).alias("activity"),
        avg(col("Total Emissions")).alias("Total Emissions"),
        avg(col("Utilization")).alias("Utilization")
    
)

#grouped_avg3.show()

In [34]:
agriculture_df_3 =  updated_df.union(grouped_avg3)
#agriculture_df_3.show(5)

In [35]:
agriculture_df_1 = grouped_avg

### Agriculture - Cropland-fires

In [36]:
# Filter data based on original_inventory_sector (example: filtering for 'sector_name')
filtered_df =  agriculture_df_final.filter(col('original_inventory_sector') == 'cropland-fires')
#filtered_df.show()

In [37]:
# Group by the 'Group' column and count occurrences
grouped_result5 = filtered_df.groupBy("asset_name").count().orderBy("count", ascending=False)
#grouped_result5.show()

In [38]:

# Split the 'asset_name' column by underscore and get the first part
grouped_df4 = filtered_df.withColumn("Asset_Group", split(col("asset_name"), "_").getItem(0))
# Split the 'asset_name' column by underscore and get the first part
grouped_df4 = grouped_df4.withColumn("Asset_SubGroup", split(col("asset_name"), "_").getItem(1))

#grouped_df4.show()

In [39]:

# Group by the 'Group' column and count occurrences
grouped_result6 = grouped_df4.groupBy("Asset_Group","Asset_SubGroup").count().orderBy("count", ascending=False)

grouped_result6.show()

+-----------+---------------+-----+
|Asset_Group| Asset_SubGroup|count|
+-----------+---------------+-----+
|      India|    Maharashtra|95280|
|      India|  Uttar Pradesh|88515|
|      China|   Heilongjiang|84360|
|      India| Madhya Pradesh|76980|
|   Pakistan|         Punjab|73910|
|      China|     Nei Mongol|69640|
|     Brazil|   Minas Gerais|61605|
|      China|          Hebei|60995|
|     Brazil|      São Paulo|58135|
|     Brazil|          Goiás|54900|
|     Brazil|    Mato Grosso|54055|
|      China|         Yunnan|52490|
|      China|       Shandong|51865|
|      India|      Karnataka|51600|
|      India| Andhra Pradesh|49485|
|  Australia|New South Wales|46760|
|      China|       Liaoning|46530|
|      India|      Telangana|46465|
|      China|          Jilin|45465|
|     Brazil|         Paraná|44580|
+-----------+---------------+-----+
only showing top 20 rows



In [40]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
grouped_avg4 = grouped_df4.groupBy("Asset_Group","Asset_SubGroup","asset_type","gas","original_inventory_sector").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
        avg(col("capacity")).alias("capacity"),
        avg(col("capacity_factor")).alias("capacity_factor"),
        avg(col("activity")).alias("activity"),
        avg(col("Total Emissions")).alias("Total Emissions"),
        avg(col("Utilization")).alias("Utilization")
    
)

grouped_avg4.show(10)

+-------------+--------------------+----------+----------+-------------------------+--------------------+-------------------+------------------+---------------+------------------+-------------------+------------------+
|  Asset_Group|      Asset_SubGroup|asset_type|       gas|original_inventory_sector|  emissions_quantity|   emissions_factor|          capacity|capacity_factor|          activity|    Total Emissions|       Utilization|
+-------------+--------------------+----------+----------+-------------------------+--------------------+-------------------+------------------+---------------+------------------+-------------------+------------------+
|     Thailand|            Yasothon|  Cropland|       n2o|           cropland-fires| 0.06560131349387752|0.09999999999999991|63.431450437317785|         2089.0| 10.26773566569485| 1.0267735665694848| 1540.835151745222|
|     Ethiopia|    Benshangul-Gumaz|  Cropland|       n2o|           cropland-fires| 0.08639054476776177|0.09999999999999984

In [41]:
agriculture_df_2 =grouped_avg4

## Sector 2:  Power - Electricity Generation

In [42]:
folder_dfs['power_emission_df'].show(5)

+--------+------------+-------------------------+-------------------+-------------------+--------------------+----------+------------------+----------------+----------------------+--------+-------------------+---------------+--------+--------------+--------------------+--------------------+--------------------+----------+--------------------+------+--------------+
|asset_id|iso3_country|original_inventory_sector|         start_time|           end_time|temporal_granularity|       gas|emissions_quantity|emissions_factor|emissions_factor_units|capacity|     capacity_units|capacity_factor|activity|activity_units|        created_date|       modified_date|          asset_name|asset_type|           st_astext|folder|     file_type|
+--------+------------+-------------------------+-------------------+-------------------+--------------------+----------+------------------+----------------+----------------------+--------+-------------------+---------------+--------+--------------+-----------------

In [43]:
folder_dfs['power_emission_df'].printSchema()

root
 |-- asset_id: string (nullable = true)
 |-- iso3_country: string (nullable = true)
 |-- original_inventory_sector: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- temporal_granularity: string (nullable = true)
 |-- gas: string (nullable = true)
 |-- emissions_quantity: string (nullable = true)
 |-- emissions_factor: string (nullable = true)
 |-- emissions_factor_units: string (nullable = true)
 |-- capacity: string (nullable = true)
 |-- capacity_units: string (nullable = true)
 |-- capacity_factor: string (nullable = true)
 |-- activity: string (nullable = true)
 |-- activity_units: string (nullable = true)
 |-- created_date: string (nullable = true)
 |-- modified_date: string (nullable = true)
 |-- asset_name: string (nullable = true)
 |-- asset_type: string (nullable = true)
 |-- st_astext: string (nullable = true)
 |-- folder: string (nullable = false)
 |-- file_type: string (nullable = false)



In [44]:
columns_to_drop = ['start_time', 'end_time', 'temporal_granularity','created_date','modified_date']

folder_dfs['power_emission_df'] = folder_dfs['power_emission_df'].drop(*columns_to_drop)

In [45]:
from pyspark.sql.functions import when, count,col

null_counts = folder_dfs['power_emission_df'] .agg(*[
    count(when(col(c).isNull(), c)).alias(c)
    for c in folder_dfs['power_emission_df'] .columns
])

#null_counts.show()

In [46]:
power_emission_df_new = folder_dfs['power_emission_df'].dropna(subset=['emissions_quantity'])

In [47]:
power_emission_df_new= power_emission_df_new.withColumn('Total Emissions', col('emissions_factor') * col('activity'))
power_emission_df_new= power_emission_df_new.withColumn('Utilization', col('activity') * col('capacity'))


In [48]:
 power_df_final = power_emission_df_new.join(folder_dfs['power_ownership_df'], ['asset_id', 'asset_name','original_inventory_sector','folder'], 'left')

In [49]:
# Select specific columns using col() function
power_df_output = power_df_final.select(
    col("asset_id"),
    col("asset_name"),
    col("original_inventory_sector"),
    col("folder"),
    col("iso3_country"),
    col("gas"),
    col("emissions_quantity"),
    col("emissions_factor"),
    col("emissions_factor_units"),
    col("capacity"),
    col("capacity_units"),
    col("capacity_factor"),
    col("activity"),
    col("activity_units"),
    col("asset_type"),
    col("st_astext"),
    col("Total Emissions"),
    col("Utilization"),
    col("owner_name"), 
    col("owner_grouping")
)

# Show the resulting DataFrame
power_df_output.show(5)

+--------+--------------------+-------------------------+------+------------+----------+------------------+----------------+----------------------+--------+-------------------+---------------+--------+--------------+----------+--------------------+---------------+------------+--------------------+--------------+
|asset_id|          asset_name|original_inventory_sector|folder|iso3_country|       gas|emissions_quantity|emissions_factor|emissions_factor_units|capacity|     capacity_units|capacity_factor|activity|activity_units|asset_type|           st_astext|Total Emissions| Utilization|          owner_name|owner_grouping|
+--------+--------------------+-------------------------+------+------------+----------+------------------+----------------+----------------------+--------+-------------------+---------------+--------+--------------+----------+--------------------+---------------+------------+--------------------+--------------+
| 1670320|     Mill Creek (KY)|     electricity-gener...| 

In [50]:
# Group by the 'Group' column and count occurrences
Power_group = power_df_output.groupBy("asset_type").count().orderBy("count", ascending=False)
Power_group.show(10)

+--------------------+-----+
|          asset_type|count|
+--------------------+-----+
|                coal| 5286|
|   gas, other_fossil|  238|
|           coal, gas|  232|
|           coal, oil|  205|
|coal, gas, other_...|  145|
|                 gas|  122|
|          gas_or_oil|   67|
|   oil, other_fossil|   66|
|           gas, coal|   58|
|  coal, other_fossil|   49|
+--------------------+-----+
only showing top 10 rows



In [51]:
from pyspark.sql.functions import lower, split, explode, col

# Create a temporary table with the simplified asset types
power_df_output.select(
    explode(split(col("asset_type"), ", ")).alias("Asset_Category")
).createOrReplaceTempView("Asset_categories")

# Read the temporary table into a DataFrame
Asset_categories_counts = spark.sql(
    "SELECT Asset_Category, count(*) AS count FROM Asset_categories GROUP BY Asset_Category ORDER BY count DESC"
)

# Sort the DataFrame by count in descending order
Asset_categories_count = Asset_categories_counts.orderBy("count", ascending=False)


In [52]:
Asset_categories_count.show()


+--------------+-----+
|Asset_Category|count|
+--------------+-----+
|          coal| 6081|
|           gas| 1059|
|  other_fossil|  711|
|           oil|  492|
|    gas_or_oil|  250|
|       biomass|   27|
+--------------+-----+



In [53]:
power_df_group = power_df_output.withColumn("Asset_category", explode(split(col("asset_type"), ", ")))


In [54]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
grouped_avg_power = power_df_group.groupBy("Asset_category","asset_name", "gas","original_inventory_sector","owner_name","owner_grouping","iso3_country").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
        avg(col("capacity")).alias("capacity"),
        avg(col("capacity_factor")).alias("capacity_factor"),
        avg(col("activity")).alias("activity"),
        avg(col("Total Emissions")).alias("Total Emissions"),
        avg(col("Utilization")).alias("Utilization")
    
)

grouped_avg_power.show(10)

+--------------+--------------------+----------+-------------------------+--------------------+--------------------+------------+--------------------+------------------+--------+-------------------+--------------------+-----------------+--------------------+
|Asset_category|          asset_name|       gas|original_inventory_sector|          owner_name|      owner_grouping|iso3_country|  emissions_quantity|  emissions_factor|capacity|    capacity_factor|            activity|  Total Emissions|         Utilization|
+--------------+--------------------+----------+-------------------------+--------------------+--------------------+------------+--------------------+------------------+--------+-------------------+--------------------+-----------------+--------------------+
|           oil|  Safi power station|       co2|     electricity-gener...| Safi Energy Company|Sofina and Nareva...|         MAR|   6969333.333333333|             1.006|  1388.0| 0.5693333333333332|           6927000.0|    

 ## Sector 3:  Fossil Fuel operations

In [55]:
#folder_dfs['fossil_fuel_operations_emission_df'].show(5)

There is issue in ingesting data for the above df all the fields are being null, so definining individual schema for fossil fuel files and merging them into single dataframe

In [56]:
from functools import reduce
from pyspark.sql import DataFrame
    
    #Filter all_files_df based on conditions
filtered_files_df = all_files_df.filter(
    (all_files_df.folder == "fossil_fuel_operations") & 
    (all_files_df.file_type == "emission_files")
)

# Collect the file paths from the filtered DataFrame
file_paths = filtered_files_df.select("file_path").rdd.flatMap(lambda x: x).collect()
print(file_paths)


['hdfs://localhost:9000/user/username/project_data/Data/fossil_fuel_operations/asset_coal-mining_emissions.csv', 'hdfs://localhost:9000/user/username/project_data/Data/fossil_fuel_operations/asset_oil-and-gas-production-and-transport_emissions.csv', 'hdfs://localhost:9000/user/username/project_data/Data/fossil_fuel_operations/asset_oil-and-gas-refining_emissions.csv']


In [57]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType


# Read CSV files into chunks using the collected file paths and specified schema
dataframes = []
for file_path in file_paths:
    df_chunks = spark.read.format("csv").option("header", "true").load(file_path)
    dataframes.append(df_chunks)

# Show the merged DataFrame
#merged_df.show(5)

### Fossil fuels - Coal Mining

In [58]:
columns_to_drop = ['start_time', 'end_time', 'temporal_granularity','created_date','modified_date']

coal_mining_df = dataframes[0].drop(*columns_to_drop)
coal_mining_df.show(5)

+---------+------------+-------------------------+-------+----------+------------------+----------------+----------------------+--------+---------------+---------------+--------+--------------------+--------------------+----------+--------------------+
| asset_id|iso3_country|original_inventory_sector|lat_lon|       gas|emissions_quantity|emissions_factor|emissions_factor_units|capacity| capacity_units|capacity_factor|activity|      activity_units|          asset_name|asset_type|           st_astext|
+---------+------------+-------------------------+-------+----------+------------------+----------------+----------------------+--------+---------------+---------------+--------+--------------------+--------------------+----------+--------------------+
|136113483|         CHN|              coal-mining|   null|       co2|                 0|               0|  tonnes_gas_per_co...|     1.5|coal_mine_depth|          0.752|   1.128|tonnes_coal_extra...|Tashan Coal Mine ...|Bituminous|POINT(113.

In [59]:
# Group by the 'Group' column and count occurrences
coal_1 = coal_mining_df.groupBy("asset_type").count().orderBy("count", ascending=True)
coal_1.show( )

+-------------+-----+
|   asset_type|count|
+-------------+-----+
|      Lignite| 1160|
|   Anthracite| 1325|
|Subbituminous| 3170|
|   Bituminous| 8125|
+-------------+-----+



In [60]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
coal_2 = coal_mining_df.groupBy("asset_name", "asset_type","gas","original_inventory_sector","iso3_country").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
        avg(col("capacity")).alias("capacity"),
        avg(col("capacity_factor")).alias("capacity_factor"),
        avg(col("activity")).alias("activity"),    
)
coal_2 = coal_2.dropna(subset=['emissions_quantity'])
coal_2.show(10)

+--------------------+-------------+----------+-------------------------+------------+------------------+----------------+--------+---------------+--------+
|          asset_name|   asset_type|       gas|original_inventory_sector|iso3_country|emissions_quantity|emissions_factor|capacity|capacity_factor|activity|
+--------------------+-------------+----------+-------------------------+------------+------------------+----------------+--------+---------------+--------+
|     Gevra Coal Mine|Subbituminous|       co2|              coal-mining|         IND|               0.0|             0.0|    null|           null|    41.0|
|Meihuajing Coal Mine|   Bituminous|co2e_100yr|              coal-mining|         CHN|        3805257.65|             0.0|    12.0|          0.752|   9.024|
|Yujialiang Coal Mine|   Bituminous|       n2o|              coal-mining|         CHN|               0.0|             0.0|    13.0|          0.752|   9.776|
|Jänschwalde Coal ...|      Lignite|       n2o|           

### Fossil Fuels - Oil and gas production

In [61]:
columns_to_drop = ['start_time', 'end_time', 'temporal_granularity','created_date','modified_date']

coal_mining_df1 = dataframes[1].drop(*columns_to_drop)
coal_mining_df1.show(5)

+---------+------------+-------------------------+----------+------------------+----------------+----------------------+----------+----------+--------------------+
| asset_id|iso3_country|original_inventory_sector|       gas|emissions_quantity|emissions_factor|emissions_factor_units|asset_name|asset_type|           st_astext|
+---------+------------+-------------------------+----------+------------------+----------------+----------------------+----------+----------+--------------------+
|985525014|         BRA|     oil-and-gas-produ...|       co2|       1061964.698|     0.022535948|                  null|   Iracema|       Oil|POINT(-42.8345 -2...|
|985525014|         BRA|     oil-and-gas-produ...|       ch4|        47032.6431|     0.000998079|                  null|   Iracema|       Oil|POINT(-42.8345 -2...|
|985525014|         BRA|     oil-and-gas-produ...|       n2o|              null|            null|                  null|   Iracema|       Oil|POINT(-42.8345 -2...|
|985525014|     

In [62]:
# Group by the 'Group' column and count occurrences
oil_1 = coal_mining_df1.groupBy("asset_type").count().orderBy("count", ascending=True)
oil_1.show( )

+----------+-----+
|asset_type|count|
+----------+-----+
|       Gas| 7360|
|       Oil|11935|
+----------+-----+



In [63]:
# Group by 'Asset_Group' and 'asset_type', calculate average of all values
oil_2 = coal_mining_df1.groupBy("asset_id","asset_name", "asset_type","gas","original_inventory_sector","iso3_country").agg(
        avg(col("emissions_quantity")).alias("emissions_quantity"),
       avg(col("emissions_factor")).alias("emissions_factor"),
)
oil_2 = oil_2.dropna(subset=['emissions_quantity'])
oil_2.show(10)

+--------+------------+----------+----------+-------------------------+------------+------------------+--------------------+
|asset_id|  asset_name|asset_type|       gas|original_inventory_sector|iso3_country|emissions_quantity|    emissions_factor|
+--------+------------+----------+----------+-------------------------+------------+------------------+--------------------+
| 5110213|     Ekofisk|       Oil|       ch4|     oil-and-gas-produ...|         NOR|20564.179999999997|5.097664285714285E-4|
| 5110530|     Shengli|       Oil|co2e_100yr|     oil-and-gas-produ...|         CHN|1746821.2717142857|                null|
| 5110532|Shwe Complex|       Gas|co2e_100yr|     oil-and-gas-produ...|         MMR|208388.22481428573|                null|
| 5110466|    Pohokura|       Gas|       ch4|     oil-and-gas-produ...|         NZL|13724.184607571427|0.001277858428571...|
| 5110320|      Keshen|       Gas| co2e_20yr|     oil-and-gas-produ...|         CHN| 9494147.830571428|                null|


In [64]:
#folder_dfs['fossil_fuel_operations_ownership_df'].show(5)

In [65]:
oil_and_gas = oil_2.join(folder_dfs['fossil_fuel_operations_ownership_df'], ['asset_id', 'asset_name','original_inventory_sector'], 'left')

In [66]:
#oil_and_gas.show(5)

In [67]:
# Select specific columns using col() function
oil_and_gas_output = oil_and_gas.select(
    col("asset_id"),
    col("asset_name"),
    col("original_inventory_sector"),
    col("folder"),
    col("iso3_country"),
    col("gas"),
    col("emissions_quantity"),
    col("emissions_factor"),
    col("percentage_of_ownership"),
    col("owner_direct_parent"),
    col("asset_type"),
    col("owner_name"), 
    col("owner_grouping")
)

# Show the resulting DataFrame
oil_and_gas_output.show(5)

+--------+----------+-------------------------+--------------------+------------+---+------------------+--------------------+-----------------------+-------------------+----------+-----------+--------------------+
|asset_id|asset_name|original_inventory_sector|              folder|iso3_country|gas|emissions_quantity|    emissions_factor|percentage_of_ownership|owner_direct_parent|asset_type| owner_name|      owner_grouping|
+--------+----------+-------------------------+--------------------+------------+---+------------------+--------------------+-----------------------+-------------------+----------+-----------+--------------------+
| 5110213|   Ekofisk|     oil-and-gas-produ...|fossil_fuel_opera...|         NOR|ch4|20564.179999999997|5.097664285714285E-4|                  12.39|Eni and HitecVision|       Oil|Vaar Energi|TotalEnergies; Co...|
| 5110213|   Ekofisk|     oil-and-gas-produ...|fossil_fuel_opera...|         NOR|ch4|20564.179999999997|5.097664285714285E-4|                  1

In [68]:
def store_the_output(df_name):
    num_files = 1  # Number of output files
    df = globals().get(df_name)  # Get DataFrame by variable name
    if df:
        df.coalesce(num_files) \
            .write \
            .mode("append") \
            .option("header", "true") \
            .csv("hdfs://localhost:9000/user/username/project_data/sector_output")
    else:
        print(f"DataFrame {df_name} not found.")

# Call the function for each DataFrame
output_files = ['agriculture_df_3', 'agriculture_df_1', 'agriculture_df_2', 'grouped_avg_power', 'oil_and_gas_output', 'coal_2']
for file in output_files:
    store_the_output(file)


In [69]:
spark.stop()