# 1. Read the following three CSV files "transactions.csv", "country_abbreviation.csv", and "accounts.csv".

In [1]:
from pyspark.sql import SparkSession

path_to = "input_csv_files/"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Spark CSV Analysis") \
    .getOrCreate()

# Read transactions.csv
transactionsDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv(path_to + "transactions.csv")

# Read country_abbreviation.csv
countryAbbrDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv(path_to + "country_abbreviation.csv")

# Read accounts.csv
accountsDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv(path_to + "accounts.csv")


# 

In [None]:
from pyspark.sql.functions import col, concat, lit, year, sum

# Join the transactions and accounts dataframes on 'id' and filter out the earnings greater than 0
earningsDF = transactionsDF \
    .join(accountsDF, transactionsDF["id"] == accountsDF["id"]) \
    .filter(col("amount") > 0) \
    .drop(accountsDF["id"]) # Drop duplicate 'id' column

# Note: You should also resolve the country column conflict. Here, I'm keeping the country from transactionsDF
# You might adjust this according to your needs
earningsDF = earningsDF.drop(accountsDF["country"])

# Join with country abbreviation dataframe to get full country names
withFullCountryDF = earningsDF \
    .join(countryAbbrDF, earningsDF["country"] == countryAbbrDF["abbreviation"])

# Filter out users from Switzerland
swissEarningsDF = withFullCountryDF.filter(col("country_full_name") == "Switzerland")

# Calculate yearly earnings
yearlyEarningsDF = swissEarningsDF.groupBy(
    concat(col("first_name"), lit(" "), col("last_name")).alias("full_name"),
    year(col("transaction_date")).alias("year")) \
    .agg(sum("amount").alias("earnings"))

# Pivot the dataframe on 'year'
pivotDF = yearlyEarningsDF.groupBy("full_name").pivot("year").agg(sum("earnings"))

pivotDF.show()
