In [1]:
import os
import findspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window


findspark.init()
findspark.find()

os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"

# Start session
http://localhost:4040

In [2]:
sp = SparkSession.builder.master('local[*]').config("spark.driver.memory", "15g").appName('spark_test_tables').getOrCreate()

# Preprocessing
Create spark datasets from all CSV and JSON files

In [3]:
# Open CSV file as spark dataframe
def spark_csv_open(file):
    path = "data/" + file
    
    # open file
    df = sp.read.format("csv") \
            .option("mode", "FAILFAST") \
            .option("inferSchema", "true") \
            .option("header","true") \
            .option("escapeQuotes", "true") \
            .option("path", path) \
            .load()
    
    # return spark dataframe
    return df

# Open JSON file with format {key:value} as pandas dataframe (key = 'country_code'), transform to spark dataframe
def spark_json_open(file,colname):
    path = "data/" + file
    
    # open file - pandas dataframe
    sf = pd.read_json(path, typ='series')
    df = pd.DataFrame({'country_code':sf.index, colname:sf.values})

    # create spark dataframe from a pandas dataframe
    df = sp.createDataFrame(df)
    
    # return spark dataframe
    return df


# Create spark datasets from files
column1 = 'country_code'
df_nobel_orig = spark_csv_open('nobel.csv')
df_countries_orig = spark_csv_open('countries of the world.csv')
df_cities_orig = spark_csv_open('worldcitiespop.csv')
df_capitals_orig = spark_json_open('capital.json','capital_name')
df_continents_orig = spark_json_open('continent.json','continent_code')
df_currencies_orig = spark_json_open('currency.json','currency_code')
df_iso3_orig = spark_json_open('iso3.json','iso3_code')
df_names_orig = spark_json_open('names.json','country_name')
df_phones_orig = spark_json_open('phone.json','phone_code')

In [None]:
# Create a database in hive - hadoop

# spark.sql("drop database if exists wh cascade")
# spark.sql("create database whdb")

# Create spark datasets from the staging database in hive
# df_nobel_orig = spark.sql("select * from staging.nobel_laureates")
# df_countries_orig = spark.sql("select * from staging.countries_of_the_world")
# df_cities_orig = spark.sql("select * from staging.cities")
# df_capitals_orig = spark.sql("select * from staging.country_capital")
# df_continents_orig = spark.sql("select * from staging.country_continents")
# df_currencies_orig = spark.sql("select * from staging.country_currencies")
# df_iso3_orig = spark.sql("select * from staging.country_iso3")
# df_names_orig = spark.sql("select * from staging.country_names")
# df_phones_orig = spark.sql("select * from staging.country_phones")

# Analysis
Check relationships between original datasets

## ISO 2 Country Codes
Check names.json dataset

In [4]:
print("Count of records: ", df_names_orig.count())
print("Count of unique ISO Country Names: ", df_names_orig.select("country_name").distinct().count())
print("Count of unique ISO2: ", df_names_orig.select("country_code").distinct().count())

Count of records:  250
Count of unique ISO Country Names:  250
Count of unique ISO2:  250


 Check that 1 country code contains no more than 1 country name and there are no duplicates in country codes.

In [5]:
df_names_orig.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



## Countries + Country Codes ISO2
Analysis relationship between datasets.

Join df_names_orig dataset with country codes from the names.json file to the df_countries_orig dataframe<br>
Review countries without matches in the ISO2 codes dataset.

In [6]:
join_expr = f.trim(df_names_orig["country_name"]) == f.trim(df_countries_orig["Country"])
df_tmp = df_countries_orig.join(df_names_orig, join_expr, "left_anti").select("Country")

df_tmp.show(100,truncate=False)

+---------------------+
|Country              |
+---------------------+
|Congo, Repub. of the |
|Korea, North         |
|Congo, Dem. Rep.     |
|Burma                |
|Central African Rep. |
|Macau                |
|West Bank            |
|Micronesia, Fed. St. |
|Virgin Islands       |
|N. Mariana Islands   |
|Netherlands Antilles |
|Gambia, The          |
|Gaza Strip           |
|Cote d'Ivoire        |
|St Pierre & Miquelon |
|British Virgin Is.   |
|Korea, South         |
|Bahamas, The         |
|Trinidad & Tobago    |
|Bosnia & Herzegovina |
|Antigua & Barbuda    |
|Sao Tome & Principe  |
|Saint Kitts & Nevis  |
|Turks & Caicos Is    |
+---------------------+



Check the ISO2 codes for all these countries and find them in the Country Codes dataset with different country names.<br>
There are issues with the following codes:<br>
Netherlands Antilles has 3 country codes = BQ (Bonaire, Saint Eustatius and Saba), CW (Curacao) and SX (Sint Maarten).<br>
West Bank and Gaza Strip have the same country code = PS (Palestinian Territory)<br>
This means that the relationship Countries - Country Codes is "M-to-N" and 3 entities should be created: "Countries", "Country Codes", "County Codes in Countries".

Update the Country Names from the Countries dataset with the country names from the ISO2 dataset.<br>
For example, if Country Name = 'Korea, South', the name from the ISO2 dataframe =  'South Korea'

In [7]:
df_countries_orig = df_countries_orig.withColumn("Country", \
                                   f.when(f.trim(f.col("Country")) == 'Congo, Repub. of the', 'Republic of the Congo') \
                                    .when(f.trim(f.col("Country")) == 'Korea, North', 'North Korea') \
                                    .when(f.trim(f.col("Country")) == 'Congo, Dem. Rep.', 'Democratic Republic of the Congo') \
                                    .when(f.trim(f.col("Country")) == 'Burma', 'Myanmar') \
                                    .when(f.trim(f.col("Country")) == 'Central African Rep.', 'Central African Republic') \
                                    .when(f.trim(f.col("Country")) == 'Macau', 'Macao') \
                                    .when(f.trim(f.col("Country")) == 'Micronesia, Fed. St.', 'Micronesia') \
                                    .when(f.trim(f.col("Country")) == 'Virgin Islands', 'U.S. Virgin Islands') \
                                    .when(f.trim(f.col("Country")) == 'N. Mariana Islands', 'Northern Mariana Islands') \
                                    .when(f.trim(f.col("Country")) == 'Gambia, The', 'Gambia') \
                                    .when(f.trim(f.col("Country")) == 'Cote d\'Ivoire', 'Ivory Coast') \
                                    .when(f.trim(f.col("Country")) == 'St Pierre & Miquelon', 'Saint Pierre and Miquelon') \
                                    .when(f.trim(f.col("Country")) == 'British Virgin Is.', 'British Virgin Islands') \
                                    .when(f.trim(f.col("Country")) == 'Korea, South', 'South Korea') \
                                    .when(f.trim(f.col("Country")) == 'Bahamas, The', 'Bahamas') \
                                    .when(f.trim(f.col("Country")) == 'Trinidad & Tobago', 'Trinidad and Tobago') \
                                    .when(f.trim(f.col("Country")) == 'Bosnia & Herzegovina', 'Bosnia and Herzegovina') \
                                    .when(f.trim(f.col("Country")) == 'Antigua & Barbuda', 'Antigua and Barbuda') \
                                    .when(f.trim(f.col("Country")) == 'Sao Tome & Principe', 'Sao Tome and Principe') \
                                    .when(f.trim(f.col("Country")) == 'Saint Kitts & Nevis', 'Saint Kitts and Nevis') \
                                    .when(f.trim(f.col("Country")) == 'Turks & Caicos Is', 'Turks and Caicos Islands') \
                                    .otherwise(df_countries_orig["Country"]))

Check that there are records in the Country Codes dataset that are not included in the Countries dataset.<br>
Exclude countries shown below (Korea, Congo, etc.)

In [8]:
join_expr = f.trim(df_countries_orig["Country"]) == f.trim(df_names_orig["country_name"])
df_tmp = df_names_orig.join(df_countries_orig, join_expr, "left_anti") \
                        .select("country_name", "country_code") \
                        .where("country_code not in ('BQ','CW','SX','PS')")

df_tmp.show(100, truncate=False)

+--------------------------------------------+------------+
|country_name                                |country_code|
+--------------------------------------------+------------+
|Saint Barthelemy                            |BL          |
|Bouvet Island                               |BV          |
|Tokelau                                     |TK          |
|South Georgia and the South Sandwich Islands|GS          |
|Heard Island and McDonald Islands           |HM          |
|Svalbard and Jan Mayen                      |SJ          |
|Pitcairn                                    |PN          |
|Montenegro                                  |ME          |
|Saint Martin                                |MF          |
|British Indian Ocean Territory              |IO          |
|Falkland Islands                            |FK          |
|Norfolk Island                              |NF          |
|Niue                                        |NU          |
|Kosovo                                 

There are many country codes without information in the Countries dataset.<br>
Information about these countries and the codes below should also be loaded into the Countries dataset.

In [9]:
#save the list of missing countries to a temp dataset
df_tmp_mis_cntr = df_tmp.select("country_name", "country_code")

## Countries - Nobel Laureates
Review countries from the Nobel Laureates dataset that are not in the Countries and ISO2 datasets.<br>
Columns in the Nobel Laureates dataset: "Birth Country", "Death Country", "Organization Country"

In [10]:
# Records without Country names for Birth Country
join_expr = f.trim(df_countries_orig["Country"]) == f.trim(df_nobel_orig["Birth Country"])
df_tmp = df_nobel_orig.join(df_countries_orig, join_expr, "left_anti") \
                        .withColumnRenamed("Birth Country", "country_name") \
                        .where("country_name is not null") \
                        .select("country_name") \
                        .distinct()

# Records without Country names for Death Country
join_expr = f.trim(df_countries_orig["Country"]) == f.trim(df_nobel_orig["Death Country"])
df_tmp2 = df_nobel_orig.join(df_countries_orig, join_expr, "left_anti") \
                        .withColumnRenamed("Death Country", "country_name") \
                        .where("country_name is not null") \
                        .select("country_name") \
                        .distinct()

# Records without Country names for Organization Country
join_expr = f.trim(df_countries_orig["Country"]) == f.trim(df_nobel_orig["Organization Country"])
df_tmp3 = df_nobel_orig.join(df_countries_orig, join_expr, "left_anti") \
                        .withColumnRenamed("Organization Country", "country_name") \
                        .where("country_name is not null") \
                        .select("country_name") \
                        .distinct()

# Union 3 dataframes with missing columns, select unique values, save result to the temp dataframe
df_tmp_mis_hist_cntr = df_tmp.union(df_tmp2).union(df_tmp3).distinct()

# Check results
df_tmp_mis_hist_cntr.show(100,truncate=False)

+---------------------------------------------+
|country_name                                 |
+---------------------------------------------+
|British Protectorate of Palestine (Israel)   |
|Germany (Poland)                             |
|W&uuml;rttemberg (Germany)                   |
|Czechoslovakia (Czech Republic)              |
|Russian Empire (Latvia)                      |
|Trinidad                                     |
|Austria-Hungary (Poland)                     |
|Schleswig (Germany)                          |
|Russian Empire (Poland)                      |
|British India (India)                        |
|Germany (France)                             |
|British West Indies (Saint Lucia)            |
|Tuscany (Italy)                              |
|Bosnia (Bosnia and Herzegovina)              |
|India (Pakistan)                             |
|Poland (Ukraine)                             |
|Austria-Hungary (Ukraine)                    |
|Russian Empire (Lithuania)             

The following countries exist on this list, and the names can be fixed in the original Nobel Laureates dataset:<br>
- "United States of America" is "United States"
- "Guadeloupe Island" is "Guadeloupe"
- "Germany (Poland)", "Germany (France)", "Germany (Russia)" should be fixed to "Germany"
- "Poland (Ukraine)", "Poland (Lithuania)", "Poland (Belarus)" should be fixed to "Poland"
- "Hungary (Slovakia)" should be fixed to "Hungary"
- "India (Pakistan)" should be fixed to "India"<br>
Fix the name "W&uuml;rttemberg (Germany)" to "Württemberg (Germany)"<br>
Update the Nobel Laureates original dataset.

In [11]:
df_nobel_orig = df_nobel_orig.withColumn("Birth Country",  
         f.when(f.col("Birth Country") == 'United States of America', 'United States') \
          .when(f.col("Birth Country") == 'Guadeloupe Island', 'Guadeloupe') \
          .when((f.col("Birth Country") == 'Germany (Poland)') | (f.col("Birth Country") == 'Germany (France)') \
                | (f.col("Birth Country") == 'Germany (Russia)'), 'Germany') \
          .when((f.col("Birth Country") == 'Poland (Ukraine)') | (f.col("Birth Country") == 'Poland (Lithuania)') \
                | (f.col("Birth Country") == 'Poland (Belarus)'), 'Poland') \
          .when(f.col("Birth Country") == 'Hungary (Slovakia)', 'Hungary') \
          .when(f.col("Birth Country") == 'India (Pakistan)', 'India') \
          .when(f.col("Birth Country") == 'W&uuml;rttemberg (Germany)', \
                'Württemberg (Germany)') \
          .otherwise(df_nobel_orig["Birth Country"])) \
     .withColumn("Death Country",  
         f.when(f.col("Death Country") == 'United States of America', 'United States') \
          .when(f.col("Death Country") == 'Guadeloupe Island', 'Guadeloupe') \
          .when((f.col("Death Country") == 'Germany (Poland)') | (f.col("Death Country") == 'Germany (France)') \
                | (f.col("Death Country") == 'Germany (Russia)'), 'Germany') \
          .when((f.col("Death Country") == 'Poland (Ukraine)') | (f.col("Death Country") == 'Poland (Lithuania)') \
                | (f.col("Death Country") == 'Poland (Belarus)'), 'Poland') \
          .when(f.col("Death Country") == 'Hungary (Slovakia)', 'Hungary') \
          .when(f.col("Death Country") == 'India (Pakistan)', 'India') \
          .otherwise(df_nobel_orig["Death Country"])) \
     .withColumn("Organization Country",  
         f.when(f.col("Organization Country") == 'United States of America', 'United States') \
          .when(f.col("Organization Country") == 'Guadeloupe Island', 'Guadeloupe') \
          .when((f.col("Organization Country") == 'Germany (Poland)') | (f.col("Organization Country") == 'Germany (France)') \
                | (f.col("Organization Country") == 'Germany (Russia)'), 'Germany') \
          .when((f.col("Organization Country") == 'Poland (Ukraine)') | (f.col("Organization Country") == 'Poland (Lithuania)') \
                | (f.col("Organization Country") == 'Poland (Belarus)'), 'Poland') \
          .when(f.col("Organization Country") == 'Hungary (Slovakia)', 'Hungary') \
          .when(f.col("Organization Country") == 'India (Pakistan)', 'India') \
          .otherwise(df_nobel_orig["Organization Country"]))

