##Pyspark config and imports

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [0]:
#Initialize spark session
spark = SparkSession.builder.appName("OilDataPipeline").getOrCreate()

##Data Extraction

In [0]:
df = spark.read.table("workspace.default.oil_production_statistics_large")
display(df)


##Data Exploration

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df.describe().show()


##Data Quality Assessment

In [0]:
## checking for duplicates
df.count()

In [0]:
# number of duplicates
df.count() - df.distinct().count()

##### Completeness check (Finding Null / Missing Values)

In [0]:
#check for null values in each column
df.filter(df['value'].isNull()).show()


In [0]:
#check for null values and counts in all columns
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

## Consistency / Accuracy Checks(finding inconsistent values)

In [0]:
#check for negative values in 'value' column
df.filter(col('value') < 0).show()


In [0]:
negative_values = df.filter(col('value') < 0).count()
display(negative_values)

## Data Cleaning and Transformation

In [0]:
#drop duplicates
clean_df = df.dropDuplicates()
clean_df.count()


In [0]:
#fill missing values with 0
clean_df = clean_df.fillna(0)


In [0]:
# clean_df = clean_df.fillna({"value": 0, "type": "unknown"})
# clean_df =clean_df.dropna(subset= ["value" ,"country"])

In [0]:
#Verify
clean_df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
#remove negative values
clean_df = clean_df.filter(col('value') >= 0)


In [0]:
#renam  the country_name column to country
clean_df = clean_df.withColumnRenamed("country_name", "country")
clean_df.columns

## Aggregations/Analysis

In [0]:
#calculate the total production for each country
clean_df.groupBy("country").agg(sum("value")).alias("total_production").show()



In [0]:
#sort
clean_df.groupBy("country").agg(sum("value").alias("Total_production")).orderBy(col("total_production").desc()).show()

rank() over (partition by year order by value)

In [0]:
# using window functions assign ranks to the top 20 countries based on total production
windowSpec = Window.partitionBy("year").orderBy(desc("value"))

ranked_df = clean_df.withColumn("rank", rank().over(windowSpec))

display(ranked_df)

In [0]:
top_3_per_year = ranked_df.filter(col("rank")<= 3)
display(top_3_per_year)

In [0]:
# save and load cleaned data to delta lake tables
clean_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("oil_production_clean")

In [0]:
#save top 3 per year to delta lake table
top_3_per_year.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("top_3_oil_production")

## SparkSQL

In [0]:
# Create temp view from the clean df to query from
clean_df.createOrReplaceTempView("oil_data")

In [0]:
%sql
 SELECT * FROM oil_data;

In [0]:
%sql

SELECT country, SUM(value) AS total_production 
    FROM oil_data 
    GROUP BY country 
    ORDER BY total_production DESC


In [0]:
spark.sql("SELECT * FROM top_3_oil_production").display()

In [0]:

# --Average Production of crude oil vs natural gas
spark.sql("""SELECT
  product,
  AVG(value) AS avg_production
FROM oil_data
WHERE product IN ('Crude oil', 'Natural gas')
GROUP BY product""").display()


