**Практическая работа 4. Проектирование сквозного конвейера ETL на Python и Airflow.**

**Задание 4.2.** Basic pipeline ETL

4.2.1. **часть 1** Построить конвейер данных на основании Basic pipeline ETL.rar

**часть 2** Набор данных использовать из Практическая работа 3. Работа с API. Тестовые наборы данных Kaggle API

4.2.2. Результаты работы представить в виде файла ФИО.ipynb, выгрузить в учебный портал moodle.

## **Задание 4.2.1. Часть 1.**
Построить конвейер данных на основании Basic pipeline ETL.rar

### What is ETL?

ETL is actually short form of Extract, Transform and Load, a process in which data is acquired, changed/processes and then finally get loaded into data warehouse/database(s).

You can extract data from data sources like Files, Website or some Database, transform the acquired data and then load the final version into database for business usage.

You may ask, Why ETL?, well, what ETL does, many of you might already been doing one way or other by writing different functions/scripts to perform tasks but one of the main advantage of ETLs is visualizing your entire data flow pipeline thus help you make decisions according to that.

Let's start with building our own ETL pipeline.
* Extract data from CSV file
* Transform/Manipulate Data
* Load Data into MongoDB

In [None]:
# to read data from csv, python provides csv module
import csv

To deal with files in Python, we use the open() function, it’s a built-in Python function. This function accepts two different arguments (inputs) in the parentheses, always in the following order:
* the name of the file (as a string)
* the mode of working with the file (as a string)

The syntax to open a file in python is:

file_obj = open(“filename”, “mode”)  

In [None]:
import csv

with open('/content/crypto-markets.csv', 'r') as f:
    csv_reader = csv.reader(f)
    row_count = 0
    for row in csv_reader:
        if row_count < 14:
            print(row)
            row_count += 1
        else:
            break