Delete records with updated names from df_tmp_mis_hist_cntr, update "Württemberg"

In [12]:
df_tmp_mis_hist_cntr = df_tmp_mis_hist_cntr.where("country_name not in ('United States of America', 'Guadeloupe Island', \
                                                 'Germany (Poland)', 'Germany (France)', 'Germany (Russia)', \
                                                 'Poland (Ukraine)', 'Poland (Lithuania)', 'Poland (Belarus)', \
                                                 'Hungary (Slovakia)', 'India (Pakistan)')") \
                            .withColumn("country_name", 
                                f.when(f.col("country_name") == 'W&uuml;rttemberg (Germany)', 'Württemberg (Germany)') \
                                 .otherwise(df_tmp_mis_hist_cntr["country_name"]))

## Cities - Nobel Laureates
Review countries from the Nobel Laureates dataset that are not in the Cities dataset.<br>
Columns in the dataset of Nobel Laureates: "Birth City", "Death City", "Organization City"

In [13]:
# City columns in the Nobel Laureate dataframe contain a state for the US country

print("Nobel Prizes - US city format:")
df_nobel_orig.withColumnRenamed("Birth City", "birth_city") \
             .withColumnRenamed("Birth Country", "birth_country") \
             .where("birth_city like '%New York%' or birth_city like '%Clinton%'") \
             .select("birth_city", "birth_country") \
             .distinct().show()

print("Cities - US city format:")
df_cities_orig.where("(AccentCity = 'New York' or AccentCity = 'Clinton') and Country = 'us' and Region = 'NY'") \
              .distinct().show()

Nobel Prizes - US city format:
+------------+-------------+
|  birth_city|birth_country|
+------------+-------------+
| Clinton, NY|United States|
|New York, NY|United States|
+------------+-------------+

Cities - US city format:
+-------+--------+----------+------+----------+------------------+-----------+
|Country|    City|AccentCity|Region|Population|          Latitude|  Longitude|
+-------+--------+----------+------+----------+------------------+-----------+
|     us|new york|  New York|    NY| 8107916.0|        40.7141667|-74.0063889|
|     us| clinton|   Clinton|    NY|      null|43.048333299999996|-75.3788889|
+-------+--------+----------+------+----------+------------------+-----------+



The Nobel dataset has the following format for US cities: "city, STATE". The Cities dataset contains only the City name in the AccentCity column and the state name in the Region column.<br>
Create a new column in the format "AccentCity, Region" for US cities and "AccentCity" for other cities in the Cities dataset.

In [14]:
df_cities_orig = df_cities_orig.withColumn("CityUpd", \
                              f.when(f.col("Country") == 'us', f.concat_ws(', ','AccentCity','Region')) \
                               .otherwise(f.col("AccentCity")))

Review missing city names in the Cities dataset

In [15]:
# Records without City names for Birth Ciry
join_expr = f.trim(df_cities_orig["CityUpd"]) == f.trim(df_nobel_orig["Birth City"])
df_tmp = df_nobel_orig.join(df_cities_orig, join_expr, "left_anti") \
                        .withColumnRenamed("Birth City", "mis_city_name") \
                        .where("mis_city_name is not null") \
                        .select("mis_city_name") \
                        .distinct()

# Records without City names for Death Ciry
join_expr = f.trim(df_cities_orig["CityUpd"]) == f.trim(df_nobel_orig["Death City"])
df_tmp2 = df_nobel_orig.join(df_cities_orig, join_expr, "left_anti") \
                        .withColumnRenamed("Death City", "mis_city_name") \
                        .where("mis_city_name is not null") \
                        .select("mis_city_name") \
                        .distinct()

# Records without City names for Organization Ciry
join_expr = f.trim(df_cities_orig["CityUpd"]) == f.trim(df_nobel_orig["Organization City"])
df_tmp3 = df_nobel_orig.join(df_cities_orig, join_expr, "left_anti") \
                        .withColumnRenamed("Organization City", "mis_city_name") \
                        .where("mis_city_name is not null") \
                        .select("mis_city_name") \
                        .distinct()

# Union 3 dataframes with missing columns, select unique values, save result to the temp dataframe
df_tmp_mis_cities = df_tmp.union(df_tmp2).union(df_tmp3).distinct()

In [16]:
df_tmp_mis_cities.show(500,truncate=False)

