In [None]:
import io
import ftplib
import pandas as pd
import datetime as dt
import logging
import pytz
import time
import threading

from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool

from settings.db import WAREHOSUE_URL
from settings.ftp import METER_DATA_FTP

In [None]:
def make_ftp(ftp_settings):
    ftp = ftplib.FTP()
    ftp.connect(ftp_settings['host'], ftp_settings['port'])
    # ftp.set_pasv(True)
    ftp.login(ftp_settings['user'], ftp_settings['passwd'])
    ftp.cwd(ftp_settings['working_dir'])
    return ftp


def get_forecast_files(ftp):
    files = dict()
    for file_name in ftp.nlst():
        if file_name[-4:] == '.txt':
            date = dt.datetime.strptime(file_name[5:11], '%d%m%y')
            creation_time = ftp.sendcmd('MDTM ' + file_name)[4:]
            creation_time = dt.datetime.strptime(creation_time, '%Y%m%d%H%M%S')
            try:
                files[date].update({creation_time: file_name})
            except KeyError:
                files[date] = {creation_time: file_name}
    for date in sorted(files):
        for time in sorted(files[date]):
            yield date, time, files[date][time]


def read_meter_data(ftp, file_name):
    content = io.BytesIO()
    ftp.retrbinary(f'RETR {file_name}', content.write)
    content.seek(0)
    return content


def move_seen_file(ftp, old_filename, new_filename, results_dict):
    ftp.rename(old_filename, new_filename)
    results_dict.update({old_filename: True})
    return True

def get_dim_metering_poins(date, connection):
    query = f"""
    select * from dim_metering_point
    where valid_from <= '{date.date()}'
    and valid_to >= '{date.date()}'
    """
    columns = [
        'id',
        'parameter_number',
        'serial_number',
        'name',
        'site_id',
        'parameter',
        'z_code',
        'valid_from',
        'valid_to',
        'is_active',
        'is_border',
        'is_inverter',
        'weight_factor',
    ]
    response = pd.read_sql_query(query, connection)
    metering_points = pd.DataFrame(response, columns=columns)
    metering_points['metering_point_parameters'] = metering_points['serial_number'].astype('str') + metering_points['parameter_number'].astype('str')
    metering_points.index = metering_points['metering_point_parameters']
    return metering_points


def add_dim_metering_poins(metering_point_ids, date, connection):
    columns = [
        'id',
        'parameter_number',
        'serial_number',
        'name',
        'site_id',
        'parameter',
        'z_code',
        'valid_from',
        'valid_to',
        'is_active',
        'is_border',
        'is_inverter',
        'weight_factor',
    ]
    parameters = {'1': 'A+', '2': 'A-', '3': 'R+', '4': 'R-'}
    insert_statements = list()
    for metering_point_id in metering_point_ids:
        parameter_number = metering_point_id[-1]
        serial_number = metering_point_id[:-1]
        valid_from = date.strftime('%Y-%m-%d')
        parameter = parameters[parameter_number]
        insert_statement = "('{}', '{}', 'noname', 100000, '{}', '62Z0000000000000', '{}', '9999-12-31', false, false, false, 1.0)"
        insert_statements.append(insert_statement.format(parameter_number, serial_number, parameter, valid_from))
    query = f"""
    insert into dim_metering_point({', '.join(columns[1:])}) VALUES {', '.join(insert_statements)};
    """
    connection.execute(query)
    return get_dim_metering_poins(date, connection)


def prepare_meter_data(raw_meter_data, metering_points):
    fact_meter_data = raw_meter_data.join(metering_points, on=0, how='left')
    fact_meter_data['metering_point_id'] = fact_meter_data['id']
    fact_meter_data['parameter'] = fact_meter_data['parameter_number']
    fact_meter_data['date_id'] = pd.Series(index=fact_meter_data.index, data=file[0].strftime('%Y%m%d')).astype(int)
    fact_meter_data['availability_utc'] = pd.Series(index=fact_meter_data.index, data=file[1].replace(tzinfo=pytz.timezone('europe/kiev')).astimezone(pytz.UTC).replace(tzinfo=None).strftime('%Y-%m-%dT%H:%M:%S'))
    fact_meter_data['daily_total'] = fact_meter_data[1]
    fact_meter_data['sub_hourly_values'] = fact_meter_data.iloc[:, 2:52].agg(list, axis=1)
    fact_meter_data = fact_meter_data.iloc[:, 52:]
    fact_meter_data.drop(columns=['id', 'serial_number', 'parameter_number', 'name', 'z_code', 
                                'valid_from', 'valid_to', 'is_active', 'is_border', 'is_inverter', 
                                'weight_factor', 'metering_point_parameters'], inplace=True)
    fact_meter_data_columns = ['date_id', 'site_id', 'metering_point_id', 'parameter', 'availability_utc', 'daily_total', 'sub_hourly_values']
    fact_meter_data = fact_meter_data[fact_meter_data_columns]
    return fact_meter_data


