# imports

In [7]:
import requests
import zipfile
import io
import os 
import shutil
import pandas as pd
import logging
import configparser
import shutil


from pathlib import Path
from datetime import datetime
from sshtunnel import SSHTunnelForwarder
from sqlalchemy import create_engine, text, NullPool

import warnings
warnings.filterwarnings("ignore")

In [16]:
import configparser

config = configparser.ConfigParser()
config.read('config.ini')
ssh_host = config['database']['ssh_host']
ssh_port = int(config['database']['ssh_port'])
ssh_username = config['database']['ssh_username']
ssh_password = config['database']['ssh_password']
database_username = config['database']['database_username']
database_password = config['database']['database_password']
database_name = config['database']['database_name']
localhost = config['database']['localhost']
localhost_port = int(config['database']['localhost_port'])
table_name = config['database']['table_name']
ya_api = config['yandex']['ya_api']
ya_link = config['yandex']['ya_link']

# define saving dir

In [42]:
def create_load_save_dir():
    try:
        Path.mkdir(Path.cwd()/'saved_csv')
    except:
        pass
    save_dir = (Path.cwd()/'saved_csv').as_posix()
    return save_dir

In [43]:
local_save_dir = create_load_save_dir()

In [36]:
local_save_dir

'C:/Users/pnknv/Documents/GitHub/yadisk_loader/saved_csv'

# get list of files already in database table test_avito_parsed

In [6]:
def get_today_date():
    return datetime.strftime(datetime.today(), format = "%d/%m/%Y, %H:%M:%S")

In [7]:
def get_sql_engine(ssh_host, ssh_port, ssh_username, ssh_password, localhost,
                   localhost_port, database_username, database_password, database_name):
    
    sql_server = SSHTunnelForwarder(
        (ssh_host, ssh_port),
        ssh_username=ssh_username,
        ssh_password=ssh_password,
        remote_bind_address=(localhost, localhost_port)
    )

    sql_server.start()
    local_port = str(sql_server.local_bind_port)
    sql_engine = create_engine('mariadb+pymysql://{}:{}@{}:{}/{}'.format(database_username, database_password,
                                                                         localhost, local_port, database_name), 
                               poolclass=NullPool)

    return sql_server, sql_engine

In [8]:
sql_server, sql_engine = get_sql_engine(ssh_host, ssh_port, ssh_username, ssh_password, localhost, 
                                        localhost_port, database_username, database_password, database_name)

In [11]:
def get_db_saved_files(engine):
    filenames_query='SELECT DISTINCT file_name FROM test_avito_parsed'
    try:
        con_obj=engine.connect()
        filenames_list = pd.read_sql(sql=text(filenames_query), con=con_obj)
        filenames_list = filenames_list.file_name.to_list()
        con_obj.close()
        exc_code = None
    except Exception as exc:
        if exc.code == 'f405':
            print(exc)
            print('\nТаблица не существует и это нормально, она создастся при записи csv-файла!')
            filenames_list = []
            exc_code = None
        else:
            print(exc)
            filenames_list = []
            exc_code = exc.code
    return filenames_list, exc_code

In [12]:
files_in_db, error_with_db_con = get_db_saved_files(sql_engine)

In [13]:
error_with_db_con is None

True

In [15]:
# sql_server.stop()
# sql_engine.dispose()

# load files

In [37]:
def get_direct_link(sharing_link):
    pk_request = requests.get(ya_api.format(sharing_link))
    # None если не удается преоброазовать ссылку
    return pk_request.json().get('href')

def download_yadisk_files(sharing_link, list_file_exist, save_dir):
    direct_link = get_direct_link(sharing_link)
    if direct_link:
        download = requests.get(direct_link)
        zips = zipfile.ZipFile(io.BytesIO(download.content))
        cnt = 0
        for member in zips.namelist():
            filename = os.path.basename(member)
            if not filename or Path(filename).stem in list_file_exist:
                continue
            src = zips.open(member)
            target = open(os.path.join(save_dir, filename), 'wb')
            with src, target:
                shutil.copyfileobj(src, target)
                cnt += 1
            target.close()
        print('Succesfully downloaded {} files from "{}"'.format(cnt, sharing_link))
        return None
    else:
        print('Failed to download files from "{}"'.format(sharing_link))
        return 'error loading files'

In [44]:
error_file_loading = download_yadisk_files(ya_link, files_in_db, local_save_dir)

Succesfully downloaded 1 files from "https://disk.yandex.ru/d/sVgz0GiqjhCsNg"


In [22]:
error_file_loading is None

True

# upload files to database 

In [23]:
def get_local_files(save_dir):
    p = Path(save_dir).glob('**/*')
    saved_files_list = [x.as_posix() for x in p if x.is_file()]
    return saved_files_list

def create_csv(save_directory):
    saved_files = get_local_files(save_directory)
    cnt_files = 0
    for f_ind in range(len(saved_files)):
        try:
            saved_files_new = pd.read_csv(saved_files[f_ind], delimiter=';', encoding='utf8')
            saved_files_new['file_name'] = Path(saved_files[f_ind]).stem
            parsed_csv = parsed_csv.append(saved_files_new)
        except:
            parsed_csv = pd.read_csv(saved_files[f_ind], delimiter=';', encoding='utf8')
            parsed_csv['file_name'] = Path(saved_files[f_ind]).stem
        cnt_files += 1
        
    parsed_csv.reset_index(drop=True, inplace=True)
    print('Succesfully opened {} files'.format(cnt_files))

    return parsed_csv

In [24]:
csv_upload = create_csv(local_save_dir)

Succesfully opened 1 files


In [25]:
csv_upload.to_sql(name='test_avito_parsed', con=sql_engine, if_exists='append', chunksize=7000, method='multi')

25375

# finish delete and close connection

In [None]:
dat

In [46]:
def delete_files(pth):
    pth = Path(pth)
    for child in pth.glob('*'):
        if child.is_file():
            child.unlink()
        else:
            delete_files(Path(pth)/child)

def close_sql_connection(server, engine):
    server.stop()
    engine.dispose()

In [47]:
delete_files(local_save_dir)

In [32]:
close_sql_connection(sql_server, sql_engine)

# dump

In [None]:
import pandas as pd
test_df = pd.DataFrame([[1, 2, '14-02-23 (без дублей)'], [2, 2, '14-02-23 (без дублей)'], [3, 2, '15-02-23 (без дублей)']], columns=['as', 'ad', 'file_name'])

In [None]:
server = SSHTunnelForwarder(
    (ssh_host, ssh_port),
    ssh_username=ssh_username,
    ssh_password=ssh_password,
    remote_bind_address=(localhost, localhost_port)
    )

server.start()
local_port = str(server.local_bind_port)
engine = create_engine('mariadb+pymysql://{}:{}@{}:{}/{}'.format(database_username, database_password, 
                                                                 localhost, local_port, database_name))

In [None]:
server.stop()

In [None]:
engine.dispose()

In [None]:
test_df

In [None]:
test_df.to_sql(name='test_avito_parsed', con=sql_engine, if_exists='append', chunksize=5000, method='multi')

In [None]:
filenames_query='SELECT DISTINCT file_name FROM test_avito_parsed'

In [None]:
test_distinct = pd.read_sql(sql=text(filenames_query), con=engine.connect())
test_distinct = test_distinct.file_name.to_list()

In [None]:
test_distinct

In [None]:
saved_files_csv.to_sql(name='test_avito_parsed', con=engine, if_exists='append', chunksize=5000, method='multi')