# 世界银行数据集ETL实现

## 项目目标：
* 在这个项目中我从世界银行提取数据、转换数据、加载数据这一整套流程（ETL）

为什么要做这样的项目？
* 在公司中工作，每天都要创建新的数据。当新数据到来的时候，你需要编写软件来周期性和自动化地提取、转换和加载数据。

项目的步骤：
1. 你需要每次读入一行 GDP 数据
2. 然后将这行数据做转换
3. 然后加载到 SQLite 数据库里。

以下是本 Jupyter notebook 目录：
1. 第一个单元格连接到一个叫做worldbank.db 的 SQLite 数据库，创建一个表，用于保存 GDP 数据。你只需要运行这个单元格即可。
2. 第二个单元格有一个函数，叫做 extract_line()。你只需要运行这个单元格即可。该函数是一个 [Python 生成器](https://wiki.python.org/moin/Generators)。完成这个单元格不需要你理解生成器的原理。本质上讲，生成器和函数类似，除了 return 语句不同，生成器使用 yield 语句。生成器让你可以在 for 循环里使用函数。这个函数允许你每次读入一行数据，对该行数据进行转换，然后移向文件的下一行。
3. 第三个单元格有一个函数，叫做 transform_indicator_data()。该函数从 csv 文件接收一行，然后对该行进行转换，以备加载步骤使用。
4. 第四个单元格包含一个函数，叫做 load_indicator_data()，将转换后的数据加载到worldbank.db 数据库的 GDP 表格中。
5. 第五个单元格运行这个 ETL 管道。
6. 第六个单元格运行查询语句，确保数据库运行正常。

你需要修改第三个和第四个单元格。

## 创建表单
* 使用SQLite库创建一个表，用于保存GDP数据
> SQLite 是一个C语言库，它可以提供一种轻量级的基于磁盘的数据库，这种数据库不需要独立的服务器进程，也允许需要使用一种非标准的 SQL 查询语言来访问它。一些应用程序可以使用 SQLite 作为内部数据存储。可以用它来创建一个应用程序原型，然后再迁移到更大的数据库，比如 PostgreSQL 或 Oracle。

In [35]:
import sqlite3
conn = sqlite3.connect('worldbank_etl.db')

# 获取游标
cur = conn.cursor()

# 创建表格
cur.execute("DROP TABLE IF EXISTS gdp")
cur.execute("CREATE TABLE gdp (countryname TEXT, countrycode TEXT, year INTEGER, gdp REAL, PRIMARY KEY (countrycode, year));")

conn.commit()
conn.close()

## 提取数据

In [36]:
# 生成器对于大型数据适应内存很有用
def extract_lines(file):
    while True:
        line = file.readline()
        if not line:
            break
        yield line

In [37]:
import pandas as pd
import numpy as np


def transform_indicator_data(data, colnames):
    """转换清理数据
    inputs: data row of data from the csv file
            colnames: colnames (list) column names from the csv file

    output: list of [countryname, countrycode, year, gdp]
    """
    # 去除双引号
    for i, datum in enumerate(data):
        data[i] = datum.replace('"', '')
    country = data[0]

    # 不是真正国家的名字列表
    non_countries = ['World',
                     'High income',
                     'OECD members',
                     'Post-demographic dividend',
                     'IDA & IBRD total',
                     'Low & middle income',
                     'Middle income',
                     'IBRD only',
                     'East Asia & Pacific',
                     'Europe & Central Asia',
                     'North America',
                     'Upper middle income',
                     'Late-demographic dividend',
                     'European Union',
                     'East Asia & Pacific (excluding high income)',
                     'East Asia & Pacific (IDA & IBRD countries)',
                     'Euro area',
                     'Early-demographic dividend',
                     'Lower middle income',
                     'Latin America & Caribbean',
                     'Latin America & the Caribbean (IDA & IBRD countries)',
                     'Latin America & Caribbean (excluding high income)',
                     'Europe & Central Asia (IDA & IBRD countries)',
                     'Middle East & North Africa',
                     'Europe & Central Asia (excluding high income)',
                     'South Asia (IDA & IBRD)',
                     'South Asia',
                     'Arab World',
                     'IDA total',
                     'Sub-Saharan Africa',
                     'Sub-Saharan Africa (IDA & IBRD countries)',
                     'Sub-Saharan Africa (excluding high income)',
                     'Middle East & North Africa (excluding high income)',
                     'Middle East & North Africa (IDA & IBRD countries)',
                     'Central Europe and the Baltics',
                     'Pre-demographic dividend',
                     'IDA only',
                     'Least developed countries: UN classification',
                     'IDA blend',
                     'Fragile and conflict affected situations',
                     'Heavily indebted poor countries (HIPC)',
                     'Low income',
                     'Small states',
                     'Other small states',
                     'Not classified',
                     'Caribbean small states',
                     'Pacific island small states']

    # 过滤国家名称
    if country not in non_countries:
        data_array = np.array(data, ndmin=2)
        data_array.reshape(1, 63)
        df = pd.DataFrame(data_array, columns=colnames).replace('', np.nan)
        df.drop(['\n', 'Indicator Name', 'Indicator Code'],
                inplace=True, axis=1)
        df_melt = df.melt(id_vars=['Country Name', 'Country Code'],
                          var_name='year',
                          value_name='gdp')

        # 遍历数据，对每一行提取数据生成list[country, countrycode, year, gdp]
        results = []
        # 遍历dataframe的方法iterrows()
        for i, row in df_melt.iterrows():
            country, countrycode, year, gdp = row
            if str(gdp) != 'nan':
                results.append([country, countrycode, year, gdp])

        return results

## 加载数据

In [38]:
def load_indicator_data(results):
    """遍历数据，加载到数据库的表中
    inputs: results (list) looks like[Aruba, ABW, 1995, 1.320670e+09]
    outputs: None
    """
    conn = sqlite3.connect('worldbank_etl.db')
    cur = conn.cursor()

    if results:
        for result in results:
            countryname, countrycode, year, gdp = result
            sql_string = 'INSERT INTO gdp (countryname, countrycode, year, gdp) VALUES ("{}", "{}", {}, {});'.format(
                countryname, countrycode, year, gdp)
            try:
                cur.execute(sql_string)
            except Exception as e:
                print('error occurred:', e, result)
    conn.commit()
    conn.close()

    return None

## ETL测试

In [39]:
with open('data/gdp_data.csv') as f:
    # 使用生成器按行读取数据
    for line in extract_lines(f):
        data = line.split(',')
        # 前几列不是数据本身
        if len(data) == 63:
            if data[0] == '"Country Name"':
                colnames = []
                for i, datum in enumerate(data):
                    colnames.append(datum.replace('"', ''))
            else:
                # 对数据本身进行转换
                results = transform_indicator_data(data, colnames)
                load_indicator_data(results)

In [45]:
# 查看数据库是否已加载数据
conn = sqlite3.connect('worldbank_etl.db')
cur = conn.cursor()

# 测试查询
df = pd.read_sql("SELECT * FROM gdp", con=conn)

conn.commit()
conn.close()

In [46]:
df.head(20)

Unnamed: 0,countryname,countrycode,year,gdp
0,Aruba,ABW,1994,1330168000.0
1,Aruba,ABW,1995,1320670000.0
2,Aruba,ABW,1996,1379888000.0
3,Aruba,ABW,1997,1531844000.0
4,Aruba,ABW,1998,1665363000.0
5,Aruba,ABW,1999,1722799000.0
6,Aruba,ABW,2000,1873453000.0
7,Aruba,ABW,2001,1920263000.0
8,Aruba,ABW,2002,1941095000.0
9,Aruba,ABW,2003,2021302000.0


# 总结
* 在提取数据时应注意，数据开头的行并不是数据本身，而是列名，对不同的数据部分需使用不同的处理方式。
* **ETL**管道包含从数据源提取数据（这是里从 csv 文件里）、将数据转换成可用性更高的形式、将数据存储到另一个地方这三步。目的是将企业中的分散、零乱、标准不统一的数据整合到一起，为企业的决策提供分析依据。ETL是BI项目重要的一个环节，其设计的好坏影响生成数据的质量，直接关系到BI项目的成败。
* 在这个项目中，我们熟悉了**ETL**的整体流程，在实际工作中，我们需要用到一些ETL的工具，其原因在与：
1. 当数据来自不同的物理主机，这时候如使用SQL语句去处理的话，就显得比较吃力且开销也更大。
2. 数据来源可以是各种不同的数据库或者文件，这时候需要先把他们整理成统一的格式后才可以进行数据的处理，这一过程用代码实现显然有些麻烦。
3. 在数据库中我们当然可以使用存储过程去处理数据，但是处理海量数据的时候存储过程显然比较吃力，而且会占用较多数据库的资源，这可能会导致数据资源不足，进而影响数据库的性能。
* **ETL**工具的优点在于：
1. 支持多种异构数据源的连接（部分）
2. 图形化的界面操作十分方便
3. 处理海量数据速度快、流程更清晰等
* 常用的**ETL**工具有：**Kettle**，**Datastage**，**Informatica**等，对于**ETL**工具的使用还有待学习