+--------------------------------+
|mis_city_name                   |
+--------------------------------+
|Dabrovica                       |
|Langford Grove, Maldon, Essex   |
|Danzig (Gdansk)                 |
|Nam Ha province                 |
|Lethbridge, Alberta             |
|Nizhny Tagil                    |
|Champaign-Urbana, IL            |
|Mürzzuschlag                    |
|Zelvas                          |
|Yamanashi Prefecture            |
|Nitzkydorf, Banat               |
|Zhejiang Ningbo                 |
|Ta'izz                          |
|Sorau (Zory)                    |
|Waltersdorf (Niegoslawice)      |
|Priluka (Nova Pryluka)          |
|Corteno                         |
|Neisse (Nysa)                   |
|Gjesdal                         |
|Fleräng                         |
|Strehlen (Strzelin)             |
|Val di Castello                 |
|Gaffken (Parusnoye)             |
|Toyama City                     |
|Uskup (Skopje)                  |
|Clausthal (Claustha

All of these cities were found in the original Cities dataset. No need to load additional cities.<br>
Some of the cities have historical names.

## ISO3 Codes and relationship ISO2 - ISO3
Check iso3.json dataset

In [17]:
print("Count of records: ", df_iso3_orig.count())
print("Count of unique ISO3: ", df_iso3_orig.select("iso3_code").distinct().count())
print("Count of unique ISO2: ", df_iso3_orig.select("country_code").distinct().count())

Count of records:  250
Count of unique ISO3:  250
Count of unique ISO2:  250


Check that 1 ISO2 country code contains no more than 1 ISO3 country code and there are no duplicates in country codes.

In [18]:
df_iso3_orig.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



Check relationship between ISO2 and ISO3 datasets

In [19]:
join_expr = f.trim(df_names_orig["country_code"]) == f.trim(df_iso3_orig["country_code"])
df_names_orig.join(df_iso3_orig, join_expr, "outer") \
                .where("country_name is null or country_name = '' or iso3_code is null or iso3_code = ''") \
                .show(100, truncate=False)

+------------+------------+------------+---------+
|country_code|country_name|country_code|iso3_code|
+------------+------------+------------+---------+
+------------+------------+------------+---------+



"1-to-1" relationship, both codes can be included in 1 entity "Country Codes".

## Phone Codes and relationship ISO2 - Phone
Check phones.json dataset

In [20]:
print("Count of records: ", df_phones_orig.count())
print("Count of unique Phone codes: ", df_phones_orig.select("phone_code").distinct().count())
print("Count of unique ISO2: ", df_phones_orig.select("country_code").distinct().count())

Count of records:  250
Count of unique Phone codes:  234
Count of unique ISO2:  250


Check that 1 ISO2 country code contains no more than 1 phone country code and there are no duplicates in country codes.

In [21]:
df_phones_orig.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



Check relationship between ISO2 and Phone datasets

In [22]:
join_expr = f.trim(df_names_orig["country_code"]) == f.trim(df_phones_orig["country_code"])
df_names_orig.join(df_phones_orig, join_expr, "outer") \
                .where("country_name is null or country_name = '' or phone_code is null or phone_code = ''") \
                .show(100, truncate=False)

+------------+--------------------------------------------+------------+----------+
|country_code|country_name                                |country_code|phone_code|
+------------+--------------------------------------------+------------+----------+
|AQ          |Antarctica                                  |AQ          |          |
|BV          |Bouvet Island                               |BV          |          |
|GS          |South Georgia and the South Sandwich Islands|GS          |          |
|TF          |French Southern Territories                 |TF          |          |
|XK          |Kosovo                                      |XK          |          |
+------------+--------------------------------------------+------------+----------+



"1-to-M" relationship, both codes can be included in 1 entity "Country Codes"

## Currency Codes and relationship ISO2 - Currencies
Check currency.json dataset

In [23]:
print("Count of records: ", df_currencies_orig.count())
print("Count of unique Currency codes: ", df_currencies_orig.select("currency_code").distinct().count())
print("Count of unique ISO2: ", df_currencies_orig.select("country_code").distinct().count())

Count of records:  250
Count of unique Currency codes:  157
Count of unique ISO2:  250


Check that 1 ISO2 country code contains no more than 1 currency country code and there are no duplicates in country codes.

In [24]:
df_currencies_orig.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



Check relationship between ISO2 and Currency datasets

In [25]:
join_expr = f.trim(df_names_orig["country_code"]) == f.trim(df_currencies_orig["country_code"])
df_names_orig.join(df_currencies_orig, join_expr, "outer") \
                .where("country_name is null or country_name = '' or currency_code is null or currency_code = ''") \
                .show(100, truncate=False)

+------------+------------+------------+-------------+
|country_code|country_name|country_code|currency_code|
+------------+------------+------------+-------------+
|AQ          |Antarctica  |AQ          |             |
+------------+------------+------------+-------------+



"1-to-M" relationship, both codes can be included in 1 entity "Country Codes".

## Capitals
Check capitals.json dataset<br>
Capitals should be included in Countries dataset.

In [26]:
print("Count of records: ", df_capitals_orig.count())
print("Count of unique Capitals: ", df_capitals_orig.select("capital_name").distinct().count())
print("Count of unique ISO2: ", df_capitals_orig.select("country_code").distinct().count())

Count of records:  250
Count of unique Capitals:  244
Count of unique ISO2:  250


Check the Netherlands Antilles country information with 3 different country codes BQ, CW and SX.

In [27]:
df_capitals_orig.where("country_code in('BQ', 'CW', 'SX')").show()

+------------+------------+
|country_code|capital_name|
+------------+------------+
|          BQ|            |
|          CW|  Willemstad|
|          SX| Philipsburg|
+------------+------------+



Capitals are different. The capital of the Netherlands Antilles is Willemstad, it will be added manually.
Check that 1 country code contains no more than 1 capital and there are no duplicates in country codes.

In [28]:
df_capitals_orig.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



## Continents
Check continent.json dataset<br>
Continents should be included in Countries dataset.

Check the Netherlands Antilles country information with 3 different country codes BQ, CW and SX.

In [29]:
df_continents_orig.where("country_code in('BQ', 'CW', 'SX')").show()

+------------+--------------+
|country_code|continent_code|
+------------+--------------+
|          BQ|            NA|
|          CW|            NA|
|          SX|            NA|
+------------+--------------+



Continents are the same.<br>
Check that 1 country code contains no more than 1 continent and there are no duplicates in country codes.

In [30]:
df_continents_orig.groupBy("country_code").count().where("count > 1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



Continent information will be included in Countries dataset.

### Results

The following entities will be created from the original datasets "Countries" and "Country Data" (JSON files):<br>
1. "Currency Codes"
2. "Continents"
3. "Country Codes" with ISO2 codes, ISO3 codes, Phone codes and a link to the "Currency Codes" entity
4. "Countries" with with information about countries and a link to the "Continents" entity. Information will be loaded in 3 steps:
    - from the original dataset "Countries";
    - from datasets with ISO2 codes and Country Names - for countries not included in the original "Countries" dataset;
    - from the original dataset "Nobel Prizes" - historical countries that do not exist today, with links to a modern country.
5. "Codes in Countries" with links to the "Countries" and "Country Codes" entities
6. "Regions" with a list of regions from the original dataset "Countries"<br>

The following entities will be created from the original datasets "Cities" and "Nobel Prizes":
1. "Cities" - with the link to entity "Countries"
2. "Laureate Types" 
3. "Categories"
4. "Prize Types"
5. "Genders"
6. "Laureate Persons" - with links to the entities "Genders" and "Sities". For Laureates with the "Individual" type
7. "Organizations" - with link to the "Cities" entity. For the list of organizations where the laureates worked
8. "Societies" - for Laureates with the "Organization" type
9. "Persons in Organizations" - for relationship "Laureate Persons" - "Societies" as "M-to-N", with links to these entities
10. "Nobel Prizes" - entity with a fact, contains links to the entities "Laureate Types", "Categories", "Prize Types", "Laureate Persons"

# Creating tables

## Laureate Types table
Contains a list of types of Nobel laureates

In [31]:
df_laureate_types = df_nobel_orig.select("Laureate Type") \
                                 .distinct() \
                                 .withColumn("laureate_type_id", f.monotonically_increasing_id()) \
                                 .select("laureate_type_id", "Laureate Type") \
                                 .withColumnRenamed("Laureate Type", "laureate_type")

Check dataframe

In [32]:
df_laureate_types.show()

+----------------+-------------+
|laureate_type_id|laureate_type|
+----------------+-------------+
|               0|   Individual|
|               1| Organization|
+----------------+-------------+



In [None]:
# Save to hive - hadoop
# df_laureate_types.write.format("parquet").mode("overwrite").saveAsTable("whdb.laureate_types")

## Categories table
Contains a list of Nobel Prize categories

In [33]:
df_categories = df_nobel_orig.select("Category") \
                             .distinct() \
                             .withColumn("category_id", f.monotonically_increasing_id()) \
                             .select("category_id", "Category") \
                             .withColumnRenamed("Category", "category")

Check dataframe

In [34]:
df_categories.show()

+-----------+----------+
|category_id|  category|
+-----------+----------+
|          0| Chemistry|
|          1|  Medicine|
|          2|   Physics|
|          3|Literature|
|          4| Economics|
|          5|     Peace|
+-----------+----------+



In [None]:
# Save to hive - hadoop
# df_categories.write.format("parquet").mode("overwrite").saveAsTable("whdb.categories")

## Prize Types table
Contains the list of Prize Types of Nobel Prizes

In [35]:
df_prize_types = df_nobel_orig.select("Prize") \
                              .distinct() \
                              .withColumn("prize_type_id", f.monotonically_increasing_id()) \
                              .select("prize_type_id", "Prize") \
                              .withColumnRenamed("Prize", "prize_type")

Check the dataframe

In [36]:
df_prize_types.show(5,truncate=False)

+-------------+-----------------------------------------------------+
|prize_type_id|prize_type                                           |
+-------------+-----------------------------------------------------+
|0            |The Nobel Prize in Physics 1908                      |
|1            |The Nobel Peace Prize 1938                           |
|2            |The Nobel Prize in Physics 1950                      |
|3            |The Nobel Prize in Chemistry 1982                    |
|4            |The Sveriges Riksbank Prize in Economic Sciences 1999|
+-------------+-----------------------------------------------------+
only showing top 5 rows



In [None]:
# Save to hive - hadoop
# df_prize_types.write.format("parquet").mode("overwrite").saveAsTable("whdb.prize_types")

## Genders table
Contains a list of the genders of individual Nobel Prize Laureates

In [37]:
df_genders = df_nobel_orig.select("Sex") \
                .where("Sex is not null") \
                .distinct() \
                .withColumn("gender_id", f.monotonically_increasing_id()) \
                .select("gender_id", "Sex") \
                .withColumnRenamed("Sex", "gender")

Check dataframe

In [38]:
df_genders.show()

+---------+------+
|gender_id|gender|
+---------+------+
|        0|Female|
|        1|  Male|
+---------+------+



In [None]:
# Save to hive - hadoop
# df_genders.write.format("parquet").mode("overwrite").saveAsTable("whdb.genders")

## Regions table
Contains a list of country regions

In [39]:
df_regions = df_countries_orig.select("Region") \
                .distinct() \
                .withColumn("region_id", f.monotonically_increasing_id()) \
                .select("region_id", "Region") \
                .withColumnRenamed("Region", "region_name")

Check dataframe

In [40]:
df_regions.show(truncate=False)

+---------+-----------------------------------+
|region_id|region_name                        |
+---------+-----------------------------------+
|0        |BALTICS                            |
|1        |C.W. OF IND. STATES                |
|2        |ASIA (EX. NEAR EAST)               |
|3        |WESTERN EUROPE                     |
|4        |NORTHERN AMERICA                   |
|5        |NEAR EAST                          |
|6        |EASTERN EUROPE                     |
|7        |OCEANIA                            |
|8        |SUB-SAHARAN AFRICA                 |
|9        |NORTHERN AFRICA                    |
|10       |LATIN AMER. & CARIB                |
+---------+-----------------------------------+



In [None]:
# Save to hive - hadoop
# df_regions.write.format("parquet").mode("overwrite").saveAsTable("whdb.regions")

## Continents table
Contains a list of countries continents

In [41]:
df_continents = df_continents_orig.select("continent_code") \
                .distinct() \
                .withColumn("continent_id", f.monotonically_increasing_id()) \
                .withColumn("continent_name",  f.when(f.col("continent_code") == 'NA', 'NORTH AMERICA') \
                                                .when(f.col("continent_code") == 'SA', 'SOUTH AMERICA') \
                                                .when(f.col("continent_code") == 'AS', 'ASIA') \
                                                .when(f.col("continent_code") == 'AN', 'ANTARCTICA') \
                                                .when(f.col("continent_code") == 'OC', 'AUSTRALIA/OCEANIA') \
                                                .when(f.col("continent_code") == 'EU', 'EUROPE') \
                                                .when(f.col("continent_code") == 'AF', 'AFRICA') \
                                                .otherwise('')) \
                .select("continent_id", "continent_name", "continent_code")

Check dataframe

In [42]:
df_continents.show()

+------------+-----------------+--------------+
|continent_id|   continent_name|continent_code|
+------------+-----------------+--------------+
|           0|    NORTH AMERICA|            NA|
|           1|    SOUTH AMERICA|            SA|
|           2|             ASIA|            AS|
|           3|       ANTARCTICA|            AN|
|           4|AUSTRALIA/OCEANIA|            OC|
|           5|           EUROPE|            EU|
|           6|           AFRICA|            AF|
+------------+-----------------+--------------+



In [None]:
# Save to hive - hadoop
# df_continents.write.format("parquet").mode("overwrite").saveAsTable("whdb.continents")

## Currency Codes table
Contains a list of currency codes

In [43]:
df_currencies = df_currencies_orig.select("currency_code") \
                .where("currency_code != ''") \
                .distinct() \
                .withColumn("currency_code_id", f.monotonically_increasing_id()) \
                .select("currency_code_id", "currency_code")

Check dataframe

In [44]:
df_currencies.show(5)

+----------------+-------------+
|currency_code_id|currency_code|
+----------------+-------------+
|               0|          DKK|
|               1|          XPF|
|               2|          NZD|
|               3|          HUF|
|               4|          BDT|
+----------------+-------------+
only showing top 5 rows



In [None]:
# Save to hive - hadoop
# df_currencies.write.format("parquet").mode("overwrite").saveAsTable("whdb.currencies")

## Countries table
Contains a list of countries. Information will be loaded from 3 dataframes: 
1. df_countries_orig - the original Countries dataset
2. df_tmp_mis_cntr - data that does not exist in the Countries dataset and exists in the ISO2 codes names.json dataset
3. df_tmp_mis_hist_cntr - data with historical names of countries that do not exist today (with added parent_country_id)

### Union data from 3 dataframes

In [45]:
# Countries dataframe structure
df_countries_orig.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Area (sq. mi.): integer (nullable = true)
 |-- Pop. Density (per sq. mi.): string (nullable = true)
 |-- Coastline (coast/area ratio): string (nullable = true)
 |-- Net migration: string (nullable = true)
 |-- Infant mortality (per 1000 births): string (nullable = true)
 |-- GDP ($ per capita): integer (nullable = true)
 |-- Literacy (%): string (nullable = true)
 |-- Phones (per 1000): string (nullable = true)
 |-- Arable (%): string (nullable = true)
 |-- Crops (%): string (nullable = true)
 |-- Other (%): string (nullable = true)
 |-- Climate: string (nullable = true)
 |-- Birthrate: string (nullable = true)
 |-- Deathrate: string (nullable = true)
 |-- Agriculture: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Service: string (nullable = true)



Union dataframes 1 and 2 (original Countries + missing countries from ISO2)

In [46]:
# Prepare dataframe df_tmp_mis_cntr with missing country names
# Create the same structure
df_tmp_mis_cntr = df_tmp_mis_cntr.select("country_name") \
    .withColumn("1", f.lit(None)).withColumn("2", f.lit(None)).withColumn("3", f.lit(None)).withColumn("4", f.lit(None)) \
    .withColumn("5", f.lit(None)).withColumn("6", f.lit(None)).withColumn("7", f.lit(None)).withColumn("8", f.lit(None)) \
    .withColumn("9", f.lit(None)).withColumn("10", f.lit(None)).withColumn("11", f.lit(None)).withColumn("12", f.lit(None)) \
    .withColumn("13", f.lit(None)).withColumn("14", f.lit(None)).withColumn("15", f.lit(None)).withColumn("16", f.lit(None)) \
    .withColumn("17", f.lit(None)).withColumn("18", f.lit(None)).withColumn("19", f.lit(None))

# Union dataframes
df_countries = df_countries_orig.union(df_tmp_mis_cntr)

# Add an ID column to the union dataframes 1 and 2, add null column Parent Country ID
w = Window.orderBy("Country")
df_countries = df_countries.withColumn("country_id", f.row_number().over(w) - 1) \
                            .withColumn("parent_country_id", f.lit(None).cast('integer'))

# Check the max value of country_id
max_count_id = df_countries.select(f.max("country_id")).collect()[0][0]
print("Max country_id after union dataframes 1 and 2 = ", max_count_id)

# Rename columns, update column order
df_countries = df_countries.withColumnRenamed("Country", "country_name") \
                           .withColumnRenamed("Population", "population") \
                           .withColumnRenamed("Region", "region") \
                           .withColumnRenamed("Area (sq. mi.)", "area_sq_miles") \
                           .withColumnRenamed("Pop. Density (per sq. mi.)", "pop_dencity_per_sq_mile") \
                           .withColumnRenamed("Coastline (coast/area ratio)", "coastline") \
                           .withColumnRenamed("Net migration", "net_migration") \
                           .withColumnRenamed("Infant mortality (per 1000 births)", "infant_mortality_per_1000") \
                           .withColumnRenamed("GDP ($ per capita)", "gdb_dollar_per_capita") \
                           .withColumnRenamed("Literacy (%)", "percent_literacy") \
                           .withColumnRenamed("Phones (per 1000)", "phones_per_1000") \
                           .withColumnRenamed("Arable (%)", "percent_arable") \
                           .withColumnRenamed("Crops (%)", "percent_crops") \
                           .withColumnRenamed("Other (%)", "percent_other") \
                           .withColumnRenamed("Climate", "climate") \
                           .withColumnRenamed("Birthrate", "birthrate") \
                           .withColumnRenamed("Deathrate", "deathrate") \
                           .withColumnRenamed("Agriculture", "agriculture") \
                           .withColumnRenamed("Industry", "industry") \
                           .withColumnRenamed("Service", "service") \
                           .select("country_id", "parent_country_id", "country_name", "population", "region", "area_sq_miles", \
                                   "pop_dencity_per_sq_mile", "coastline", "net_migration", "infant_mortality_per_1000", \
                                   "gdb_dollar_per_capita", "percent_literacy", "phones_per_1000", "percent_arable", \
                                   "percent_crops", "percent_other", "climate", "birthrate", "deathrate", "agriculture", \
                                   "industry", "service")

Max country_id after union dataframes 1 and 2 =  248


Add Country ID and Parent Country ID to dataframe 3 (historical records)

In [47]:
# Add Country ID - starts from the next country_id = 249
w = Window.orderBy("country_name")
df_tmp_mis_hist_cntr = df_tmp_mis_hist_cntr.withColumn("country_id", f.row_number().over(w) + max_count_id)

# Add Parent Country ID = Country ID from the dataframe df_countries created below
df_countries.createOrReplaceTempView("ORIG_CNT")
df_tmp_mis_hist_cntr.createOrReplaceTempView("HIST_CNT")
df_tmp = sp.sql("select country_id, \
       case \
       when country_name like '%(Poland)' then (select country_id from ORIG_CNT where trim(country_name) = 'Poland') \
       when country_name like '%(Austria)' then (select country_id from ORIG_CNT where trim(country_name) = 'Austria') \
       when country_name like '%(Croatia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Croatia') \
       when country_name like '%(Czech Republic)' or country_name like '%Czechoslovakia%' \
            then (select country_id from ORIG_CNT where trim(country_name) = 'Czech Republic') \
       when country_name like '%(Hungary)' then (select country_id from ORIG_CNT where trim(country_name) = 'Hungary') \
       when country_name like '%(Slovenia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Slovenia') \
       when country_name like '%(Ukraine)' then (select country_id from ORIG_CNT where trim(country_name) = 'Ukraine') \
       when country_name like '%(Italy)' then (select country_id from ORIG_CNT where trim(country_name) = 'Italy') \
       when country_name like '%(Germany)' or trim(country_name) in ('East Germany', 'Federal Republic of Germany')\
            then (select country_id from ORIG_CNT where trim(country_name) = 'Germany') \
       when country_name like '%(Israel)' then (select country_id from ORIG_CNT where trim(country_name) = 'Israel') \
       when country_name like '%(Bangladesh)' then (select country_id from ORIG_CNT where trim(country_name) = 'Bangladesh') \
       when country_name like '%(India)' then (select country_id from ORIG_CNT where trim(country_name) = 'India') \
       when country_name like '%(Saint Lucia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Saint Lucia') \
       when country_name like '%(Greece)' then (select country_id from ORIG_CNT where trim(country_name) = 'Greece') \
       when country_name like '%(Denmark)' then (select country_id from ORIG_CNT where trim(country_name) = 'Denmark') \
       when country_name like '%(Algeria)' then (select country_id from ORIG_CNT where trim(country_name) = 'Algeria') \
       when country_name like '%(Ghana)' then (select country_id from ORIG_CNT where trim(country_name) = 'Ghana') \
       when country_name like '%(Slovakia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Slovakia') \
       when country_name like '%(Turkey)' then (select country_id from ORIG_CNT where trim(country_name) = 'Turkey') \
       when country_name like '%(Zambia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Zambia') \
       when country_name like '%(Iran)' then (select country_id from ORIG_CNT where trim(country_name) = 'Iran') \
       when country_name like '%(Belarus)' then (select country_id from ORIG_CNT where trim(country_name) = 'Belarus') \
       when country_name like '%(Russia)' or country_name like '%Union of Soviet Socialist Republics%' \
            then (select country_id from ORIG_CNT where trim(country_name) = 'Russia') \
       when country_name like '%(Lithuania)' then (select country_id from ORIG_CNT where trim(country_name) = 'Lithuania') \
       when country_name like '%(Azerbaijan)' then (select country_id from ORIG_CNT where trim(country_name) = 'Azerbaijan') \
       when country_name like '%(Finland)' then (select country_id from ORIG_CNT where trim(country_name) = 'Finland') \
       when country_name like '%(Latvia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Latvia') \
       when country_name like '%France)' then (select country_id from ORIG_CNT where trim(country_name) = 'France') \
       when country_name like '%(Zimbabwe)' then (select country_id from ORIG_CNT where trim(country_name) = 'Zimbabwe') \
       when country_name like '%(Pakistan)' then (select country_id from ORIG_CNT where trim(country_name) = 'Pakistan') \
       when country_name like '%China)' then (select country_id from ORIG_CNT where trim(country_name) = 'China') \
       when country_name like '%(Indonesia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Indonesia') \
       when country_name like '%(Serbia)' then (select country_id from ORIG_CNT where trim(country_name) = 'Serbia') \
       when country_name like '%(Republic of Macedonia)' \
            then (select country_id from ORIG_CNT where trim(country_name) = 'Macedonia') \
       when country_name like '%(Bosnia%' \
            then (select country_id from ORIG_CNT where trim(country_name) = 'Bosnia and Herzegovina') \
       when trim(country_name) in ('Northern Ireland', 'Scotland') \
            then (select country_id from ORIG_CNT where trim(country_name) = 'United Kingdom') \
       when country_name like '%(South Korea)' then (select country_id from ORIG_CNT where trim(country_name) = 'South Korea') \
       when trim(country_name) = 'Trinidad' \
            then (select country_id from ORIG_CNT where trim(country_name) = 'Trinidad and Tobago') \
       when country_name like '%(Myanmar)' then (select country_id from ORIG_CNT where trim(country_name) = 'Myanmar') \
       else null end as parent_country_id, \
       country_name \
       from HIST_CNT")

Union dataframes

In [48]:
# Create the same structure of dataframe 3 with historical names
df_tmp = df_tmp.withColumn("1", f.lit(None)).withColumn("2", f.lit(None)).withColumn("3", f.lit(None)) \
    .withColumn("4", f.lit(None)).withColumn("5", f.lit(None)).withColumn("6", f.lit(None)).withColumn("7", f.lit(None)) \
    .withColumn("8", f.lit(None)).withColumn("9", f.lit(None)).withColumn("10", f.lit(None)).withColumn("11", f.lit(None)) \
    .withColumn("12", f.lit(None)).withColumn("13", f.lit(None)).withColumn("14", f.lit(None)).withColumn("15", f.lit(None)) \
    .withColumn("16", f.lit(None)).withColumn("17", f.lit(None)).withColumn("18", f.lit(None)).withColumn("19", f.lit(None))

# Union dataframes
df_countries = df_countries.union(df_tmp)

# Check results
df_countries.select("country_id", "parent_country_id", "country_name", "population", "service") \
            .where("country_id in (0,1,2,248,249,250)") \
            .show(truncate=False)

+----------+-----------------+---------------------------------+----------+-------+
|country_id|parent_country_id|country_name                     |population|service|
+----------+-----------------+---------------------------------+----------+-------+
|0         |null             |Afghanistan                      |31056997  |0,38   |
|1         |null             |Aland Islands                    |null      |null   |
|2         |null             |Albania                          |3581655   |0,579  |
|248       |null             |Zimbabwe                         |12236805  |0,579  |
|249       |73               |Alsace (then Germany, now France)|null      |null   |
|250       |14               |Austria-Hungary (Austria)        |null      |null   |
+----------+-----------------+---------------------------------+----------+-------+



### Add region_id - link to the Regions table

In [49]:
join_expr = f.trim(df_countries["region"]) == f.trim(df_regions["region_name"])
df_countries = df_countries.join(df_regions, join_expr, "left_outer").drop("region", "region_name")

In [50]:
# Check null region_id values - records from the ISO2 countries dataframe. Should be 22 records
df_countries.select("country_id", "parent_country_id", "country_name", "region_id") \
            .where("region_id is null and parent_country_id is null") \
            .count()

22

### Add continent_id - link to the Continents table

In [51]:
# Join dataframes Countries, Continents, ISO2 Code
join_expr = f.trim(df_names_orig["country_code"]) == f.trim(df_continents_orig["country_code"])
df_tmp = df_names_orig.join(df_continents_orig, join_expr, "left_outer") \
                        .select("country_name", "continent_code") \
                        .withColumnRenamed("continent_code", "c_code") \
                        .withColumnRenamed("country_name", "c_name")

join_expr = f.trim(df_tmp["c_code"]) == f.trim(df_continents["continent_code"])
df_tmp = df_tmp.join(df_continents, join_expr, "left_outer").select("c_name", "continent_id")

join_expr = f.trim(df_tmp["c_name"]) == f.trim(df_countries["country_name"])
df_countries = df_countries.join(df_tmp, join_expr, "left_outer")

In [52]:
# Check null values for all countries except historical names
df_countries.select("country_id", "country_name", "continent_id") \
            .where("continent_id is null and parent_country_id is null").show(truncate=False)

+----------+---------------------+------------+
|country_id|country_name         |continent_id|
+----------+---------------------+------------+
|79        |Gaza Strip           |null        |
|155       |Netherlands Antilles |null        |
|244       |West Bank            |null        |
+----------+---------------------+------------+



Selected countries with no matches in the ISO2 dataframe. Add values manually:
- Gaza Strip and West Bank have the ISO2 code = 'PS' and continent = 'ASIA'
- Netherlands Antilles have 3 ISO2 codes, but 1 continent = 'NORTH AMERICA'

In [53]:
df_continents.createOrReplaceTempView("CONTINENTS")
asia_id = sp.sql("select continent_id from CONTINENTS where continent_name = 'ASIA'").collect()[0][0]
na_id = sp.sql("select continent_id from CONTINENTS where continent_name = 'NORTH AMERICA'").collect()[0][0]

df_countries = df_countries.withColumn("continent_id", \
                              f.when(f.trim(f.col("country_name")) == 'Gaza Strip', asia_id) \
                               .when(f.trim(f.col("country_name")) == 'Netherlands Antilles', na_id) \
                               .when(f.trim(f.col("country_name")) == 'West Bank', asia_id) \
                               .otherwise(df_tmp["continent_id"]))

In [54]:
# Check results
df_countries.select("country_id", "country_name", "continent_id") \
            .where("trim(country_name) in ('Gaza Strip', 'Netherlands Antilles', 'West Bank', 'United States')") \
            .show(truncate=False)

+----------+---------------------+------------+
|country_id|country_name         |continent_id|
+----------+---------------------+------------+
|79        |Gaza Strip           |2           |
|155       |Netherlands Antilles |0           |
|235       |United States        |0           |
|244       |West Bank            |2           |
+----------+---------------------+------------+



### Add Capitals

In [55]:
# Join dataframes Countries, ISO2 Code, Capitals
join_expr = f.trim(df_names_orig["country_code"]) == f.trim(df_capitals_orig["country_code"])
df_tmp = df_names_orig.join(df_capitals_orig, join_expr, "left_outer") \
                        .select("country_name", "capital_name") \
                        .withColumnRenamed("country_name", "c_name")

# + add missing capitals
join_expr = f.trim(df_tmp["c_name"]) == f.trim(df_countries["country_name"])
df_countries = df_countries.join(df_tmp, join_expr, "left_outer") \
                           .withColumn("capital_name", \
                                f.when(f.col("capital_name") == '', f.lit(None)) \
                                 .when(f.trim(f.col("country_name")) == 'Scotland', 'Edinburgh') \
                                 .when(f.trim(f.col("country_name")) == 'Northern Ireland', 'Belfast') \
                                 .when(f.trim(f.col("country_name")) == 'Netherlands Antilles', 'Willemstad') \
                                 .when(f.trim(f.col("country_name")) == 'West Bank', 'East Jerusalem') \
                                 .when(f.trim(f.col("country_name")) == 'Gaza Strip', 'East Jerusalem') \
                                 .otherwise(f.col("capital_name")))

In [56]:
# Update Countries dataset
df_countries = df_countries.select("country_id", "country_name", "region_id", "continent_id", "parent_country_id", \
                                   "capital_name", "population", "area_sq_miles", "pop_dencity_per_sq_mile", "coastline", \
                                   "net_migration", "infant_mortality_per_1000", "gdb_dollar_per_capita", "percent_literacy", \
                                   "phones_per_1000", "percent_arable", "percent_crops", "percent_other", "climate", \
                                   "birthrate", "deathrate", "agriculture", "industry", "service") \
                           .sort("country_id")

Check dataframe

In [57]:
df_countries.show(1, truncate=False)

+----------+------------+---------+------------+-----------------+------------+----------+-------------+-----------------------+---------+-------------+-------------------------+---------------------+----------------+---------------+--------------+-------------+-------------+-------+---------+---------+-----------+--------+-------+
|country_id|country_name|region_id|continent_id|parent_country_id|capital_name|population|area_sq_miles|pop_dencity_per_sq_mile|coastline|net_migration|infant_mortality_per_1000|gdb_dollar_per_capita|percent_literacy|phones_per_1000|percent_arable|percent_crops|percent_other|climate|birthrate|deathrate|agriculture|industry|service|
+----------+------------+---------+------------+-----------------+------------+----------+-------------+-----------------------+---------+-------------+-------------------------+---------------------+----------------+---------------+--------------+-------------+-------------+-------+---------+---------+-----------+--------+-------

In [None]:
# Save to hive - hadoop
# df_countries.write.format("parquet").mode("overwrite").saveAsTable("whdb.countries")

## Country Codes table
Contains a list of all possible Country Codes: ISO2, ISO3, country code names, phone codes, link to currency codes<br>
In most cases, country code names are the same as country names in the Countries dataset. 

In [58]:
# Join ISO3 Codes
df_country_codes = df_names_orig.withColumnRenamed("country_code", "country_code1")
join_expr = f.trim(df_country_codes["country_code1"]) == f.trim(df_iso3_orig["country_code"])
df_country_codes = df_country_codes.join(df_iso3_orig, join_expr, "left_outer")

# Join Curency Codes
df_country_codes = df_country_codes.withColumnRenamed("country_code", "country_code2")
join_expr = f.trim(df_country_codes["country_code1"]) == f.trim(df_currencies_orig["country_code"])
df_country_codes = df_country_codes.join(df_currencies_orig, join_expr, "left_outer")

# Join Currency ID
df_country_codes = df_country_codes.withColumnRenamed("currency_code", "currency_code1")
join_expr = f.trim(df_country_codes["currency_code1"]) == f.trim(df_currencies["currency_code"])
df_country_codes = df_country_codes.join(df_currencies, join_expr, "left_outer")

# Join Phone Codes
df_country_codes = df_country_codes.withColumnRenamed("country_code", "country_code3")
join_expr = f.trim(df_country_codes["country_code3"]) == f.trim(df_phones_orig["country_code"])
df_country_codes = df_country_codes.join(df_phones_orig, join_expr, "left_outer")

# Add ID, select and rename columns
df_country_codes = df_country_codes.sort("country_name") \
                                   .withColumn("country_code_id", f.monotonically_increasing_id()) \
                                   .withColumnRenamed("country_name", "code_name") \
                                   .withColumnRenamed("country_code", "iso2_code") \
                                   .select("country_code_id", "code_name", "iso2_code", "iso3_code", \
                                           "currency_code_id", "phone_code")

Check dataframe

In [59]:
df_country_codes.show(3,truncate=False)

+---------------+-------------+---------+---------+----------------+----------+
|country_code_id|code_name    |iso2_code|iso3_code|currency_code_id|phone_code|
+---------------+-------------+---------+---------+----------------+----------+
|0              |Afghanistan  |AF       |AFG      |147             |93        |
|1              |Aland Islands|AX       |ALA      |17              |+358-18   |
|2              |Albania      |AL       |ALB      |121             |355       |
+---------------+-------------+---------+---------+----------------+----------+
only showing top 3 rows



In [None]:
# Save to hive - hadoop
# df_country_codes.write.format("parquet").mode("overwrite").saveAsTable("whdb.country_codes")

## Codes in Countries table
Contains matches between countries and country codes

Join dataframes Country Codes with Countries (not historical records), add matches

In [60]:
join_expr = f.trim(df_country_codes["code_name"]) == f.trim(df_countries["country_name"])
df_codes_in_countries = df_country_codes.join(df_countries.where("parent_country_id is null"), join_expr, "outer") \
                                        .withColumn("iso2_code_tmp", \
                                           f.when(f.trim(f.col("country_name")) == 'Gaza Strip', 'PS') \
                                            .when(f.trim(f.col("country_name")) == 'West Bank', 'PS') \
                                            .otherwise(f.col("iso2_code"))) \
                                        .withColumn("country_name_tmp", \
                                           f.when(f.trim(f.col("iso2_code")) == 'BQ', 'Netherlands Antilles') \
                                            .when(f.trim(f.col("iso2_code")) == 'CW', 'Netherlands Antilles') \
                                            .when(f.trim(f.col("iso2_code")) == 'SX', 'Netherlands Antilles') \
                                            .otherwise(f.col("country_name"))) \
                                        .select("iso2_code_tmp", "country_name_tmp") \
                                        .where("country_name_tmp is not null") \
                                        .where("iso2_code_tmp is not null")

# Check results
df_codes_in_countries.where("iso2_code_tmp in ('BQ', 'CW', 'SX', 'PS')").show()

+-------------+--------------------+
|iso2_code_tmp|    country_name_tmp|
+-------------+--------------------+
|           BQ|Netherlands Antilles|
|           CW|Netherlands Antilles|
|           PS|         Gaza Strip |
|           SX|Netherlands Antilles|
|           PS|          West Bank |
+-------------+--------------------+



Add ID, Country ID, Country Code ID

In [61]:
join_expr = f.trim(df_codes_in_countries["iso2_code_tmp"]) == f.trim(df_country_codes["iso2_code"])
join_expr1 = f.trim(df_codes_in_countries["country_name_tmp"]) == f.trim(df_countries["country_name"])

# df_cc_tmp will be used in the section Cities
df_cc_tmp = df_codes_in_countries.join(df_country_codes, join_expr, "inner") \
                                 .join(df_countries, join_expr1, "inner")

df_codes_in_countries = df_cc_tmp.sort("country_code_id") \
                                 .withColumn("code_in_country_id", f.monotonically_increasing_id()) \
                                 .select("code_in_country_id", "country_code_id", "country_id")

Check dataframe

In [62]:
df_codes_in_countries.show(3,False)

+------------------+---------------+----------+
|code_in_country_id|country_code_id|country_id|
+------------------+---------------+----------+
|0                 |0              |0         |
|1                 |1              |1         |
|2                 |2              |2         |
+------------------+---------------+----------+
only showing top 3 rows



In [None]:
# Save to hive - hadoop
# df_codes_in_countries.write.format("parquet").mode("overwrite").saveAsTable("whdb.codes_in_countries")

## Cities table
Contains a list of all cities. Created from the original Cities dataset.
All cities in the Nobel Laureate dataset exist in the Cities dataset. No need to load additional cities.

### Create Cities dataset, add Country ID
Use the temp dataset Code in Countries because the list of countries in the Cities dataset is very similar to ISO2 codes

Prepare a temp dataframe with codes in countries to join.

In [63]:
df_cc_tmp = df_cc_tmp.withColumn("iso2_code", f.lower(f.col("iso2_code"))) \
                     .select("iso2_code", "country_name", "country_id")

Check the list of countries in the original Cities dataset - look for values that are not in the Country Code dataset.

In [64]:
join_expr = f.trim(df_cities_orig["Country"]) == f.trim(df_cc_tmp["iso2_code"])
df_tmp = df_cities_orig.join(df_cc_tmp, join_expr, "left_anti").select("Country").distinct()
df_tmp.show()

+-------+
|Country|
+-------+
|     zr|
|     an|
+-------+



Country 'zr' is 'Democratic Republic of the Congo', country code = 'CD'<br>
Country 'an' is 'Netherlands Antilles' with country codes = 'BQ', 'CW', 'SX'<br>

Check the cities with country codes 'zr' and 'cd' in the original dataset.

In [65]:
df_tmp1 = df_cities_orig.where("Country = 'zr'") \
                        .withColumn("City_zr", \
                           (f.concat(f.col("City"),f.lit('_'),f.col("AccentCity"),f.lit('_'),f.col("Region")))) \
                        .select("City_zr")
df_tmp2 = df_cities_orig.where("Country = 'cd'") \
                        .withColumn("City_cd", \
                           (f.concat(f.col("City"),f.lit('_'),f.col("AccentCity"),f.lit('_'),f.col("Region")))) \
                        .select("City_cd")

join_expr = f.trim(df_tmp1["City_zr"]) == f.trim(df_tmp2["City_cd"])
df_tmp = df_tmp1.join(df_tmp2, join_expr, "left_anti")

df_tmp.count()

0

Lists of cities in countries with codes 'zr' and 'cd' are identical.
Delete records with country = 'zr'

In [66]:
df_cities = df_cities_orig.where("Country != 'zr'")

Check if there are cities in the Netherlands Antilles with country codes "bq", "cw", "sx"

In [67]:
df_cities.where("Country in ('bq', 'cw', 'sx')").show()

+-------+----+----------+------+----------+--------+---------+-------+
|Country|City|AccentCity|Region|Population|Latitude|Longitude|CityUpd|
+-------+----+----------+------+----------+--------+---------+-------+
+-------+----+----------+------+----------+--------+---------+-------+



There are no Cities.<br>
Update the temp Code in the Country dataset - change the codes 'bq', 'cw', 'sx' to 'an', group it into one record

In [68]:
df_cc_tmp = df_cc_tmp.withColumn("iso2_code", f.when(f.col("iso2_code") == 'bq', 'an') \
                                               .when(f.col("iso2_code") == 'cw', 'an') \
                                               .when(f.col("iso2_code") == 'sx', 'an') \
                                               .otherwise(f.col("iso2_code"))) \
                     .distinct()

# Check results
df_cc_tmp.where("iso2_code = 'an'").show(5,False)

+---------+---------------------+----------+
|iso2_code|country_name         |country_id|
+---------+---------------------+----------+
|an       |Netherlands Antilles |155       |
+---------+---------------------+----------+



Check for duplicate country codes in the temp Code in Country dataframe - code 'ps'

In [69]:
df_cc_tmp.groupBy("iso2_code").count().where("count > 1").show()
df_cc_tmp.where("iso2_code = 'ps'").show()

+---------+-----+
|iso2_code|count|
+---------+-----+
|       ps|    2|
+---------+-----+

+---------+------------+----------+
|iso2_code|country_name|country_id|
+---------+------------+----------+
|       ps|  West Bank |       244|
|       ps| Gaza Strip |        79|
+---------+------------+----------+



Country 'ps' has 2 records in the Countries dataframe: 'West Bank' and 'Gaza Strip'<br>
Separate the codes in the temp Codes in Countries dataframe

In [70]:
df_cc_tmp = df_cc_tmp.withColumn("iso2_code", f.when(f.trim(f.col("country_name")) == 'West Bank', 'ps_wb') \
                                               .when(f.trim(f.col("country_name")) == 'Gaza Strip', 'ps_gs') \
                                               .otherwise(f.col("iso2_code")))

# Check results
df_cc_tmp.where("iso2_code like 'ps%'").show()

+---------+------------+----------+
|iso2_code|country_name|country_id|
+---------+------------+----------+
|    ps_wb|  West Bank |       244|
|    ps_gs| Gaza Strip |        79|
+---------+------------+----------+



Divide the codes in the original Cities dataframe by longitude.

In [71]:
df_cities = df_cities.withColumn("CountryUpd", 
                        f.when((f.col("Country") == 'ps') & (f.col("Longitude") <= 34.8), 'ps_gs') \
                         .when((f.col("Country") == 'ps') & (f.col("Longitude") > 34.8), 'ps_wb') \
                         .otherwise(f.col("Country")))

# Check results
df_cities.where("Country = 'ps'").select("AccentCity", "CountryUpd", "Longitude").show(5,False)

+---------------+----------+------------------+
|AccentCity     |CountryUpd|Longitude         |
+---------------+----------+------------------+
|Abasan         |ps_gs     |34.346111         |
|Abasan al Kabir|ps_gs     |34.346111         |
|Abasân el-Kabîr|ps_gs     |34.346111         |
|Abu al `Ajaj   |ps_wb     |35.488333000000004|
|`Abud          |ps_wb     |35.066944         |
+---------------+----------+------------------+
only showing top 5 rows



Join dataframes - add a Country ID to the Cities dataframe

In [72]:
# Check count of records in the Cities dataset before join
print("Count or City records before join = ", df_cities.count())

# Join dataframes
join_expr = f.trim(df_cities["CountryUpd"]) == f.trim(df_cc_tmp["iso2_code"])
df_cities = df_cities.join(df_cc_tmp, join_expr, "left_outer")

# Check count of records in the Cities dataset after join
print("Count or City records after join = ", df_cities.count())

Count or City records before join =  3150946
Count or City records after join =  3150946


### Add ID, rename or delete columns

In [73]:
df_cities = df_cities.sort("country_id", "AccentCity") \
                     .withColumn("city_id", f.monotonically_increasing_id()) \
                     .withColumnRenamed("City", "city_name") \
                     .withColumnRenamed("AccentCity", "accent_city_name") \
                     .withColumnRenamed("Region", "city_region") \
                     .withColumnRenamed("Population", "population") \
                     .withColumnRenamed("Latitude", "latitude") \
                     .withColumnRenamed("Longitude", "longitude") \
                     .select("city_id", "city_name", "accent_city_name", "country_id", "city_region", "population", \
                             "latitude", "longitude", "CityUpd", "Country")

Dataframe Cities completed.<br>
The CityUpd and Country columns are required for future joining the Nobel Laureates data frame. It will be removed later.<br>
Check dataframe:

In [74]:
df_cities.show(3,False)

+-------+---------------+----------------+----------+-----------+----------+---------+---------+---------------+-------+
|city_id|city_name      |accent_city_name|country_id|city_region|population|latitude |longitude|CityUpd        |Country|
+-------+---------------+----------------+----------+-----------+----------+---------+---------+---------------+-------+
|0      |"dekh""iykh'ya"|"Dekh""iykh'ya" |0         |13         |null      |34.60345 |69.2405  |"Dekh""iykh'ya"|af     |
|1      |"dekh""yak"    |"Dekh""yak"     |0         |10         |null      |32.739348|64.994051|"Dekh""yak"    |af     |
|2      |"dekh""yak"    |"Dekh""yak"     |0         |39         |null      |32.533033|65.892138|"Dekh""yak"    |af     |
+-------+---------------+----------------+----------+-----------+----------+---------+---------+---------------+-------+
only showing top 3 rows



In [None]:
# Save to hive - hadoop
# df_cities.write.format("parquet").mode("overwrite").saveAsTable("whdb.cities")

### Create a temp dataframe with a list of missing city names
Creating a dataframe with mismatches in city names to join the dataframes of laureates persons and organizations

In [75]:
data = [{"mis_city_name": "Casteldàwson", "orig_city_name": "Castledawson", "orig_region": "S7", "orig_country": "gb"},
{"mis_city_name": "Grand Valley, CO", "orig_city_name": "Green Valley Acres", "orig_region": "CO", "orig_country": "us"},
{"mis_city_name": "Yamanashi Prefecture", "orig_city_name": "Yamanashi", "orig_region": "37", "orig_country": "jp"},
{"mis_city_name": "Amherst, NS", "orig_city_name": "Amherst", "orig_region": "07", "orig_country": "ca"},
{"mis_city_name": "Champaign-Urbana, IL", "orig_city_name": "Champaign", "orig_region": "IL", "orig_country": "us"},
{"mis_city_name": "Danzig (Gdansk)", "orig_city_name": "Danzig", "orig_region": "82", "orig_country": "pl"},
{"mis_city_name": "Fleräng", "orig_city_name": "Uppsala", "orig_region": "21", "orig_country": "se"},
{"mis_city_name": "Kattowitz (Katowice)", "orig_city_name": "Kattowitz", "orig_region": "83", "orig_country": "pl"},
{"mis_city_name": "Hobart, Tasmania", "orig_city_name": "Hobart", "orig_region": "06", "orig_country": "au"},
{"mis_city_name": "Leningrad (Saint Petersburg)", "orig_city_name": "Leningrad", "orig_region": "66", "orig_country": "ru"},
{"mis_city_name": "Gaffken (Parusnoye)", "orig_city_name": "Gaffken", "orig_region": "23", "orig_country": "ru"},
{"mis_city_name": "Rufford, near Chesterfield", "orig_city_name": "Rufford", "orig_region": "H2", "orig_country": "gb"},
{"mis_city_name": "Goldschmieden, near Breslau", "orig_city_name": "Breslau", "orig_region": "72", "orig_country": "pl"},
{"mis_city_name": "Kibbutz Sde-Nahum", "orig_city_name": "Sede Nahum", "orig_region": "03", "orig_country": "il"},
{"mis_city_name": "Hansdorf (Lawice)", "orig_city_name": "Hansdorf", "orig_region": "85", "orig_country": "pl"},
{"mis_city_name": "Toyama City", "orig_city_name": "Toyama", "orig_region": "08", "orig_country": "jp"},
{"mis_city_name": "St. Petersburg", "orig_city_name": "Sankt-Peterburg", "orig_region": "66", "orig_country": "ru"},
{"mis_city_name": "Aldea Chimel", "orig_city_name": "Chimel", "orig_region": "01", "orig_country": "gt"},
{"mis_city_name": "Vicuña", "orig_city_name": "Vicuna", "orig_region": "07", "orig_country": "cl"},
{"mis_city_name": "Dippenhall", "orig_city_name": "Farnham", "orig_region": "E4", "orig_country": "gb"},
{"mis_city_name": "Königshütte (Chorzów)", "orig_city_name": "Königshütte", "orig_region": "14", "orig_country": "de"},
{"mis_city_name": "Skedsmo", "orig_city_name": "Skedsmokorset", "orig_region": "01", "orig_country": "no"},
{"mis_city_name": "Kingston, ON", "orig_city_name": "Kingston", "orig_region": "08", "orig_country": "ca"},
{"mis_city_name": "Leggiuno-Sangiano", "orig_city_name": "Varese", "orig_region": "09", "orig_country": "it"},
{"mis_city_name": "Taktser", "orig_city_name": "Qinghaihu", "orig_region": "06", "orig_country": "cn"},
{"mis_city_name": "Gränichen", "orig_city_name": "Granichen", "orig_region": "01", "orig_country": "ch"},
{"mis_city_name": "Neisse (Nysa)", "orig_city_name": "Neisse", "orig_region": "79", "orig_country": "pl"},
{"mis_city_name": "St. Columb Minor", "orig_city_name": "Saint Columb Minor", "orig_region": "C6", "orig_country": "gb"},
{"mis_city_name": "Zelvas", "orig_city_name": "Zelva", "orig_region": "60", "orig_country": "lt"},
{"mis_city_name": "Olympus, TN", "orig_city_name": "Byrdstown", "orig_region": "TN", "orig_country": "us"},
{"mis_city_name": "Sorau (Zory)", "orig_city_name": "Sorau in Nieder Lausitz", "orig_region": "76", "orig_country": "pl"},
{"mis_city_name": "Gjesdal", "orig_city_name": "Ålgård", "orig_region": "14", "orig_country": "no"},
{"mis_city_name": "Viipuri (Vyborg)", "orig_city_name": "Viipuri", "orig_region": "42", "orig_country": "ru"},
{"mis_city_name": "Kvikne", "orig_city_name": "Kviknepladsen", "orig_region": "06", "orig_country": "no"},
{"mis_city_name": "Nuoro, Sardinia", "orig_city_name": "Nuoro", "orig_region": "14", "orig_country": "it"},
{"mis_city_name": "Priluka (Nova Pryluka)", "orig_city_name": "Priluka Novaya", "orig_region": "23", "orig_country": "ua"},
{"mis_city_name": "Laibach (Ljubljana)", "orig_city_name": "Laibach", "orig_region": "04", "orig_country": "si"},
{"mis_city_name": "Smyrna (Izmir)", "orig_city_name": "Smyrna", "orig_region": "35", "orig_country": "tr"},
{"mis_city_name": "Mexico City", "orig_city_name": "Mexico", "orig_region": "09", "orig_country": "mx"},
{"mis_city_name": "Timmins, ON", "orig_city_name": "Timmins", "orig_region": "08", "orig_country": "ca"},
{"mis_city_name": "San José, CA", "orig_city_name": "San Jose", "orig_region": "CA", "orig_country": "us"},
{"mis_city_name": "Jamaica Plain, MA (Boston)", "orig_city_name": "Jamaica Plain", "orig_region": "MA", "orig_country": "us"},
{"mis_city_name": "Nitzkydorf, Banat", "orig_city_name": "Nitzkydorf", "orig_region": "36", "orig_country": "ro"},
{"mis_city_name": "Waltersdorf (Niegoslawice)", "orig_city_name": "Waltersdorf", "orig_region": "76", "orig_country": "pl"},
{"mis_city_name": "Agrigento, Sicily", "orig_city_name": "Agrigento", "orig_region": "15", "orig_country": "it"},
{"mis_city_name": "Medicine Hat, Alberta", "orig_city_name": "Medicine Hat", "orig_region": "01", "orig_country": "ca"},
{"mis_city_name": "&#346;eteniai", "orig_city_name": "Kedainiai", "orig_region": "60", "orig_country": "lt"},
{"mis_city_name": "Kharkov (Kharkiv)", "orig_city_name": "Kharkov", "orig_region": "07", "orig_country": "ua"},
{"mis_city_name": "Wailacama", "orig_city_name": "Dukuh Kupang Timur", "orig_region": "08", "orig_country": "id"},
{"mis_city_name": "Pressburg (Bratislava)", "orig_city_name": "Pressburg", "orig_region": "02", "orig_country": "sk"},
{"mis_city_name": "Lennep (Remscheid)", "orig_city_name": "Lennep", "orig_region": "07", "orig_country": "de"},
{"mis_city_name": "Higashimatsuyama", "orig_city_name": "Higashi-Matsuyama", "orig_region": "34", "orig_country": "jp"},
{"mis_city_name": "Nam Ha province", "orig_city_name": "Phu Ly", "orig_region": "80", "orig_country": "vn"},
{"mis_city_name": "Cheetham Hill, near Manchester  ", "orig_city_name": "Manchester", "orig_region": "I2", "orig_country": "gb"},
{"mis_city_name": "Kingston Hill", "orig_city_name": "London", "orig_region": "H9", "orig_country": "gb"},
{"mis_city_name": "Hofei, Anhwei", "orig_city_name": "Hefei", "orig_region": "01", "orig_country": "cn"},
{"mis_city_name": "St. Louis, MO", "orig_city_name": "Saint Louis", "orig_region": "MO", "orig_country": "us"},
{"mis_city_name": "Dabrovica", "orig_city_name": "Dabrowica", "orig_region": "77", "orig_country": "pl"},
{"mis_city_name": "Mount Verno, NY", "orig_city_name": "Mount Vernon", "orig_region": "NY", "orig_country": "us"},
{"mis_city_name": "Ta'izz", "orig_city_name": "Taiz", "orig_region": "11", "orig_country": "ye"},
{"mis_city_name": "Mit Abu al-Kawm", "orig_city_name": "Mit Abu al Kawm", "orig_region": "09", "orig_country": "eg"},
{"mis_city_name": "Chidambaram, Tamil Nadu", "orig_city_name": "Chidambaram", "orig_region": "25", "orig_country": "in"},
{"mis_city_name": "Tananarive (Antananarivo)", "orig_city_name": "Tananarive", "orig_region": "05", "orig_country": "mg"},
{"mis_city_name": "Frankfurt-on-the-Main", "orig_city_name": "Frankfurt am Main", "orig_region": "05", "orig_country": "de"},
{"mis_city_name": "Hoechst", "orig_city_name": "Frankfurt am Main", "orig_region": "05", "orig_country": "de"},
{"mis_city_name": "Jhang Maghi&#257;na", "orig_city_name": "Jhang", "orig_region": "12", "orig_country": "in"},
{"mis_city_name": "Zhejiang Ningbo", "orig_city_name": "Ningbo", "orig_region": "14", "orig_country": "cn"},
{"mis_city_name": "Wilno (Vilnius)", "orig_city_name": "Wilno", "orig_region": "65", "orig_country": "lt"},
{"mis_city_name": "Langford Grove, Maldon, Essex", "orig_city_name": "Langford", "orig_region": "E4  ", "orig_country": "gb"},
{"mis_city_name": "Clausthal (Clausthal-Zellerfeld)", "orig_city_name": "Clausthal", "orig_region": "06", "orig_country": "de"},
{"mis_city_name": "Strassburg (Strasbourg)", "orig_city_name": "Strassburg", "orig_region": "C1  ", "orig_country": "fr"},
{"mis_city_name": "Rangoon (Yangon)", "orig_city_name": "Rangoon", "orig_region": "17", "orig_country": "mm"},
{"mis_city_name": "Strehlen (Strzelin)", "orig_city_name": "Strehlen", "orig_region": "72", "orig_country": "pl"},
{"mis_city_name": "Buczacz (Buchach)", "orig_city_name": "Buczacz", "orig_region": "22", "orig_country": "ua"},
{"mis_city_name": "Kalgoorlie", "orig_city_name": "Boulder", "orig_region": "08", "orig_country": "au"},
{"mis_city_name": "Nizhny Tagil", "orig_city_name": "Nizhnii Tagil", "orig_region": "71", "orig_country": "ru"},
{"mis_city_name": "Bremerhaven-Lehe", "orig_city_name": "Bremerhaven", "orig_region": "03", "orig_country": "de"},
{"mis_city_name": "Corteno", "orig_city_name": "Brescia", "orig_region": "09", "orig_country": "it"},
{"mis_city_name": "Windsor, ON", "orig_city_name": "Windsor", "orig_region": "08", "orig_country": "ca"},
{"mis_city_name": "Bnin (Kórnik)", "orig_city_name": "Bnin", "orig_region": "86", "orig_country": "pl"},
{"mis_city_name": "Iria Flavia", "orig_city_name": "Iria", "orig_region": "58", "orig_country": "es"},
{"mis_city_name": "Breslau (Wroclaw)", "orig_city_name": "Breslau", "orig_region": "72", "orig_country": "pl"},
{"mis_city_name": "Uskup (Skopje)", "orig_city_name": "Üsküp", "orig_region": "39", "orig_country": "tr"},
{"mis_city_name": "Koenigsberg (Kaliningrad)", "orig_city_name": "Königsberg", "orig_region": "23", "orig_country": "ru"},
{"mis_city_name": "Reykjavik", "orig_city_name": "Reykjavík", "orig_region": "10", "orig_country": "is"},
{"mis_city_name": "Petilla de Aragó", "orig_city_name": "Petilla de Aragón", "orig_region": "32", "orig_country": "es"},
{"mis_city_name": "'s Graveland", "orig_city_name": "'s-Graveland", "orig_region": "07", "orig_country": "nl"},
{"mis_city_name": "Zloczov", "orig_city_name": "Zloczow", "orig_region": "15", "orig_country": "ua"},
{"mis_city_name": "Vishneva", "orig_city_name": "Wisniowa", "orig_region": "77", "orig_country": "pl"},
{"mis_city_name": "St. Paul, MN", "orig_city_name": "Saint Paul", "orig_region": "MN", "orig_country": "us"},
{"mis_city_name": "Kristiania (Oslo)", "orig_city_name": "Kristiania", "orig_region": "12", "orig_country": "no"},
{"mis_city_name": "Rendcombe", "orig_city_name": "Cirencester", "orig_region": "E6", "orig_country": "gb"},
{"mis_city_name": "Vitebsk, Belorussia", "orig_city_name": "Vitebsk", "orig_region": "07", "orig_country": "by"},
{"mis_city_name": "Mürzzuschlag", "orig_city_name": "Murzzuschlag", "orig_region": "06", "orig_country": "at"},
{"mis_city_name": "Val di Castello", "orig_city_name": "Pietrasanta", "orig_region": "16", "orig_country": "it"},
{"mis_city_name": "Glencorse", "orig_city_name": "Midlothian", "orig_region": "V5", "orig_country": "gb"},
{"mis_city_name": "Strelno (Strzelno)", "orig_city_name": "Strelno", "orig_region": "73", "orig_country": "pl"},
{"mis_city_name": "Lochfield", "orig_city_name": "Lichfield", "orig_region": "N1", "orig_country": "gb"},
{"mis_city_name": "the Hague", "orig_city_name": "The Hague", "orig_region": "11", "orig_country": "nl"},
{"mis_city_name": "Ivano-Frankivsk", "orig_city_name": "IvanoFrankivsk", "orig_region": "06", "orig_country": "ua"},
{"mis_city_name": "Victoria, BC", "orig_city_name": "Victoria", "orig_region": "02", "orig_country": "ca"},
{"mis_city_name": "Lethbridge, Alberta", "orig_city_name": "Lethbridge", "orig_region": "01", "orig_country": "ca"},
{"mis_city_name": "Wickenberg, AZ", "orig_city_name": "Wickenburg", "orig_region": "AZ", "orig_country": "us"},
{"mis_city_name": "Grantchester", "orig_city_name": "Cambridgeshire", "orig_region": "C3", "orig_country": "gb"},
{"mis_city_name": "Wakulla Springs State Park, FL  ", "orig_city_name": "Wakulla Springs", "orig_region": "FL", "orig_country": "us"},
{"mis_city_name": "Presqu'île-de-Giens", "orig_city_name": "Giens", "orig_region": "B8", "orig_country": "fr"},
{"mis_city_name": "Newfoundland", "orig_city_name": "Saint John's", "orig_region": "05", "orig_country": "ca"},
{"mis_city_name": "New Jersey, NJ", "orig_city_name": "Jersey City", "orig_region": "NJ", "orig_country": "us"},
{"mis_city_name": "Penrhyndeudraeth", "orig_city_name": "Penrhyndeudreath", "orig_region": "Y2", "orig_country": "gb"},
{"mis_city_name": "Perranarworthal", "orig_city_name": "County of Cornwall", "orig_region": "C6", "orig_country": "gb"},
{"mis_city_name": "Llangarron", "orig_city_name": "Ross on Wye", "orig_region": "F7", "orig_country": "gb"},
{"mis_city_name": "Kraków", "orig_city_name": "Krakow", "orig_region": "77", "orig_country": "pl"},
{"mis_city_name": "Hamilton, Ontario", "orig_city_name": "Hamilton", "orig_region": "08", "orig_country": "ca"},
{"mis_city_name": "Putney Heath", "orig_city_name": "London", "orig_region": "H9", "orig_country": "gb"},
{"mis_city_name": "Shipston-on-Stour", "orig_city_name": "Shipston on Stour", "orig_region": "P3", "orig_country": "gb"},
{"mis_city_name": "Brive-Corrèze", "orig_city_name": "Brive-la-Gaillarde", "orig_region": "B1", "orig_country": "fr"},
{"mis_city_name": "Övralid", "orig_city_name": "Motala", "orig_region": "16", "orig_country": "se"},
{"mis_city_name": "Bornheim-Merten", "orig_city_name": "Merten", "orig_region": "07", "orig_country": "de"},
{"mis_city_name": "Truro, NS", "orig_city_name": "Truro", "orig_region": "07", "orig_country": "ca"},
{"mis_city_name": "Waterford, CT", "orig_city_name": "New London", "orig_region": "CT", "orig_country": "us"},
{"mis_city_name": "West Berlin", "orig_city_name": "Berlin", "orig_region": "16", "orig_country": "de"},
{"mis_city_name": "Palma, Majorca", "orig_city_name": "Palma de Mallorca", "orig_region": "07", "orig_country": "es"},
{"mis_city_name": "Ayot St. Lawrence", "orig_city_name": "Ayot Saint Lawrence", "orig_region": "F8", "orig_country": "gb"},
{"mis_city_name": "Moffett Field, CA", "orig_city_name": "Santa Clara", "orig_region": "CA", "orig_country": "us"},
{"mis_city_name": "Lewes, East Sussex", "orig_city_name": "Lewes", "orig_region": "E2", "orig_country": "gb"},
{"mis_city_name": "Agnetendorf (Jagniatków)", "orig_city_name": "Jelenia Gora", "orig_region": "72", "orig_country": "pl"},
{"mis_city_name": "Research Triangle Park, NC", "orig_city_name": "Raleigh", "orig_region": "NC", "orig_country": "us"},
{"mis_city_name": "Bucksburn (Scotland)", "orig_city_name": "Bucksburn", "orig_region": "T5", "orig_country": "gb"},
{"mis_city_name": "Lidingö-Stockholm", "orig_city_name": "Lidingö", "orig_region": "26", "orig_country": "se"},
{"mis_city_name": "Breisgau", "orig_city_name": "Freiburg im Breisgau", "orig_region": "01", "orig_country": "de"},
{"mis_city_name": "J&uuml;lich", "orig_city_name": "Jülich", "orig_region": "07", "orig_country": "de"},
{"mis_city_name": "Mannheim-Rheinau", "orig_city_name": "Rheinau", "orig_region": "01", "orig_country": "de"},
{"mis_city_name": "Sapporo", "orig_city_name": "Sapporo-shi", "orig_region": "12", "orig_country": "jp"},
{"mis_city_name": "Berlin-Dahlem", "orig_city_name": "Dahlem", "orig_region": "16", "orig_country": "de"},
{"mis_city_name": "Long Island, New York, NY", "orig_city_name": "Long Island City", "orig_region": "NY", "orig_country": "us"},
{"mis_city_name": "Harwell, Berkshire", "orig_city_name": "Harwell", "orig_region": "K2", "orig_country": "gb"},
{"mis_city_name": "Argonne, IL", "orig_city_name": "Lemont", "orig_region": "IL", "orig_country": "us"},
{"mis_city_name": "Altenberg; Grünau im Almtal", "orig_city_name": "Altenberg", "orig_region": "04", "orig_country": "at"},
{"mis_city_name": "Massachusetts, MA", "orig_city_name": "Boston", "orig_region": "MA", "orig_country": "us"},
{"mis_city_name": "Mülheim/Ruhr", "orig_city_name": "Mülheim an der Ruhr", "orig_region": "07", "orig_country": "de"},
{"mis_city_name": "Rüschlikon", "orig_city_name": "Zurich", "orig_region": "25", "orig_country": "ch"},    
{"mis_city_name": "Kyoto", "orig_city_name": "Kyoto-shi", "orig_region": "22", "orig_country": "jp"},
{"mis_city_name": "Guebwiller", "orig_city_name": "Guebwiller", "orig_region": "C1", "orig_country": "fr"},
{"mis_city_name": "Lahore", "orig_city_name": "Lahore", "orig_region": "02", "orig_country": "pk"},
{"mis_city_name": "Thorshavn", "orig_city_name": "Thorshavn", "orig_region": "00", "orig_country": "fo"},
{"mis_city_name": "Cluny", "orig_city_name": "Offord Cluny", "orig_region": "C3", "orig_country": "gb"},
{"mis_city_name": "Lagow", "orig_city_name": "Lagow", "orig_region": "72", "orig_country": "pl"},
{"mis_city_name": "Neuchâtel", "orig_city_name": "Neuchâtel", "orig_region": "A6", "orig_country": "fr"},
{"mis_city_name": "Kaysersberg", "orig_city_name": "Kaysersberg", "orig_region": "C1", "orig_country": "fr"},
{"mis_city_name": "Dili", "orig_city_name": "Dukuh Kupang Timur", "orig_region": "08", "orig_country": "id"},
{"mis_city_name": "Bad Salzbrunn", "orig_city_name": "Bad Salzbrunn", "orig_region": "72", "orig_country": "pl"},
{"mis_city_name": "San Juan", "orig_city_name": "San Juan", "orig_region": "PR", "orig_country": "us"},
{"mis_city_name": "Lanzarote", "orig_city_name": "Arrecife", "orig_region": "53", "orig_country": "es"},
{"mis_city_name": "Hong Kong", "orig_city_name": "Hong Kong", "orig_region": "00", "orig_country": "hk"}]

df_tmp_mis_cities = sp.createDataFrame(data)

In [76]:
join_expr = (f.trim(df_tmp_mis_cities["orig_city_name"]) == f.trim(df_cities["accent_city_name"])) \
            & (f.trim(df_tmp_mis_cities["orig_region"]) == f.trim(df_cities["city_region"])) \
            & (f.trim(df_tmp_mis_cities["orig_country"]) == f.trim(df_cities["Country"]))

df_tmp_mis_cities = df_tmp_mis_cities.join(df_cities, join_expr, "left_outer") \
                                     .select("city_id", "mis_city_name", "orig_city_name", "orig_region", "orig_country") \
                                     .sort("city_id")

## Societies table
Contains a list of Societies: laureates with laureate type = "Organization"

Check the Laureate ID column: check that 1 laureate ID matches 1 laureate name

In [77]:
df_nobel_orig.select("Laureate ID", "Full Name").distinct().sort("Laureate ID") \
             .groupBy("Laureate ID").count().where("count > 1").show()

+-----------+-----+
|Laureate ID|count|
+-----------+-----+
+-----------+-----+



1 ID - 1 Name. The original ID will be preserved for the Societies and Laureate Persons dataframes.

In [78]:
df_societies = df_nobel_orig.withColumnRenamed("Laureate Type", "laureate_type") \
                            .withColumnRenamed("Laureate ID", "laureate_id") \
                            .withColumnRenamed("Full Name", "society_name") \
                            .where("laureate_type = 'Organization'") \
                            .select("laureate_id", "society_name") \
                            .distinct() \
                            .sort("laureate_id")
    
# Check results
df_societies.show(3,False)

+-----------+---------------------------------------------------------------------------------+
|laureate_id|society_name                                                                     |
+-----------+---------------------------------------------------------------------------------+
|467        |Institut de droit international (Institute of International Law)                 |
|477        |Bureau international permanent de la Paix (Permanent International Peace Bureau) |
|482        |Comité international de la Croix Rouge (International Committee of the Red Cross)|
+-----------+---------------------------------------------------------------------------------+
only showing top 3 rows



In [None]:
# Save to hive - hadoop
# df_societies.write.format("parquet").mode("overwrite").saveAsTable("whdb.societies")

## Laureate Persons table
Contains a list of laureates persons: laureates with laureate type = "Individual"<br>
The original ID will be preserved for the Societies and Laureate Persons tables.

In [79]:
df_laureates_persons = df_nobel_orig.withColumnRenamed("Laureate Type", "laureate_type") \
                                    .withColumnRenamed("Laureate ID", "laureate_id") \
                                    .withColumnRenamed("Full Name", "full_name") \
                                    .withColumnRenamed("Birth Date", "birth_date") \
                                    .withColumnRenamed("Birth City", "birth_city") \
                                    .withColumnRenamed("Birth Country", "birth_country") \
                                    .withColumnRenamed("Death Date", "death_date") \
                                    .withColumnRenamed("Death City", "death_city") \
                                    .withColumnRenamed("Death Country", "death_country") \
                                    .where("laureate_type = 'Individual'") \
                                    .select("laureate_id", "full_name", "Sex", "birth_date", "birth_city", \
                                            "birth_country", "death_date", "death_city", "death_country") \
                                    .distinct()

### Add Gender ID

In [80]:
join_expr = f.trim(df_laureates_persons["Sex"]) == f.trim(df_genders["gender"])
df_laureates_persons = df_laureates_persons.join(df_genders, join_expr, "left_outer") \
                                           .select("laureate_id", "full_name", "gender_id", "birth_date", "birth_city", \
                                                   "birth_country", "death_date", "death_city", "death_country")

### Add Birth Country ID

In [81]:
join_expr = f.trim(df_laureates_persons["birth_country"]) == f.trim(df_countries["country_name"])
df_laureates_persons = df_laureates_persons.join(df_countries, join_expr, "left_outer") \
                                           .withColumnRenamed("country_id", "birth_country_id") \
                                           .withColumnRenamed("parent_country_id", "birth_parent_country_id") \
                                           .select("laureate_id", "full_name", "gender_id", "birth_date", "birth_city", \
                                                   "birth_country_id", "death_date", "death_city", "death_country", \
                                                   "birth_country", "birth_parent_country_id")

In [82]:
# Check null values
df_laureates_persons.where("birth_country_id is null and birth_country is not null").count()

0

### Add Death Country ID

In [83]:
join_expr = f.trim(df_laureates_persons["death_country"]) == f.trim(df_countries["country_name"])
df_laureates_persons = df_laureates_persons.join(df_countries, join_expr, "left_outer") \
                                           .withColumnRenamed("country_id", "death_country_id") \
                                           .withColumnRenamed("parent_country_id", "death_parent_country_id") \
                                           .select("laureate_id", "full_name", "gender_id", "birth_date", "birth_city", \
                                                   "birth_country_id", "death_date", "death_city", "death_country_id", \
                                                   "birth_parent_country_id", "death_parent_country_id", "death_country")

In [84]:
# Check null values
df_laureates_persons.where("death_country_id is null and death_country is not null").count()

0

### Add Birth City ID

Add existing in the Cities dataframe names  - current countries<br>
Join Cities dataframe by city name + country ID

In [85]:
join_expr = (f.trim(df_laureates_persons["birth_city"]) == f.trim(df_cities["CityUpd"])) \
            & (df_laureates_persons["birth_country_id"] == df_cities["country_id"])
df_laureates_persons = df_laureates_persons.join(df_cities, join_expr, "left_outer") \
                                           .withColumnRenamed("city_id", "birth_city_id") \
                                           .drop("city_name", "accent_city_name", "country_id", "city_region", "population", \
                                                 "latitude", "longitude", "CityUpd", "Country")

# Some cities are not unique by "City Name" + "Country". Remove duplicates from dataframe, use first City ID number
w = Window.partitionBy("laureate_id").orderBy(f.col("laureate_id"))
df_laureates_persons = df_laureates_persons.withColumn("row", f.row_number().over(w)) \
                                           .filter(f.col("row") == 1) \
                                           .drop("row")

Add existing in the Cities dataframe names - historical countries<br>
Join Cities dataframe by city name + parent country ID

In [86]:
join_expr = (f.trim(df_laureates_persons["birth_city"]) == f.trim(df_cities["CityUpd"])) \
            & (df_laureates_persons["birth_parent_country_id"] == df_cities["country_id"])
df_laureates_persons = df_laureates_persons.join(df_cities, join_expr, "left_outer") \
                                           .withColumn("birth_city_id", \
                                              f.when(f.col("birth_city_id") >= 0, f.col("birth_city_id")) \
                                               .otherwise(f.col("city_id"))) \
                                           .drop("city_name", "accent_city_name", "country_id", "city_region", "population", \
                                                 "latitude", "longitude", "CityUpd", "Country", "city_id")

# Some cities are not unique by "City Name" + "Country". Remove duplicates from dataframe, use first City ID number
w = Window.partitionBy("laureate_id").orderBy(f.col("laureate_id"))
df_laureates_persons = df_laureates_persons.withColumn("row", f.row_number().over(w)) \
                                           .filter(f.col("row") == 1) \
                                           .drop("row")

Add missing in the Cities dataframe names<br>
Join missing city names to the Laureates dataframe - use the dataframe df_tmp_mis_cities created above

In [87]:
join_expr = f.trim(df_laureates_persons["birth_city"]) == f.trim(df_tmp_mis_cities["mis_city_name"])
df_laureates_persons = df_laureates_persons.join(df_tmp_mis_cities, join_expr, "left_outer") \
                                           .withColumn("birth_city_id", \
                                              f.when(f.col("birth_city_id") >= 0, f.col("birth_city_id")) \
                                               .otherwise(f.col("city_id"))) \
                                               .select("laureate_id", "full_name", "gender_id", "birth_date", "birth_city_id", \
                                                       "birth_country_id", "death_date", "death_city", "death_country_id", \
                                                       "birth_parent_country_id", "death_parent_country_id", "birth_city")

In [88]:
# Check null values
df_laureates_persons.where("birth_city_id is null and birth_city is not null").count()

0

### Add Death City ID

Add existing in the Cities dataframe names  - current countries<br>
Join Cities dataframe by city name + country ID

In [89]:
join_expr = (f.trim(df_laureates_persons["death_city"]) == f.trim(df_cities["CityUpd"])) \
            & (df_laureates_persons["death_country_id"] == df_cities["country_id"])
df_laureates_persons = df_laureates_persons.join(df_cities, join_expr, "left_outer") \
                                           .withColumnRenamed("city_id", "death_city_id") \
                                           .drop("city_name", "accent_city_name", "country_id", "city_region", "population", \
                                                 "latitude", "longitude", "CityUpd", "Country")

# Some cities are not unique by "City Name" + "Country". Remove duplicates from dataframe, use first City ID number
w = Window.partitionBy("laureate_id").orderBy(f.col("laureate_id"))
df_laureates_persons = df_laureates_persons.withColumn("row", f.row_number().over(w)) \
                                           .filter(f.col("row") == 1) \
                                           .drop("row")

Add existing in the Cities dataframe names - historical countries<br>
Join Cities dataframe by city name + parent country ID

In [90]:
join_expr = (f.trim(df_laureates_persons["death_city"]) == f.trim(df_cities["CityUpd"])) \
            & (df_laureates_persons["death_parent_country_id"] == df_cities["country_id"])
df_laureates_persons = df_laureates_persons.join(df_cities, join_expr, "left_outer") \
                                           .withColumn("death_city_id", \
                                              f.when(f.col("death_city_id") >= 0, f.col("death_city_id")) \
                                               .otherwise(f.col("city_id"))) \
                                           .drop("city_name", "accent_city_name", "country_id", "city_region", "population", \
                                                 "latitude", "longitude", "CityUpd", "Country", "city_id")

# Some cities are not unique by "City Name" + "Country". Remove duplicates from dataframe, use first City ID number
w = Window.partitionBy("laureate_id").orderBy(f.col("laureate_id"))
df_laureates_persons = df_laureates_persons.withColumn("row", f.row_number().over(w)) \
                                           .filter(f.col("row") == 1) \
                                           .drop("row")

Add missing in the Cities dataframe names<br>
Join missing city names to the Laureates dataframe - use the dataframe df_tmp_mis_cities created above

In [91]:
join_expr = f.trim(df_laureates_persons["death_city"]) == f.trim(df_tmp_mis_cities["mis_city_name"])
df_laureates_persons = df_laureates_persons.join(df_tmp_mis_cities, join_expr, "left_outer") \
                                           .withColumn("death_city_id", \
                                              f.when(f.col("death_city_id") >= 0, f.col("death_city_id")) \
                                               .otherwise(f.col("city_id"))) \
                                           .select("laureate_id", "full_name", "gender_id", "birth_date", "birth_city_id", \
                                                   "birth_country_id", "death_date", "death_city_id", "death_country_id", \
                                                   "birth_parent_country_id", "death_parent_country_id", "death_city")

In [92]:
# Check null values
df_laureates_persons.where("death_city_id is null and death_city is not null").count()

0

In [93]:
df_laureates_persons = df_laureates_persons.select("laureate_id", "full_name", "gender_id", "birth_date", "birth_city_id", \
                                                   "birth_country_id", "death_date", "death_city_id", "death_country_id") \
                                           .sort("laureate_id")

Dataframe is ready.<br>
Check dataframe

In [94]:
df_laureates_persons.show(3,False)

+-----------+----------------------+---------+----------+-------------+----------------+----------+-------------+----------------+
|laureate_id|full_name             |gender_id|birth_date|birth_city_id|birth_country_id|death_date|death_city_id|death_country_id|
+-----------+----------------------+---------+----------+-------------+----------------+----------+-------------+----------------+
|1          |Wilhelm Conrad Röntgen|1        |1845-03-27|8590091352   |289             |1923-02-10|8590071893   |81              |
|2          |Hendrik Antoon Lorentz|1        |1853-07-18|17180141601  |154             |1928-02-04|null         |154             |
|3          |Pieter Zeeman         |1        |1865-05-25|17180150094  |154             |1943-10-09|17180141534  |154             |
+-----------+----------------------+---------+----------+-------------+----------------+----------+-------------+----------------+
only showing top 3 rows



In [None]:
# Save to hive - hadoop
# df_laureates_persons.write.format("parquet").mode("overwrite").saveAsTable("whdb.laureates_persons")

## Organizations table
Contains a list of organizations where the laureates ("individual") worked

In [95]:
# Update null City ID and Country ID for organizations
df_nobel_orig = df_nobel_orig.withColumn("Organization City", \
                                f.when(f.trim(f.col("Organization Name")) == 'Howard Hughes Medical Institute', 'Durham, NC') \
                                 .when(f.trim(f.col("Organization Name")) == 'University of Delaware', 'Newark, DE') \
                                 .otherwise(f.col("Organization City"))) \
                             .withColumn("Organization Country", \
                                f.when(f.trim(f.col("Organization Name")) == 'Howard Hughes Medical Institute',\
                                       'United States') \
                                 .when((f.trim(f.col("Organization Name")) == 'Institut Pasteur') \
                                       & (f.trim(f.col("Organization City")) == 'Tunis'), \
                                       'Tunisia') \
                                 .otherwise(f.col("Organization Country")))

In [96]:
df_organizations = df_nobel_orig.withColumnRenamed("Laureate Type", "laureate_type") \
                                .withColumnRenamed("Organization Name", "organization_name") \
                                .withColumnRenamed("Organization Country", "organization_country") \
                                .withColumnRenamed("Organization City", "organization_city") \
                                .select("organization_name", "organization_country", "organization_city") \
                                .where("laureate_type = 'Individual' and organization_name is not null") \
                                .distinct() \
                                .sort("organization_name")

# Add Organization ID
w = Window.orderBy("organization_name")
df_organizations = df_organizations.withColumn("organization_id", f.row_number().over(w) - 1) \
                                   .select("organization_id", "organization_name", "organization_country", "organization_city")

In [97]:
# Check updated country and city
df_organizations.where("organization_name in ('University of Delaware', 'Institut Pasteur')").show(10,False)

+---------------+----------------------+--------------------+-----------------+
|organization_id|organization_name     |organization_country|organization_city|
+---------------+----------------------+--------------------+-----------------+
|96             |Institut Pasteur      |France              |Paris            |
|97             |Institut Pasteur      |Tunisia             |Tunis            |
|266            |University of Delaware|United States       |Newark, DE       |
+---------------+----------------------+--------------------+-----------------+



Add parent organizations that are not in the list of organizations

In [98]:
# Parent organization not listed in the Nobel Prizes dataframe
data = [{"org_name": "Duke University", "org_country": "United States", "org_city": "Durham, NC"},
{"org_name": "Max Planck Society", "org_country": "Germany", "org_city": "Munich"},
{"org_name": "The University of California", "org_country": "United States", "org_city": "Berkeley, CA"}]

df_tmp_mis_org = sp.createDataFrame(data)

# Check the max value of organization_id
max_count_id = df_organizations.select(f.max("organization_id")).collect()[0][0]

# Add organization_id - starts from the next organization_id
w = Window.orderBy("org_name")
df_tmp_mis_org = df_tmp_mis_org.withColumn("org_id", f.row_number().over(w) + max_count_id) \
                               .select("org_id", "org_name", "org_country", "org_city")

In [99]:
# Union dataframes
df_organizations = df_organizations.union(df_tmp_mis_org)

### Add Parent Organization ID

In [100]:
df_organizations.createOrReplaceTempView("ORG")
df_organizations = sp.sql("select organization_id, organization_name, organization_city, organization_country, \
       case \
       when trim(organization_name) = 'Australian National University' and trim(organization_city) = 'Weston Creek' \
            then (select organization_id from ORG where trim(organization_name) = 'Australian National University' \
            and trim(organization_city) = 'Canberra') \
        when (trim(organization_name) = 'Bell Laboratories' and trim(organization_city) = 'Holmdel, NJ') \
        or trim(organization_name) = 'Bell Telephone Laboratories' \
            then (select organization_id from ORG where trim(organization_name) = 'Bell Laboratories' \
            and trim(organization_city) = 'Murray Hill, NJ') \
        when organization_name like 'Columbia University Division%'  \
            then (select organization_id from ORG where trim(organization_name) = 'Columbia University') \
        when organization_name like 'Duke University %'  \
            then (select organization_id from ORG where trim(organization_name) = 'Duke University') \
        when organization_name like '%Fritz-Haber-Institut%' or organization_name like '%Max-Planck%' \
            or organization_name like '%Max Planck Institute%' \
            then (select organization_id from ORG where trim(organization_name) = 'Max Planck Society') \
        when organization_name like 'Harvard University,%'  \
            then (select organization_id from ORG where trim(organization_name) = 'Harvard University') \
        when trim(organization_name) = 'I.G. Farbenindustrie A.G.' and trim(organization_city) = 'Heidelberg' \
            then (select organization_id from ORG where trim(organization_name) = 'I.G. Farbenindustrie A.G.' \
            and trim(organization_city) = 'Mannheim-Rheinau') \
        when trim(organization_name) = 'Imperial Cancer Research Fund Laboratory' \
            then (select organization_id from ORG where trim(organization_name) = 'Imperial Cancer Research Fund') \
        when trim(organization_name) = 'Johns Hopkins University School of Medicine' \
            then (select organization_id from ORG where trim(organization_name) = 'Johns Hopkins University') \
        when trim(organization_name) = 'Karolinska Institutet, Nobel Medical Institute' \
            then (select organization_id from ORG where trim(organization_name) = 'Karolinska Institutet') \
        when trim(organization_name) = 'Kiel University' and trim(organization_country) = 'Federal Republic of Germany' \
            then (select organization_id from ORG where trim(organization_name) = 'Kiel University' \
            and trim(organization_country) = 'Germany') \
        when organization_name like 'London University,%' \
            then (select organization_id from ORG where trim(organization_name) = 'London University') \
        when organization_name like 'Massachusetts Institute of Technology (MIT),%' \
            then (select organization_id from ORG \
            where trim(organization_name) = 'Massachusetts Institute of Technology (MIT)') \
        when trim(organization_name) = 'New York University, College of Medicine' \
            then (select organization_id from ORG where trim(organization_name) = 'New York University') \
        when trim(organization_name) = 'P.N. Lebedev Physical Institute' \
        and trim(organization_country) = 'Union of Soviet Socialist Republics' \
            then (select organization_id from ORG where trim(organization_name) = 'P.N. Lebedev Physical Institute' \
            and trim(organization_country) = 'Russia') \
        when trim(organization_name) = 'Rockefeller Institute for Medical Research' \
            then (select organization_id from ORG where trim(organization_name) = 'Rockefeller University') \
        when trim(organization_name) = 'Sorbonne University, Institut Henri Poincaré' \
            then (select organization_id from ORG where trim(organization_name) = 'Sorbonne University') \
        when trim(organization_name) = 'Stanford University School of Medicine' \
            then (select organization_id from ORG where trim(organization_name) = 'Stanford University') \
        when organization_name like 'University of California%' and trim(organization_city) != 'Berkeley, CA' \
            then (select organization_id from ORG where trim(organization_name) = 'University of California' \
            and trim(organization_city) = 'Berkeley, CA') \
        when trim(organization_name) = 'University of Chicago, Ben May Laboratory for Cancer Research' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Chicago') \
        when trim(organization_name) = 'University of Colorado, JILA' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Colorado') \
        when trim(organization_name) = 'University of Freiburg' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Freiburg im Breisgau') \
        when trim(organization_name) = 'University of Heidelberg' and trim(organization_country) = 'Federal Republic of Germany' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Heidelberg' \
            and trim(organization_country) = 'Germany') \
        when trim(organization_name) = 'University of Oxford, Royal Society' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Oxford') \
        when organization_name like 'University of Texas %' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Texas') \
        when trim(organization_name) = 'University of Tokyo' and trim(organization_city) = 'Kashiwa' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Tokyo' \
            and trim(organization_city) = 'Tokyo') \
        when trim(organization_name) = 'University of Zurich, Institute of Experimental Immunology' \
            then (select organization_id from ORG where trim(organization_name) = 'University of Zurich') \
        when trim(organization_name) = 'Vanderbilt University School of Medicine' \
            then (select organization_id from ORG where trim(organization_name) = 'Vanderbilt Universitye') \
        when trim(organization_name) = 'Veterans Administration Hospital' and trim(organization_city) = 'New Orleans, LA' \
            then (select organization_id from ORG where trim(organization_name) = 'Veterans Administration Hospital' \
            and trim(organization_city) = 'Bronx, NY') \
        when trim(organization_name) = 'Yale University, School of Medicine' \
            then (select organization_id from ORG where trim(organization_name) = 'Yale University') \
       else null end as parent_organization_id \
       from ORG")

In [101]:
# Check results
df_organizations.select("organization_name", "parent_organization_id").where("parent_organization_id is not null").show(2,False)

+------------------------------+----------------------+
|organization_name             |parent_organization_id|
+------------------------------+----------------------+
|Australian National University|9                     |
|Bell Laboratories             |13                    |
+------------------------------+----------------------+
only showing top 2 rows



### Add Organization Country ID

In [102]:
join_expr = f.trim(df_organizations["organization_country"]) == f.trim(df_countries["country_name"])
df_organizations = df_organizations.join(df_countries, join_expr, "left_outer") \
                            .withColumnRenamed("country_id", "organization_country_id") \
                            .withColumnRenamed("parent_country_id", "organization_parent_country_id") \
                            .select("organization_id", "organization_name", "organization_country", "organization_country_id", \
                                    "organization_city", "organization_parent_country_id", "parent_organization_id")

In [103]:
# Check null values
df_organizations.where("organization_country_id is null and organization_country is not null").count()

0

### Add Organization City ID

Add existing in the Cities dataframe names  - current countries<br>
Join Cities dataframe by city name + country ID

In [104]:
join_expr = (f.trim(df_organizations["organization_city"]) == f.trim(df_cities["CityUpd"])) \
                & (df_organizations["organization_country_id"] == df_cities["country_id"])
df_organizations = df_organizations.join(df_cities, join_expr, "left_outer") \
                                   .withColumnRenamed("city_id", "organization_city_id") \
                                   .drop("city_name", "accent_city_name", "country_id", "city_region", "population", \
                                         "latitude", "longitude", "CityUpd", "Country")

# Some cities are not unique by "City Name" + "Country". Remove duplicates from dataframe, use first City ID number
w = Window.partitionBy("organization_id").orderBy(f.col("organization_id"))
df_organizations = df_organizations.withColumn("row", f.row_number().over(w)) \
                                   .filter(f.col("row") == 1).drop("row")

Add existing in the Cities dataframe names - historical countries<br>
Join Cities dataframe by city name + parent country ID

In [105]:
join_expr = (f.trim(df_organizations["organization_city"]) == f.trim(df_cities["CityUpd"])) \
                & (df_organizations["organization_parent_country_id"] == df_cities["country_id"])
df_organizations = df_organizations.join(df_cities, join_expr, "left_outer") \
                                   .withColumn("organization_city_id", \
                                      f.when(f.col("organization_city_id") >= 0, f.col("organization_city_id")) \
                                       .otherwise(f.col("city_id"))) \
                                   .drop("city_name", "accent_city_name", "country_id", "city_region", "population", \
                                         "latitude", "longitude", "CityUpd", "Country", "city_id")

# Some cities are not unique by "City Name" + "Country". Remove duplicates from dataframe, use first City ID number
w = Window.partitionBy("organization_id").orderBy(f.col("organization_id"))
df_organizations = df_organizations.withColumn("row", f.row_number().over(w)) \
                                   .filter(f.col("row") == 1).drop("row")

Add missing in the Cities dataframe names<br>
Join missing city names to the Laureates dataframe - use the dataframe df_tmp_mis_cities created above

In [106]:
join_expr = f.trim(df_organizations["organization_city"]) == f.trim(df_tmp_mis_cities["mis_city_name"])
df_organizations = df_organizations.join(df_tmp_mis_cities, join_expr, "left_outer") \
                                   .withColumn("organization_city_id", \
                                       f.when(f.col("organization_city_id") >= 0, f.col("organization_city_id")) \
                                        .otherwise(f.col("city_id"))) \
                                   .select("organization_id", "organization_name", "organization_country_id", \
                                           "organization_city_id", "parent_organization_id", \
                                           "organization_country", "organization_city") \
                                   .sort("organization_id")

In [107]:
# Check null values
df_organizations.where("organization_city_id is not null and organization_city is null").count()

0

The Organization Country and Organization City columns will be removed later - these columns are required for the Persons in Organizations data frame.

Check dataframe

In [108]:
df_organizations.show(3,False)

+---------------+--------------------------------------+-----------------------+--------------------+----------------------+-----------------------------------+-----------------+
|organization_id|organization_name                     |organization_country_id|organization_city_id|parent_organization_id|organization_country               |organization_city|
+---------------+--------------------------------------+-----------------------+--------------------+----------------------+-----------------------------------+-----------------+
|0              |A.F. Ioffe Physico-Technical Institute|182                    |25769846536         |null                  |Russia                             |St. Petersburg   |
|1              |Aarhus University                     |57                     |745653              |null                  |Denmark                            |Aarhus           |
|2              |Academy of Sciences                   |306                    |17180652633         |null

In [None]:
# Save to hive - hadoop
# df_organizations.write.format("parquet").mode("overwrite").saveAsTable("whdb.organizations")

## Persons in Organizations table
Contains the matches of the Laureate Persons and Organizations in which the laureates worked.

In [109]:
# Use the original Nobel Prizes dataset
df_persons_in_orgs = df_nobel_orig.withColumnRenamed("Laureate Type", "laureate_type") \
                                  .withColumnRenamed("Laureate ID", "laureate_id") \
                                  .withColumnRenamed("Organization Name", "org_name") \
                                  .withColumnRenamed("Organization Country", "org_country") \
                                  .withColumnRenamed("Organization City", "org_city") \
                                  .where("laureate_type = 'Individual' and org_name is not null") \
                                  .select("laureate_id", "org_name", "org_country", "org_city") \
                                  .distinct() \
                                  .sort("laureate_id", "org_name") \
                                  .withColumn("per_org_id", f.monotonically_increasing_id())

### Add Organization Id

In [110]:
join_expr = (f.trim(df_organizations["organization_name"]) == f.trim(df_persons_in_orgs["org_name"])) \
        & (f.trim(df_organizations["organization_country"]) == f.trim(df_persons_in_orgs["org_country"])) \
        & (f.trim(df_organizations["organization_city"]) == f.trim(df_persons_in_orgs["org_city"]))
df_persons_in_orgs = df_persons_in_orgs.join(df_organizations, join_expr, "inner") \
                                       .select("per_org_id", "laureate_id", "organization_id") \
                                       .sort("per_org_id")

In [111]:
# Check null values
df_persons_in_orgs.where("organization_id is null").count()

0

Check the dataframe

In [112]:
df_persons_in_orgs.show(5)

+----------+-----------+---------------+
|per_org_id|laureate_id|organization_id|
+----------+-----------+---------------+
|         0|          1|            166|
|         1|          2|            133|
|         2|          3|              4|
|         3|          4|            333|
|         4|          5|            335|
+----------+-----------+---------------+
only showing top 5 rows



In [None]:
# Save to hive - hadoop
# df_persons_in_orgs.write.format("parquet").mode("overwrite").saveAsTable("whdb.persons_in_orgs")

## Nobel Prizes table
Contains a list of Nobel Prizes<br>
Data analysis shows that the logical key of the Nobel Prize is "Laureate ID" + "Prize"

In [113]:
df_nobel_orig.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Prize: string (nullable = true)
 |-- Motivation: string (nullable = true)
 |-- Prize Share: string (nullable = true)
 |-- Laureate ID: integer (nullable = true)
 |-- Laureate Type: string (nullable = true)
 |-- Full Name: string (nullable = true)
 |-- Birth Date: string (nullable = true)
 |-- Birth City: string (nullable = true)
 |-- Birth Country: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Organization Name: string (nullable = true)
 |-- Organization City: string (nullable = true)
 |-- Organization Country: string (nullable = true)
 |-- Death Date: string (nullable = true)
 |-- Death City: string (nullable = true)
 |-- Death Country: string (nullable = true)



In [114]:
# Columns: Year, Category, Prize, Motivation, Prize Share, Laureate ID, Laureate Type
df_nobel_prizes = df_nobel_orig.withColumnRenamed("Prize Share", "prize_share") \
                               .withColumnRenamed("Motivation", "motivation") \
                               .withColumnRenamed("Year", "year") \
                               .withColumnRenamed("Laureate ID", "laureate_id") \
                               .withColumnRenamed("Laureate Type", "laureate_type_tmp") \
                               .withColumnRenamed("Category", "category_tmp") \
                               .withColumnRenamed("Prize", "prize_tmp") \
                               .select("laureate_id", "year", "motivation", "prize_share", \
                                       "category_tmp", "laureate_type_tmp", "prize_tmp") \
                               .distinct()

### Add Category ID

In [115]:
join_expr = f.trim(df_nobel_prizes["category_tmp"]) == f.trim(df_categories["category"])
df_nobel_prizes = df_nobel_prizes.join(df_categories, join_expr, "left_outer")

In [116]:
# Check null values
df_nobel_prizes.select("category_tmp", "category_id").where("category_id is null").count()

0

### Add Prize Type ID

In [117]:
join_expr = f.trim(df_nobel_prizes["prize_tmp"]) == f.trim(df_prize_types["prize_type"])
df_nobel_prizes = df_nobel_prizes.join(df_prize_types, join_expr, "left_outer")

In [118]:
# Check null values
df_nobel_prizes.select("prize_tmp", "prize_type_id").where("prize_type_id is null").count()

0

### Add Laureate Type ID

In [119]:
join_expr = f.trim(df_nobel_prizes["laureate_type_tmp"]) == f.trim(df_laureate_types["laureate_type"])
df_nobel_prizes = df_nobel_prizes.join(df_laureate_types, join_expr, "left_outer")

In [120]:
# Check null values
df_nobel_prizes.select("laureate_type_tmp", "laureate_type_id").where("laureate_type_id is null").count()

0

### Add ID

In [121]:
df_nobel_prizes = df_nobel_prizes.sort("year", "laureate_id") \
                                 .withColumn("nobel_prize_id", f.monotonically_increasing_id()) \
                                 .select("nobel_prize_id", "year", "category_id", "laureate_id", "laureate_type_id", \
                                         "prize_type_id", "prize_share", "motivation")

Check dataframe

In [122]:
df_nobel_prizes.show(2,False)

+--------------+----+-----------+-----------+----------------+-------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+
|nobel_prize_id|year|category_id|laureate_id|laureate_type_id|prize_type_id|prize_share|motivation                                                                                                                                      |
+--------------+----+-----------+-----------+----------------+-------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+
|0             |1901|2          |1          |0               |506          |1/1        |in recognition of the extraordinary services he has rendered by the discovery of the remarkable rays subsequently named after him               |
|1             |1901|0          |160        |0               |58

In [None]:
# Save to hive - hadoop
# df_nobel_prizes.write.format("parquet").mode("overwrite").saveAsTable("whdb.nobel_prizes")

### Remove additional columns from the dataframes: Cities, Organizations

In [123]:
df_organizations = df_organizations.select("organization_id", "organization_name", "organization_country_id", \
                                           "organization_city_id", "parent_organization_id")

df_cities = df_cities.select("city_id", "city_name", "accent_city_name", "country_id", "city_region", "population", \
                             "latitude", "longitude")

In [124]:
df_cities.printSchema()

root
 |-- city_id: long (nullable = false)
 |-- city_name: string (nullable = true)
 |-- accent_city_name: string (nullable = true)
 |-- country_id: integer (nullable = true)
 |-- city_region: string (nullable = true)
 |-- population: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [125]:
df_organizations.printSchema()

root
 |-- organization_id: integer (nullable = false)
 |-- organization_name: string (nullable = true)
 |-- organization_country_id: integer (nullable = true)
 |-- organization_city_id: long (nullable = true)
 |-- parent_organization_id: integer (nullable = true)



In [126]:
sp.stop()