# practice of parallel processing

In [1]:
import sys
sys.version

'3.8.8 (default, Apr 13 2021, 15:08:03) [MSC v.1916 64 bit (AMD64)]'

In [None]:
pip install joblib

In [1]:
from joblib import parallel, delayed

#並列処理の記述
#結果 = Parallel(n_jobs=-1)(delayed(処理名)(変数)繰り返し構文など)
#n_jobsに-1を指定すると、そのコンピュータのコア数に合わせて処理を分割、1を指定すると逐次処理と同じになる。

In [28]:
#逐次処理とJoblibを使用した並列処理の処理速度比較
from time import time

def proc(n):
    return sum([m * n for m in range(10000)])

# 処理時間計測開始
start = time()

# 時間がかかる処理
total = 0
for i in range(10000):
    total += proc(i)
    
# 処理結果を表示
print(total)
print('逐次処理', (time()-start), "秒")

2499500025000000
逐次処理 11.572292804718018 秒


In [33]:
from joblib import Parallel, delayed
from time import time

def proc(n):
    return sum([m * n for m in range(10000)])

start = time()

sub = Parallel(n_jobs=-1)(delayed(proc)(i) for i in range(10000))
total = sum(sub)

print(total)
print('並列処理', (time()-start), "秒")

2499500025000000
並列処理 5.294717073440552 秒


In [35]:
%%time
def proc(n):
    return sum([m * n for m in range(10000)])

sub = Parallel(n_jobs=-1)(delayed(proc)(i) for i in range(10000))
total = sum(sub)

print(total)

2499500025000000
Wall time: 4.69 s


In [2]:
pip install py7zr

Note: you may need to restart the kernel to use updated packages.


# check the aircon data

In [3]:
import os
os.getcwd()

'C:\\Users\\genta212\\practice'

In [5]:
%%time
import py7zr

os.chdir("C:/Users/genta212/practice")
df = py7zr.unpack_7zarchive("練習用.7z", "practice")

print("decompress is finished") #1min 44s
#今の処理時間の120数倍かかるからなるべく高速化

decompress is finished
Wall time: 1min 53s


In [25]:
%%time
import pandas as pd
import gzip
import os

os.chdir("C:/Users/sugiyama/practice/copy")
pd.set_option("display.max_columns",100) 

with open("test2.csv", "a", newline="") as f:
    with gzip.open("201708010000_aircon_status_info_1.csv.gz", "rt") as t:
        data = t.read()
        print(data, file=f)

print(os.path.getsize("test2.csv")) #7749152byte=7.74MB

3953008
Wall time: 80.1 ms


