In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

sc = pyspark.SparkContext(appName='Python Spark SQL basic example')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
df = sc.parallelize(Seq[999-999, 100.00, 10-10-2014;])

In [None]:
def fill_blank_dates(self, df, today_date, days):
        '''
        This function is create to add to dates that has no saldo, adding the old saldo

            >>> df.show()
            +---------------+-------------+-------------+
            |next_account_id|        saldo|         data|
            +---------------+-------------+-------------+
            |        999-999|       100.00|   10-10-2014|
            |        999-999|       150.00|   15-10-2014|
            +---------------+-------------+-------------+
            >>> fill_blank_dates(df, end_date, 5).show()
            +---------------+-------------+-------------+
            |next_account_id|        saldo|         data|
            +---------------+-------------+-------------+
            |        999-999|       100.00|   10-10-2014|
            |        999-999|       100.00|   11-10-2014|
            |        999-999|       100.00|   12-10-2014|
            |        999-999|       100.00|   13-10-2014|
            |        999-999|       100.00|   14-10-2014|
            |        999-999|       150.00|   15-10-2014|
            +---------------+-------------+-------------+
        :param df: dataframe with columns ['next_account_id', 'saldo', 'data']
        :param today_date: data of today
        :param days: how many days away
        :return: dataframe with columns ['next_account_id', 'saldo', 'data']
        '''
        data_to_append = [['99999_99999', -1, (today_date - timedelta(days=x)).strftime('%Y-%m-%d')] for x in
                          range(0, days)]
        df_to_append = self.sc.parallelize(data_to_append).toDF(['next_account_id', 'saldo', 'data'])
        new_df = df.unionAll(df_to_append)
        pivot_table = new_df.groupBy('next_account_id')\
                            .pivot('data')\
                            .max('saldo')\
                            .na.fill(0)
        columns_list = [x.name for x in pivot_table.schema.fields]
        temporal_data = pivot_table.rdd \
                                   .map(lambda row: create_temporal(row, columns_list)) \
                                   .flatMapValues(lambda x: x) \
                                   .map(lambda row: (row[0], row[1][0], row[1][1])) \
                                   .toDF(['next_account_id', 'data', 'saldo'])
        return temporal_data