In [1]:
hostname = 'localhost'
username = 'postgres'
password = 'postgres'
port_id = 5432
database = 'pyspark_exercise_database'

In [16]:
import psycopg2
import psycopg2.extras
import pandas as pd

In [47]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            distinct_government_forms_query = '''SELECT DISTINCT doc -> 'government' ->> 'GovernmentForm' AS GF
                                                 FROM pyspark_test.country_info;'''

            cur.execute(distinct_government_forms_query)
            distinct_government_form_df = pd.DataFrame(cur.fetchall(), columns = ['Government Form'])

except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()

In [48]:
distinct_government_form_df.head()

Unnamed: 0,Government Form
0,Occupied by Marocco
1,Territory of Australia
2,Dependent Territory of Norway
3,Parlementary Monarchy
4,Overseas Department of France


In [43]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            # 1 option (Full SQL query)
            the_most_frequent_continenet_query_full_sql = '''  SELECT doc -> 'geography' ->> 'Continent' AS Con,
                                                               COUNT(doc -> 'geography' ->> 'Continent') AS Cardinality
                                                               FROM pyspark_test.country_info
                                                               GROUP BY Con
                                                               ORDER BY Cardinality DESC
                                                               LIMIT 1;'''
            
            cur.execute(the_most_frequent_continenet_query_full_sql)
            most_freqent_continent_df_full_sql = pd.DataFrame(cur.fetchall(), columns=['Continent', 'Cardinality'])

            # 2 option (SQL + Pandas)

            the_most_frequent_continenet_query_sql_pandas = ''' SELECT doc -> 'geography' ->> 'Continent' AS Con,
                                                                COUNT(doc -> 'geography' ->> 'Continent') AS Cardinality
                                                                FROM pyspark_test.country_info
                                                                GROUP BY Con;'''
                                                   
            cur.execute(the_most_frequent_continenet_query_sql_pandas)
            most_freqent_continent_df_sql_pandas = pd.DataFrame(cur.fetchall(), columns=['Continent', 'Cardinality'])


except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()


In [44]:
most_freqent_continent_df_full_sql

Unnamed: 0,Continent,Cardinality
0,Africa,58


In [45]:
most_freqent_continent_df_sql_pandas.sort_values(by = ['Cardinality'], ascending = False).head(1)

Unnamed: 0,Continent,Cardinality
3,Africa,58


In [81]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            independence_year_query = '''SELECT doc -> 'Name' AS Name,
                                         CAST (doc ->> 'IndepYear' AS INTEGER) AS Independence_Year
                                         FROM pyspark_test.country_info
                                         ORDER BY Independence_Year DESC NULLS LAST
                                         ;'''

            cur.execute(independence_year_query)
            independence_year_query_df = pd.DataFrame(cur.fetchall(), columns = ['Name', 'Independence Year'])

except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()

In [82]:
independence_year_query_df

Unnamed: 0,Name,Independence Year
0,Palau,1994.0
1,Czech Republic,1993.0
2,Eritrea,1993.0
3,Slovakia,1993.0
4,Bosnia and Herzegovina,1992.0
...,...,...
234,South Georgia and the South Sandwich Islands,
235,Saint Helena,
236,Svalbard and Jan Mayen,
237,Guadeloupe,


In [117]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            languages_query = '''SELECT language, COUNT(isofficial) AS card
                                 FROM pyspark_test.country_language
                                 WHERE isofficial = 'T'
                                 GROUP BY language
                                 ORDER BY card DESC
                                 ;'''

            cur.execute(languages_query)
            language_query_df = pd.DataFrame(cur.fetchall(), columns = ['Language', 'IsOfficial Cardinality'])

except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()


In [118]:
language_query_df.head(10)

Unnamed: 0,Language,IsOfficial Cardinality
0,English,44
1,Arabic,22
2,Spanish,20
3,French,18
4,German,6
5,Portuguese,6
6,Italian,4
7,Malay,4
8,Dutch,4
9,Serbo-Croatian,3


In [141]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            languages_query_2 = '''
                                SELECT language, SUM(CAST(doc -> 'demographics' ->> 'Population' AS DOUBLE PRECISION) * (percentage / 100))  AS Population_sum
                                FROM pyspark_test.country_language
                                INNER JOIN pyspark_test.country_info
                                ON pyspark_test.country_info.doc ->> '_id' = pyspark_test.country_language.countrycode
                                GROUP BY language
                                ORDER BY Population_sum DESC
                                ;'''

            cur.execute(languages_query_2)
            language_query_df_2 = pd.DataFrame(cur.fetchall(), columns = ['Language', 'Population Sum'])

except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()

In [142]:
language_query_df_2

Unnamed: 0,Language,Population Sum
0,Chinese,1.191844e+09
1,Hindi,4.056331e+08
2,Spanish,3.550295e+08
3,English,3.470779e+08
4,Arabic,2.338392e+08
...,...,...
452,Pitcairnese,0.000000e+00
453,Tokelau,0.000000e+00
454,Futuna,0.000000e+00
455,Soqutri,0.000000e+00


