In [1]:
!pip install s3fs

Collecting s3fs
  Downloading s3fs-2023.12.2-py3-none-any.whl.metadata (1.6 kB)
Collecting aiobotocore<3.0.0,>=2.5.4 (from s3fs)
  Downloading aiobotocore-2.9.1-py3-none-any.whl.metadata (20 kB)
Collecting fsspec==2023.12.2 (from s3fs)
  Downloading fsspec-2023.12.2-py3-none-any.whl.metadata (6.8 kB)
Collecting aiohttp!=4.0.0a0,!=4.0.0a1 (from s3fs)
  Downloading aiohttp-3.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.4 kB)
Collecting botocore<1.33.14,>=1.33.2 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading botocore-1.33.13-py3-none-any.whl.metadata (6.1 kB)
Collecting wrapt<2.0.0,>=1.10.10 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading wrapt-1.16.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting aioitertools<1.0.0,>=0.5.1 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading aioitertools-0.11.0-py3-none-any.whl (23 kB)
Collecting multidict<7.0,>=4.5 (from aiohttp!=4.0.

In [2]:
import requests
import json
import os
import s3fs
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F

In [3]:
# Define environment variables
os.environ["MINIO_KEY"] = "minio"
os.environ["MINIO_SECRET"] = "minio123"
os.environ["MINIO_ENDPOINT"] = "http://minio1:9000"

In [4]:
# Get data using REST API
def fetch_countries_data(url):
    # Using session is particularly beneficial 
    # if you are making multiple requests to the same server, 
    # as it can reuse the underlying TCP connection, 
    # leading to performance improvements.
    with requests.Session() as session:
        response = session.get(url)
        response.raise_for_status()
        
        if response.status_code == 200:
            return response.json()
        else:
            return f"Error: {response.status_code}"

# Fetch data
countries_data = fetch_countries_data("https://restcountries.com/v3.1/all")

In [6]:
countries_data[:1]

[{'name': {'common': 'Andorra',
   'official': 'Principality of Andorra',
   'nativeName': {'cat': {'official': "Principat d'Andorra",
     'common': 'Andorra'}}},
  'tld': ['.ad'],
  'cca2': 'AD',
  'ccn3': '020',
  'cca3': 'AND',
  'cioc': 'AND',
  'independent': True,
  'status': 'officially-assigned',
  'unMember': True,
  'currencies': {'EUR': {'name': 'Euro', 'symbol': '€'}},
  'idd': {'root': '+3', 'suffixes': ['76']},
  'capital': ['Andorra la Vella'],
  'altSpellings': ['AD', 'Principality of Andorra', "Principat d'Andorra"],
  'region': 'Europe',
  'subregion': 'Southern Europe',
  'languages': {'cat': 'Catalan'},
  'translations': {'ara': {'official': 'إمارة أندورا', 'common': 'أندورا'},
   'bre': {'official': 'Priñselezh Andorra', 'common': 'Andorra'},
   'ces': {'official': 'Andorrské knížectví', 'common': 'Andorra'},
   'cym': {'official': 'Tywysogaeth Andorra', 'common': 'Andorra'},
   'deu': {'official': 'Fürstentum Andorra', 'common': 'Andorra'},
   'est': {'official':

In [7]:
# Write data to minIO as a JSON file

fs = s3fs.S3FileSystem(
    client_kwargs={'endpoint_url': os.environ["MINIO_ENDPOINT"]}, # minio1 = minio container name
    key=os.environ["MINIO_KEY"],
    secret=os.environ["MINIO_SECRET"],
    use_ssl=False  # Set to True if MinIO is set up with SSL
)

with fs.open('mybucket/country_data.json', 'w', encoding='utf-8') as f:
    json.dump(countries_data,f)

In [8]:
# Create Spark session

"""
- `spark.hadoop.fs.s3a.endpoint`: The endpoint URL for minIO.
- `spark.hadoop.fs.s3a.access.key` and `spark.hadoop.fs.s3a.secret.key`: The access key and secret key for minIO.
- `spark.hadoop.fs.s3a.path.style.access`: Set to true to enable path-style access for S3 bucket.
- `spark.hadoop.fs.s3a.impl`: The implementation class for S3A file system.
"""

spark = SparkSession.builder \
    .appName("country_data_analysis") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026") \
    .config("spark.hadoop.fs.s3a.endpoint", os.environ["MINIO_ENDPOINT"]) \
    .config("spark.hadoop.fs.s3a.access.key", os.environ["MINIO_KEY"]) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ["MINIO_SECRET"]) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .enableHiveSupport() \
    .getOrCreate()

In [9]:
# Check PySpark version
print(pyspark.__version__)

3.5.0


In [10]:
# Check Hadoop version
sc = SparkContext.getOrCreate()
hadoop_version = sc._gateway.jvm.org.apache.hadoop.util.VersionInfo.getVersion()
print("Hadoop version:", hadoop_version)

Hadoop version: 3.3.4


In [11]:
# Read JSON data using PySpark

df = spark.read.option("inferSchema",True).json("s3a://mybucket/country_data.json")
df.count()

250

In [16]:
df.show(1)

+--------------------+-----+----------+------------------+--------------+--------------+----+----+----+----+--------------------+----------+--------------------+--------------------+----+----+--------------------+----+----------+-----------+----------+--------------------+-----------+--------------------+--------------------+----------+--------------------+------+-----------+-------------------+---------------+-----------+-----+--------------------+--------+
|        altSpellings| area|   borders|           capital|   capitalInfo|           car|cca2|cca3|ccn3|cioc|          coatOfArms|continents|          currencies|            demonyms|fifa|flag|               flags|gini|       idd|independent|landlocked|           languages|     latlng|                maps|                name|population|          postalCode|region|startOfWeek|             status|      subregion|  timezones|  tld|        translations|unMember|
+--------------------+-----+----------+------------------+--------------+-

In [18]:
# df.printSchema()

In [19]:
# Write same data as Parquet and re-read in dataframe

df.write.mode("overwrite").format("parquet").save("s3a://mybucket/country_raw_data.parquet")
country_raw_data = spark.read.parquet("s3a://mybucket/country_raw_data.parquet")
country_raw_data.count()

250

In [20]:
# Perform transformations to raw data
country_trnsfm_data = (
    country_raw_data
    .selectExpr(
        "name.common as cntry_name",
        "area as cntry_area",
        "borders as border_cntry",
        "capital as capital_cities",
        "continents as cntry_continent",
        "landlocked as is_landlocked",
        "population",
        "startOfWeek",
        "timezones as nr_timezones",
        "unMember as is_unmember"
    )
    .withColumn("cntry_area",F.when(F.col("cntry_area") < 0, None).otherwise(F.col("cntry_area")))
    .withColumn("border_cntry",F.when(F.col("border_cntry").isNull(),F.array(F.lit("NA"))).otherwise(F.col("border_cntry")))
    .withColumn("capital_cities",F.when(F.col("capital_cities").isNull(),F.array(F.lit("NA"))).otherwise(F.col("capital_cities")))    
)

# Print schema of transformed data
country_trnsfm_data.printSchema()

root
 |-- cntry_name: string (nullable = true)
 |-- cntry_area: double (nullable = true)
 |-- border_cntry: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- capital_cities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- cntry_continent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_landlocked: boolean (nullable = true)
 |-- population: long (nullable = true)
 |-- startOfWeek: string (nullable = true)
 |-- nr_timezones: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_unmember: boolean (nullable = true)



In [21]:
# Write transformed data as PARQUET
country_trnsfm_data.write.mode("overwrite").format("parquet").save("s3a://mybucket/country_trnsfm_data.parquet")

# Create external hive table using PARQUET
spark.sql("""
CREATE EXTERNAL TABLE country_data (
    cntry_name STRING,
    cntry_area DOUBLE,
    border_cntry ARRAY<STRING>,
    capital_cities ARRAY<STRING>,
    cntry_continent ARRAY<STRING>,
    is_landlocked BOOLEAN,
    population BIGINT,
    startOfWeek STRING,
    nr_timezones ARRAY<STRING>,
    is_unmember BOOLEAN
)
STORED AS PARQUET
LOCATION 's3a://mybucket/country_trnsfm_data.parquet';
""").show()

++
||
++
++



In [22]:
# Show table details
spark.sql("DESCRIBE EXTENDED default.country_data").show(100,truncate = False)

+----------------------------+--------------------------------------------------------------+-------+
|col_name                    |data_type                                                     |comment|
+----------------------------+--------------------------------------------------------------+-------+
|cntry_name                  |string                                                        |NULL   |
|cntry_area                  |double                                                        |NULL   |
|border_cntry                |array<string>                                                 |NULL   |
|capital_cities              |array<string>                                                 |NULL   |
|cntry_continent             |array<string>                                                 |NULL   |
|is_landlocked               |boolean                                                       |NULL   |
|population                  |bigint                                              

In [23]:
# Show first 5 records from the table
spark.sql("SELECT * FROM default.country_data LIMIT 5").show(truncate = False)

# Create temporary view using dataframe
spark.table("default.country_data").createOrReplaceTempView("country_data_processed_view")

+-----------------------------------+----------+-------------------------+-------------------+---------------+-------------+----------+-----------+------------------------------------------------------------------+-----------+
|cntry_name                         |cntry_area|border_cntry             |capital_cities     |cntry_continent|is_landlocked|population|startOfWeek|nr_timezones                                                      |is_unmember|
+-----------------------------------+----------+-------------------------+-------------------+---------------+-------------+----------+-----------+------------------------------------------------------------------+-----------+
|Andorra                            |468.0     |[FRA, ESP]               |[Andorra la Vella] |[Europe]       |true         |77265     |monday     |[UTC+01:00]                                                       |true       |
|French Southern and Antarctic Lands|7747.0    |[NA]                     |[Port-aux-Français

In [24]:
# Function to show Spark SQL results
def show_results(sql_string):
    return spark.sql(
        sql_string
    ).show(truncate = False)

In [25]:
# 1. Which are the 10 largest countries in terms of area? (in sq. km.)
sql_string = """
    SELECT cntry_name, cntry_area
    FROM country_data_processed_view
    ORDER BY cntry_area DESC
    LIMIT 10
    """
show_results(sql_string)

+-------------+-----------+
|cntry_name   |cntry_area |
+-------------+-----------+
|Russia       |1.7098242E7|
|Antarctica   |1.4E7      |
|Canada       |9984670.0  |
|China        |9706961.0  |
|United States|9372610.0  |
|Brazil       |8515767.0  |
|Australia    |7692024.0  |
|India        |3287590.0  |
|Argentina    |2780400.0  |
|Kazakhstan   |2724900.0  |
+-------------+-----------+



In [26]:
# 2. Which country has the largest number of neighbouring countries?
sql_string = """
    SELECT cntry_name, border_cntry, array_size(border_cntry) as ngbr_cntry_nr
    FROM country_data_processed_view
    WHERE NOT array_contains(border_cntry,'NA')
    ORDER BY array_size(border_cntry) DESC
    LIMIT 1
    """
show_results(sql_string)

+----------+--------------------------------------------------------------------------------+-------------+
|cntry_name|border_cntry                                                                    |ngbr_cntry_nr|
+----------+--------------------------------------------------------------------------------+-------------+
|China     |[AFG, BTN, MMR, HKG, IND, KAZ, NPL, PRK, KGZ, LAO, MAC, MNG, PAK, RUS, TJK, VNM]|16           |
+----------+--------------------------------------------------------------------------------+-------------+



In [27]:
# 3. Which countries have the highest number of capital cities?
sql_string = """
    SELECT cntry_name, capital_cities, array_size(capital_cities) as total_capital_cities
    FROM country_data_processed_view
    WHERE NOT array_contains(capital_cities,'NA')
    ORDER BY array_size(capital_cities) DESC
    LIMIT 2
    """
show_results(sql_string)

+------------+-----------------------------------+--------------------+
|cntry_name  |capital_cities                     |total_capital_cities|
+------------+-----------------------------------+--------------------+
|South Africa|[Pretoria, Bloemfontein, Cape Town]|3                   |
|Palestine   |[Ramallah, Jerusalem]              |2                   |
+------------+-----------------------------------+--------------------+



In [28]:
# 4. How many countries lie on two or more continents?
sql_string = """
    SELECT cntry_name, cntry_continent, array_size(cntry_continent) as total_continents
    FROM country_data_processed_view
    ORDER BY array_size(cntry_continent) DESC
    LIMIT 3
    """
show_results(sql_string)

+----------+---------------+----------------+
|cntry_name|cntry_continent|total_continents|
+----------+---------------+----------------+
|Turkey    |[Europe, Asia] |2               |
|Russia    |[Europe, Asia] |2               |
|Azerbaijan|[Europe, Asia] |2               |
+----------+---------------+----------------+



In [29]:
# 5. How many landlocked countries per continent?
sql_string = """
    SELECT continent, SUM(is_landlocked) as landlocked_nr
    FROM (SELECT cntry_name, case when is_landlocked then 1 else 0 end as is_landlocked, explode(cntry_continent) as continent
    FROM country_data_processed_view)
    GROUP BY continent
    ORDER BY SUM(is_landlocked) DESC
    """
show_results(sql_string)

+-------------+-------------+
|continent    |landlocked_nr|
+-------------+-------------+
|Europe       |16           |
|Africa       |16           |
|Asia         |12           |
|South America|2            |
|North America|0            |
|Antarctica   |0            |
|Oceania      |0            |
+-------------+-------------+



In [30]:
# 6. Which country has the highest number of time zones?
sql_string = """
    SELECT cntry_name, nr_timezones, array_size(nr_timezones) as total_timezones
    FROM country_data_processed_view
    ORDER BY array_size(nr_timezones) DESC
    LIMIT 1
    """
show_results(sql_string)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|cntry_name|nr_timezones                                                                                                                                              |total_timezones|
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|France    |[UTC-10:00, UTC-09:30, UTC-09:00, UTC-08:00, UTC-04:00, UTC-03:00, UTC+01:00, UTC+02:00, UTC+03:00, UTC+04:00, UTC+05:00, UTC+10:00, UTC+11:00, UTC+12:00]|14             |
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+



In [31]:
# 7. How many countries are not UN members?
sql_string = """
    SELECT COUNT(*) AS count
    FROM country_data_processed_view
    WHERE NOT is_unmember
    """
show_results(sql_string)

+-----+
|count|
+-----+
|57   |
+-----+