In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12242 entries, 0 to 12241
Data columns (total 71 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   retrieve_date  12242 non-null  object 
 1   mac            12242 non-null  object 
 2   model          12242 non-null  object 
 3   pow            12242 non-null  bool   
 4   mode           12242 non-null  object 
 5   adv            12242 non-null  object 
 6   stemp          12242 non-null  object 
 7   shum           12242 non-null  object 
 8   alert          12240 non-null  float64
 9   f_dir          12242 non-null  object 
 10  htemp          12242 non-null  float64
 11  otemp          12242 non-null  float64
 12  tap            12242 non-null  object 
 13  rawrtmp        12242 non-null  float64
 14  humid          12215 non-null  float64
 15  trtmp          12242 non-null  float64
 16  fangl          12242 non-null  int64  
 17  hetmp          12242 non-null  float64
 18  itelc 

# practice using by aircon data

## copy gzip files that are included multiple folers to once folder

In [7]:
# copying gzip files that are included multiple folder to once folder 

from time import time
import os
import shutil
import glob

def copy_files(path):
    os.makedirs("copy", exist_ok=True)
    for curdir, subdir, subfile in os.walk(path, topdown=False):
        for file in subfile:
            file_path = os.path.join(curdir, file)
            try:
                shutil.copy(file_path, "copy")
            except Exception as err:
                print("発生したエラーの種類: {}".format(err))
                pass
                           
start = time()
os.chdir("C:/users/genta212/practice")
current_path = os.getcwd()

copy_files(current_path)
print("file copy is finished")
print('逐次処理', (time()-start), "秒") #逐次処理 73.7738618850708 秒

発生したエラーの種類: 'C:\\users\\genta212\\practice\\copy\\practice_unzip&concat_aircondata-checkpoint.ipynb' and 'copy\\practice_unzip&concat_aircondata-checkpoint.ipynb' are the same file
file copy is finished
逐次処理 73.7738618850708 秒


In [20]:
# copying gzip files that are included multiple folder to once folder(Parallel)

from time import time
import os
import shutil
import glob

def copy_para(curdir,subdir,subfile):
        for file in subfile:
            file_path = os.path.join(curdir, file)
            try:
                shutil.copy(file_path, "copy2")
            except Exception as err:
                print("発生したエラーの種類: {}".format(err))
                pass
                           
start = time()
os.chdir("C:/users/sugiyama/practice")
current_path = os.getcwd()
os.makedirs("copy2", exist_ok=True)

Parallel(n_jobs=-1)(delayed(copy_para)(curdir, subdir, subfile) for curdir, subdir, subfile in os.walk(current_path, topdown=False))

print("file copy is finished")
print('並列処理', (time()-start), "秒")#並列処理 329.5281240940094 秒

file copy is finished
並列処理 329.5281240940094 秒


## decompress and concat all csv files

In [8]:
# decompress and concat .gz files at once
import os
import pandas as pd
import gzip
from time import time
import glob

start = time()

os.chdir("C:/Users/genta212/practice/copy")

gz_files = glob.glob("*.csv.gz")
with open("total.csv", "a", newline='') as f:
    for file in gz_files:
        with gzip.open(file, "rt") as t:
            next(t)
            data = t.read()
            print(data, file=f)
        
print("decompress is finished")
print('逐次処理', (time()-start), '秒') #逐次処理 508.65018010139465 秒, 34GB

#各ファイルのデータが1行にまとまってデータ数分の列ができてる＝バイナリーで１文で解凍されたデータがそのまま縦に結合されてる
#→ヘッダーを設定して改行して70列×数万行のCSVを作れるのが理想→テキスト形式で読み込んだら普通にできた。
#→各データのヘッダーを飛ばしてるので、再設定する必要がある。
#並列化による処理時間の短縮と欠損値の型変換によるデータ量の削減作業(np.)もやる

decompress is finished
逐次処理 508.65018010139465 秒


In [127]:
# decompress and concat .gz files at once(Parallel)

from joblib import Parallel, delayed
from time import time
import gzip
import glob
import os
from itertools import islice
import shutil

       
#リストを指定したチャンクサイズに分割
def group_elements(n, iterable, padvalue=""):
    return zip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
        
        
#カレントディレクトリ以下のgzipファイルのリストを作成し指定数で分割したタプルを返す。
def gz_chunk_list(cwd_path, n):
    for curdir, subdir, subfile in os.walk(cwd_path, topdown=False):
        path_list = [os.path.join(curdir, file) for file in subfile]
        gz_chunks = [group_elements(n, iterable) for iterable in path_list]
    return gz_chunks
        

#同じ名前のファイル名が存在するかチェック
def duplicate_rename(file_path):
    if os.pat.exists(file_path): #存在した場合
        name, ext = os.path.splitext(file_path) #パスからファイル名と拡張子を分割」
        i = 1
        while True:
            new_name = "{}({:0=3}){}".format(name,i, ext) #数値を4桁にしたい場合は({:0=4})とする
            if not os.path.exitsts(new_name):
                return new_name
            i += 1
    
    else:
        return file_paths

        
#gzipを解凍・結合して別ディレクトリに集約
def decomp(**file_path):
    with gzip.open(file_path, "rt") as t:
        data = t.read()
    with open("./aircon_info_data(001).csv", "a", newline='') as f:
        print(data, file=f)
        
        file_path = f
        new_path = duplicate_rename(file_path)
        shutil.move(filePath, new_path)

        
start = time()

os.chdir("C:/Users/genta212/practice/practice/")
cwd_path = os.getcwd()

new_dirname = "C:/Users/genta212/practice/copy3"
os.makedirs(new_dirname, exist_ok=True)
moved_path = new_dirname

gz_chunks = gz_chunk_list(cwd_path, 10)

Parallel(n_jobs=-1)(delayed(decomp)(c) for c in gz_chunks)

print(gz_chunks)
print('decompress is finished')
print('並列処理', (time()-start), "秒") #並列処理 165.63339948654175 秒 21GB

#なんか結合はしてるけどデータが減ってる？ファイルの数が8928個で、上のコードはファイル数＝行数になってる。
#このコードだと5754行になって10GB以上サイズも違うのでなんか抜けてる可能性あり


[]
decompress is finished
並列処理 0.060547590255737305 秒


'\n#辞書を指定したチャンクサイズに分割\ndef dict_chunks(data, size):\n    it = iter(data)\n    for i in range(0, len(data), size):\n        yield{k:data[k] for k in islice(it, size)}\n        \n        \n#現在のディレクトリ以下の全てのファイル名をキー、絶対パスを値とした辞書の作成\ndef mkdic(cwd_path):\n    for curdir, subdir, subfile in os.walk(path, topdown=False):\n        files_name = [i for i in subfile]\n        files_path = [os.path.join(curdir, file) for file in subfile]\n        gz_dict = dict(zip(files_name, files_path))\n        return gz_dict\n        \n\n#以下参考コード(散乱したファイルの移動・集約）←上の処理に組み込んで省略予定\ndef copy_files(path):\n    os.makedirs("copy", exist_ok=True)\n    for curdir, subdir, subfile in os.walk(path, topdown=False):\n        for file in subfile:\n            file_path = os.path.join(curdir, file)\n            try:\n                shutil.copy(file_path, "copy")\n            except Exception as err:\n                print("発生したエラーの種類: {}".format(err))\n                pass\n\n\nstart = time()\nos.chdir("C:/users/genta212/pr

In [110]:
import gzip
import os
import glob
from pathlib import Path

os.chdir("C:/Users/genta212/practice/practice/")
path = os.getcwd()

for curdir, subdir, subfile in os.walk(path, topdown=False):
    files_name = [i for i in subfile]
    files_path = [os.path.join(curdir, file) for file in subfile]
    gz_dict = dict(zip(files_name, files_path))
    print(gz_dict)

{'201708010000_aircon_status_info_1.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010000_aircon_status_info_1.csv.gz', '201708010005_aircon_status_info_2.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010005_aircon_status_info_2.csv.gz', '201708010010_aircon_status_info_1.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010010_aircon_status_info_1.csv.gz', '201708010015_aircon_status_info_2.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010015_aircon_status_info_2.csv.gz', '201708010020_aircon_status_info_1.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010020_aircon_status_info_1.csv.gz', '201708010025_aircon_status_info_2.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010025_aircon_status_info_2.csv.gz', '201708010030_aircon_status_info_1.csv.gz': 'C:\\Users\\genta212\\practice\\practice\\201708\\20170801\\201708010030_

In [114]:
import os
import re
import shutil

file_path = 'C:/Users/genta212/practice'

#同じ名前のファイル名が存在するかチェック
def duplicate_rename(file_path):
    if os.pat.exists(file_path): #存在した場合
        name, ext = os.path.splitext(file_path) #パスからファイル名と拡張子を分割」
        i = 1
        while True:
            #数値を3桁にしたい場合は({:0=3})とする
            new_name = "{}({:0=3}){}".format(name,i, ext)
            if not os.path.exitsts(new_name):
                return new_name
            i += 1
    
    else:
        return file_path
    
new_path = duplicate_rename(file_path)

shutil.move(file_path, new_path)

## To import concating csv data to database(SQL)

In [112]:
# practice of connecting SQLite database
import sqlite3

db_path = "C:/Users/sugiyama/practice/test.db"
conn = sqlite3.connect(db_path)
conn.close()

In [40]:
# want to import all csv which include each directry to SQLite

import os
from os.path import join, split
from glob import glob

os.chdir("C:/Users/sugiyama/practice/practice/201708/201700801")
gz0_files =glob.glob("*.gz") for file in zip_files:
    with gzip.open(file, mode="rb") as fp:
        fp.extractall()

csv_path = r"C:/Users/sugiyama/practice/practice/201708/20170801"
db_path = r"C:/Users/sugiyama/practice/test.db"

def insert_csv():
    # csv ファイルのディレクトリ取得
    csv_files = glob(join(csv_path, "*.csv"))
    
    for csv in csv_files:
        table_name = split(csv)[1] # テーブル名作成
        table_name = table_name[0:-4] # テーブル名から拡張子を削除
        df = pd.read_csv(csv, dtype=object) # CSV読み込み
        with sqlite3.connect(db_path) as conn:
            df.to_sql(table_name, con=conn) # SQLiteにCSVをインポート

if __name__ == '__main__':
    insert_csv()

SyntaxError: invalid syntax (<ipython-input-40-6e2c6ffc87c3>, line 7)

## 以下没コード

In [2]:
import glob
import pandas as pd
from time import time
import gzip
import os

start = time()

os.chdir("C:/Users/sugiyama/practice/copy")

# 同じフォルダのCSVファイルの一覧を取得
files = glob.glob('*.gz')

# gzファイルの数を取得
file_number = len(files) #8928

# gzファイルを解凍しつつ、欠損値を置換したcsvを順次結合
df0 = None
for file in files:
    df = pd.read_csv(file, sep=" ", na_values="None")
    if df0 is None:
        df0 = df
    else:
        df0 = dd.concat([df0,df])
  

# CSVファイルとして出力
df0.to_csv('merge.csv', encoding='utf_8',index=False)

# 結合完了のメッセージ
print(file_number,' 個のCSVファイルを結合して、merge.csvを作成しました。')

print('逐次処理', (time()-start), '秒')

#pdでdfとして解凍して都度結合してるからメモリに乗りきらずMemory Error、できても時間がかかりすぎる

KeyboardInterrupt: 

In [None]:
# decompress and concat .gz files at once(multiprocessing)

from joblib import Parallel, delayed
from time import time
import gzip
import glob
from multiprocessing import Process
import os

def decomp(file):
    with gzip.open(file, "rt") as t:
        data = t.read()
    with open("total2.csv", "a", newline='') as f:
        print(data, file=f)

if __name__ == "__main__":
    os.chdir("C:/Users/genta212/practice/copy")
    
    start = time()

    gz_files = glob.glob("*.gz")
    p = Process(target=decomp, args=gz_files)

    p.start()
    print("Process started.")

    p.join()

    print('decompress is finished')
    print('並列処理', (time()-start), "秒") #並列処理  秒 GB


In [None]:
# decompress and concat .gz files at once(multiprocessing)

from joblib import Parallel, delayed
from time import time
import gzip
import glob
from multiprocessing import Pool
import os

os.chdir("C:/Users/genta212/practice/copy")

def decomp(file):
    with gzip.open(file, "rt") as t:
        data = t.read()
    with open("total3.csv", "a", newline='') as f:
        print(data, file=f)
            
start = time()

gz_files = glob.glob("*.gz")
pool = Pool()

print("Process started.")
pool.map(decomp, gz_files)

pool.close()
pool.join()

print('decompress is finished')
print('並列処理', (time()-start), "秒") #並列処理  秒 21GB

Process started.


In [None]:

"""
#辞書を指定したチャンクサイズに分割
def dict_chunks(data, size):
    it = iter(data)
    for i in range(0, len(data), size):
        yield{k:data[k] for k in islice(it, size)}
        
        
#現在のディレクトリ以下の全てのファイル名をキー、絶対パスを値とした辞書の作成
def mkdic(cwd_path):
    for curdir, subdir, subfile in os.walk(path, topdown=False):
        files_name = [i for i in subfile]
        files_path = [os.path.join(curdir, file) for file in subfile]
        gz_dict = dict(zip(files_name, files_path))
        return gz_dict
        

#以下参考コード(散乱したファイルの移動・集約）←上の処理に組み込んで省略予定
def copy_files(path):
    os.makedirs("copy", exist_ok=True)
    for curdir, subdir, subfile in os.walk(path, topdown=False):
        for file in subfile:
            file_path = os.path.join(curdir, file)
            try:
                shutil.copy(file_path, "copy")
            except Exception as err:
                print("発生したエラーの種類: {}".format(err))
                pass


start = time()
os.chdir("C:/users/genta212/practice")
current_path = os.getcwd()

copy_files(current_path)
print("file copy is finished")
print('逐次処理', (time()-start), "秒") #逐次処理 73.7738618850708 秒
"""