<a href="https://colab.research.google.com/github/dnicoleperez/IOD24/blob/main/IOD_Lab_10_3_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<div>
<img src=https://www.institutedata.com/wp-content/uploads/2019/10/iod_h_tp_primary_c.svg width="300">
</div>

# Lab 10.3 PySpark for Big Data

## Introduction

**Note**: this notebook is to be run in Google Colab on your Google Drive. It will not work locally on your computer.

The purpose of this lab is to gain further exposure to cloud computing, often necessary when datasets become too large to manage on a local machine. You will learn how to work with a large dataset through the PySpark Python library with Google Colaboratory (Colab).

In Google Colab, a virtual machine is automatically set up to execute your code. The maximum lifetime of such a machine is 12 hours. Note that notebooks will be disconnected from virtual machines if left idle. If this happens simple click on the Connect button to reconnect. If the kernel needs to be restarted (via the Runtime menu), variables may be lost but packages would not need to be reinstalled unless a new machine is assigned.

https://research.google.com/colaboratory/faq.html

Sign into colab.research.google.com and choose the Upload tab and upload this notebook.  This will automatically create a folder called "Colab Notebooks" in your Google Drive (if it does not already exist).

Apache Spark is an open-source cluster-computing framework, able to work with large datasets quickly by performing in-memory caching and computation. Pyspark is a Python API for Spark commonly used to manipulate big data. For reference one useful cheat sheet is available at https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf


Fortunately Pyspark is straightforward to setup in Google Colab:

In [4]:
!pip install pyspark py4j



To work with Spark DataFrames we firstly need to create a Spark DataFrame:

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, avg, round, when
from pyspark import SparkFiles

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [6]:
spark = SparkSession.builder.appName("populationdata").getOrCreate()

In [None]:
# Step 1: Install PySpark if you haven't done so
!pip install pyspark

# Step 2: Import SparkSession
from pyspark.sql import SparkSession

# Step 3: Create a Spark session
spark = SparkSession.builder.appName("populationdata").getOrCreate()

# Optional: Check Spark version
print("Spark Version:", spark.version)


Spark Version: 3.5.3


In [7]:
spark = SparkSession.builder.appName("populationdata").getOrCreate()

In [8]:
spark

## Loading the dataset and EDA

The dataset to be analysed in this lab shows population estimates by age and gender:

   - PopMale: Male population for the individual age (thousands)
   - PopFemale: Female population for the individual age (thousands)
   - PopTotal: Total population for the individual age (thousands)

Further details can be found at https://population.un.org/wpp/.

In [9]:
from google.colab import drive

In [10]:
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
# run this cell after uploading the file into your Colab Notebooks folder
df = spark.read.csv(r"/content/drive/MyDrive/Colab Notebooks/WPP2019_PopulationBySingleAgeSex_1950-2019.csv", header=True)

In [12]:
df.head()

Row(LocID='4', Location='Afghanistan', VarID='2', Variant='Medium', Time='1950', MidPeriod='1950.5', AgeGrp='0', AgeGrpStart='0', AgeGrpSpan='1', PopMale='139.669', PopFemale='154.913', PopTotal='294.582')

Note that unlike Pandas this does not display a preview of the dataset, only the schema. This is because Spark performs lazy evaluation, only displaying rows when needed.

In [13]:
df.columns

['LocID',
 'Location',
 'VarID',
 'Variant',
 'Time',
 'MidPeriod',
 'AgeGrp',
 'AgeGrpStart',
 'AgeGrpSpan',
 'PopMale',
 'PopFemale',
 'PopTotal']

**Exercise**: How many rows does df contain?

In [14]:
# ANSWER
df.distinct().count()


1604549

In [15]:
df.rdd.getNumPartitions()

2

This corresponds to the number of cores in a free Google Colab instance.

In [16]:
df.printSchema()

