In [1]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

In [2]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!pip install -q pyspark==3.0.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m12.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# 1. Configuração

In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

!pip install -q findspark==1.4.2

import findspark

findspark.init()

# 2. Conexão

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

In [6]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

In [7]:
data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

In [8]:
data.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [9]:
data.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

# 3. Data Wrangling

# 3.1 Exploração

In [10]:
data.count()

841

In [11]:
data.columns

['Description',
 'Real GDP of England at market prices',
 'Real GDP of England at factor cost ',
 'Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders',
 'Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders',
 'Index of real UK GDP at factor cost - based on changing political boundaries, ',
 'Composite estimate of English and (geographically-consistent) UK real GDP at factor cost',
 'HP-filter of log of real composite estimate of English and UK real GDP at factor cost',
 'Real UK gross disposable national income at market prices, constant border estimate',
 'Real consumption',
 'Real investment',
 'Stockbuilding contribution',
 'Real government consumption of goods and services',
 'Export volumes',
 'Import volumes',
 'Nominal GDP of England at market prices',
 'Nominal UK GDP at market prices',
 'Nominal UK GDP at market prices.1',
 'Population (GB+NI)',
 'Population (England)',
 'Employment',
 'Unemployment

In [12]:
len(data.columns)

77

# 3.2 Limpeza

In [13]:
data = data.select([ "Description", "Population (GB+NI)", "Unemployment rate" ])

In [14]:
data = data.\
  withColumnRenamed ( "Description", 'year' ).\
  withColumnRenamed( "Population (GB+NI)", "population" ).\
  withColumnRenamed( "Unemployment rate", "unemployment_rate")

In [15]:
data.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
+-----+----------+-----------------+
only showing top 10 rows



In [16]:
data_description = data.filter(data['year'] == 'Units')

In [17]:
data_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



# 3.3 Junção

In [18]:
(data.count(), len(data.columns))

(841, 3)

In [19]:
(data_description.count(), len(data_description.columns))

(1, 3)

In [20]:
from pyspark.sql.functions import broadcast

In [21]:
data = data.join( other=broadcast(data_description) , on=['year'], how='left_anti')

In [22]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



In [23]:
data = data.dropna()

In [24]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



In [25]:
data = data.withColumn('century', 1 + (data['year']/100).cast('int'))

In [26]:
data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



In [27]:
timing = data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect()

In [28]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [29]:
timing[0].asDict()

{'century': 20, 'count(year)': 100}

# 3.4 Escrita

In [31]:
data.repartition('century').write.csv(path="uk-macroeconomic-data-clean" , sep=",", header=True, mode="overwrite")

# Visualização

In [32]:
import pandas as pd

In [36]:
df = pd.read_csv('/content/uk-macroeconomic-data-clean/part-00053-b61ff244-9c7f-491d-ac89-6c82f22d9265-c000.csv')

In [37]:
df.head()

Unnamed: 0,year,population,unemployment_rate,century
0,1900,37923,3.67,20
1,1901,38328,4.86,20
2,1902,38696,5.13,20
3,1903,39068,5.59,20
4,1904,39444,6.89,20


In [39]:
df.sort_values(['year'], ascending=True)

Unnamed: 0,year,population,unemployment_rate,century
0,1900,37923,3.67,20
1,1901,38328,4.86,20
2,1902,38696,5.13,20
3,1903,39068,5.59,20
4,1904,39444,6.89,20
...,...,...,...,...
95,1995,58025,8.62,20
96,1996,58164,8.10,20
97,1997,58314,6.97,20
98,1998,58475,6.26,20
