In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DateType, StringType
from pyspark.sql.functions import *


spark = SparkSession.builder.master("local[1]")\
          .appName("task1")\
          .getOrCreate()

df_accounts = spark.read.option("header",True) \
     .option("delimiter", ";")\
     .csv("work/accounts.csv")

df_country = spark.read.option("header",True) \
    .option("delimiter", ";")\
     .csv("work/country_abbreviation.csv")

df_trans = spark.read.option("header",True) \
    .option("delimiter", ";")\
     .csv("work/transactions.csv")

In [2]:
df_trans = df_trans.withColumnRenamed("id", "account_id")

In [3]:
df_trans.head(2)

[Row(account_id='179528', amount='-730.86', account_type='Business', transaction_date='2013-07-10', country='SV'),
 Row(account_id='378343', amount='-946.98', account_type='Personal', transaction_date='2018-04-06', country='YE')]

In [4]:
df_accounts = df_accounts.where(df_accounts.country == "CH")

In [5]:
df_accounts = df_accounts.withColumn("full_name", concat(df_accounts.first_name,lit(" "),df_accounts.last_name)) 

In [6]:
df_accounts = df_accounts.drop(*["first_name", "last_name", "age"])

In [7]:
df_accounts.take(3)

[Row(id='23', country='CH', full_name='Frederick Morrison'),
 Row(id='108', country='CH', full_name='Maximilian Chapman'),
 Row(id='158', country='CH', full_name='Spike Taylor')]

In [8]:
df_trans = df_trans.where(df_trans.amount > 0)
df_trans = df_trans.drop(*["account_type", "country"])
df_trans = df_trans.withColumn("amount", df_trans["amount"].cast(IntegerType()))

In [9]:
df_trans.take(5)

[Row(account_id='75450', amount=7816, transaction_date='2016-11-20'),
 Row(account_id='357719', amount=704, transaction_date='2016-11-06'),
 Row(account_id='110511', amount=3462, transaction_date='2018-01-18'),
 Row(account_id='461830', amount=762, transaction_date='2017-06-20'),
 Row(account_id='30180', amount=5390, transaction_date='2021-05-26')]

In [10]:
df_trans = df_trans.withColumn("year", date_trunc("year", df_trans.transaction_date))

In [11]:
df_trans.take(5)

[Row(account_id='75450', amount=7816, transaction_date='2016-11-20', year=datetime.datetime(2016, 1, 1, 0, 0)),
 Row(account_id='357719', amount=704, transaction_date='2016-11-06', year=datetime.datetime(2016, 1, 1, 0, 0)),
 Row(account_id='110511', amount=3462, transaction_date='2018-01-18', year=datetime.datetime(2018, 1, 1, 0, 0)),
 Row(account_id='461830', amount=762, transaction_date='2017-06-20', year=datetime.datetime(2017, 1, 1, 0, 0)),
 Row(account_id='30180', amount=5390, transaction_date='2021-05-26', year=datetime.datetime(2021, 1, 1, 0, 0))]

In [12]:
df_final = df_accounts.join(df_trans, df_accounts.id == df_trans.account_id)\
                        .groupby("year", "full_name", "id")\
                        .sum("amount")\
                        .groupby("full_name")\
                        .pivot("year")\
                        .sum('sum(amount)')

In [None]:
df_final.show(1)

In [None]:
df_final.describe()