
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

# ETL Pipeline: Oil Production with PySpark

## 1. Data ingestion
We load the CSV file from DBFS.

In [0]:
# File location and type
file_location = "/FileStore/tables/oil_gas/Oil_and_Gas_1932_2014.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/oil_gas/Oil_and_Gas_1932_2014.csv")

display(df)

cty_name,iso3numeric,id,year,eiacty,oil_prod32_14,oil_price_2000,oil_price_nom,oil_value_nom,oil_value_2000,oil_value_2014,gas_prod55_14,gas_price_2000_mboe,gas_price_2000,gas_price_nom,gas_value_nom,gas_value_2000,gas_value_2014,oil_gas_value_nom,oil_gas_value_2000,oil_gas_value_2014,oil_gas_valuePOP_nom,oil_gas_valuePOP_2000,oil_gas_valuePOP_2014,oil_exports,net_oil_exports,net_oil_exports_mt,net_oil_exports_value,net_oil_exports_valuePOP,gas_exports,net_gas_exports_bcf,net_gas_exports_mboe,net_gas_exports_value,net_gas_exports_valuePOP,net_oil_gas_exports_valuePOP,population,pop_maddison,sovereign,mult_nom_2000,mult_nom_2014,mult_2000_2014
Afghanistan,4,AFG,1932,Afghanistan,,10.86086,0.87,,,,,0.0,,0.06,,,,,,,,,,,,,,,,,,,,,,,1,12.599874,17.32197312,1.3747735192
Afghanistan,4,AFG,1933,Afghanistan,,8.815239,0.67,,,,,0.0,,0.06,,,,,,,,,,,,,,,,,,,,,,,1,13.280064,18.25708032,1.3747735192
Afghanistan,4,AFG,1934,Afghanistan,,12.71579,1.0,,,,,0.0,,0.06,,,,,,,,,,,,,,,,,,,,,,,1,12.853008,17.66997504,1.3747735192
Afghanistan,4,AFG,1935,Afghanistan,,12.03969,0.97,,,,,0.0,,0.06,,,,,,,,,,,,,,,,,,,,,,,1,12.537882,17.23674816,1.3747735192
Afghanistan,4,AFG,1936,Afghanistan,,13.38321,1.09,,,,,0.0,,0.06,,,,,,,,,,,,,,,,,,,,,,,1,12.420786,17.07576768,1.3747735192
Afghanistan,4,AFG,1937,Afghanistan,,13.9813,1.18,,,,,0.0,,0.05,,,,,,,,,,,,,,,,,,,,,,,1,11.986842,16.47919296,1.3747735192
Afghanistan,4,AFG,1938,Afghanistan,,13.66059,1.13,,,,,0.0,,0.05,,,,,,,,,,,,,,,,,,,,,,,1,12.214146,16.79168448,1.3747735192
Afghanistan,4,AFG,1939,Afghanistan,,12.49909,1.02,,,,,0.0,,0.05,,,,,,,,,,,,,,,,,,,,,,,1,12.38979,17.0331552,1.3747735192
Afghanistan,4,AFG,1940,Afghanistan,,12.37774,1.02,,,,,0.0,,0.05,,,,,,,,,,,,,,,,,,,,,,,1,12.270972,16.86980736,1.3747735192
Afghanistan,4,AFG,1941,Afghanistan,,13.17518,1.14,,,,,0.0,,0.05,,,,,,,,,,,,,,,,,,,,,,,1,11.688936,16.06963968,1.3747735192


## 2. Cleaning and transformation
I rename columns and filter by years since 2000.

In [0]:
## 2. Data Cleaning and Transformation
## I rename the columns for better readability, filtered the data to include only records from the year 2000 onwards, and prepared it for aggregation.

from pyspark.sql.functions import col

# Rename columns
df = df.withColumnRenamed("cty_name", "country") \
       .withColumnRenamed("Year", "year") \
       .withColumnRenamed("oil_price_2000", "oil price 2000") \
      

# View schema and sample data
df.printSchema()
df.show(5)

root
 |-- country: string (nullable = true)
 |-- iso3numeric: string (nullable = true)
 |-- id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- eiacty: string (nullable = true)
 |-- oil_prod32_14: double (nullable = true)
 |-- oil price 2000: string (nullable = true)
 |-- oil_price_nom: string (nullable = true)
 |-- oil_value_nom: string (nullable = true)
 |-- oil_value_2000: string (nullable = true)
 |-- oil_value_2014: string (nullable = true)
 |-- gas_prod55_14: string (nullable = true)
 |-- gas_price_2000_mboe: string (nullable = true)
 |-- gas_price_2000: string (nullable = true)
 |-- gas_price_nom: string (nullable = true)
 |-- gas_value_nom: string (nullable = true)
 |-- gas_value_2000: string (nullable = true)
 |-- gas_value_2014: string (nullable = true)
 |-- oil_gas_value_nom: string (nullable = true)
 |-- oil_gas_value_2000: string (nullable = true)
 |-- oil_gas_value_2014: string (nullable = true)
 |-- oil_gas_valuePOP_nom: string (nullable = true)
 |-- oi

In [0]:
df = df.filter(col("year") >= 2000)

## 3. Aggregation
I calculate the average production per country.

In [0]:
# Functions
from pyspark.sql.functions import col, avg, cast
from pyspark.sql.types import DoubleType

# I convert the column from string to number 
df = df.withColumn("oil_prod32_14", col("oil_prod32_14").cast(DoubleType()))

# calculate the average per country
average_by_country = df.groupBy("country").agg(avg("oil_prod32_14").alias("average_production"))

# show results
display(average_by_country.orderBy("country"))

country,average_production
Afghanistan,0.0
Albania,482359.2537333333
Algeria,78974533.97893333
Angola,72022079.1264
Argentina,33726013.014000006
Armenia,0.0
Australia,23992071.52013333
Austria,863376.2910666667
Azerbaijan,33099594.21
"Bahamas, The",0.0


## Storage
I save the final result in optimized Parquet format.

In [0]:
#save results as parquet
average_by_country.write.mode("overwrite").parquet("/FileStore/oil_gas/output/average_by_country")

In [0]:
df_results = spark.read.parquet("/FileStore/oil_gas/output/average_by_country")
df_results.show()

+----------------+--------------------+
|         country|  average_production|
+----------------+--------------------+
|            Chad|        4923759.5652|
|        Paraguay|                 0.0|
|Congo, Dem. Rep.|  1046796.4934666667|
|         Senegal|  29671.996000000003|
|          Sweden|                 0.0|
|  Macedonia, FYR|                 0.0|
|          Guyana|                 0.0|
|         Eritrea|                 0.0|
|     Philippines|  1007000.3785333334|
|        Djibouti|                 0.0|
|        Malaysia|3.1205683614533328E7|
|       Singapore|                 0.0|
|            Fiji|                 0.0|
|          Turkey|  2334516.6529333326|
|          Malawi|                 0.0|
|            Iraq|1.1779921007200001E8|
|         Germany|   2174992.115866666|
|         Comoros|                 0.0|
|     Afghanistan|                 0.0|
|        Cambodia|                 0.0|
+----------------+--------------------+
only showing top 20 rows



In [0]:

# Create a view or table

temp_table_name = "Oil_and_Gas_1932_2014_csv"

df.createOrReplaceTempView(temp_table_name)



In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Oil_and_Gas_1932_2014_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