def put_meter_data_to_warehouse(data, connection):
    df_full = data.copy()
    tuples_full = [str(tuple(x)).replace('[', "'{").replace(']', "}'") for x in df_full.to_numpy()]
    # print(tuples_full[:2])
    columns = list(df_full.columns)
    columns_unique = columns[0:4]
    # print(columns_unique)
    # SQL query to execute
    query_1 = 'INSERT INTO fact_meter_data({}) VALUES \n{}\n'.format(', '.join(columns), ', \n'.join(tuples_full).replace('[', '{').replace(']', '}'))
    query_2 = '''
    ON CONFLICT (date_id, site_id, metering_point_id, parameter) 
    DO UPDATE SET
    availability_utc = excluded.availability_utc,
    daily_total = excluded.daily_total,
    sub_hourly_values = excluded.sub_hourly_values;
    '''
    # print(query_1, query_2)
    connection.execute(query_1 + '\n' + query_2)
    return True

In [None]:
engine = create_engine(WAREHOSUE_URL, poolclass=NullPool)
ftp = make_ftp(METER_DATA_FTP)
ftp_threads = list()
ftp_move_results = dict()
# print(ftp.pwd())
files = get_forecast_files(ftp)
for file in files:
# file = next(files)
    print(file)
    raw_meter_data = read_meter_data(ftp, file[-1])
    raw_meter_data = pd.read_csv(raw_meter_data, delimiter=':', skiprows=1, header=None, skipfooter=1, engine='python')
    raw_meter_data[0] = raw_meter_data[0].apply(lambda x: x[1:-1])
    raw_meter_data.drop(columns=[52], inplace=True)

    duplicates = raw_meter_data[0].duplicated(keep=False)

    duplicates = raw_meter_data.loc[duplicates]
    # print(duplicates)
    indexes_to_keep = list()
    indexes_to_remove = list()

    for id in duplicates[0].unique():
        # print(id)
        questionable_records = duplicates.loc[duplicates[0] == id]
        # print(questionable_records)
        if questionable_records[1].sum() == 0:
            index_to_keep = questionable_records.index[0]
            index_to_drop = questionable_records.index[1:].to_list()
        else:
            index_to_keep = questionable_records.loc[questionable_records[1] == questionable_records[1].max()].index.to_list()[0]
            index_to_drop = list(set(questionable_records.index).difference(set([index_to_keep])))
        indexes_to_keep.append(index_to_keep)
        indexes_to_remove.extend(index_to_drop)
        
    # print('Keep: {}'.format(indexes_to_keep))
    # print('Drop: {}'.format(indexes_to_remove))
    raw_meter_data.drop(indexes_to_remove, inplace=True)

    with engine.connect() as connection:
        metering_points = get_dim_metering_poins(file[0], connection)
        dim_metering_points_parameters = set(metering_points['metering_point_parameters'].to_list())
        raw_meter_data_points_parameters = set(raw_meter_data[0].to_list())
        absent_points = raw_meter_data_points_parameters - dim_metering_points_parameters
        if absent_points:
            # add newly comming metering points
            print(f'Adding new metering points: {len(absent_points)}')
            metering_points = add_dim_metering_poins(absent_points, file[0], connection)
            dim_metering_points_parameters = set(metering_points['metering_point_parameters'].to_list())
            raw_meter_data_points_parameters = set(raw_meter_data[0].to_list())
            absent_points = raw_meter_data_points_parameters - dim_metering_points_parameters
            print(len(absent_points))
        fact_meter_data = prepare_meter_data(raw_meter_data, metering_points)
        result = put_meter_data_to_warehouse(fact_meter_data, connection)
        old_filename = file[-1]
        old_filename = METER_DATA_FTP['working_dir'] + old_filename
        new_filename = file[-1][:11] + '000000' + file[-1][-4:]
        new_filename = METER_DATA_FTP['working_dir'] + 'archived/' + new_filename
        ftp_thread = threading.Thread(target=move_seen_file, args=[ftp, old_filename, new_filename, ftp_move_results])
        ftp_thread.start()
        ftp_threads.append(ftp_thread)
        print(result)
        print('-----------------------------------')
        time.sleep(5)

for ftp_thread in ftp_threads:
    ftp_thread.join()

print(ftp_move_results)