In [None]:
### SETUP ###

In [1]:
import pyspark 
sc = pyspark.SparkContext('local[*]')
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)


[29, 821, 807, 23, 980]

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Read CSV in Jupyter with PySpark") \
    .getOrCreate()

In [3]:
accounts = spark.read.csv("files/accounts.csv", header=True, inferSchema=True, sep=";")
print(accounts.count())

500000


In [4]:
country_abbrev = spark.read.csv("files/country_abbreviation.csv", header=True, inferSchema=True, sep=";")
print(country_abbrev.count())

121


In [5]:
transactions = spark.read.csv("files/transactions.csv", header=True, inferSchema=True, sep=";")
print(transactions.count())

5000000


In [None]:
### TASK 1 ###

In [37]:
from pyspark.sql.functions import count, sum, max, col, round

In [7]:
display(transactions)

DataFrame[id: int, amount: double, account_type: string, transaction_date: date, country: string]

In [8]:
account_counts = transactions.groupBy("account_type").agg(count("id").alias("account_type_count"))
account_counts.show()

+------------+------------------+
|account_type|account_type_count|
+------------+------------------+
|    Personal|           1667072|
|Professional|           1667358|
|    Business|           1665570|
+------------+------------------+



In [17]:
balance_and_date = transactions.groupBy("id")\
    .agg(
        round(sum("amount"), 2).alias("balance"),
        max("transaction_date").alias("latest_date")
    )\
    .withColumn("balance", col("balance").cast("string"))

balance_and_date.show()

+------+--------+-----------+
|    id| balance|latest_date|
+------+--------+-----------+
|482333|27174.07| 2020-07-17|
|222048|48004.81| 2020-07-20|
|328078|36948.25| 2020-02-01|
|192401|36736.98| 2020-01-30|
|273916|47475.38| 2021-05-30|
|485103|62198.93| 2021-05-22|
|300282|55103.62| 2021-05-01|
| 20683|56448.72| 2021-10-27|
| 15846|58671.91| 2020-12-23|
|446783|98085.51| 2021-12-11|
| 92182| 42335.3| 2020-08-08|
|477485|22114.03| 2020-05-23|
|171142| 40428.9| 2021-04-07|
|317762|40025.55| 2021-12-02|
| 65478| 57941.9| 2021-10-06|
|306768|26566.93| 2019-12-19|
|380411|43652.94| 2020-06-02|
|304681|37827.69| 2021-03-26|
|475638| 44509.1| 2021-11-23|
| 97413|39611.24| 2018-05-01|
+------+--------+-----------+
only showing top 20 rows



In [None]:
### TASK 2 ###

In [40]:
from pyspark.sql.functions import year, concat_ws

In [42]:
def earnings_pivot_table(accounts, country_abbreviation, transactions):
    # Filter transactions with earnings
    earnings_df = transactions.filter(transactions.amount > 0)
    
    # Alias the DataFrames
    acc_alias = accounts.alias("acc")
    country_alias = country_abbreviation.alias("country")
    trans_alias = earnings_df.alias("trans")
    
    # Join transactions with accounts
    joined_df = trans_alias.join(acc_alias, trans_alias.id == acc_alias.id, 'inner').select(trans_alias["*"], acc_alias["first_name"], acc_alias["last_name"])
    
    # Join with country abbreviations and filter for Switzerland
    swiss_earnings_df = joined_df.join(country_alias, joined_df.country == country_alias.abbreviation).filter(country_alias.country_full_name == 'Switzerland')
    
    # Extract year from transaction_date
    swiss_earnings_df = swiss_earnings_df.withColumn("year", year(swiss_earnings_df.transaction_date))
    
    # Group by full name and year
    grouped_df = swiss_earnings_df.groupBy(concat_ws(' ', swiss_earnings_df.first_name, swiss_earnings_df.last_name).alias("full_name"), "year").agg(round(sum("amount"), 2).alias("earnings"))
    
    # Pivot table
    pivot_df = grouped_df.groupBy("full_name").pivot("year").sum("earnings").fillna(0)

    return pivot_df

In [43]:
# Assuming dataframes are read from CSVs
result_df = earnings_pivot_table(accounts, country_abbrev, transactions)

result_df.show()

+------------------+-------+-------+-------+--------+-------+-------+--------+-------+-------+-------+-------+
|         full_name|   2011|   2012|   2013|    2014|   2015|   2016|    2017|   2018|   2019|   2020|   2021|
+------------------+-------+-------+-------+--------+-------+-------+--------+-------+-------+-------+-------+
|     Lenny Spencer|2050.35|    0.0|    0.0|     0.0|    0.0| 509.08|16116.58|    0.0|8693.52| 923.65|8797.15|
|      Lucia Watson|    0.0|2173.26|    0.0|10047.25|    0.0|    0.0|     0.0|    0.0|4500.61|    0.0|    0.0|
|      Jessica West|    0.0|8673.72|    0.0|     0.0|    0.0|    0.0|     0.0|    0.0|    0.0|4797.03|1294.49|
|     Aston Andrews|    0.0|    0.0|    0.0|     0.0|    0.0|    0.0| 4598.25|    0.0|    0.0|    0.0|    0.0|
|   Kirsten Stevens| 8932.7|    0.0|5753.21|     0.0|    0.0|3134.12|     0.0|    0.0|    0.0|    0.0|    0.0|
|       Luke Carter|    0.0|1585.41|  93.69|     0.0|    0.0|    0.0| 7029.37|8340.16|    0.0|    0.0|    0.0|
|