Maclay Teefey (mjt6vj)
Data Project #2 Final

# DS-2002 – Data Project 2 100 points

The goal of this project is to demonstrate (1) an understanding of and (2) competence creating and 
implementing basic data science systems such as pipelines, scripts, data transformations, APIs, databases 
and cloud services. Submit your project in your GitHub Repo or file drop on Collab. 
Data Projects must be done individually.

## ETL Data Processor

You project should demonstrate your understanding of the differing types of data systems (OLTP/OLAP), 
and how data can be extracted from various source systems (structured, semi-structured, unstructured), 
transformed (cleansed, integrated), and then loaded into a destination system that’s optimized for post 
hoc diagnostic analysis.

# Deliverable

## 1. Design a dimensional data mart that represents a simple business process of your choosing.

a. Examples might include retail sales, inventory management, procurement, order 
management, transportation or hospitality bookings, medical appointments, student 
registration and/or attendance.

b. You may select any business process that interests you, but remember that a 
dimensional data mart provides for the post hoc summarization and historic analysis of 
business transactions that reflect the interaction between various entities (e.g., patients 
& doctors, retailers & customers, students & schools/classes, travelers & airlines/hotels).

In [0]:
import os
import json
#import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

The three data sets I will be using and their forms:

1. World sample data set from Oracle in SQL

