In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, round
import pyspark.pandas as ps
import plotly.express as px



In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
path_past_death = r"D:\Documentos\Projects\un-data\WPP2022_DeathsBySingleAgeSex_Medium_1950-2021.csv"
path_future_death = r"D:\Documentos\Projects\un-data\WPP2022_DeathsBySingleAgeSex_Medium_2022-2100.csv"
path_birth = r"D:\Documentos\Projects\un-data\WPP2022_Fertility_by_Age5.csv"

In [37]:
df_past_death = spark.read.csv(path_past_death, header=True)
df_past_death = df_past_death.select('ISO3_code', 'LocTypeID', 'LocTypeName', 'Location', 'Time', 'AgeGrp', 'DeathMale', 'DeathFemale', 'DeathTotal')
df_past_death = df_past_death.withColumn('Time', col('Time').cast('int'))\
                             .withColumn('DeathMale', col('DeathMale').cast('float'))\
                             .withColumn('DeathFemale', col('DeathFemale').cast('float'))\
                             .withColumn('DeathTotal', col('DeathTotal').cast('float'))
df_past_death = df_past_death.filter(col('LocTypeName') == 'Country/Area')

2072520

In [20]:
df_future_death = spark.read.csv(path_future_death, header=True)
df_future_death = df_future_death.select('ISO3_code', 'LocTypeID', 'LocTypeName', 'Location', 'Time', 'AgeGrp', 'DeathMale', 'DeathFemale', 'DeathTotal')
df_future_death = df_future_death.withColumn('Time', col('Time').cast('int'))\
                                 .withColumn('DeathMale', col('DeathMale').cast('float'))\
                                 .withColumn('DeathFemale', col('DeathFemale').cast('float'))\
                                 .withColumn('DeathTotal', col('DeathTotal').cast('float'))
df_future_death = df_future_death.filter(col('LocTypeName') == 'Country/Area')

In [34]:
df_past_death_total = df_past_death.withColumn('DeathTotal', col('DeathTotal').cast('float'))
df_past_death_total = df_past_death_total.select(col('Location'), col('Time'), col('DeathTotal')).groupBy('Location', 'Time').agg(sum('DeathTotal').alias('DeathTotal')).orderBy(col('Time'))
df_past_death_total = df_past_death_total.withColumn('DeathTotal', round(col('DeathTotal'), 2))
df_future_death_total = df_future_death.withColumn('DeathTotal', col('DeathTotal').cast('float'))
df_future_death_total = df_future_death_total.select(col('Location'), col('Time'), col('DeathTotal')).groupBy('Location', 'Time').agg(sum('DeathTotal').alias('DeathTotal')).orderBy(col('Time'))
df_future_death_total = df_future_death_total.withColumn('DeathTotal', round(col('DeathTotal'), 2))
df_death_total = df_past_death_total.union(df_future_death_total).orderBy(col('Time'))

year = input('Enter the year: ').title()
threshold = input('Enter the threshold in millions: ')

df_death_total_year = df_death_total.filter(col('Time') == year).orderBy(col('DeathTotal').desc(), col('Location').asc())
df_death_total_year = df_death_total_year.withColumn('DeathTotal', round((col('DeathTotal') / 1000), 2))
df_death_total_year = df_death_total_year.filter(col('DeathTotal') < threshold)
df_death_total_year.show(truncate=False, n=300)

df_death_total_year_pandas = df_death_total_year.pandas_api()

plot = px.choropleth(df_death_total_year_pandas, color='DeathTotal', locations='Location', locationmode='country names', height=500, title=f'Total Deaths in {year}')
plot.show()

+----------------------------------+----+----------+
|Location                          |Time|DeathTotal|
+----------------------------------+----+----------+
|United States of America          |2050|4.11      |
|Nigeria                           |2050|3.71      |
|Indonesia                         |2050|3.38      |
|Pakistan                          |2050|2.55      |
|Brazil                            |2050|2.34      |
|Russian Federation                |2050|1.89      |
|Bangladesh                        |2050|1.52      |
|Japan                             |2050|1.51      |
|Democratic Republic of the Congo  |2050|1.48      |
|Mexico                            |2050|1.36      |
|Philippines                       |2050|1.21      |
|Ethiopia                          |2050|1.2       |
|Viet Nam                          |2050|1.1       |
|Germany                           |2050|1.09      |
|Egypt                             |2050|1.09      |
|Thailand                          |2050|0.89 


