#Install Faker

In [None]:
!pip install Faker


Collecting Faker
  Downloading faker-37.1.0-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.1.0-py3-none-any.whl (1.9 MB)
[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.9/1.9 MB[0m [31m61.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Faker
Successfully installed Faker-37.1.0


#Mock Data

In [None]:
import pandas as pd
import numpy as np
from faker import Faker
from random import choice, uniform
from datetime import datetime, timedelta
import os

# Setup
fake = Faker()
departments = ['Sales', 'Operations', 'Finance', 'IT', 'Marketing']
department_ids = list(range(1, len(departments)+1))
months = pd.date_range(start='2024-01-01', periods=12, freq='MS')

# Create data folder
os.makedirs("data", exist_ok=True)

# 1. departments.csv
df_dept = pd.DataFrame({
    'department_id': department_ids,
    'department_name': departments
})
df_dept.to_csv("data/departments.csv", index=False)

# 2. transactions.csv
df_txns = pd.DataFrame([{
    'txn_id': i,
    'txn_date': fake.date_between(start_date='-12m', end_date='today'),
    'amount': round(uniform(-10000, 20000), 2),
    'type': choice(['Revenue', 'Expense', 'Adjustment']),
    'department_id': choice(department_ids)
} for i in range(2000)])
df_txns.to_csv("data/transactions.csv", index=False)

# 3. invoices.csv
df_invoices = pd.DataFrame([{
    'invoice_id': i,
    'client': fake.company(),
    'due_date': fake.date_between(start_date='-6m', end_date='+1m'),
    'amount_due': round(uniform(500, 10000), 2),
    'paid_date': fake.date_between(start_date='-5m', end_date='today')
} for i in range(1000)])
df_invoices.to_csv("data/invoices.csv", index=False)

# 4. expenses.csv
df_expenses = pd.DataFrame([{
    'expense_id': i,
    'department_id': choice(department_ids),
    'category': choice(['Supplies', 'Software', 'Utilities', 'Travel']),
    'amount': round(uniform(100, 5000), 2),
    'date': fake.date_between(start_date='-12m', end_date='today')
} for i in range(800)])
df_expenses.to_csv("data/expenses.csv", index=False)

# 5. kpi_targets.csv
df_kpis = pd.DataFrame([{
    'month': m.strftime('%Y-%m'),
    'department_id': did,
    'kpi_type': choice(['Revenue', 'Profit']),
    'target_value': round(uniform(10000, 100000), 2)
} for m in months for did in department_ids])
df_kpis.to_csv("data/kpi_targets.csv", index=False)

print("‚úÖ All 5 mock financial datasets generated in /content/data/")


‚úÖ All 5 mock financial datasets generated in /content/data/


#Set Up PySpark

In [None]:
# Install PySpark
!pip install pyspark




#PySpark ETL Pipeline, Data Cleaning & Aggregation

In [None]:
# üì¶ Install PySpark if not already done (run only once per Colab session)
!pip install pyspark

# üîß Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
import os

# üß† Start Spark session
spark = SparkSession.builder \
    .appName("Financial ETL Pipeline") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# üìÇ Path to mock CSV data
base_path = "/content/data"

# üóÉÔ∏è Load CSVs
transactions = spark.read.csv(os.path.join(base_path, "transactions.csv"), header=True, inferSchema=True)
invoices = spark.read.csv(os.path.join(base_path, "invoices.csv"), header=True, inferSchema=True)
expenses = spark.read.csv(os.path.join(base_path, "expenses.csv"), header=True, inferSchema=True)
departments = spark.read.csv(os.path.join(base_path, "departments.csv"), header=True, inferSchema=True)
kpi_targets = spark.read.csv(os.path.join(base_path, "kpi_targets.csv"), header=True, inferSchema=True)

# üßº Clean and convert date columns
transactions = transactions.withColumn("txn_date", to_date("txn_date", "yyyy-MM-dd"))
invoices = invoices.withColumn("due_date", to_date("due_date", "yyyy-MM-dd")) \
                   .withColumn("paid_date", to_date("paid_date", "yyyy-MM-dd"))
expenses = expenses.withColumn("date", to_date("date", "yyyy-MM-dd"))

# üßÆ Aggregations
transactions_agg = transactions.groupBy("department_id", month("txn_date").alias("month")) \
    .agg(sum("amount").alias("net_amount"))

expenses_agg = expenses.groupBy("department_id", month("date").alias("month")) \
    .agg(sum("amount").alias("total_expense"))

# ‚úÖ Fix KPI Targets (convert month string to numeric month)
kpi_targets = kpi_targets \
    .withColumn("month_string", concat(col("month"), lit("-01")).cast(StringType())) \
    .withColumn("month", month(to_date("month_string", "yyyy-MM-dd"))) \
    .drop("month_string")

# üîó Join everything into a unified summary table
summary = kpi_targets \
    .join(transactions_agg, ["department_id", "month"], "left") \
    .join(expenses_agg, ["department_id", "month"], "left") \
    .join(departments, "department_id", "left") \
    .select("month", "department_name", "kpi_type", "target_value", "net_amount", "total_expense")

# üìä Preview results
summary.show(10)

# üíæ Export to CSV
output_dir = "/content/output_kpi_summary"
summary.coalesce(1).write.mode("overwrite").option("header", True).csv(output_dir)

# üì¶ Zip output folder
!zip -r output_kpi_summary.zip output_kpi_summary

# üì• Trigger download
from google.colab import files
files.download("output_kpi_summary.zip")

+-----+---------------+--------+------------+----------+-------------+
|month|department_name|kpi_type|target_value|net_amount|total_expense|
+-----+---------------+--------+------------+----------+-------------+
|    1|          Sales|  Profit|    10905.68|      NULL|         NULL|
|    1|     Operations|  Profit|    38088.67|      NULL|         NULL|
|    1|        Finance| Revenue|    67522.96|      NULL|         NULL|
|    1|             IT|  Profit|    56760.35|      NULL|         NULL|
|    1|      Marketing| Revenue|    46883.49|      NULL|         NULL|
|    2|          Sales|  Profit|    32824.05|      NULL|         NULL|
|    2|     Operations|  Profit|    60488.48|      NULL|         NULL|
|    2|        Finance| Revenue|    60548.59|      NULL|         NULL|
|    2|             IT|  Profit|    85554.17|      NULL|         NULL|
|    2|      Marketing|  Profit|    89826.11|      NULL|         NULL|
+-----+---------------+--------+------------+----------+-------------+
only s

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# üß± Rebuild summary using department_id only (NO department_name)
summary_clean = kpi_targets \
    .join(transactions_agg, ["department_id", "month"], "left") \
    .join(expenses_agg, ["department_id", "month"], "left") \
    .select(
        col("month").cast("int"),
        col("department_id").cast("int"),
        col("kpi_type").cast("string"),
        col("target_value").cast("float"),
        col("net_amount").cast("float"),
        col("total_expense").cast("float")
    )

# üíæ Save to new output folder
summary_clean.coalesce(1).write.mode("overwrite").option("header", True).csv("/content/output_snowflake_final")

# üì¶ Zip for download
!zip -r output_snowflake_final.zip output_snowflake_final

# üì• Download
from google.colab import files
files.download("output_snowflake_final.zip")



  adding: output_snowflake_final/ (stored 0%)
  adding: output_snowflake_final/.part-00000-cd7cedc8-f78f-4cc9-ab30-f315d0840c58-c000.csv.crc (stored 0%)
  adding: output_snowflake_final/_SUCCESS (stored 0%)
  adding: output_snowflake_final/._SUCCESS.crc (stored 0%)
  adding: output_snowflake_final/part-00000-cd7cedc8-f78f-4cc9-ab30-f315d0840c58-c000.csv (deflated 60%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>