2. Nobel Prizes downloaded and loaded in as a JSON file (local file system) (link: https://api.nobelprize.org/v1/prize.json)

3. Nobel Prize Laureates from the Nobel Prizes API (taken in as JSON)

However, I have found it impossible to deal with JSON files with spark dataframes. Due to Spark not having a json_normalize function and not allowing edits to the spark dataframes without creating a new dataframe I have found it impossible to do the complex data manipulations I did to make the Nobel Prize API Work. Therefore I will take in both Laureates and Prizes as csv files saved from the pandas dataframes from the stage before I joined it with the dim_date dimension in my midterm project.

The relationship being warehoused is between the countries and the nobel prize winners. The key joining point will be the country name and birth country of the laureate. The required date portion will be the Birth Date section.

## 2. Develop an ETL pipeline that extracts, transforms, and loads data into your data mart.

a. Extract data from one or more SQL database tables; hosted locally or in the Cloud.

b. Retrieve a data file, either from a remote or local file system, converting its original 
format (e.g., CSV, JSON) into a SQL database table.

c. Modify the number of columns from each source to the destination.

d. Provide error messages wherever an operation fails (i.e., Try/Except error handlers).

Getting the Countries table from World Data

Functions for Getting Data From and Setting Data Into Databases Taken from Lab 03

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "mjt6vj-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "world"

connection_properties = {
  "user" : "mjt6vj",
  "password" : "GB8m0Sjy0J8Q",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "sandbox"
atlas_database_name = "adventure_works"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

# Data Files (JSON) Information ###############################
dst_database = "nobel_prize_info"

base_dir = "dbfs:/FileStore/default"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/prizesSplit"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"
checkpoint_dir = f"{base_dir}/checkpoints/"

output_bronze = f"{database_dir}/fact_nobel/bronze"
output_silver = f"{database_dir}/fact_nobel/silver"
output_gold   = f"{database_dir}/fact_nobel/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_nobel", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[131]: False

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

First we will get the world data, and the date dimension data

In [0]:
df_world = get_sql_dataframe(jdbc_hostname, jdbc_port, src_database, connection_properties, "world.country")
display(df_world)

Code,Name,Continent,Region,SurfaceArea,IndepYear,Population,LifeExpectancy,GNP,GNPOld,LocalName,GovernmentForm,HeadOfState,Capital,Code2
ABW,Aruba,North America,Caribbean,193.0,,103000,78.4,828.0,793.0,Aruba,Nonmetropolitan Territory of The Netherlands,Beatrix,129.0,AW
AFG,Afghanistan,Asia,Southern and Central Asia,652090.0,1919.0,22720000,45.9,5976.0,,Afganistan/Afqanestan,Islamic Emirate,Mohammad Omar,1.0,AF
AGO,Angola,Africa,Central Africa,1246700.0,1975.0,12878000,38.3,6648.0,7984.0,Angola,Republic,José Eduardo dos Santos,56.0,AO
AIA,Anguilla,North America,Caribbean,96.0,,8000,76.1,63.2,,Anguilla,Dependent Territory of the UK,Elisabeth II,62.0,AI
ALB,Albania,Europe,Southern Europe,28748.0,1912.0,3401200,71.6,3205.0,2500.0,Shqipëria,Republic,Rexhep Mejdani,34.0,AL
AND,Andorra,Europe,Southern Europe,468.0,1278.0,78000,83.5,1630.0,,Andorra,Parliamentary Coprincipality,,55.0,AD
ANT,Netherlands Antilles,North America,Caribbean,800.0,,217000,74.7,1941.0,,Nederlandse Antillen,Nonmetropolitan Territory of The Netherlands,Beatrix,33.0,AN
ARE,United Arab Emirates,Asia,Middle East,83600.0,1971.0,2441000,74.1,37966.0,36846.0,Al-Imarat al-´Arabiya al-Muttahida,Emirate Federation,Zayid bin Sultan al-Nahayan,65.0,AE
ARG,Argentina,South America,South America,2780400.0,1816.0,37032000,75.1,340238.0,323310.0,Argentina,Federal Republic,Fernando de la Rúa,69.0,AR
ARM,Armenia,Asia,Middle East,29800.0,1991.0,3520000,66.4,1813.0,1627.0,Hajastan,Republic,Robert Kotšarjan,126.0,AM


In [0]:
df_date_dim = get_sql_dataframe(jdbc_hostname, jdbc_port, src_database, connection_properties, "world.dim_date")
display(df_date_dim)

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
18171130,1817-11-30,1817/11/30,11/30/1817,30/11/1817,1,Sunday,30,334,Weekend,48,November,11,Y,4,1817,1817-11,1817Q4,5,2,1818,1818-05,1818Q2
18171201,1817-12-01,1817/12/01,12/01/1817,01/12/1817,2,Monday,1,335,Weekday,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171202,1817-12-02,1817/12/02,12/02/1817,02/12/1817,3,Tuesday,2,336,Weekday,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171203,1817-12-03,1817/12/03,12/03/1817,03/12/1817,4,Wednesday,3,337,Weekday,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171204,1817-12-04,1817/12/04,12/04/1817,04/12/1817,5,Thursday,4,338,Weekday,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171205,1817-12-05,1817/12/05,12/05/1817,05/12/1817,6,Friday,5,339,Weekday,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171206,1817-12-06,1817/12/06,12/06/1817,06/12/1817,7,Saturday,6,340,Weekend,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171207,1817-12-07,1817/12/07,12/07/1817,07/12/1817,1,Sunday,7,341,Weekend,49,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171208,1817-12-08,1817/12/08,12/08/1817,08/12/1817,2,Monday,8,342,Weekday,50,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2
18171209,1817-12-09,1817/12/09,12/09/1817,09/12/1817,3,Tuesday,9,343,Weekday,50,December,12,N,4,1817,1817-12,1817Q4,6,2,1818,1818-06,1818Q2


For the date dimension, I was able to use the provided code from the lab. I did not use it for the midterm, because my mysql and sqlengine would crash trying to load dates from 1817 to 2021, but for the azure mysql server it didnt time out even with taking over 700s

Now to get Laureate data, Country Conversion Data, and Prize Data

In [0]:
address_laureates = f"{base_dir}/laureates.csv"

df_laureates = spark.read.format('csv').options(header='true', inferSchema='true').load(address_laureates)
display(df_laureates)


_c0,LaureateKey,firstname,surname,BirthDate,Country,CountryCode,BirthLocation,gender
0,0,Wilhelm Conrad,Röntgen,1845-03-27,Prussia (now Germany),DE,Lennep (now Remscheid),male
1,1,Hendrik A.,Lorentz,1853-07-18,the Netherlands,NL,Arnhem,male
2,2,Pieter,Zeeman,1865-05-25,the Netherlands,NL,Zonnemaire,male
3,3,Henri,Becquerel,1852-12-15,France,FR,Paris,male
4,4,Pierre,Curie,1859-05-15,France,FR,Paris,male
5,5,Marie,Curie,1867-11-07,Russian Empire (now Poland),PL,Warsaw,female
6,6,Lord,Rayleigh,1842-11-12,United Kingdom,GB,"Langford Grove, Maldon, Essex",male
7,7,Philipp,Lenard,1862-06-07,Hungary (now Slovakia),SK,Pressburg (now Bratislava),male
8,8,J.J.,Thomson,1856-12-18,United Kingdom,GB,Cheetham Hill,male
9,9,Albert A.,Michelson,1852-12-19,Prussia (now Poland),PL,Strelno (now Strzelno),male


In [0]:
address_country = f"{base_dir}/country.csv"

df_countries = spark.read.format('csv').options(header='true', inferSchema='true').load(address_country)
display(df_countries)

_c0,id,CountryCode2,CountryCode3,CountryName
0,4,AF,AFG,Afghanistan
1,8,AL,ALB,Albania
2,12,DZ,DZA,Algeria
3,20,AD,AND,Andorra
4,24,AO,AGO,Angola
5,28,AG,ATG,Antigua and Barbuda
6,32,AR,ARG,Argentina
7,51,AM,ARM,Armenia
8,36,AU,AUS,Australia
9,40,AT,AUT,Austria


For prize data I will use an autoloader, but first I will make dataframes for each fix to the world and laureate dataframes that will be combined together into bronze silver and gold views for combining laureate data, country data, and then selecting important features for analysis.

In [0]:
from pyspark.sql.functions import col

joinType = "inner"
joinCondition = df_date_dim["full_date"] == df_laureates["birthDate"]
df_bronze = df_laureates.join(df_date_dim, joinCondition, joinType)
df_bronze = df_bronze.select(col("LaureateKey"),col("firstname"), col("surname"), col("gender"), col("CountryCode"), col("date_key"), col("calendar_year"))
df_bronze = df_bronze.withColumnRenamed(
  "date_key", "birth_date_key").withColumnRenamed("calendar_year", "birth_year")
display(df_bronze)

LaureateKey,firstname,surname,gender,CountryCode,birth_date_key,birth_year
559,Theodor,Mommsen,male,DE,18171130,1817
457,Frédéric,Passy,male,FR,18220520,1822
460,Randal,Cremer,male,GB,18280318,1828
456,Henry,Dunant,male,CH,18280508,1828
468,Auguste,Beernaert,male,BE,18290726,1829
568,Paul,Heyse,male,DE,18300315,1830
561,Frédéric,Mistral,male,FR,18300908,1830
562,José,Echegaray,male,ES,18320419,1832
560,Bjørnstjerne,Bjørnson,male,NO,18321208,1832
458,Élie,Ducommun,male,CH,18330219,1833


In [0]:
joinType = "inner"
joinCondition = df_world["Code"] == df_countries["CountryCode3"]
df_silver = df_world.join(df_countries, joinCondition, joinType)
df_silver = df_silver.select(col("CountryCode2"),col("name"), col("Population"), col("GNP"), col("Continent"))
df_silver = df_silver.withColumnRenamed("CountryCode2", "CountryKey2").withColumnRenamed("name", "Country")
display(df_silver)

CountryKey2,Country,Population,GNP,Continent
AF,Afghanistan,22720000,5976.0,Asia
AO,Angola,12878000,6648.0,Africa
AL,Albania,3401200,3205.0,Europe
AD,Andorra,78000,1630.0,Europe
AE,United Arab Emirates,2441000,37966.0,Asia
AR,Argentina,37032000,340238.0,South America
AM,Armenia,3520000,1813.0,Asia
AG,Antigua and Barbuda,68000,612.0,North America
AU,Australia,18886000,351182.0,Oceania
AT,Austria,8091800,211860.0,Europe


In [0]:
joinType = "inner"
joinCondition = df_bronze["CountryCode"] == df_silver["CountryKey2"]
df_silver_final = df_bronze.join(df_silver, joinCondition, joinType)
df_silver_final = df_silver_final.select(col("LaureateKey"),col("firstname"), col("surname"), col("gender"), col("CountryCode"), col("birth_date_key"), col("birth_year"),col("Country"), col("Population"), col("GNP"), col("Continent"))
df_silver_final = df_silver_final.withColumnRenamed("CountryCode", "CountryKey").withColumnRenamed("birth_date_key", "birthDateKey").withColumnRenamed("birth_year", "birthYear").withColumnRenamed("LaureateKey", "laureate_key")
display(df_silver_final)

laureate_key,firstname,surname,gender,CountryKey,birthDateKey,birthYear,Country,Population,GNP,Continent
559,Theodor,Mommsen,male,DE,18171130,1817,Germany,82164700,2133367.0,Europe
457,Frédéric,Passy,male,FR,18220520,1822,France,59225700,1424285.0,Europe
460,Randal,Cremer,male,GB,18280318,1828,United Kingdom,59623400,1378330.0,Europe
456,Henry,Dunant,male,CH,18280508,1828,Switzerland,7160400,264478.0,Europe
468,Auguste,Beernaert,male,BE,18290726,1829,Belgium,10239000,249704.0,Europe
568,Paul,Heyse,male,DE,18300315,1830,Germany,82164700,2133367.0,Europe
561,Frédéric,Mistral,male,FR,18300908,1830,France,59225700,1424285.0,Europe
562,José,Echegaray,male,ES,18320419,1832,Spain,39441700,553233.0,Europe
560,Bjørnstjerne,Bjørnson,male,NO,18321208,1832,Norway,4478500,145895.0,Europe
458,Élie,Ducommun,male,CH,18330219,1833,Switzerland,7160400,264478.0,Europe


In [0]:
df_silver_final.write.format("delta").mode("overwrite").saveAsTable("fact_silver")

In [0]:
%sql
SELECT * FROM fact_silver

laureate_key,firstname,surname,gender,CountryKey,birthDateKey,birthYear,Country,Population,GNP,Continent
559,Theodor,Mommsen,male,DE,18171130,1817,Germany,82164700,2133367.0,Europe
457,Frédéric,Passy,male,FR,18220520,1822,France,59225700,1424285.0,Europe
460,Randal,Cremer,male,GB,18280318,1828,United Kingdom,59623400,1378330.0,Europe
456,Henry,Dunant,male,CH,18280508,1828,Switzerland,7160400,264478.0,Europe
468,Auguste,Beernaert,male,BE,18290726,1829,Belgium,10239000,249704.0,Europe
568,Paul,Heyse,male,DE,18300315,1830,Germany,82164700,2133367.0,Europe
561,Frédéric,Mistral,male,FR,18300908,1830,France,59225700,1424285.0,Europe
562,José,Echegaray,male,ES,18320419,1832,Spain,39441700,553233.0,Europe
560,Bjørnstjerne,Bjørnson,male,NO,18321208,1832,Norway,4478500,145895.0,Europe
458,Élie,Ducommun,male,CH,18330219,1833,Switzerland,7160400,264478.0,Europe


Now onto hot-data for the Prizes csv data

In [0]:
def autoload_to_table(data_source, source_format, table_name, checkpoint_directory):
    query = (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", source_format)
                  .option("cloudFiles.schemaLocation", checkpoint_directory)
                  .load(data_source)
                  .writeStream
                  .option("checkpointLocation", checkpoint_directory)
                  .option("mergeSchema", "true")
                  .table(table_name))
    return query

In [0]:
query = autoload_to_table(data_source = f"{data_dir}",
                          source_format = "csv",
                          table_name = "target_table",
                          checkpoint_directory = f"{checkpoint_dir}/target_table")


In [0]:
def block_until_stream_is_ready(query, min_batches=2):
    import time
    while len(query.recentProgress) < min_batches:
        time.sleep(5) # Give it a couple of seconds

    print(f"The stream has processed {len(query.recentProgress)} batchs")

block_until_stream_is_ready(query)

The stream has processed 99 batchs


In [0]:
%sql
SELECT * FROM target_table

PrizeKey,date,category,LaureateKey,_rescued_data
400,1986-12-10,chemistry,260,"{""_c0"":""400"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
401,1986-12-10,economics,677,"{""_c0"":""401"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
402,1986-12-10,literature,640,"{""_c0"":""402"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
403,1986-12-10,peace,537,"{""_c0"":""403"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
404,1986-12-10,physics,124,"{""_c0"":""404"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
405,1986-12-10,physics,125,"{""_c0"":""405"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
406,1986-12-10,physics,126,"{""_c0"":""406"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
407,1986-12-10,medicine,428,"{""_c0"":""407"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
408,1986-12-10,medicine,429,"{""_c0"":""408"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"
409,1985-12-10,chemistry,256,"{""_c0"":""409"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_5.csv""}"


In [0]:
%sql
DESCRIBE TABLE target_table

col_name,data_type,comment
PrizeKey,string,
date,string,
category,string,
LaureateKey,string,
_rescued_data,string,


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW nobel_bronze_view AS
  SELECT category, count(*) total_awards
  FROM target_table
  GROUP BY category;
  
SELECT * FROM nobel_bronze_view

category,total_awards
chemistry,190
medicine,224
literature,118
peace,108
economics,92
physics,221


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW nobel_silver_view AS
  SELECT *
  FROM target_table
  INNER JOIN fact_silver
  ON target_table.LaureateKey = fact_silver.laureate_key;


In [0]:
%sql
SELECT * FROM nobel_silver_view;

PrizeKey,date,category,LaureateKey,_rescued_data,laureate_key,firstname,surname,gender,CountryKey,birthDateKey,birthYear,Country,Population,GNP,Continent
941,1902-12-10,literature,559,"{""_c0"":""941"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",559,Theodor,Mommsen,male,DE,18171130,1817,Germany,82164700,2133367.0,Europe
950,1901-12-10,peace,457,"{""_c0"":""950"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",457,Frédéric,Passy,male,FR,18220520,1822,France,59225700,1424285.0,Europe
936,1903-12-10,peace,460,"{""_c0"":""936"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",460,Randal,Cremer,male,GB,18280318,1828,United Kingdom,59623400,1378330.0,Europe
949,1901-12-10,peace,456,"{""_c0"":""949"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",456,Henry,Dunant,male,CH,18280508,1828,Switzerland,7160400,264478.0,Europe
900,1909-12-10,peace,468,"{""_c0"":""900"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",468,Auguste,Beernaert,male,BE,18290726,1829,Belgium,10239000,249704.0,Europe
895,1910-12-10,literature,568,"{""_c0"":""895"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_9.csv""}",568,Paul,Heyse,male,DE,18300315,1830,Germany,82164700,2133367.0,Europe
930,1904-12-10,literature,561,"{""_c0"":""930"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",561,Frédéric,Mistral,male,FR,18300908,1830,France,59225700,1424285.0,Europe
931,1904-12-10,literature,562,"{""_c0"":""931"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",562,José,Echegaray,male,ES,18320419,1832,Spain,39441700,553233.0,Europe
935,1903-12-10,literature,560,"{""_c0"":""935"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",560,Bjørnstjerne,Bjørnson,male,NO,18321208,1832,Norway,4478500,145895.0,Europe
942,1902-12-10,peace,458,"{""_c0"":""942"",""_file_path"":""dbfs:/FileStore/default/prizesSplit/prizes_10.csv""}",458,Élie,Ducommun,male,CH,18330219,1833,Switzerland,7160400,264478.0,Europe


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW nobel_gold_view AS
  SELECT country, COUNT(*) AS num_of_awards
  FROM nobel_silver_view
  GROUP BY country
  ORDER BY num_of_awards DESC;


