In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=2c3fedc41ae2df7071c81dda4342d7f2a0e4c642bf95586def84e7b3c88e1783
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from google.colab import drive
drive.mount('/content/drive')

from pyspark.sql import SparkSession


import pandas as pd

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CoLabExample") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()

Mounted at /content/drive


In [3]:
# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [20]:
# imports
from pyspark.sql.functions import when, col, \
to_date, date_format, regexp_replace, lit, min, max,count,  year, month, abs
from functools import reduce
from pyspark.sql.types import StringType, DateType
from datetime import datetime, timedelta

In [5]:
def generate_date_range(from_date, to_date):
    # Parse the input strings to datetime objects
    from_date = datetime.strptime(from_date, '%Y-%m-%d')
    to_date = datetime.strptime(to_date, '%Y-%m-%d')

    # Calculate the number of days between the two dates
    delta = (to_date - from_date).days

    # Generate the date list
    date_list = [(from_date + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(delta + 1)]

    return date_list


def transform_df(df):
    # Convert 'Date' column to date format
    #df = df.withColumn('Date', to_date(col('Date'), 'MMM dd, yyyy'))

    # Create 'withdraw' DataFrame
    withdraw_df = df.select(col('Date'), col('Description'), col('Withdrawals').alias('Amount'))\
                    .withColumn('Type', lit('withdraw')).filter(col('Amount') != 0)

    # Create 'deposits' DataFrame
    deposits_df = df.select(col('Date'), col('Description'), col('Deposits').alias('Amount'))\
                    .withColumn('Type', lit('deposits')).filter(col('Amount') != 0)

    # Union the two DataFrames
    return withdraw_df.union(deposits_df).sort('Date')



def format_finance_df(df, type='banking'):
    """_summary_

    Args:
        df (DataFrame): The DataFrame to format.
        type (str, optional): Wheather is credit or banking account. Defaults to 'banking'.

    Returns:
        _type_: _description_
    """
    res_df = df
    if type == 'banking':
       # Cast Date column in to yyyy-MM-dd format. eg: (Jul 22, 2023) -> (2023-07-22)
       res_df = res_df.withColumn('Date', date_format(to_date('Date', 'MMM dd, yyyy'), 'yyyy-MM-dd'))

       for column in ['Withdrawals', 'Deposits', 'Balance']:
        # For the column in 'Withdrawals', 'Deposits', 'Balance'. Rm the '$' ,',' '' '-'. Cast it into float object.
        res_df = res_df.withColumn(
            column,
            regexp_replace(
                regexp_replace(
                    regexp_replace(col(column), '−', '-'),
                    ',', ''
                ),
                '\$', ''
            ).cast('float')
        )

    else: # credit
        # Cast Date column in to yyyy-MM-dd format. eg: (Jul 22, 2023) -> (2023-07-22)
        res_df = res_df.withColumn('Date', date_format(to_date('Date', 'MMM dd, yyyy'), 'yyyy-MM-dd'))

        # For the Amount column. Rm the '$' ,',' '' '-'. Cast it into float object.
        res_df = res_df.withColumn('Amount',
                            regexp_replace(
                                regexp_replace(
                                    regexp_replace(col('Amount'), '−', '-'),
                                    ',', ''
                                      ),
                                    '\$', ''
                                    ).cast('float')
                            )

    return res_df


def summarize_nas(df):
    """
    Summarizes the missing values in a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame to summarize.

    Returns:
    pd.DataFrame: A summary DataFrame containing the count and percentage of missing values for each column.
    """
    # Calculate the number of missing values per column
    nas = df.isna().sum()

    # Calculate the percentage of missing values per column
    nas_percent = (nas / len(df)) * 100

    # Create a summary DataFrame
    summary_df = pd.DataFrame({
        'Missing Values': nas,
        'Percentage': nas_percent
    }).sort_values(by='Missing Values', ascending=False)

    return summary_df


def spark_df_basic_stats(df):
  """
  """
  # Print schema
  print("Schema:")
  df.printSchema()

  # Count rows
  row_count = df.count()
  print(f"Row count: {row_count}")

  # Summary statistics for numerical columns
  print("Summary statistics:")
  df.describe().show()

  # Non-null count for each column
  print("Non-null counts for each column:")
  non_null_counts = df.select([count(col(c)).alias(c) for c in df.columns]).collect()[0].asDict()
  for column, non_null_count in non_null_counts.items():
      print(f"{column}: {non_null_count}")

  # Show first few rows
  print("First few rows:")
  df.show(5)

The Debit tables

In [6]:
# TD data
td_cheq = spark.read.csv('../content/drive/My Drive/personal_fin_data/TD_EVERY_DAY_CHEQUING_ACCOUNT.csv',
                         header=True,
                         inferSchema=True)

td_save = spark.read.csv('../content/drive/My Drive/personal_fin_data/TD_EVERY_DAY_SAVINGS_ACCOUNT.csv',
                         header=True,
                         inferSchema=True)

td_cheq = format_finance_df(td_cheq)
td_save = format_finance_df(td_save)


# RBC Data
rbc_cheq = spark.read.csv('../content/drive/My Drive/personal_fin_data/RBC_Day_to_Day_Banking.csv',
                         header=True,
                         inferSchema=True)


# For the rbc data. Fix the sign
rbc_cheq = rbc_cheq.withColumn('Withdrawals', col('Withdrawals')*-1)
rbc_cheq = format_finance_df(rbc_cheq)

In [7]:
# Transform all the debit df
debit_dfs = [td_cheq, td_save, rbc_cheq]
trans_debit_dfs = [transform_df(df) for df in debit_dfs]

# Merge all the debit data.
debit_df = reduce(lambda df1, df2: df1.union(df2), trans_debit_dfs)

# Sort by Date in descending order
debit_df = debit_df.orderBy(col('Date').desc())

debit_df = debit_df.withColumn('Category', lit(None))

The credit table

In [8]:
cibc_credit = spark.read.csv('../content/drive/My Drive/personal_fin_data/CIBC_MasterCard_20240714_to_20240418.csv',
                         header=True,
                         inferSchema=True)

cibc_credit = format_finance_df(cibc_credit, 'credit')

# Create the 'Type' column in df fill with value 'credit'
cibc_credit = cibc_credit.withColumn('Type', lit('credit'))

# rename Merchant into Description
cibc_credit = cibc_credit.withColumnRenamed('Merchant', 'Description')

cibc_credit = cibc_credit.select(
    col('Date'),
    col('Description'),
    col('Amount'),
    col('Type'),
    col('Category'),
)


In [10]:
# Stack (concatenate) the DataFrames
stacked_df = debit_df.unionByName(cibc_credit)

In [11]:
stacked_df = stacked_df.orderBy(col('Date'))

stacked_df = stacked_df.fillna({'Category':'Debit_Card'})
#stacked_df = stacked_df.withColumn('Amount', when(col('Type') == 'withdraw', col('Amount') * -1).otherwise(col('Amount')))



In [12]:
# Create a continuely daily range df
max_date , min_date  = stacked_df.select(max('Date')).collect()[0][0],stacked_df.select(min('Date')).collect()[0][0]


date_range = generate_date_range(min_date, max_date)

date_df = spark.createDataFrame(date_range, StringType()).toDF('Date')

# Rename 'Date' column in date_df to avoid ambiguity
date_df = date_df.withColumnRenamed('Date', 'Date_temp')

In [17]:
# Get all unique values from the 'Type' column
unique_types = stacked_df.select(col('Type')).distinct().collect()

# Extract and print the unique values
unique_types_list = [row['Type'] for row in unique_types]
print(unique_types_list)

['deposits', 'withdraw', 'credit']


In [19]:
full_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Amount: float (nullable = false)
 |-- Type: string (nullable = false)
 |-- Category: string (nullable = false)



In [23]:
# merge date df
# Perform a left join to fill missing dates
full_df = date_df.join(stacked_df, date_df['Date_temp'] == stacked_df['Date'], 'left') \
    .fillna({'Amount': 0, 'Description': 'No Transaction', 'Type': 'None', 'Category': 'None'}) \
    .drop('Date') \
    .withColumnRenamed('Date_temp', 'Date')


res_df = full_df.withColumn('Processed_Amount',
    when(col('Type') == 'withdraw', -abs(col('Amount')))
    .when(col('Type') == 'deposit', abs(col('Amount')))
    .when((col('Type') == 'credit') & (col('Amount') > 0), -col('Amount'))
    .when((col('Type') == 'credit') & (col('Amount') <= 0), 0)
    .otherwise(col('Amount'))
)

In [33]:
# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = res_df.toPandas()

# Step 3: Save the CSV file to the local filesystem of Colab
csv_filename = '/content/transactions.csv'
pandas_df.to_csv(csv_filename, index=False)