`to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.



In [6]:
country = input('Enter the country name: ').title()

df_past_country_death = df_past_death.groupBy('ISO3_code', 'LocTypeID', 'LocTypeName', 'Location', 'Time')\
                                     .agg(round(sum('DeathMale'), 2).alias('DeathMaleTotal'),\
                                          round(sum('DeathFemale'), 2).alias('DeathFemaleTotal'),\
                                          round(sum('DeathTotal'), 2).alias('DeathGrandTotal'))\
                                     .orderBy('Time', asc=True)
df_past_country_death = df_past_country_death.filter(col('Location') == country)
df_past_country_death.show()
df_past_country_death.printSchema()

df_past_pandas_death = df_past_country_death.pandas_api()

plot_past_country_death = df_past_pandas_death.plot(x='Time', y=['DeathMaleTotal', 'DeathFemaleTotal', 'DeathGrandTotal'], kind='line', title=f'Deaths in {country}')
plot_past_country_death.show()

+---------+---------+------------+--------+----+--------------+----------------+---------------+
|ISO3_code|LocTypeID| LocTypeName|Location|Time|DeathMaleTotal|DeathFemaleTotal|DeathGrandTotal|
+---------+---------+------------+--------+----+--------------+----------------+---------------+
|      COL|        4|Country/Area|Colombia|1950|        110.39|           95.56|         205.96|
|      COL|        4|Country/Area|Colombia|1951|        109.75|            95.5|         205.24|
|      COL|        4|Country/Area|Colombia|1952|         108.2|           94.36|         202.56|
|      COL|        4|Country/Area|Colombia|1953|        106.86|           93.07|         199.93|
|      COL|        4|Country/Area|Colombia|1954|        106.02|           92.05|         198.07|
|      COL|        4|Country/Area|Colombia|1955|        105.67|           91.32|         196.99|
|      COL|        4|Country/Area|Colombia|1956|        105.84|           91.06|         196.89|
|      COL|        4|Country/A

In [7]:
country = input('Enter the country name: ').title()

df_future_country_death = df_future_death.groupBy('ISO3_code', 'LocTypeID', 'LocTypeName', 'Location', 'Time')\
                                         .agg(round(sum('DeathMale'), 2).alias('DeathMaleTotal'),\
                                              round(sum('DeathFemale'), 2).alias('DeathFemaleTotal'),\
                                              round(sum('DeathTotal'), 2).alias('DeathGrandTotal'))\
                                             .orderBy('Time', asc=True)
df_future_country_death = df_future_country_death.filter(col('Location') == country)
df_future_country_death.show()
df_future_country_death.printSchema()

df_future_pandas_death = df_future_country_death.pandas_api()

plot_future_country_death = df_future_pandas_death.plot(x='Time', y=['DeathMaleTotal', 'DeathFemaleTotal', 'DeathGrandTotal'], kind='line', title=f'Deaths in {country}')
plot_future_country_death.show()

+---------+---------+------------+---------+----+--------------+----------------+---------------+
|ISO3_code|LocTypeID| LocTypeName| Location|Time|DeathMaleTotal|DeathFemaleTotal|DeathGrandTotal|
+---------+---------+------------+---------+----+--------------+----------------+---------------+
|      NIC|        4|Country/Area|Nicaragua|2022|         18.11|           14.24|          32.36|
|      NIC|        4|Country/Area|Nicaragua|2023|         18.48|           14.56|          33.04|
|      NIC|        4|Country/Area|Nicaragua|2024|         18.84|           14.87|          33.72|
|      NIC|        4|Country/Area|Nicaragua|2025|         19.21|           15.18|           34.4|
|      NIC|        4|Country/Area|Nicaragua|2026|         19.58|           15.53|          35.12|
|      NIC|        4|Country/Area|Nicaragua|2027|         19.97|           15.87|          35.84|
|      NIC|        4|Country/Area|Nicaragua|2028|         20.36|           16.23|           36.6|
|      NIC|        4

In [13]:
df_birth_age = spark.read.csv(path_birth, header=True)
# df_birth_age.select('Location').distinct().collect()
df_birth_age = df_birth_age.select('ISO3_code', 'LocTypeID', 'LocTypeName', 'Location', 'Time', 'AgeGrp', 'AgeGrpStart', 'Births')
df_birth_age = df_birth_age.withColumn('Time', col('Time').cast('int'))\
                   .withColumn('AgeGrpStart', col('AgeGrpStart').cast('int'))\
                   .withColumn('Births', col('Births').cast('float'))
df_birth_age = df_birth_age.orderBy('time', 'AgeGrpStart')

country1 = input('Enter the country name: ').title()
country2 = input('Enter the country name: ').title()
year = input('Enter the year: ')

df_birth_age = df_birth_age.groupBy('ISO3_code', 'LocTypeID', 'LocTypeName', 'Location', 'Time', 'AgeGrp', 'AgeGrpStart')\
                   .agg(round(sum('Births'), 2).alias('Births'))\
                   .orderBy('Time', asc=True)
df_birth_age = df_birth_age.filter((col('Location') == country1) | (col('Location') == country2)).filter(col('Time') == year)
df_birth_age = df_birth_age.groupBy('Time', 'AgeGrp')\
                           .pivot('Location')\
                           .agg({'Births': 'first'})
df_birth_age.show()
df_birth_age.printSchema()

df_pandas_birth = df_birth_age.pandas_api()

plot = px.bar(df_pandas_birth, x='AgeGrp', y=[country1, country2], title=f'Births in {year}',
              labels = {'value': 'Births', 'variable': 'Country', 'AgeGrp': 'Age Group'},
              barmode='group')
plot.show()

+----+------+--------+---------+
|Time|AgeGrp|Colombia|Guatemala|
+----+------+--------+---------+
|2030| 10-14|    47.2|    20.39|
|2030| 15-19|  890.42|    506.2|
|2030| 20-24| 1479.64|   940.16|
|2030| 25-29|  1597.8|   883.59|
|2030| 30-34| 1208.37|   635.38|
|2030| 35-39|  727.56|   335.13|
|2030| 40-44|  272.02|   114.25|
|2030| 45-49|    38.3|    15.78|
|2030| 50-54|    2.12|     0.78|
+----+------+--------+---------+

root
 |-- Time: integer (nullable = true)
 |-- AgeGrp: string (nullable = true)
 |-- Colombia: double (nullable = true)
 |-- Guatemala: double (nullable = true)




`to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small.