In [144]:
language_query_df_2.loc[language_query_df_2['Population Sum'] > 10e7]

Unnamed: 0,Language,Population Sum
0,Chinese,1191844000.0
1,Hindi,405633100.0
2,Spanish,355029500.0
3,English,347077900.0
4,Arabic,233839200.0
5,Bengali,209304700.0
6,Portuguese,177595300.0
7,Russian,160807600.0
8,Japanese,126814100.0
9,Punjabi,104025400.0


In [187]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            highest_GNP = '''
                                SELECT CAST(doc ->> 'GNP' AS DOUBLE PRECISION) AS GNP,
                                name AS country_name
                                FROM pyspark_test.country_info
                                INNER JOIN pyspark_test.country
                                ON pyspark_test.country_info.doc ->> '_id' = pyspark_test.country.code
                                ORDER BY GNP DESC NULLS LAST
                                LIMIT 20
                                ;'''

            highest_life_expectancy = '''
                                SELECT CAST(doc -> 'demographics' ->> 'LifeExpectancy' AS DOUBLE PRECISION) AS Life_expectancy,
                                name AS country_name
                                FROM pyspark_test.country_info
                                INNER JOIN pyspark_test.country
                                ON pyspark_test.country_info.doc ->> '_id' = pyspark_test.country.code
                                ORDER BY Life_expectancy DESC NULLS LAST
                                LIMIT 20
                                ;'''

            cur.execute(highest_GNP)
            highest_GNP_df = pd.DataFrame(cur.fetchall(), columns = ['GNP', 'country name'])

            cur.execute(highest_life_expectancy)
            highest_life_expectancy_df = pd.DataFrame(cur.fetchall(), columns = ['Life expectancy', 'country name'])

except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()

In [188]:
highest_GNP_df

Unnamed: 0,GNP,country name
0,8510700.0,United States
1,3787042.0,Japan
2,2133367.0,Germany
3,1424285.0,France
4,1378330.0,United Kingdom
5,1161755.0,Italy
6,982268.0,China
7,776739.0,Brazil
8,598862.0,Canada
9,553233.0,Spain


In [189]:
highest_life_expectancy_df

Unnamed: 0,Life expectancy,country name
0,83.5,Andorra
1,81.599998,Macao
2,81.099998,San Marino
3,80.699997,Japan
4,80.099998,Singapore
5,79.800003,Australia
6,79.599998,Switzerland
7,79.599998,Sweden
8,79.5,Hong Kong
9,79.400002,Canada


In [192]:
result = highest_GNP_df.merge(highest_life_expectancy_df, on=['country name'])
result = result[['country name', 'GNP', 'Life expectancy']]
result

Unnamed: 0,country name,GNP,Life expectancy
0,Japan,3787042.0,80.699997
1,France,1424285.0,78.800003
2,Italy,1161755.0,79.0
3,Canada,598862.0,79.400002
4,Spain,553233.0,78.800003
5,Australia,351182.0,79.800003
6,Switzerland,264478.0,79.599998


### It can be solved using only SQL too

In [195]:
try:
    with psycopg2.connect (

        host = hostname,
        dbname = database,
        user = username,
        password = password,
        port = port_id) as conn:

        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:

            full_sql_gnp_life_expectancy = '''

                                SELECT country_name_1, GNP, Life_expectancy FROM

                                (SELECT CAST(doc ->> 'GNP' AS DOUBLE PRECISION) AS GNP,
                                name AS country_name_1
                                FROM pyspark_test.country_info
                                INNER JOIN pyspark_test.country
                                ON pyspark_test.country_info.doc ->> '_id' = pyspark_test.country.code
                                ORDER BY GNP DESC NULLS LAST
                                LIMIT 20) AS T1

                                INNER JOIN

                                (SELECT CAST(doc -> 'demographics' ->> 'LifeExpectancy' AS DOUBLE PRECISION) AS Life_expectancy,
                                name AS country_name_2
                                FROM pyspark_test.country_info
                                INNER JOIN pyspark_test.country
                                ON pyspark_test.country_info.doc ->> '_id' = pyspark_test.country.code
                                ORDER BY Life_expectancy DESC NULLS LAST
                                LIMIT 20) AS T2

                                ON T1.country_name_1 = T2.country_name_2
                                
                                ;'''

            cur.execute(full_sql_gnp_life_expectancy)
            full_sql_gnp_life_expectancy_df = pd.DataFrame(cur.fetchall(), columns = ['Country name', 'GNP', 'Life expectancy'])

except Exception as error:
    print(error)

finally:
    if conn is not None:
        conn.close()

In [196]:
full_sql_gnp_life_expectancy_df

Unnamed: 0,Country name,GNP,Life expectancy
0,Japan,3787042.0,80.699997
1,France,1424285.0,78.800003
2,Italy,1161755.0,79.0
3,Canada,598862.0,79.400002
4,Spain,553233.0,78.800003
5,Australia,351182.0,79.800003
6,Switzerland,264478.0,79.599998