In [0]:
%sql
SELECT * FROM nobel_gold_view

country,num_of_awards
United States,285
United Kingdom,104
Germany,84
France,59
Sweden,30
Poland,29
Japan,28
Russian Federation,27
Italy,20
Canada,20


## 3. Author one or more SQL queries (SELECT statements) to demonstrate proper functionality.

a. SELECT data from at least 3 tables (two dimensions; plus the fact table).

b. Perform some type of aggregation (e.g., SUM, COUNT, AVERAGE). This, of course, 
necessitates some form of grouping operation (e.g., GROUP BY <customer.last_name>).

In [0]:
sql_select_statement = """
SELECT `fact_combined`.Country as country_name,
       `world`.Population as population,
       COUNT(*) as count_of_nobel_prizes,
       COUNT(CASE WHEN `laureate`.gender = "male" then 1 ELSE NULL END) as num_to_men,
       COUNT(CASE WHEN `laureate`.gender = "female" then 1 ELSE NULL END) as num_to_women
FROM `data_project_warehouse`.`fact_combined`
INNER JOIN `data_project_warehouse`.`world`
ON  `fact_combined`.`CountryKey` = `world`.`CountryKey`
INNER JOIN `data_project_warehouse`.`laureate`
ON  `fact_combined`.`LaureateKey` = `laureate`.`LaureateKey`
GROUP BY `world`.`country`
ORDER BY count_of_nobel_prizes DESC;
"""
df_result_one = get_dataframe(user_id, pwd, host_name, src_dbname, sql_select_statement)
df_result_one.head()



The 3 Tables used in this select statment are:

1. fact_combined with the country name
2. world with the country population
3. nobel with the gender of the nobel laurette

I also used another table called countries, but it does not show up in the final product because it was only used to convert the 3 letter country intervals in the world sql file to the nobel's 2 letter country intervals