root
 |-- LocID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- VarID: string (nullable = true)
 |-- Variant: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- MidPeriod: string (nullable = true)
 |-- AgeGrp: string (nullable = true)
 |-- AgeGrpStart: string (nullable = true)
 |-- AgeGrpSpan: string (nullable = true)
 |-- PopMale: string (nullable = true)
 |-- PopFemale: string (nullable = true)
 |-- PopTotal: string (nullable = true)



We use df.show() similarly to df.head() in Pandas.

In [17]:
df.show()

+-----+-----------+-----+-------+----+---------+------+-----------+----------+-------+---------+--------+
|LocID|   Location|VarID|Variant|Time|MidPeriod|AgeGrp|AgeGrpStart|AgeGrpSpan|PopMale|PopFemale|PopTotal|
+-----+-----------+-----+-------+----+---------+------+-----------+----------+-------+---------+--------+
|    4|Afghanistan|    2| Medium|1950|   1950.5|     0|          0|         1|139.669|  154.913| 294.582|
|    4|Afghanistan|    2| Medium|1950|   1950.5|     1|          1|         1|131.916|  141.851| 273.767|
|    4|Afghanistan|    2| Medium|1950|   1950.5|     2|          2|         1|125.127|  130.632| 255.759|
|    4|Afghanistan|    2| Medium|1950|   1950.5|     3|          3|         1| 119.22|  121.097| 240.317|
|    4|Afghanistan|    2| Medium|1950|   1950.5|     4|          4|         1|114.112|  113.085| 227.197|
|    4|Afghanistan|    2| Medium|1950|   1950.5|     5|          5|         1|109.723|  106.436| 216.159|
|    4|Afghanistan|    2| Medium|1950|   1950.

In [18]:
df.describe()

DataFrame[summary: string, LocID: string, Location: string, VarID: string, Variant: string, Time: string, MidPeriod: string, AgeGrp: string, AgeGrpStart: string, AgeGrpSpan: string, PopMale: string, PopFemale: string, PopTotal: string]

Once again evaluation of describe is lazy, we use show() to display results:

In [19]:
df.describe().show()

+-------+------------------+--------------------+-------+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+
|summary|             LocID|            Location|  VarID|Variant|              Time|         MidPeriod|            AgeGrp|       AgeGrpStart|         AgeGrpSpan|           PopMale|         PopFemale|          PopTotal|
+-------+------------------+--------------------+-------+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+
|  count|           1604549|             1604549|1604549|1604549|           1604549|           1604549|           1604549|           1604549|            1604549|           1604549|           1604549|           1604548|
|   mean|1037.6430660578144|                NULL|    2.0|   NULL|1984.4929279193095|1984.9929279193095| 49.99925399598267| 4

Let us remove rows we do not need.

In [20]:
df.select('VarID', 'Variant').distinct().show()

+-----+-------+
|VarID|Variant|
+-----+-------+
|    2| Medium|
+-----+-------+



As we will not work with other variant, we can safely drop these columns.

In [21]:
df = df.drop('VarID', 'Variant')

In [22]:
df.show()

+-----+-----------+----+---------+------+-----------+----------+-------+---------+--------+
|LocID|   Location|Time|MidPeriod|AgeGrp|AgeGrpStart|AgeGrpSpan|PopMale|PopFemale|PopTotal|
+-----+-----------+----+---------+------+-----------+----------+-------+---------+--------+
|    4|Afghanistan|1950|   1950.5|     0|          0|         1|139.669|  154.913| 294.582|
|    4|Afghanistan|1950|   1950.5|     1|          1|         1|131.916|  141.851| 273.767|
|    4|Afghanistan|1950|   1950.5|     2|          2|         1|125.127|  130.632| 255.759|
|    4|Afghanistan|1950|   1950.5|     3|          3|         1| 119.22|  121.097| 240.317|
|    4|Afghanistan|1950|   1950.5|     4|          4|         1|114.112|  113.085| 227.197|
|    4|Afghanistan|1950|   1950.5|     5|          5|         1|109.723|  106.436| 216.159|
|    4|Afghanistan|1950|   1950.5|     6|          6|         1|105.969|   100.99| 206.959|
|    4|Afghanistan|1950|   1950.5|     7|          7|         1|102.769|   96.58