['slug', 'symbol', 'name', 'date', 'ranknow', 'open', 'high', 'low', 'close', 'volume', 'market', 'close_ratio', 'spread']
['bitcoin', 'BTC', 'Bitcoin', '2013-04-28', '1', '135.3', '135.98', '132.1', '134.21', '0', '1488566728', '0.5438', '3.88']
['bitcoin', 'BTC', 'Bitcoin', '2013-04-29', '1', '134.44', '147.49', '134', '144.54', '0', '1603768865', '0.7813', '13.49']
['bitcoin', 'BTC', 'Bitcoin', '2013-04-30', '1', '144', '146.93', '134.05', '139', '0', '1542813125', '0.3843', '12.88']
['bitcoin', 'BTC', 'Bitcoin', '2013-05-01', '1', '139', '139.89', '107.72', '116.99', '0', '1298954594', '0.2882', '32.17']
['bitcoin', 'BTC', 'Bitcoin', '2013-05-02', '1', '116.38', '125.6', '92.28', '105.21', '0', '1168517495', '0.3881', '33.32']
['bitcoin', 'BTC', 'Bitcoin', '2013-05-03', '1', '106.25', '108.13', '79.1', '97.75', '0', '1085995169', '0.6424', '29.03']
['bitcoin', 'BTC', 'Bitcoin', '2013-05-04', '1', '98.1', '115', '92.5', '112.5', '0', '1250316563', '0.8889', '22.5']
['bitcoin', 'BTC'

In [None]:
f = open(r'/content/crypto-markets.csv')
# 'f' is a file handler here

csv_reader = csv.reader(f)
print(csv_reader)


<_csv.reader object at 0x7c51da5c3d80>


Transforming/Changing the data.

In [None]:
assetsCode = ['BTC','ETH','XRP','LTC']

# initialize empty list
crypto_data = []

next(csv_reader, None)  # skips the headers

# read csv data row wise
for row in csv_reader:
    if(row[1] in assetsCode):  # check if current row consist of either 'BTC' or 'ETH' or 'XRP' or 'LTC' currency data
        # convert open, high, low, close amount to float type first from str and then convert it into GBP
        row[5] = float(row[5]) * 0.75
        row[6] = float(row[6]) * 0.75
        row[7] = float(row[7])* 0.75
        row[8] = float(row[8]) * 0.75
        crypto_data.append(row)

# print(csv_reader.line_num)
print(len(crypto_data))
print(crypto_data[0:2])

7239
[['bitcoin', 'BTC', 'Bitcoin', '2013-04-28', '1', 101.47500000000001, 101.98499999999999, 99.07499999999999, 100.6575, '0', '1488566728', '0.5438', '3.88'], ['bitcoin', 'BTC', 'Bitcoin', '2013-04-29', '1', 100.83, 110.6175, 100.5, 108.405, '0', '1603768865', '0.7813', '13.49']]


Loading the data into SQL DB

In [None]:
import sqlite3

# connect function opens a connection to the SQLite database file,
conn = sqlite3.connect('session.db')
#Similarly we will make connection with other databases like Oracle, DB2 etc.

In [None]:
# Drop a table name Crypto id it exists already
try:
    conn.execute('DROP TABLE IF EXISTS `Crypto` ')
except Exception as e:
    print(str(e))

In [None]:
# Create a new Table named as Crypto
try:
    conn.execute('''
         CREATE TABLE Crypto
         (ID         INTEGER PRIMARY KEY,
         NAME        TEXT    NOT NULL,
         Date        datetime,
         Open        Float DEFAULT 0,
         High        Float DEFAULT 0,
         Low         Float DEFAULT 0,
         Close       Float DEFAULT 0);''')
    print ("Table created successfully");
except Exception as e:
    print(str(e))
    print('Table Creation Failed!!!!!')
finally:
    conn.close() # this closes the database connection

Table created successfully


In [None]:
# Since our crypto data contains more information than required so we need eliminate some of it.
print(crypto_data[0])

['bitcoin', 'BTC', 'Bitcoin', '2013-04-28', '1', 101.47500000000001, 101.98499999999999, 99.07499999999999, 100.6575, '0', '1488566728', '0.5438', '3.88']


In [None]:
# Some more transformations
crypto_sql_data = [(row[2], row[3], row[5], row[6], row[7], row[8]) for row in crypto_data]
crypto_sql_data[:2]

[('Bitcoin',
  '2013-04-28',
  101.47500000000001,
  101.98499999999999,
  99.07499999999999,
  100.6575),
 ('Bitcoin', '2013-04-29', 100.83, 110.6175, 100.5, 108.405)]

In [None]:
# lets make new connection to Insert crypto data in SQL DB
conn = sqlite3.connect('session.db')
cur = conn.cursor()
try:
    cur.executemany("INSERT INTO Crypto(NAME, Date, Open, High, Low, Close) VALUES (?,?,?,?,?,?)", crypto_sql_data)
    conn.commit()
    print('Data Inserted Successfully')
except Exception as e:
    print(str(e))
    print('Data Insertion Failed')
finally:
    conn.close()

Data Inserted Successfully


In [None]:
# Let's Read data from DB to verify it
conn = sqlite3.connect('session.db')
rows = conn.cursor().execute('SELECT * FROM Crypto').fetchall()
#for row in rows:
    #print(row)
count = 0
for i, row in enumerate(rows):
    if i < 15:
        print(row)
    count += 1

print(f"Total rows: {count}")

(1, 'Bitcoin', '2013-04-28', 101.47500000000001, 101.98499999999999, 99.07499999999999, 100.6575)
(2, 'Bitcoin', '2013-04-29', 100.83, 110.6175, 100.5, 108.405)
(3, 'Bitcoin', '2013-04-30', 108.0, 110.1975, 100.53750000000001, 104.25)
(4, 'Bitcoin', '2013-05-01', 104.25, 104.91749999999999, 80.78999999999999, 87.74249999999999)
(5, 'Bitcoin', '2013-05-02', 87.285, 94.19999999999999, 69.21000000000001, 78.9075)
(6, 'Bitcoin', '2013-05-03', 79.6875, 81.0975, 59.324999999999996, 73.3125)
(7, 'Bitcoin', '2013-05-04', 73.57499999999999, 86.25, 69.375, 84.375)
(8, 'Bitcoin', '2013-05-05', 84.67500000000001, 89.1, 80.355, 86.9325)
(9, 'Bitcoin', '2013-05-06', 86.985, 93.495, 79.98, 84.225)
(10, 'Bitcoin', '2013-05-07', 84.1875, 85.08, 73.275, 83.625)
(11, 'Bitcoin', '2013-05-08', 82.19999999999999, 86.83500000000001, 82.19999999999999, 85.1775)
(12, 'Bitcoin', '2013-05-09', 84.9, 85.095, 81.94500000000001, 84.5025)
(13, 'Bitcoin', '2013-05-10', 84.6, 91.5, 83.6625, 87.9)
(14, 'Bitcoin', '2013

Write data in a csv file

In [None]:
csvfile = open('Crypto.csv', 'w')
csv_writer = csv.writer(csvfile, lineterminator='\r')
# Now we can write data to files using two methods:
# writerow() or writerows()
# writerow() is used when we need to write one-dimension data such as a single list :[1, ‘Jerry’, 95]
# writerows() is used when we need to write multi-dimension data such as list of list [[1, ‘Jerry’, 95], [2, ‘Tom’, 80], [3, ‘Scooby’, 90]]
# So the only difference is that writerows() lets you pass multiple values!
csv_writer.writerow(['Name', 'Date', 'Open', 'High', 'Low', 'Close'])
csv_writer.writerows(crypto_sql_data)
csvfile.close()

## **Задание 4.2.1. Часть 2.**
Построить конвейер данных, используя набор данных из "Практическая работа 3. Работа с API. Тестовые наборы данных Kaggle API"

Вариант 10. Home & Living


По итогу выполнения практической работы 3 был сформирован файл clear.csv.
Используем его:

Просмотрим содержимое файла:

In [None]:
import csv
with open('clear.csv', newline='') as csvfile:
    csv_reader1 = csv.reader(csvfile, delimiter=',')

    for row in csv_reader1:
        print(row)

['HDI rank', 'Country', 'HUMAN DEVELOPMENT', 'Human Development Index (HDI) ', 'Life expectancy at birth', 'Expected years of schooling', 'Mean years of schooling', 'Gross national income (GNI) per capita']
['1', 'Switzerland', 'VERY HIGH ', '0.962', '84.0', '16.5', '13.9', '66933.0']
['2', 'Norway', 'VERY HIGH ', '0.961', '83.2', '18.2', '13.0', '64660.0']
['3', 'Iceland', 'VERY HIGH ', '0.959', '82.7', '19.2', '13.8', '55782.0']
['4', 'Hong Kong, China (SAR)', 'VERY HIGH ', '0.952', '85.5', '17.3', '12.2', '62607.0']
['5', 'Australia', 'VERY HIGH ', '0.951', '84.5', '21.1', '12.7', '49238.0']
['6', 'Denmark', 'VERY HIGH ', '0.948', '81.4', '18.7', '13.0', '60365.0']
['7', 'Sweden', 'VERY HIGH ', '0.947', '83.0', '19.4', '12.6', '54489.0']
['8', 'Ireland', 'VERY HIGH ', '0.945', '82.0', '18.9', '11.6', '76169.0']
['9', 'Germany', 'VERY HIGH ', '0.942', '80.6', '17.0', '14.1', '54534.0']
['10', 'Netherlands', 'VERY HIGH ', '0.941', '81.7', '18.7', '12.6', '55979.0']
['11', 'Finland', '

In [None]:
f = open(r'/content/clear.csv')

csv_reader1 = csv.reader(f)
print(csv_reader1)

<_csv.reader object at 0x7c51bd5844a0>


In [None]:
#список категорий человеческого развития для отбора
human_development = ['LOW','MEDIUM ']

#инициализация пустого списка
human_data = []

#пропускает заголовки
next(csv_reader1, None)

#читаем данные из CSV-файла построчно
for row in csv_reader1:
    if(row[2] in human_development):  #проверяем, содержат ли текущие данные выбранную категорию
        row[3] = '{:.1f}%'.format(float(row[3]) * 100) #развитие чел.потенциала превращаем в процентное значение
        human_data.append(row)

#количества строк, соответствующих условиям
print(len(human_data))

#первые 2 строки из human_data
print(human_data[0:2])

76
[['116', 'Philippines', 'MEDIUM ', '69.9%', '69.3', '13.1', '9.0', '8920.0'], ['117', 'Botswana', 'MEDIUM ', '69.3%', '61.1', '12.3', '10.3', '16198.0']]


Загрузка данных в SQL DB

In [None]:
import sqlite3

#открывает подключение к файлу базы данных SQLite
conn = sqlite3.connect('session.db')

In [None]:
#удаление существующей таблицы  Life
try:
    conn.execute('DROP TABLE IF EXISTS `Life`')
except Exception as e:
    print(str(e))

In [None]:
#создание новой таблицы Life
try:
    conn.execute('''
         CREATE TABLE life
         (ID         INTEGER PRIMARY KEY,
         country        TEXT    NOT NULL,
         rank        INTEGER DEFAULT 0,
         index_h        Float DEFAULT 0,
         birth        Float DEFAULT 0);''')
    print ("Table created successfully");
except Exception as e:
    print(str(e))
    print('Table Creation Failed!!!!!')
finally:
    conn.close() #закрывает подключение к базе данных

Table created successfully


In [None]:
import sqlite3

#устанавливаем соединение с базой данных
conn = sqlite3.connect('session.db')
cursor = conn.cursor()

#выполняем запрос SQL для получения списка таблиц
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()

#выводим список таблиц
print("Список таблиц:")
for table in tables:
    print(table[0])

#закрываем соединение с базой данных
conn.close()

Список таблиц:
Crypto
life


In [None]:
#данные содержат больше информации, чем требуется - исключим часть из них
print(human_data[0])

['116', 'Philippines', 'MEDIUM ', '69.9%', '69.3', '13.1', '9.0', '8920.0']


In [None]:
#дополнительные преобразования
crypto_sql_data = [(row[1], row[0], row[3], row[4]) for row in human_data]
crypto_sql_data[:2]

[('Philippines', '116', '69.9%', '69.3'), ('Botswana', '117', '69.3%', '61.1')]

In [None]:
#создание нового подключения для вставки новых данных в базу данных SQL
conn = sqlite3.connect('session.db')
cur = conn.cursor()
try:
    cur.executemany("INSERT INTO life(country, rank, index_h, birth) VALUES (?,?,?,?)", crypto_sql_data)
    conn.commit()
    print('Data Inserted Successfully')
except Exception as e:
    print(str(e))
    print('Data Insertion Failed')
finally:
    conn.close()

Data Inserted Successfully


In [None]:
#чтение данных из БД, чтобы проверить их
conn = sqlite3.connect('session.db')
rows = conn.cursor().execute('Select * from life')

for row in rows:
    print(row)

(1, 'Philippines', 116, '69.9%', 69.3)
(2, 'Botswana', 117, '69.3%', 61.1)
(3, 'Bolivia (Plurinational State of)', 118, '69.2%', 63.6)
(4, 'Kyrgyzstan', 118, '69.2%', 70.0)
(5, 'Venezuela (Bolivarian Republic of)', 120, '69.1%', 70.6)
(6, 'Iraq', 121, '68.6%', 70.4)
(7, 'Tajikistan', 122, '68.5%', 71.6)
(8, 'Belize', 123, '68.3%', 70.5)
(9, 'Morocco', 123, '68.3%', 74.0)
(10, 'El Salvador', 125, '67.5%', 70.7)
(11, 'Nicaragua', 126, '66.7%', 73.8)
(12, 'Bhutan', 127, '66.6%', 71.8)
(13, 'Cabo Verde', 128, '66.2%', 74.1)
(14, 'Bangladesh', 129, '66.1%', 72.4)
(15, 'Tuvalu', 130, '64.1%', 64.5)
(16, 'Marshall Islands', 131, '63.9%', 65.3)
(17, 'India', 132, '63.3%', 67.2)
(18, 'Ghana', 133, '63.2%', 63.8)
(19, 'Micronesia (Federated States of)', 134, '62.8%', 70.7)
(20, 'Guatemala', 135, '62.7%', 69.2)
(21, 'Kiribati', 136, '62.4%', 67.4)
(22, 'Honduras', 137, '62.1%', 70.1)
(23, 'Sao Tome and Principe', 138, '61.8%', 67.6)
(24, 'Namibia', 139, '61.5%', 59.3)
(25, "Lao People's Democrati

In [None]:
csvfile = open('life.csv', 'w')
csv_writer = csv.writer(csvfile, lineterminator='\r')
#записываем данные в файл
csv_writer.writerow(['country', 'rank', 'index_h', 'birth'])
csv_writer.writerows(crypto_sql_data)
csvfile.close()