In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, avg, isnan, when, lit, monotonically_increasing_id

In [None]:
import sys
print(sys.executable)

------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Iniciamos Sesión con Ophelia

In [None]:
import visualize
from ophelia.ophelib.OpheliaMain import Ophelia

In [None]:
ophelia = Ophelia("A wrapper for pyspark", False)

# Loading CSV daily price Funds file.

In [None]:
def read_portfolio_data(path_file, source, date_col, withSchema=True):
    spark = ophelia.SparkSession
    portfolio_path_file = path_file
    portfolio_data = ophelia.Read.read_file(spark, path_file, source)
    if withSchema is True:
        return ophelia.Read.build_portfolio_schema(portfolio_data, date_col)
    return portfolio_data

## Change impure schema portfolio input data.

In [None]:
portfolio_df = read_portfolio_data(
    path_file="data/raw/csv/data.csv",
    source="csv",
    date_col="operation_date",
)

portfolio_df.limit(5).toPandas()

In [None]:
from com.ophelia.wrapper import SparkWrapper
from com.ophelia.utils import ListUtils

In [None]:
class BackMetadata(object):
    
    @staticmethod
    def meta_numeric(df, partition_by):
        infer_type_dict = ListUtils.feature_picking(df)
        append_numerics = infer_type_dict['double'] + infer_type_dict['float']
        if partition_by is None:
            return append_numerics
        else:
            return [partition_by] + append_numerics
    
    @staticmethod
    def meta_date(df, partition_by):
        infer_type_dict = ListUtils.feature_picking(df)
        if partition_by is None:
            return infer_type_dict['date']
        else:
            return [partition_by] + infer_type_dict['date']
    
    @staticmethod
    def meta_partition(df, partition_by):
        infer_type_dict = ListUtils.feature_picking(df)
        append_partitioners = infer_type_dict['long'] + infer_type_dict['int']
        if partition_by is None:
            return append_partitioners
        else:
            return [partition_by] + append_partitioners
    
    @staticmethod
    def meta_string(self):
        infer_type_dict = ListUtils.feature_picking(self.df)
        if self.partition_by is None:
            return infer_type_dict['string']
        else:
            return [partition_by] + infer_type_dict['string']
        
    @staticmethod
    def generate_partition(df, select_list: list, partition_by: str):
        if partition_by is not None:
            return df.select(partition_by, *select_list)
        return df.select(monotonically_increasing_id().alias('partition_id'), *select_list)


class NullDebug(object):
    
    @staticmethod
    def cleansing_list(self, partition_by: str, offset: float = 0.5):
        if partition_by is None:
            raise TypeError(f"'partition_by' required parameter, invalid {partition_by} input.")
        clean_list = self.toPanel(partition_by, ['id', 'value']).groupBy('id')\
                         .agg(count(when(isnan('value') | col('value').isNull(), 'value')).alias('null_count'))\
                         .select('*', (col('null_count') / self.Shape[0]).alias('null_pct'))\
                         .where(col('null_pct') <= offset).uniqueRow('id')
        return clean_list
    
    @staticmethod
    def mean_imput():
        pass
    
    def median_imput():
        pass
    
    def moving_imput():
        pass
    
    def weight_imput():
        pass

    @staticmethod
    def null_clean(self, partition_by=None, offset=0.5):
        numerics_list = BackMetadata.meta_numeric(self, partition_by)
        gen_partition = BackMetadata.generate_partition(self, numerics_list, partition_by)
        if partition_by is None:
            cleansing_list = NullDebug.cleansing_list(gen_partition, 'partition_id', offset)
            return gen_partition.select('partition_id', *cleansing_list)
        cleansing_list = NullDebug.cleansing_list(gen_partition, partition_by, offset)
        return self.select(partition_by, *cleansing_list)

DataFrame.nullClean = NullDebug.null_clean

In [None]:
NullDebug.cleansing_list(portfolio_df, 'operation_date')

In [None]:
portfolio_df.nullClean(offset=0.5).show()

In [None]:
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import (when, row_number, lit, count, first, sum as spark_sum,
                                   min as spark_min, max as spark_max, mean, stddev, variance)

def spark_methods():
    return {
        'sum': spark_sum,
        'min': spark_min,
        'max': spark_max,
        'mean': mean,
        'stddev': stddev,
        'var': variance,
        'first': first,
        'count': count,
    }

