# ETL Pipeline: Joining Geographic Data (Countries, Regions, Sub-Regions)

In [0]:
""" 
             ┌──────────────────────────────┐
             │         df_regions
             │  (load and select)           │
             │  id        |   region        │
             └────────────┴─────────────────┘
                       ▲
                       │ (df_countries.region_id = df_regions.id)
                       │
┌─────────────────────────────┐
│        df_countries         │ 
│   (load and select)         │
│  country_id   |   country   │
│  region_id    |   sub_region_id
│  population   |   area_km2  │
└───────────────┴─────────────┘
                       │
                       │ (df_countries.sub_region_id = df_sub_regions.id)
                       ▼
             ┌─────────────────────────────┐
             │       df_sub_regions        │
             │  id        |   sub_region   │
             └────────────┴────────────────┘
 """
print("The following cells execute this scheme given above ^^^^")

First, load a CSV of country data, perform a clean schema (it is efficient and defines types for each column), select only useful columns, rename some of them for clarity, and display the result in a notebook

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Explicitely define the country schema -- a schema is useful since it provides significant performance improvements 
# (PySpark does not infer the schema; saving time), ensures (explicit) data quality, and prevents errors during processing 
schema_countries = StructType([
  StructField("country_id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("nationality", StringType(), True),
  StructField("country_code", StringType(), True),
  StructField("iso_alpha2", StringType(), True),
  StructField("capital", StringType(), True),
  StructField("population", IntegerType(), True),
  StructField("area_km2", IntegerType(), True),
  StructField("region_id", IntegerType(), True),
  StructField("sub_region_id", IntegerType(), True)])


df_countries = spark.read.csv("/Volumes/customer_orders/default/countries/countries_population.csv", header=True, schema=schema_countries).\
  select("country_id", col("name").alias("country"), "region_id", "sub_region_id", "population", "area_km2")
 
df_countries.show(5)


In [0]:
schema_regions = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True)
])
df_regions =  spark.read.csv("/Volumes/customer_orders/default/countries/country_regions.csv", header=True, schema=schema_regions).select("id", col("name").alias("region"))

df_regions.show(5)


In [0]:
schema_sub_regions = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True)
])

df_sub_regions = spark.read.csv("/Volumes/customer_orders/default/countries/country_sub_regions.csv", header=True,
                               schema = schema_sub_regions). \
                                 select("id", col("name").alias("sub_region"))
df_sub_regions.show()


The main cell of the notebook joins the three DataFrames by means of the region_id and sub_region_id keys.

This operation, denoted by the dot (.\), allows us to chain multiple PySpark operations together.

In [0]:

df_joined = df_countries.join(df_regions, df_countries.region_id == df_regions.id, "left").\
  select("country_id", "country", "region", "population", "area_km2", "sub_region_id").\
    join(df_sub_regions, df_countries.sub_region_id == df_sub_regions.id, "left").\
      select("country_id", "country", "region", "sub_region", "population", "area_km2")

In [0]:
df_joined.show(5)


In [0]:
#df_joined.write.saveAsTable("countries_consolidated")