In [23]:
df.select('Location').distinct().count()

225

In [24]:
df.select('Location').distinct().show()

+--------------------+
|            Location|
+--------------------+
|       Côte d'Ivoire|
|                Chad|
|          Cabo Verde|
|       ECE: UNECE-52|
|ESCAP: ADB Develo...|
|ECLAC: The Caribbean|
|            Djibouti|
|African, Caribbea...|
|             Comoros|
|         Afghanistan|
|Belt-Road Initiat...|
|            Cambodia|
|Commonwealth of I...|
|       African Group|
|Countries with Ac...|
|Countries with Ac...|
|Central European ...|
|              Africa|
|African Union: We...|
|Countries with Ac...|
+--------------------+
only showing top 20 rows



**Exercise**: Repeat the above query, this time ordering the results by Location and using the truncate=False option to display results in full. Show all 440 results.

In [25]:
# ANSWER
df.select('Location').distinct().take(225)

[Row(Location="Côte d'Ivoire"),
 Row(Location='Chad'),
 Row(Location='Cabo Verde'),
 Row(Location='ECE: UNECE-52'),
 Row(Location='ESCAP: ADB Developing member countries (DMCs)'),
 Row(Location='ECLAC: The Caribbean'),
 Row(Location='Djibouti'),
 Row(Location='African, Caribbean and Pacific (ACP) Group of States'),
 Row(Location='Comoros'),
 Row(Location='Afghanistan'),
 Row(Location='Belt-Road Initiative: Latin America and the Caribbean'),
 Row(Location='Cambodia'),
 Row(Location='Commonwealth of Independent States (CIS)'),
 Row(Location='African Group'),
 Row(Location='Countries with Access to the Sea'),
 Row(Location='Countries with Access to the Sea: Asia'),
 Row(Location='Central European Free Trade Agreement (CEFTA)'),
 Row(Location='Africa'),
 Row(Location='African Union: Western Africa'),
 Row(Location='Countries with Access to the Sea: Northern America'),
 Row(Location='Algeria'),
 Row(Location='Countries with Access to the Sea: Oceania'),
 Row(Location='Argentina'),
 Row(Loca

Next run the following to confirm that there are no missing values in this dataset:

In [26]:
df.count() - df.na.drop().count()

1

## Convert types

We saw above that all columns are in the form of strings. The following cell converts some of the columns to type float.

In [38]:
floatcols = ['MidPeriod', 'PopMale', 'PopFemale', 'PopTotal']

for col_name in floatcols:
    df = df.withColumn(col_name, trim(col(col_name)).cast('float'))

**Exercise**: Similarly convert the four columns listed below into **integers**.

In [28]:
intcols = ['Time', 'AgeGrp', 'AgeGrpStart', 'AgeGrpSpan']

#ANSWER
for col_name in intcols:
    df = df.withColumn(col_name, trim(col(col_name)).cast('int'))

In [29]:
df.printSchema()

root
 |-- LocID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- MidPeriod: string (nullable = true)
 |-- AgeGrp: integer (nullable = true)
 |-- AgeGrpStart: integer (nullable = true)
 |-- AgeGrpSpan: integer (nullable = true)
 |-- PopMale: string (nullable = true)
 |-- PopFemale: string (nullable = true)
 |-- PopTotal: string (nullable = true)



## Answering some queries about the data

We use filter to select a subset of rows satisfying a True/False condition:

Example: What was the population breakdown by age and gender in Australia in 1970?

In [30]:
df.filter((df.Location == 'Australia') & (df.Time == 1970)).show(101)

+-----+---------+----+---------+------+-----------+----------+-------+---------+--------+
|LocID| Location|Time|MidPeriod|AgeGrp|AgeGrpStart|AgeGrpSpan|PopMale|PopFemale|PopTotal|
+-----+---------+----+---------+------+-----------+----------+-------+---------+--------+
|   36|Australia|1970|   1970.5|     0|          0|         1|126.482|   121.47| 247.952|
|   36|Australia|1970|   1970.5|     1|          1|         1|126.326|  120.997| 247.323|
|   36|Australia|1970|   1970.5|     2|          2|         1|126.439|  120.829| 247.268|
|   36|Australia|1970|   1970.5|     3|          3|         1|126.751|  120.899|  247.65|
|   36|Australia|1970|   1970.5|     4|          4|         1|127.193|   121.14| 248.333|
|   36|Australia|1970|   1970.5|     5|          5|         1|127.698|  121.488| 249.186|
|   36|Australia|1970|   1970.5|     6|          6|         1|128.196|  121.877| 250.073|
|   36|Australia|1970|   1970.5|     7|          7|         1|128.619|  122.239| 250.858|
|   36|Aus

The select function can select a subset of columns.

**Exercise**: What was the population of 45-year-old females in India in 1960?

In [35]:
# ANSWER
df.filter((df.Location == 'India') & (df.Time == 1960) & (df.AgeGrp== 45)).show(101)

+-----+--------+----+---------+------+-----------+----------+--------+---------+--------+
|LocID|Location|Time|MidPeriod|AgeGrp|AgeGrpStart|AgeGrpSpan| PopMale|PopFemale|PopTotal|
+-----+--------+----+---------+------+-----------+----------+--------+---------+--------+
|  356|   India|1960|   1960.5|    45|         45|         1|2210.417| 1987.196|4197.613|
+-----+--------+----+---------+------+-----------+----------+--------+---------+--------+



**Exercise**: Write a filter query to show the distinct locations starting with

1.   List item

1.   List item
2.   List item


2.   List item

'UN'.

In [36]:
#ANSWER
df.filter(col('Location').startswith('UN')).show()


+-----+--------+----+---------+------+-----------+----------+-------+---------+--------+
|LocID|Location|Time|MidPeriod|AgeGrp|AgeGrpStart|AgeGrpSpan|PopMale|PopFemale|PopTotal|
+-----+--------+----+---------+------+-----------+----------+-------+---------+--------+
+-----+--------+----+---------+------+-----------+----------+-------+---------+--------+



Another example: what is the population of each location in 1950 and 2019?

In [39]:
populations_in_1950 = df.filter(df.Time == 1950)\
                    .groupBy('Location')\
                    .sum('PopTotal')\
                    .withColumnRenamed('sum(PopTotal)', 'Population_1950')\
                    .withColumn('Population_1950', round('Population_1950', 3))\
                    .orderBy(col('Population_1950').desc())
populations_in_1950.show()

+--------------------+---------------+
|            Location|Population_1950|
+--------------------+---------------+
|Countries with Ac...|    2384540.117|
|Group of Twenty (...|    1731945.275|
|   Group of 77 (G77)|    1618241.347|
|ESCAP: income groups|    1490837.911|
|   ESCAP: HDI groups|    1490837.911|
|ESCAP: WB income ...|    1490818.181|
|                Asia|    1404908.989|
|  Asia-Pacific Group|    1397562.214|
|Countries with Ac...|     1364302.65|
|ESCAP: ADB Develo...|    1223407.377|
|Asia-Pacific Econ...|    1136577.758|
|               BRICS|    1101146.274|
|              Europe|    1098657.721|
|                BRIC|    1087517.847|
|Belt-Road Initiat...|     934148.834|
|Eastern and South...|     842669.337|
|       ECE: UNECE-52|     770020.389|
|ESCAP: WB Upper m...|     737341.529|
|Europe and Northe...|      721931.47|
|ESCAP: ADB Group ...|     718170.833|
+--------------------+---------------+
only showing top 20 rows



In [40]:
populations_in_2019 = df.filter(df.Time == 2019)\
                    .groupBy('Location')\
                    .sum('PopTotal')\
                    .withColumnRenamed('sum(PopTotal)', 'Population_2019')\
                    .withColumn('Population_2019', round('Population_2019', 3))\
                    .orderBy(col('Population_2019').desc())


In [41]:
populations_in_1950.join(populations_in_2019, 'Location').orderBy('Location').show()

+--------------------+---------------+---------------+
|            Location|Population_1950|Population_2019|
+--------------------+---------------+---------------+
|         Afghanistan|       7752.117|      38041.757|
|              Africa|     227794.137|    1308064.174|
|       African Group|     227511.873|    1306320.571|
|       African Union|     227525.637|    1306903.022|
|African Union: Ce...|      24214.661|     154013.705|
|African Union: Ea...|      55336.034|     382531.218|
|African Union: No...|      43705.252|     203493.227|
|African Union: So...|      34051.198|     179956.492|
|African Union: We...|       70218.49|     386908.388|
|African, Caribbea...|     199979.667|    1150190.577|
|             Albania|       1263.164|       2880.913|
|             Algeria|        8872.25|      43053.054|
|    Andean Community|      26311.024|     111736.664|
|              Angola|       4548.021|      31825.299|
| Antigua and Barbuda|         45.515|         97.115|
|         

**Exercise**: Which locations had the largest percentage change in population from 1950 to 2019?

In [None]:
# ANSWER
pop_percentage = populations_in_1950`


**Bonus Exercise**: In 2019 which locations have the highest percentage of seniors (age 80+) relative to their total population?

(Hint: if you find Japan, Greece and Italy amongst them you are right!)

In [None]:
# ANSWER


## Population line plot

In this section, we use a query to perform a data visualisation with matplotlib. We shall plot population vs year for three countries.

In [None]:
df_3countries = df[df.Location.isin("Australia", "New Zealand", "Singapore")]\
                  .select('Location', 'MidPeriod', 'PopTotal') \
                  .groupBy('Location', 'MidPeriod')\
                  .sum('PopTotal')\
                  .withColumnRenamed('sum(PopTotal)', 'Population')\
                  .withColumn('Population', round('Population', 3))
df_3countries.show()

In [None]:
dataforplotting = df_3countries.toPandas()

Now that this dataset is of a manageable size, we convert it to a Pandas dataframe for plotting.

In [None]:
dataforplotting.info()

In [None]:
dataforplotting.pivot_table(index=['MidPeriod'], columns='Location', values='Population')

In [None]:
ax = dataforplotting.pivot_table(index=['MidPeriod'], columns='Location', values='Population').plot()
ax.set_xlabel('Year');
ax.set_ylabel('Population in thousands');

**Bonus Exercise**: Plot the world population of children (age 0-17) along with those of the age groups 18-39 and 40+ from 1950 to 2019. You should display three lines (one for the population of each age group vs year) on the same plot. Note that 'World' is one of the locations.

Hint: one approach is to create a new column 'AgeCategory' based on AgeGrpStart using the 'when' function.

In [None]:
# ANSWER


In [None]:
agegrp_df = world_data.toPandas()

In [None]:
ax = agegrp_df.pivot_table(index=['MidPeriod'], columns='AgeCategory', values='Population').plot()
ax.set_xlabel('Year');
ax.set_ylabel('Population in thousands');

## Population Pyramid

In this section we show how a population pyramid may be created. We look at China in the year 1980.

In [None]:
china_1980 = df.filter((df.Location == 'China (and dependencies)') & (df.Time == 1980)).toPandas()

In [None]:
china_1980

In [None]:
china_1980.describe()

Create age brackets in multiples of 5 - such as 0-4, 5-9, ...

In [None]:
lower = china_1980['AgeGrpStart'] - (china_1980['AgeGrpStart'] % 5)

In [None]:
agebrackets = [f'{x:02d}-{(x+4):02d}' for x in lower.values]
agebrackets[-1] = '100+'

In [None]:
agebrackets[:10]

In [None]:
china_1980['AgeRange'] = agebrackets

Next find the populations by age range.

In [None]:
agg_china_1980 = (china_1980.groupby(['AgeRange']).sum()[['PopMale', 'PopFemale']]/1000).reset_index()

In [None]:
rev_age = list(dict.fromkeys(agebrackets[::-1])) #reversed list of ages

In [None]:
agg_china_1980['NegPopMale'] = -agg_china_1980['PopMale']

We are now ready to plot the population pyramid:

In [None]:
sns.set(rc={'figure.figsize':(12,8)})
bar_plot = sns.barplot(x='NegPopMale', y='AgeRange', data=agg_china_1980, order = rev_age, color='red')
bar_plot = sns.barplot(x='PopFemale', y='AgeRange', data=agg_china_1980, order = rev_age, color='green')

labels = ["80", "60", "40", "20", "0", "20", "40", "60", "80"]
bar_plot.set_xticklabels(labels)

h = [bar_plot.bar(x=.1, height=.1, color = c) for c in ['red', 'green']] #used to set colour of bar in legend
bar_plot.legend(handles = h, labels=['Male', 'Female'], fontsize=20)

bar_plot.axes.set_title("Population Pyramid for China (1980)", fontsize=20);
bar_plot.set_xlabel("Population (millions)", fontsize=20);
bar_plot.set_ylabel("Age Group", fontsize=20);
bar_plot.set_xticklabels(bar_plot.get_xticklabels(), size = 15);
bar_plot.set_yticklabels(bar_plot.get_yticklabels(), size = 15);

## Prediction
Finally we use Spark's MLlib library in a linear regression problem. We shall predict the proportion of population to be of a particular age, given year.

Inputs:
- year
- age

Target variable:
- proportion of a country's population to be of that age, in that year


In [None]:
pop_by_loc_and_year = df.groupBy('Location', 'Time')\
                        .sum('PopTotal')\
                        .withColumnRenamed('sum(PopTotal)', 'Location_year_pop')

In [None]:
pop_by_loc_and_year.show()

In [None]:
trainingset = df.join(pop_by_loc_and_year, ['Location', 'Time'])\
  .withColumn('Proportion',
              df.PopTotal/pop_by_loc_and_year.Location_year_pop)\
  .select('Time', 'AgeGrpStart', 'Proportion')

**Exercise**: Use the stat.corr function to find the correlation between AgeGrpStart and Proportion.

In [None]:
# ANSWER


Next we create a simple linear regression model predict Ratio from Time and AgeGrpStart.

In [None]:
from pyspark.ml.regression import LinearRegression

MLlib takes input in vector form. Hence we need to create vectors from features.

In [None]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['Time', 'AgeGrpStart'],
                                 outputCol='Features')

In [None]:
final_data_for_regression = featureassembler.transform(trainingset).select('Features', 'Proportion')
final_data_for_regression.show()

In [None]:
train, test = final_data_for_regression.randomSplit([0.9,0.1])

In [None]:
train

Next we cache these dataframes into memory:

In [None]:
train = train.cache()

In [None]:
test = test.cache()

We are ready to train the model. This may take a minute or so due to the size of training set.

In [None]:
regressor = LinearRegression(featuresCol='Features', labelCol='Proportion')
regressor = regressor.fit(train)

In [None]:
regressor.coefficients

In [None]:
regressor.intercept

In [None]:
predicted_results = regressor.evaluate(test)

Here we can compare some predicted proportions with actual proportions:

In [None]:
predicted_results.predictions.show()

**Exercise**: Find R-squared and the mean squared error using the 'r2' and 'meanSquaredError' attributes.

In [None]:
# ANSWER


## Conclusion

We have seen how to use Pyspark to make queries on a large dataset, save smaller datasets into Pandas dataframe for plotting and how to use MLlib for machine learning on a large dataset.



---



---



> > > > > > > > > © 2024 Institute of Data


---



---