def rolling_down(self, op_col, nat_order, min_p=2, window=2, method='sum'):
    w = Window.orderBy(nat_order).rowsBetween(
        Window.currentRow - (window - 1), Window.currentRow)
    if isinstance(op_col, list):
        rolling = [spark_methods()[method](c).over(w).alias(f'{c}_rolling_{method}') for c in op_col]
        return self.select(*rolling)
    if method == 'count':
        if isinstance(op_col, list):
            rolling = [spark_methods()[method](c).over(w).alias(f'{c}_rolling_{method}') for c in op_col]
            return self.select(*rolling)
        rolling = spark_methods()[method](op_col).over(w).alias(f'{op_col}_rolling_{method}')
        return self.select('*', rolling)
    _unbounded_w = Window.orderBy(nat_order).rowsBetween(
        Window.unboundedPreceding, Window.currentRow)
    rolling = when(
        row_number().over(_unbounded_w) >= min_p,
        spark_methods()[method](op_col).over(w),
        ).otherwise(lit(None)).alias(f'{op_col}_rolling_{method}')
    return self.select('*', rolling)

In [None]:
sel = portfolio_df.select(portfolio_df.columns[:4])
rolling_down(sel, op_col='AXESCP', nat_order='operation_date', method='mean').show()

In [None]:
sel = portfolio_df.select(portfolio_df.columns[:4])
rolling_down(sel, op_col=['SCOTIAG'], nat_order='operation_date', method='mean').show()

In [None]:
#portfolio_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in portfolio_df.columns[1:]]).show()

## Defining Year parameters input array:

# Cleaning data, analytic base table structuration.

In [None]:
def portfolio_date_window(df, from_year, to_year, col_date):
    year_array = ophelia.ophelia_array.year_array(from_year, to_year)
    split_dates = ophelia.ophelia_df.split_date(df, col_date)
    operation_dates_list = ophelia.ophelia_array.sorted_date_list(df, col_date)
    date_index_udf = ophelia.ophelia_array.dates_index(operation_dates_list)
    portfolio_dates = split_dates.where(col(col_date+"_year").isin(year_array))\
                                 .select('*', (date_index_udf(col(col_date))).alias(col_date[:9]+"_id"))
    return portfolio_dates

In [None]:
portfolio_window_df = portfolio_date_window(
    df=portfolio_df, 
    from_year="2016", 
    to_year="2019", 
    col_date="operation_date"
)

In [None]:
def monitoring_empty_vector(df, feature_type):
    float_cols = ophelia.ophelia_array.feature_picking(df)[str(feature_type)]
    count_by_col = [spark_count(col(x)).alias(str(x)) for x in float_cols]
    aggregate_columns = df.select(*count_by_col)
    return aggregate_columns

In [None]:
def debug_null(panel, missing_days, N):
    null_count = panel.select([col(c).alias(c) for c in panel.columns]).collect()[0].asDict()
    clean_null_list = [k for k, v in null_count.items() if v < abs(missing_days - N)]
    return clean_null_list

In [None]:
def debug_empty_vector(df, feature_type, missing_days=10):
    sample_count = df.count()
    empty_panel = monitoring_empty_vector(df, feature_type)
    clean_null_list = debug_null(empty_panel, missing_days, sample_count)
    debug_vector = df.drop(*clean_null_list)
    return debug_vector

In [None]:
remove_none_df = debug_empty_vector(portfolio_window_df, feature_type="float")
remove_none_df.limit(5).toPandas()

In [None]:
def mean_impute(df):
    float_cols = ophelia.ophelia_array.feature_picking(df)["float"]
    numerical_fields = df.agg(*(spark_avg(c).alias(c) for c in df.columns if c in float_cols))
    portfolio_base_table = df.na.fill(numerical_fields.collect()[0].asDict())
    return portfolio_base_table

# Now we can write our masterized analytical base table:

In [None]:
portfolio_base_table = mean_impute(remove_none_df).drop("operation_date_year", "operation_date_month", "operation_date_day")
portfolio_base_table.orderBy(col("operation_date").desc()).limit(5).toPandas()

In [None]:
portfolio_path = ophelia.ophelia_write.write_parquet(
    df=portfolio_base_table,
    output_type="engine",
    project="OpheliaPortfolio",
    part="operation_date"
)